VWED_server/tests/demo_concurrent_fix.py
2025-07-30 15:11:59 +08:00

262 lines
9.0 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
获取密集库位处理器并发问题修复演示
展示优化前后的区别
"""
import asyncio
import time
import random
from typing import Dict, Any, List
from datetime import datetime
class MockDatabase:
"""模拟数据库"""
def __init__(self):
# 模拟5个空闲库位
self.storage_sites = {
f"A00{i}": {
"is_locked": False,
"is_occupied": False,
"last_access": datetime.now()
}
for i in range(1, 6)
}
self.operation_log = []
async def query_available_sites(self) -> List[str]:
"""查询可用库位"""
await asyncio.sleep(0.01) # 模拟数据库查询延迟
available = [
site_id for site_id, info in self.storage_sites.items()
if not info["is_locked"] and not info["is_occupied"]
]
return sorted(available)
async def lock_site_old_way(self, site_id: str, task_id: str) -> bool:
"""旧方式:分别查询后锁定(有竞争条件)"""
# 先查询
available_sites = await self.query_available_sites()
if site_id not in available_sites:
return False
# 模拟网络延迟和处理时间
await asyncio.sleep(random.uniform(0.01, 0.05))
# 尝试锁定(可能已被其他请求抢占)
if self.storage_sites[site_id]["is_locked"]:
return False
self.storage_sites[site_id]["is_locked"] = True
self.operation_log.append(f"[OLD] Task {task_id} locked {site_id}")
return True
async def lock_site_new_way(self, site_id: str, task_id: str) -> bool:
"""新方式:原子性锁定(无竞争条件)"""
# 原子操作:检查并锁定
if not self.storage_sites[site_id]["is_locked"] and not self.storage_sites[site_id]["is_occupied"]:
self.storage_sites[site_id]["is_locked"] = True
self.operation_log.append(f"[NEW] Task {task_id} locked {site_id}")
return True
return False
def reset(self):
"""重置数据库状态"""
for site_info in self.storage_sites.values():
site_info["is_locked"] = False
self.operation_log.clear()
async def simulate_old_approach(db: MockDatabase, num_tasks: int) -> Dict[str, Any]:
"""模拟旧的实现方式"""
print(f"\n=== 模拟旧方式:查询后锁定(存在竞争条件)===")
async def old_task(task_id: int):
# 查询可用库位
available_sites = await db.query_available_sites()
if not available_sites:
return {"task_id": task_id, "success": False, "reason": "no_sites"}
# 选择第一个可用库位
target_site = available_sites[0]
# 尝试锁定
success = await db.lock_site_old_way(target_site, f"task_{task_id}")
return {
"task_id": task_id,
"success": success,
"site_id": target_site if success else None,
"reason": "success" if success else "race_condition"
}
# 重置数据库
db.reset()
start_time = time.time()
# 并发执行任务
tasks = [old_task(i) for i in range(num_tasks)]
results = await asyncio.gather(*tasks)
end_time = time.time()
# 分析结果
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
# 检查是否有重复分配
allocated_sites = [r["site_id"] for r in successful if r["site_id"]]
duplicates = len(allocated_sites) - len(set(allocated_sites))
print(f"执行时间: {end_time - start_time:.3f}s")
print(f"成功任务: {len(successful)}")
print(f"失败任务: {len(failed)}")
print(f"重复分配: {duplicates} 个库位被重复分配")
if duplicates > 0:
print("❌ 发现并发竞争问题:有库位被重复分配!")
else:
print("✓ 未发现重复分配")
print("操作日志:")
for log in db.operation_log[:10]: # 只显示前10条
print(f" {log}")
return {
"total_tasks": num_tasks,
"successful": len(successful),
"failed": len(failed),
"duplicates": duplicates,
"execution_time": end_time - start_time
}
async def simulate_new_approach(db: MockDatabase, num_tasks: int) -> Dict[str, Any]:
"""模拟新的实现方式"""
print(f"\n=== 模拟新方式:原子性获取(无竞争条件)===")
async def new_task(task_id: int):
# 查询可用库位
available_sites = await db.query_available_sites()
if not available_sites:
return {"task_id": task_id, "success": False, "reason": "no_sites"}
# 按优先级尝试原子性锁定每个库位
for site_id in available_sites:
success = await db.lock_site_new_way(site_id, f"task_{task_id}")
if success:
return {
"task_id": task_id,
"success": True,
"site_id": site_id,
"reason": "success"
}
return {"task_id": task_id, "success": False, "reason": "all_occupied"}
# 重置数据库
db.reset()
start_time = time.time()
# 并发执行任务
tasks = [new_task(i) for i in range(num_tasks)]
results = await asyncio.gather(*tasks)
end_time = time.time()
# 分析结果
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
# 检查是否有重复分配
allocated_sites = [r["site_id"] for r in successful if r["site_id"]]
duplicates = len(allocated_sites) - len(set(allocated_sites))
print(f"执行时间: {end_time - start_time:.3f}s")
print(f"成功任务: {len(successful)}")
print(f"失败任务: {len(failed)}")
print(f"重复分配: {duplicates} 个库位被重复分配")
if duplicates > 0:
print("❌ 仍然存在并发竞争问题!")
else:
print("✓ 成功解决并发竞争问题")
print("操作日志:")
for log in db.operation_log[:10]: # 只显示前10条
print(f" {log}")
return {
"total_tasks": num_tasks,
"successful": len(successful),
"failed": len(failed),
"duplicates": duplicates,
"execution_time": end_time - start_time
}
async def run_comparison_demo():
"""运行对比演示"""
print("🔧 获取密集库位处理器并发问题修复演示")
print("=" * 60)
db = MockDatabase()
num_concurrent_tasks = 10 # 10个并发任务竞争5个库位
print(f"测试场景: {num_concurrent_tasks} 个并发任务竞争 {len(db.storage_sites)} 个库位")
# 测试旧方式
old_results = await simulate_old_approach(db, num_concurrent_tasks)
# 测试新方式
new_results = await simulate_new_approach(db, num_concurrent_tasks)
# 对比结果
print(f"\n=== 对比结果 ===")
print(f"旧方式 - 成功: {old_results['successful']}, 失败: {old_results['failed']}, 重复分配: {old_results['duplicates']}")
print(f"新方式 - 成功: {new_results['successful']}, 失败: {new_results['failed']}, 重复分配: {new_results['duplicates']}")
if old_results['duplicates'] > 0 and new_results['duplicates'] == 0:
print("🎉 优化成功!新方式完全解决了并发竞争问题")
elif old_results['duplicates'] == new_results['duplicates'] == 0:
print(" 在当前测试条件下,两种方式都没有出现竞争问题")
print(" 可能需要增加并发压力或调整延迟来复现问题")
else:
print("⚠️ 优化可能不够完善,需要进一步检查")
# 性能对比
performance_diff = ((new_results['execution_time'] - old_results['execution_time']) / old_results['execution_time']) * 100
if performance_diff > 0:
print(f"📊 性能: 新方式比旧方式慢 {performance_diff:.1f}%")
else:
print(f"📊 性能: 新方式比旧方式快 {abs(performance_diff):.1f}%")
async def stress_test():
"""压力测试"""
print(f"\n=== 压力测试 ===")
print("高并发场景下测试并发安全性...")
db = MockDatabase()
num_tasks = 50 # 50个并发任务
print(f"压力测试: {num_tasks} 个高并发任务")
# 只测试新方式的压力承受能力
results = await simulate_new_approach(db, num_tasks)
success_rate = (results['successful'] / results['total_tasks']) * 100
print(f"成功率: {success_rate:.1f}%")
print(f"平均每任务耗时: {results['execution_time']/results['total_tasks']*1000:.2f}ms")
if results['duplicates'] == 0:
print("✅ 压力测试通过:无并发竞争问题")
else:
print("❌ 压力测试失败:仍存在并发竞争问题")
if __name__ == "__main__":
asyncio.run(run_comparison_demo())
asyncio.run(stress_test())