VWED_server/utils/area_lock_manager.py

243 lines
8.2 KiB
Python
Raw Normal View History

2025-09-09 10:41:27 +08:00
"""
库区级别的分布式锁管理器
用于解决密集库区获取库位时的并发竞争问题
"""
import asyncio
import time
from typing import Dict, Optional
# from utils.logger import Logger
from config.settings import settings
from utils.logger import get_logger
logger = get_logger("utils.area_lock_manager")
class AreaLockManager:
"""库区级别的分布式锁管理器"""
def __init__(self, default_timeout: Optional[int] = None):
"""
初始化锁管理器
Args:
default_timeout: 默认锁超时时间None时使用配置文件中的值
"""
self.locks: Dict[str, Dict[str, any]] = {} # 锁信息存储
self.lock = asyncio.Lock() # 保护locks字典的锁
self.default_timeout = default_timeout or settings.AREA_LOCK_DEFAULT_TIMEOUT
self.cleanup_interval = settings.AREA_LOCK_CLEANUP_INTERVAL
self.enabled = settings.AREA_LOCK_ENABLE
async def acquire_lock(self, lock_key: str, owner_id: str, timeout: Optional[int] = None) -> bool:
"""
获取锁
Args:
lock_key: 锁的唯一标识
owner_id: 锁的拥有者ID通常是task_record_id
timeout: 锁超时时间None使用默认值
Returns:
bool: 是否成功获取锁
"""
# 如果库区锁功能被禁用,直接返回成功
if not self.enabled:
logger.debug(f"库区锁功能已禁用,跳过锁获取: {lock_key}")
return True
timeout = timeout or self.default_timeout
current_time = time.time()
async with self.lock:
# 检查锁是否存在
if lock_key in self.locks:
lock_info = self.locks[lock_key]
# 检查锁是否过期
if current_time > lock_info['expires_at']:
# 锁已过期,清理并重新获取
logger.info(f"库区锁已过期,清理锁: {lock_key}, 原拥有者: {lock_info['owner_id']}")
del self.locks[lock_key]
elif lock_info['owner_id'] == owner_id:
# 同一个拥有者重新获取锁,更新过期时间
lock_info['expires_at'] = current_time + timeout
logger.debug(f"库区锁续期成功: {lock_key}, 拥有者: {owner_id}")
return True
else:
# 锁被其他拥有者占用
logger.debug(f"库区锁被占用: {lock_key}, 当前拥有者: {lock_info['owner_id']}, 请求者: {owner_id}")
return False
# 获取新锁
self.locks[lock_key] = {
'owner_id': owner_id,
'acquired_at': current_time,
'expires_at': current_time + timeout
}
logger.debug(f"库区锁获取成功: {lock_key}, 拥有者: {owner_id}, 超时时间: {timeout}")
return True
async def release_lock(self, lock_key: str, owner_id: str) -> bool:
"""
释放锁
Args:
lock_key: 锁的唯一标识
owner_id: 锁的拥有者ID
Returns:
bool: 是否成功释放锁
"""
# 如果库区锁功能被禁用,直接返回成功
if not self.enabled:
logger.debug(f"库区锁功能已禁用,跳过锁释放: {lock_key}")
return True
async with self.lock:
if lock_key not in self.locks:
logger.warning(f"尝试释放不存在的库区锁: {lock_key}, 请求者: {owner_id}")
return False
lock_info = self.locks[lock_key]
if lock_info['owner_id'] != owner_id:
logger.warning(f"尝试释放他人的库区锁: {lock_key}, 当前拥有者: {lock_info['owner_id']}, 请求者: {owner_id}")
return False
del self.locks[lock_key]
logger.debug(f"库区锁释放成功: {lock_key}, 拥有者: {owner_id}")
return True
async def is_locked(self, lock_key: str) -> bool:
"""
检查锁是否被占用
Args:
lock_key: 锁的唯一标识
Returns:
bool: 锁是否被占用
"""
current_time = time.time()
async with self.lock:
if lock_key not in self.locks:
return False
lock_info = self.locks[lock_key]
if current_time > lock_info['expires_at']:
# 锁已过期,清理
del self.locks[lock_key]
return False
return True
async def get_lock_info(self, lock_key: str) -> Optional[Dict[str, any]]:
"""
获取锁信息
Args:
lock_key: 锁的唯一标识
Returns:
Optional[Dict]: 锁信息如果锁不存在返回None
"""
current_time = time.time()
async with self.lock:
if lock_key not in self.locks:
return None
lock_info = self.locks[lock_key]
if current_time > lock_info['expires_at']:
# 锁已过期,清理
del self.locks[lock_key]
return None
return lock_info.copy()
async def cleanup_expired_locks(self):
"""清理过期的锁"""
current_time = time.time()
expired_keys = []
async with self.lock:
for lock_key, lock_info in self.locks.items():
if current_time > lock_info['expires_at']:
expired_keys.append(lock_key)
for key in expired_keys:
del self.locks[key]
logger.info(f"清理过期的库区锁: {key}")
return len(expired_keys)
async def force_release_all_locks_by_owner(self, owner_id: str) -> int:
"""
强制释放指定拥有者的所有锁用于任务取消或异常情况
Args:
owner_id: 锁的拥有者ID
Returns:
int: 释放的锁数量
"""
released_count = 0
keys_to_remove = []
async with self.lock:
for lock_key, lock_info in self.locks.items():
if lock_info['owner_id'] == owner_id:
keys_to_remove.append(lock_key)
for key in keys_to_remove:
del self.locks[key]
released_count += 1
logger.info(f"强制释放库区锁: {key}, 拥有者: {owner_id}")
return released_count
def get_stats(self) -> Dict[str, any]:
"""
获取锁管理器统计信息
Returns:
Dict: 统计信息
"""
current_time = time.time()
active_locks = 0
expired_locks = 0
for lock_info in self.locks.values():
if current_time > lock_info['expires_at']:
expired_locks += 1
else:
active_locks += 1
return {
'total_locks': len(self.locks),
'active_locks': active_locks,
'expired_locks': expired_locks,
'default_timeout': self.default_timeout
}
# 全局锁管理器实例
area_lock_manager = AreaLockManager()
async def start_lock_cleanup_task():
"""启动锁清理任务"""
async def cleanup_task():
while True:
try:
await asyncio.sleep(area_lock_manager.cleanup_interval)
cleaned = await area_lock_manager.cleanup_expired_locks()
if cleaned > 0:
logger.info(f"定时清理过期库区锁: {cleaned}")
except Exception as e:
logger.error(f"库区锁清理任务异常: {str(e)}")
asyncio.create_task(cleanup_task())
logger.info(f"库区锁清理任务已启动,清理间隔: {area_lock_manager.cleanup_interval}")