#!/usr/bin/env python # -*- coding: utf-8 -*- """ 库位请求队列管理器 提供稳定的库位分配队列机制,避免高并发冲突 """ import asyncio import time import json import uuid from typing import Dict, Any, List, Optional, Callable from dataclasses import dataclass, field from enum import Enum from collections import defaultdict import heapq from utils.logger import get_logger from config.settings import settings logger = get_logger("services.execution.handlers.storage_queue_manager") class RequestPriority(Enum): """请求优先级""" LOW = 1 NORMAL = 2 HIGH = 3 URGENT = 4 class RequestStatus(Enum): """请求状态""" PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" TIMEOUT = "timeout" CANCELLED = "cancelled" @dataclass class StorageRequest: """库位请求""" request_id: str handler_type: str # "idle_site" 或 "idle_crowded_site" input_params: Dict[str, Any] context_data: Dict[str, Any] map_id: str task_record_id: str priority: RequestPriority = RequestPriority.NORMAL created_at: float = field(default_factory=time.time) timeout: float = field(default_factory=lambda: settings.STORAGE_QUEUE_DEFAULT_TIMEOUT if settings.STORAGE_QUEUE_ENABLE_TIMEOUT else float('inf')) retry_count: int = 0 max_retries: int = 3 status: RequestStatus = RequestStatus.PENDING result: Optional[Dict[str, Any]] = None error_message: Optional[str] = None def __lt__(self, other): """用于优先级队列排序""" if self.priority.value != other.priority.value: return self.priority.value > other.priority.value # 高优先级优先 return self.created_at < other.created_at # 时间早的优先 class StorageQueueManager: """库位请求队列管理器""" def __init__(self, max_workers: int = settings.STORAGE_QUEUE_MAX_WORKERS, max_queue_size: int = None): self.max_workers = max_workers or settings.STORAGE_QUEUE_MAX_WORKERS self.max_queue_size = max_queue_size or settings.STORAGE_QUEUE_MAX_SIZE self.enable_timeout = settings.STORAGE_QUEUE_ENABLE_TIMEOUT self.cleanup_interval = settings.STORAGE_QUEUE_CLEANUP_INTERVAL self.completed_request_ttl = settings.STORAGE_QUEUE_COMPLETED_REQUEST_TTL # 优先级队列 self.request_queue: List[StorageRequest] = [] self.queue_lock = asyncio.Lock() # 请求跟踪 self.pending_requests: Dict[str, StorageRequest] = {} self.processing_requests: Dict[str, StorageRequest] = {} self.completed_requests: Dict[str, StorageRequest] = {} # 工作者管理 self.workers: List[asyncio.Task] = [] self.worker_semaphore = asyncio.Semaphore(max_workers) self.shutdown_event = asyncio.Event() # 处理器注册 self.handlers: Dict[str, Callable] = {} # 统计信息 self.stats = { 'requests_total': 0, 'requests_completed': 0, 'requests_failed': 0, 'requests_timeout': 0, 'avg_processing_time': 0, 'queue_size': 0, 'active_workers': 0 } # 启动清理任务 self.cleanup_task = None async def start(self): """启动队列管理器""" logger.info("启动库位请求队列管理器") # 启动工作者 for i in range(self.max_workers): worker = asyncio.create_task(self._worker(f"worker-{i}")) self.workers.append(worker) # 启动清理任务 self.cleanup_task = asyncio.create_task(self._cleanup_completed_requests()) logger.info(f"队列管理器已启动,工作者数量: {self.max_workers}") async def stop(self): """停止队列管理器""" logger.info("停止库位请求队列管理器") # 设置停止信号 self.shutdown_event.set() # 等待所有工作者完成 if self.workers: await asyncio.gather(*self.workers, return_exceptions=True) # 停止清理任务 if self.cleanup_task: self.cleanup_task.cancel() try: await self.cleanup_task except asyncio.CancelledError: pass logger.info("队列管理器已停止") def register_handler(self, handler_type: str, handler_func: Callable): """注册处理器""" self.handlers[handler_type] = handler_func logger.info(f"注册处理器: {handler_type}") async def submit_request(self, request: StorageRequest) -> str: """提交请求到队列""" if len(self.request_queue) >= self.max_queue_size: raise Exception("队列已满,无法提交新请求") async with self.queue_lock: # 添加到优先级队列 heapq.heappush(self.request_queue, request) self.pending_requests[request.request_id] = request # 更新统计 self.stats['requests_total'] += 1 self.stats['queue_size'] = len(self.request_queue) logger.info(f"提交请求到队列: {request.request_id}, 优先级: {request.priority.name}") return request.request_id async def get_request_status(self, request_id: str) -> Optional[Dict[str, Any]]: """获取请求状态""" # 检查各个状态的请求 if request_id in self.pending_requests: request = self.pending_requests[request_id] return { "request_id": request_id, "status": request.status.value, "created_at": request.created_at, "queue_position": self._get_queue_position(request_id) } elif request_id in self.processing_requests: request = self.processing_requests[request_id] return { "request_id": request_id, "status": request.status.value, "created_at": request.created_at, "processing_time": time.time() - request.created_at } elif request_id in self.completed_requests: request = self.completed_requests[request_id] return { "request_id": request_id, "status": request.status.value, "created_at": request.created_at, "result": request.result, "error_message": request.error_message, "processing_time": time.time() - request.created_at } return None async def wait_for_result(self, request_id: str, timeout: float = None) -> Dict[str, Any]: """等待请求结果""" start_time = time.time() # 根据配置决定是否使用超时 if not self.enable_timeout: # 禁用超时,无限等待 check_timeout = float('inf') logger.info(f"等待请求结果(无超时限制): {request_id}") else: # 启用超时,使用提供的超时时间或默认值 check_timeout = timeout or settings.STORAGE_QUEUE_DEFAULT_TIMEOUT logger.info(f"等待请求结果(超时 {check_timeout}s): {request_id}") while True: # 检查是否完成 if request_id in self.completed_requests: request = self.completed_requests[request_id] if request.status == RequestStatus.COMPLETED: return request.result else: raise Exception(f"请求失败: {request.error_message}") # 只有在启用超时时才检查超时 if self.enable_timeout: # 检查等待超时 if time.time() - start_time >= check_timeout: await self._mark_request_timeout(request_id) raise Exception("等待结果超时") # 检查请求本身是否超时 if request_id in self.pending_requests: request = self.pending_requests[request_id] if request.timeout != float('inf') and time.time() - request.created_at > request.timeout: await self._mark_request_timeout(request_id) raise Exception("请求超时") await asyncio.sleep(0.1) # 避免忙等待 async def cancel_request(self, request_id: str) -> bool: """取消请求""" async with self.queue_lock: if request_id in self.pending_requests: request = self.pending_requests[request_id] request.status = RequestStatus.CANCELLED # 从队列中移除 self.request_queue = [r for r in self.request_queue if r.request_id != request_id] heapq.heapify(self.request_queue) # 移动到完成队列 del self.pending_requests[request_id] self.completed_requests[request_id] = request self.stats['queue_size'] = len(self.request_queue) logger.info(f"取消请求: {request_id}") return True return False def get_queue_stats(self) -> Dict[str, Any]: """获取队列统计信息""" self.stats['queue_size'] = len(self.request_queue) self.stats['active_workers'] = sum(1 for w in self.workers if not w.done()) return self.stats.copy() async def _worker(self, worker_name: str): """工作者协程""" logger.info(f"工作者 {worker_name} 启动") while not self.shutdown_event.is_set(): try: # 获取请求 request = await self._get_next_request() if not request: await asyncio.sleep(0.1) continue # 处理请求 async with self.worker_semaphore: await self._process_request(request, worker_name) except Exception as e: logger.error(f"工作者 {worker_name} 处理请求异常: {str(e)}") await asyncio.sleep(1) logger.info(f"工作者 {worker_name} 停止") async def _get_next_request(self) -> Optional[StorageRequest]: """获取下一个请求""" async with self.queue_lock: while self.request_queue: request = heapq.heappop(self.request_queue) # 检查请求是否仍然有效 if request.request_id in self.pending_requests: # 只有在启用超时且请求设置了有限超时时才检查超时 if (self.enable_timeout and request.timeout != float('inf') and time.time() - request.created_at > request.timeout): await self._mark_request_timeout(request.request_id) continue # 移动到处理队列 del self.pending_requests[request.request_id] self.processing_requests[request.request_id] = request request.status = RequestStatus.PROCESSING self.stats['queue_size'] = len(self.request_queue) return request return None async def _process_request(self, request: StorageRequest, worker_name: str): """处理单个请求""" start_time = time.time() logger.info(f"工作者 {worker_name} 开始处理请求: {request.request_id}") try: # 获取处理器 handler = self.handlers.get(request.handler_type) if not handler: raise Exception(f"未找到处理器: {request.handler_type}") # 执行处理 result = await handler(request.input_params, request.context_data, request.map_id, request.task_record_id) # 标记完成 await self._mark_request_completed(request.request_id, result) processing_time = time.time() - start_time logger.info(f"工作者 {worker_name} 完成请求: {request.request_id}, 用时: {processing_time:.3f}s") except Exception as e: # 处理失败 error_msg = str(e) logger.error(f"工作者 {worker_name} 处理请求失败: {request.request_id}, 错误: {error_msg}") # 检查是否需要重试 if request.retry_count < request.max_retries: await self._retry_request(request) else: await self._mark_request_failed(request.request_id, error_msg) finally: # 更新平均处理时间 processing_time = time.time() - start_time total_time = self.stats['avg_processing_time'] * self.stats['requests_completed'] self.stats['requests_completed'] += 1 self.stats['avg_processing_time'] = (total_time + processing_time) / self.stats['requests_completed'] async def _mark_request_completed(self, request_id: str, result: Dict[str, Any]): """标记请求完成""" if request_id in self.processing_requests: request = self.processing_requests[request_id] request.status = RequestStatus.COMPLETED request.result = result del self.processing_requests[request_id] self.completed_requests[request_id] = request self.stats['requests_completed'] += 1 async def _mark_request_failed(self, request_id: str, error_message: str): """标记请求失败""" if request_id in self.processing_requests: request = self.processing_requests[request_id] request.status = RequestStatus.FAILED request.error_message = error_message del self.processing_requests[request_id] self.completed_requests[request_id] = request self.stats['requests_failed'] += 1 async def _mark_request_timeout(self, request_id: str): """标记请求超时""" request = None if request_id in self.pending_requests: request = self.pending_requests[request_id] del self.pending_requests[request_id] elif request_id in self.processing_requests: request = self.processing_requests[request_id] del self.processing_requests[request_id] if request: request.status = RequestStatus.TIMEOUT request.error_message = "请求超时" self.completed_requests[request_id] = request self.stats['requests_timeout'] += 1 async def _retry_request(self, request: StorageRequest): """重试请求""" request.retry_count += 1 request.status = RequestStatus.PENDING # 重新加入队列 async with self.queue_lock: heapq.heappush(self.request_queue, request) del self.processing_requests[request.request_id] self.pending_requests[request.request_id] = request self.stats['queue_size'] = len(self.request_queue) logger.info(f"重试请求: {request.request_id}, 第 {request.retry_count} 次重试") def _get_queue_position(self, request_id: str) -> int: """获取请求在队列中的位置""" for i, request in enumerate(self.request_queue): if request.request_id == request_id: return i + 1 return -1 async def _cleanup_completed_requests(self): """清理已完成的请求""" while not self.shutdown_event.is_set(): try: await asyncio.sleep(self.cleanup_interval) # 使用配置的清理间隔 current_time = time.time() cleanup_threshold = self.completed_request_ttl # 使用配置的保留时间 to_remove = [] for request_id, request in self.completed_requests.items(): if current_time - request.created_at > cleanup_threshold: to_remove.append(request_id) for request_id in to_remove: del self.completed_requests[request_id] if to_remove: logger.info(f"清理了 {len(to_remove)} 个已完成的请求") except Exception as e: logger.error(f"清理已完成请求异常: {str(e)}") # 全局队列管理器实例 storage_queue_manager = StorageQueueManager()