262 lines
9.0 KiB
Python
262 lines
9.0 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
获取密集库位处理器并发问题修复演示
|
||
展示优化前后的区别
|
||
"""
|
||
|
||
import asyncio
|
||
import time
|
||
import random
|
||
from typing import Dict, Any, List
|
||
from datetime import datetime
|
||
|
||
|
||
class MockDatabase:
|
||
"""模拟数据库"""
|
||
def __init__(self):
|
||
# 模拟5个空闲库位
|
||
self.storage_sites = {
|
||
f"A00{i}": {
|
||
"is_locked": False,
|
||
"is_occupied": False,
|
||
"last_access": datetime.now()
|
||
}
|
||
for i in range(1, 6)
|
||
}
|
||
self.operation_log = []
|
||
|
||
async def query_available_sites(self) -> List[str]:
|
||
"""查询可用库位"""
|
||
await asyncio.sleep(0.01) # 模拟数据库查询延迟
|
||
available = [
|
||
site_id for site_id, info in self.storage_sites.items()
|
||
if not info["is_locked"] and not info["is_occupied"]
|
||
]
|
||
return sorted(available)
|
||
|
||
async def lock_site_old_way(self, site_id: str, task_id: str) -> bool:
|
||
"""旧方式:分别查询后锁定(有竞争条件)"""
|
||
# 先查询
|
||
available_sites = await self.query_available_sites()
|
||
if site_id not in available_sites:
|
||
return False
|
||
|
||
# 模拟网络延迟和处理时间
|
||
await asyncio.sleep(random.uniform(0.01, 0.05))
|
||
|
||
# 尝试锁定(可能已被其他请求抢占)
|
||
if self.storage_sites[site_id]["is_locked"]:
|
||
return False
|
||
|
||
self.storage_sites[site_id]["is_locked"] = True
|
||
self.operation_log.append(f"[OLD] Task {task_id} locked {site_id}")
|
||
return True
|
||
|
||
async def lock_site_new_way(self, site_id: str, task_id: str) -> bool:
|
||
"""新方式:原子性锁定(无竞争条件)"""
|
||
# 原子操作:检查并锁定
|
||
if not self.storage_sites[site_id]["is_locked"] and not self.storage_sites[site_id]["is_occupied"]:
|
||
self.storage_sites[site_id]["is_locked"] = True
|
||
self.operation_log.append(f"[NEW] Task {task_id} locked {site_id}")
|
||
return True
|
||
return False
|
||
|
||
def reset(self):
|
||
"""重置数据库状态"""
|
||
for site_info in self.storage_sites.values():
|
||
site_info["is_locked"] = False
|
||
self.operation_log.clear()
|
||
|
||
|
||
async def simulate_old_approach(db: MockDatabase, num_tasks: int) -> Dict[str, Any]:
|
||
"""模拟旧的实现方式"""
|
||
print(f"\n=== 模拟旧方式:查询后锁定(存在竞争条件)===")
|
||
|
||
async def old_task(task_id: int):
|
||
# 查询可用库位
|
||
available_sites = await db.query_available_sites()
|
||
if not available_sites:
|
||
return {"task_id": task_id, "success": False, "reason": "no_sites"}
|
||
|
||
# 选择第一个可用库位
|
||
target_site = available_sites[0]
|
||
|
||
# 尝试锁定
|
||
success = await db.lock_site_old_way(target_site, f"task_{task_id}")
|
||
return {
|
||
"task_id": task_id,
|
||
"success": success,
|
||
"site_id": target_site if success else None,
|
||
"reason": "success" if success else "race_condition"
|
||
}
|
||
|
||
# 重置数据库
|
||
db.reset()
|
||
start_time = time.time()
|
||
|
||
# 并发执行任务
|
||
tasks = [old_task(i) for i in range(num_tasks)]
|
||
results = await asyncio.gather(*tasks)
|
||
|
||
end_time = time.time()
|
||
|
||
# 分析结果
|
||
successful = [r for r in results if r["success"]]
|
||
failed = [r for r in results if not r["success"]]
|
||
|
||
# 检查是否有重复分配
|
||
allocated_sites = [r["site_id"] for r in successful if r["site_id"]]
|
||
duplicates = len(allocated_sites) - len(set(allocated_sites))
|
||
|
||
print(f"执行时间: {end_time - start_time:.3f}s")
|
||
print(f"成功任务: {len(successful)}")
|
||
print(f"失败任务: {len(failed)}")
|
||
print(f"重复分配: {duplicates} 个库位被重复分配")
|
||
|
||
if duplicates > 0:
|
||
print("❌ 发现并发竞争问题:有库位被重复分配!")
|
||
else:
|
||
print("✓ 未发现重复分配")
|
||
|
||
print("操作日志:")
|
||
for log in db.operation_log[:10]: # 只显示前10条
|
||
print(f" {log}")
|
||
|
||
return {
|
||
"total_tasks": num_tasks,
|
||
"successful": len(successful),
|
||
"failed": len(failed),
|
||
"duplicates": duplicates,
|
||
"execution_time": end_time - start_time
|
||
}
|
||
|
||
|
||
async def simulate_new_approach(db: MockDatabase, num_tasks: int) -> Dict[str, Any]:
|
||
"""模拟新的实现方式"""
|
||
print(f"\n=== 模拟新方式:原子性获取(无竞争条件)===")
|
||
|
||
async def new_task(task_id: int):
|
||
# 查询可用库位
|
||
available_sites = await db.query_available_sites()
|
||
if not available_sites:
|
||
return {"task_id": task_id, "success": False, "reason": "no_sites"}
|
||
|
||
# 按优先级尝试原子性锁定每个库位
|
||
for site_id in available_sites:
|
||
success = await db.lock_site_new_way(site_id, f"task_{task_id}")
|
||
if success:
|
||
return {
|
||
"task_id": task_id,
|
||
"success": True,
|
||
"site_id": site_id,
|
||
"reason": "success"
|
||
}
|
||
|
||
return {"task_id": task_id, "success": False, "reason": "all_occupied"}
|
||
|
||
# 重置数据库
|
||
db.reset()
|
||
start_time = time.time()
|
||
|
||
# 并发执行任务
|
||
tasks = [new_task(i) for i in range(num_tasks)]
|
||
results = await asyncio.gather(*tasks)
|
||
|
||
end_time = time.time()
|
||
|
||
# 分析结果
|
||
successful = [r for r in results if r["success"]]
|
||
failed = [r for r in results if not r["success"]]
|
||
|
||
# 检查是否有重复分配
|
||
allocated_sites = [r["site_id"] for r in successful if r["site_id"]]
|
||
duplicates = len(allocated_sites) - len(set(allocated_sites))
|
||
|
||
print(f"执行时间: {end_time - start_time:.3f}s")
|
||
print(f"成功任务: {len(successful)}")
|
||
print(f"失败任务: {len(failed)}")
|
||
print(f"重复分配: {duplicates} 个库位被重复分配")
|
||
|
||
if duplicates > 0:
|
||
print("❌ 仍然存在并发竞争问题!")
|
||
else:
|
||
print("✓ 成功解决并发竞争问题")
|
||
|
||
print("操作日志:")
|
||
for log in db.operation_log[:10]: # 只显示前10条
|
||
print(f" {log}")
|
||
|
||
return {
|
||
"total_tasks": num_tasks,
|
||
"successful": len(successful),
|
||
"failed": len(failed),
|
||
"duplicates": duplicates,
|
||
"execution_time": end_time - start_time
|
||
}
|
||
|
||
|
||
async def run_comparison_demo():
|
||
"""运行对比演示"""
|
||
print("🔧 获取密集库位处理器并发问题修复演示")
|
||
print("=" * 60)
|
||
|
||
db = MockDatabase()
|
||
num_concurrent_tasks = 10 # 10个并发任务竞争5个库位
|
||
|
||
print(f"测试场景: {num_concurrent_tasks} 个并发任务竞争 {len(db.storage_sites)} 个库位")
|
||
|
||
# 测试旧方式
|
||
old_results = await simulate_old_approach(db, num_concurrent_tasks)
|
||
|
||
# 测试新方式
|
||
new_results = await simulate_new_approach(db, num_concurrent_tasks)
|
||
|
||
# 对比结果
|
||
print(f"\n=== 对比结果 ===")
|
||
print(f"旧方式 - 成功: {old_results['successful']}, 失败: {old_results['failed']}, 重复分配: {old_results['duplicates']}")
|
||
print(f"新方式 - 成功: {new_results['successful']}, 失败: {new_results['failed']}, 重复分配: {new_results['duplicates']}")
|
||
|
||
if old_results['duplicates'] > 0 and new_results['duplicates'] == 0:
|
||
print("🎉 优化成功!新方式完全解决了并发竞争问题")
|
||
elif old_results['duplicates'] == new_results['duplicates'] == 0:
|
||
print("ℹ️ 在当前测试条件下,两种方式都没有出现竞争问题")
|
||
print(" 可能需要增加并发压力或调整延迟来复现问题")
|
||
else:
|
||
print("⚠️ 优化可能不够完善,需要进一步检查")
|
||
|
||
# 性能对比
|
||
performance_diff = ((new_results['execution_time'] - old_results['execution_time']) / old_results['execution_time']) * 100
|
||
if performance_diff > 0:
|
||
print(f"📊 性能: 新方式比旧方式慢 {performance_diff:.1f}%")
|
||
else:
|
||
print(f"📊 性能: 新方式比旧方式快 {abs(performance_diff):.1f}%")
|
||
|
||
|
||
async def stress_test():
|
||
"""压力测试"""
|
||
print(f"\n=== 压力测试 ===")
|
||
print("高并发场景下测试并发安全性...")
|
||
|
||
db = MockDatabase()
|
||
num_tasks = 50 # 50个并发任务
|
||
|
||
print(f"压力测试: {num_tasks} 个高并发任务")
|
||
|
||
# 只测试新方式的压力承受能力
|
||
results = await simulate_new_approach(db, num_tasks)
|
||
|
||
success_rate = (results['successful'] / results['total_tasks']) * 100
|
||
print(f"成功率: {success_rate:.1f}%")
|
||
print(f"平均每任务耗时: {results['execution_time']/results['total_tasks']*1000:.2f}ms")
|
||
|
||
if results['duplicates'] == 0:
|
||
print("✅ 压力测试通过:无并发竞争问题")
|
||
else:
|
||
print("❌ 压力测试失败:仍存在并发竞争问题")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(run_comparison_demo())
|
||
asyncio.run(stress_test()) |