""" 库区级别的分布式锁管理器 用于解决密集库区获取库位时的并发竞争问题 """ import asyncio import time from typing import Dict, Optional # from utils.logger import Logger from config.settings import settings from utils.logger import get_logger logger = get_logger("utils.area_lock_manager") class AreaLockManager: """库区级别的分布式锁管理器""" def __init__(self, default_timeout: Optional[int] = None): """ 初始化锁管理器 Args: default_timeout: 默认锁超时时间(秒),None时使用配置文件中的值 """ self.locks: Dict[str, Dict[str, any]] = {} # 锁信息存储 self.lock = asyncio.Lock() # 保护locks字典的锁 self.default_timeout = default_timeout or settings.AREA_LOCK_DEFAULT_TIMEOUT self.cleanup_interval = settings.AREA_LOCK_CLEANUP_INTERVAL self.enabled = settings.AREA_LOCK_ENABLE async def acquire_lock(self, lock_key: str, owner_id: str, timeout: Optional[int] = None) -> bool: """ 获取锁 Args: lock_key: 锁的唯一标识 owner_id: 锁的拥有者ID(通常是task_record_id) timeout: 锁超时时间(秒),None使用默认值 Returns: bool: 是否成功获取锁 """ # 如果库区锁功能被禁用,直接返回成功 if not self.enabled: logger.debug(f"库区锁功能已禁用,跳过锁获取: {lock_key}") return True timeout = timeout or self.default_timeout current_time = time.time() async with self.lock: # 检查锁是否存在 if lock_key in self.locks: lock_info = self.locks[lock_key] # 检查锁是否过期 if current_time > lock_info['expires_at']: # 锁已过期,清理并重新获取 logger.info(f"库区锁已过期,清理锁: {lock_key}, 原拥有者: {lock_info['owner_id']}") del self.locks[lock_key] elif lock_info['owner_id'] == owner_id: # 同一个拥有者重新获取锁,更新过期时间 lock_info['expires_at'] = current_time + timeout logger.debug(f"库区锁续期成功: {lock_key}, 拥有者: {owner_id}") return True else: # 锁被其他拥有者占用 logger.debug(f"库区锁被占用: {lock_key}, 当前拥有者: {lock_info['owner_id']}, 请求者: {owner_id}") return False # 获取新锁 self.locks[lock_key] = { 'owner_id': owner_id, 'acquired_at': current_time, 'expires_at': current_time + timeout } logger.debug(f"库区锁获取成功: {lock_key}, 拥有者: {owner_id}, 超时时间: {timeout}秒") return True async def release_lock(self, lock_key: str, owner_id: str) -> bool: """ 释放锁 Args: lock_key: 锁的唯一标识 owner_id: 锁的拥有者ID Returns: bool: 是否成功释放锁 """ # 如果库区锁功能被禁用,直接返回成功 if not self.enabled: logger.debug(f"库区锁功能已禁用,跳过锁释放: {lock_key}") return True async with self.lock: if lock_key not in self.locks: logger.warning(f"尝试释放不存在的库区锁: {lock_key}, 请求者: {owner_id}") return False lock_info = self.locks[lock_key] if lock_info['owner_id'] != owner_id: logger.warning(f"尝试释放他人的库区锁: {lock_key}, 当前拥有者: {lock_info['owner_id']}, 请求者: {owner_id}") return False del self.locks[lock_key] logger.debug(f"库区锁释放成功: {lock_key}, 拥有者: {owner_id}") return True async def is_locked(self, lock_key: str) -> bool: """ 检查锁是否被占用 Args: lock_key: 锁的唯一标识 Returns: bool: 锁是否被占用 """ current_time = time.time() async with self.lock: if lock_key not in self.locks: return False lock_info = self.locks[lock_key] if current_time > lock_info['expires_at']: # 锁已过期,清理 del self.locks[lock_key] return False return True async def get_lock_info(self, lock_key: str) -> Optional[Dict[str, any]]: """ 获取锁信息 Args: lock_key: 锁的唯一标识 Returns: Optional[Dict]: 锁信息,如果锁不存在返回None """ current_time = time.time() async with self.lock: if lock_key not in self.locks: return None lock_info = self.locks[lock_key] if current_time > lock_info['expires_at']: # 锁已过期,清理 del self.locks[lock_key] return None return lock_info.copy() async def cleanup_expired_locks(self): """清理过期的锁""" current_time = time.time() expired_keys = [] async with self.lock: for lock_key, lock_info in self.locks.items(): if current_time > lock_info['expires_at']: expired_keys.append(lock_key) for key in expired_keys: del self.locks[key] logger.info(f"清理过期的库区锁: {key}") return len(expired_keys) async def force_release_all_locks_by_owner(self, owner_id: str) -> int: """ 强制释放指定拥有者的所有锁(用于任务取消或异常情况) Args: owner_id: 锁的拥有者ID Returns: int: 释放的锁数量 """ released_count = 0 keys_to_remove = [] async with self.lock: for lock_key, lock_info in self.locks.items(): if lock_info['owner_id'] == owner_id: keys_to_remove.append(lock_key) for key in keys_to_remove: del self.locks[key] released_count += 1 logger.info(f"强制释放库区锁: {key}, 拥有者: {owner_id}") return released_count def get_stats(self) -> Dict[str, any]: """ 获取锁管理器统计信息 Returns: Dict: 统计信息 """ current_time = time.time() active_locks = 0 expired_locks = 0 for lock_info in self.locks.values(): if current_time > lock_info['expires_at']: expired_locks += 1 else: active_locks += 1 return { 'total_locks': len(self.locks), 'active_locks': active_locks, 'expired_locks': expired_locks, 'default_timeout': self.default_timeout } # 全局锁管理器实例 area_lock_manager = AreaLockManager() async def start_lock_cleanup_task(): """启动锁清理任务""" async def cleanup_task(): while True: try: await asyncio.sleep(area_lock_manager.cleanup_interval) cleaned = await area_lock_manager.cleanup_expired_locks() if cleaned > 0: logger.info(f"定时清理过期库区锁: {cleaned}个") except Exception as e: logger.error(f"库区锁清理任务异常: {str(e)}") asyncio.create_task(cleanup_task()) logger.info(f"库区锁清理任务已启动,清理间隔: {area_lock_manager.cleanup_interval}秒")