#!/usr/bin/env python # -*- coding: utf-8 -*- """ 获取密集库位处理器并发问题修复演示 展示优化前后的区别 """ import asyncio import time import random from typing import Dict, Any, List from datetime import datetime class MockDatabase: """模拟数据库""" def __init__(self): # 模拟5个空闲库位 self.storage_sites = { f"A00{i}": { "is_locked": False, "is_occupied": False, "last_access": datetime.now() } for i in range(1, 6) } self.operation_log = [] async def query_available_sites(self) -> List[str]: """查询可用库位""" await asyncio.sleep(0.01) # 模拟数据库查询延迟 available = [ site_id for site_id, info in self.storage_sites.items() if not info["is_locked"] and not info["is_occupied"] ] return sorted(available) async def lock_site_old_way(self, site_id: str, task_id: str) -> bool: """旧方式:分别查询后锁定(有竞争条件)""" # 先查询 available_sites = await self.query_available_sites() if site_id not in available_sites: return False # 模拟网络延迟和处理时间 await asyncio.sleep(random.uniform(0.01, 0.05)) # 尝试锁定(可能已被其他请求抢占) if self.storage_sites[site_id]["is_locked"]: return False self.storage_sites[site_id]["is_locked"] = True self.operation_log.append(f"[OLD] Task {task_id} locked {site_id}") return True async def lock_site_new_way(self, site_id: str, task_id: str) -> bool: """新方式:原子性锁定(无竞争条件)""" # 原子操作:检查并锁定 if not self.storage_sites[site_id]["is_locked"] and not self.storage_sites[site_id]["is_occupied"]: self.storage_sites[site_id]["is_locked"] = True self.operation_log.append(f"[NEW] Task {task_id} locked {site_id}") return True return False def reset(self): """重置数据库状态""" for site_info in self.storage_sites.values(): site_info["is_locked"] = False self.operation_log.clear() async def simulate_old_approach(db: MockDatabase, num_tasks: int) -> Dict[str, Any]: """模拟旧的实现方式""" print(f"\n=== 模拟旧方式:查询后锁定(存在竞争条件)===") async def old_task(task_id: int): # 查询可用库位 available_sites = await db.query_available_sites() if not available_sites: return {"task_id": task_id, "success": False, "reason": "no_sites"} # 选择第一个可用库位 target_site = available_sites[0] # 尝试锁定 success = await db.lock_site_old_way(target_site, f"task_{task_id}") return { "task_id": task_id, "success": success, "site_id": target_site if success else None, "reason": "success" if success else "race_condition" } # 重置数据库 db.reset() start_time = time.time() # 并发执行任务 tasks = [old_task(i) for i in range(num_tasks)] results = await asyncio.gather(*tasks) end_time = time.time() # 分析结果 successful = [r for r in results if r["success"]] failed = [r for r in results if not r["success"]] # 检查是否有重复分配 allocated_sites = [r["site_id"] for r in successful if r["site_id"]] duplicates = len(allocated_sites) - len(set(allocated_sites)) print(f"执行时间: {end_time - start_time:.3f}s") print(f"成功任务: {len(successful)}") print(f"失败任务: {len(failed)}") print(f"重复分配: {duplicates} 个库位被重复分配") if duplicates > 0: print("❌ 发现并发竞争问题:有库位被重复分配!") else: print("✓ 未发现重复分配") print("操作日志:") for log in db.operation_log[:10]: # 只显示前10条 print(f" {log}") return { "total_tasks": num_tasks, "successful": len(successful), "failed": len(failed), "duplicates": duplicates, "execution_time": end_time - start_time } async def simulate_new_approach(db: MockDatabase, num_tasks: int) -> Dict[str, Any]: """模拟新的实现方式""" print(f"\n=== 模拟新方式:原子性获取(无竞争条件)===") async def new_task(task_id: int): # 查询可用库位 available_sites = await db.query_available_sites() if not available_sites: return {"task_id": task_id, "success": False, "reason": "no_sites"} # 按优先级尝试原子性锁定每个库位 for site_id in available_sites: success = await db.lock_site_new_way(site_id, f"task_{task_id}") if success: return { "task_id": task_id, "success": True, "site_id": site_id, "reason": "success" } return {"task_id": task_id, "success": False, "reason": "all_occupied"} # 重置数据库 db.reset() start_time = time.time() # 并发执行任务 tasks = [new_task(i) for i in range(num_tasks)] results = await asyncio.gather(*tasks) end_time = time.time() # 分析结果 successful = [r for r in results if r["success"]] failed = [r for r in results if not r["success"]] # 检查是否有重复分配 allocated_sites = [r["site_id"] for r in successful if r["site_id"]] duplicates = len(allocated_sites) - len(set(allocated_sites)) print(f"执行时间: {end_time - start_time:.3f}s") print(f"成功任务: {len(successful)}") print(f"失败任务: {len(failed)}") print(f"重复分配: {duplicates} 个库位被重复分配") if duplicates > 0: print("❌ 仍然存在并发竞争问题!") else: print("✓ 成功解决并发竞争问题") print("操作日志:") for log in db.operation_log[:10]: # 只显示前10条 print(f" {log}") return { "total_tasks": num_tasks, "successful": len(successful), "failed": len(failed), "duplicates": duplicates, "execution_time": end_time - start_time } async def run_comparison_demo(): """运行对比演示""" print("🔧 获取密集库位处理器并发问题修复演示") print("=" * 60) db = MockDatabase() num_concurrent_tasks = 10 # 10个并发任务竞争5个库位 print(f"测试场景: {num_concurrent_tasks} 个并发任务竞争 {len(db.storage_sites)} 个库位") # 测试旧方式 old_results = await simulate_old_approach(db, num_concurrent_tasks) # 测试新方式 new_results = await simulate_new_approach(db, num_concurrent_tasks) # 对比结果 print(f"\n=== 对比结果 ===") print(f"旧方式 - 成功: {old_results['successful']}, 失败: {old_results['failed']}, 重复分配: {old_results['duplicates']}") print(f"新方式 - 成功: {new_results['successful']}, 失败: {new_results['failed']}, 重复分配: {new_results['duplicates']}") if old_results['duplicates'] > 0 and new_results['duplicates'] == 0: print("🎉 优化成功!新方式完全解决了并发竞争问题") elif old_results['duplicates'] == new_results['duplicates'] == 0: print("ℹ️ 在当前测试条件下,两种方式都没有出现竞争问题") print(" 可能需要增加并发压力或调整延迟来复现问题") else: print("⚠️ 优化可能不够完善,需要进一步检查") # 性能对比 performance_diff = ((new_results['execution_time'] - old_results['execution_time']) / old_results['execution_time']) * 100 if performance_diff > 0: print(f"📊 性能: 新方式比旧方式慢 {performance_diff:.1f}%") else: print(f"📊 性能: 新方式比旧方式快 {abs(performance_diff):.1f}%") async def stress_test(): """压力测试""" print(f"\n=== 压力测试 ===") print("高并发场景下测试并发安全性...") db = MockDatabase() num_tasks = 50 # 50个并发任务 print(f"压力测试: {num_tasks} 个高并发任务") # 只测试新方式的压力承受能力 results = await simulate_new_approach(db, num_tasks) success_rate = (results['successful'] / results['total_tasks']) * 100 print(f"成功率: {success_rate:.1f}%") print(f"平均每任务耗时: {results['execution_time']/results['total_tasks']*1000:.2f}ms") if results['duplicates'] == 0: print("✅ 压力测试通过:无并发竞争问题") else: print("❌ 压力测试失败:仍存在并发竞争问题") if __name__ == "__main__": asyncio.run(run_comparison_demo()) asyncio.run(stress_test())