#!/usr/bin/env python # -*- coding: utf-8 -*- """ 自动化任务测试脚本 用于测试create_new_task和gen_agv_scheduling_task接口 """ import asyncio import random import time import uuid import sys import os from typing import Dict, List, Any, Optional import aiohttp import json from datetime import datetime from asyncio import Semaphore, Queue # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from data.session import get_async_session from data.models.operate_point_layer import OperatePointLayer from data.models.external_task_record import VWEDExternalTaskRecord from services.task_record_service import TaskRecordService from data.enum.task_record_enum import TaskStatus from sqlalchemy import select, and_ # API配置 BASE_URL = "http://localhost:8001" CREATE_TASK_URL = f"{BASE_URL}/newTask" GEN_AGV_TASK_URL = f"{BASE_URL}/GenAgvSchedulingTask" # 任务类型配置 CREATE_TASK_TYPES = ["GG2MP", "GGFK2MP", "GT2MP", "GTFK2MP", "ZG2MP", "QZ2MP", "LG2MP", "PHZ2MP"] # 任务类型分组配置 - 确保同时运行的4个任务尽可能不同类型 TASK_TYPE_GROUPS = { "GG": ["GG2MP", "GGFK2MP"], # 钢构类型 "GT": ["GT2MP", "GTFK2MP"], # 钢台类型 "ZG": ["ZG2MP"], # 铸钢类型 "QZ": ["QZ2MP"], # 其他类型1 "LG": ["LG2MP"], # 其他类型2 "PHZ": ["PHZ2MP"] # 其他类型3 } # GEN_TASK_TYPES = ["MP2GG", "MP2GGFK", "MP2GT", "MP2GTFK", "MP2ZG", "MP2QZ", "MP2LG", "MP2PHZ"] CREATE_TASK_TYPES_JSON = { "GG2MP": "MP2GG", "GGFK2MP": "MP2GGFK", "GT2MP": "MP2GT", "GTFK2MP": "MP2GTFK", "ZG2MP": "MP2ZG", "QZ2MP": "MP2QZ", "LG2MP": "MP2LG", "PHZ2MP": "MP2PHZ", } # 任务类型对应的动作点映射 CREATE_TASK_TARGET_MAPPING = { "QZ2MP": ["AP135", "AP136"], "PHZ2MP": ["AP208", "AP209", "AP210", "AP211", "AP212", "AP213", "AP214", "AP215"], "LG2MP": ["AP216", "AP217"], "GT2MP": ["AP255", "AP256", "AP257", "AP258"], "GTFK2MP": ["AP255", "AP256", "AP257", "AP258"], "GG2MP": ["AP310", "AP311", "AP312", "AP313"], "GGFK2MP": ["AP310", "AP311", "AP312", "AP313"], "ZG2MP": ["AP297", "AP298"] } GEN_TASK_ENDPOINT_MAPPING = { "MP2GG": ["AP310", "AP311", "AP312", "AP313"], "MP2GGFK": ["AP310", "AP311", "AP312", "AP313"], "MP2GT": ["AP255", "AP256", "AP257", "AP258"], "MP2GTFK": ["AP255", "AP256", "AP257", "AP258"], "MP2ZG": ["AP297", "AP298"], "MP2QZ": ["AP135", "AP136"], "MP2LG": ["AP216", "AP217"], "MP2PHZ": ["AP208", "AP209", "AP210", "AP211", "AP212", "AP213", "AP214", "AP215"] } # 任务类型对应库区映射 TASK_TYPE_AREA_MAPPING = { "MP2GG": ["ZK/ZKG"], "MP2GGFK": ["ZK/ZKG"], "MP2GT": ["ZK/ZKG"], "MP2GTFK": ["ZK/ZKG"], "MP2ZG": ["ZK/ZKG"], "MP2QZ": ["KW"], "MP2LG": ["AGW/PL"], "MP2PHZ": ["AGW/PL"] } class AutomatedTaskTester: """自动化任务测试器""" def __init__(self, max_concurrent_tasks: int = 4, max_total_tasks: int = 400): self.used_req_codes = set() self.session = None self.create_task_records = [] # 存储create_new_task的记录 self.max_concurrent_tasks = max_concurrent_tasks # 最大并发任务数 self.max_total_tasks = max_total_tasks # 最大总任务数 self.semaphore = Semaphore(max_concurrent_tasks) # 并发控制信号量 self.task_queue = Queue(maxsize=max_total_tasks) # 任务队列 self.completed_tasks = 0 # 已完成任务数 self.success_count = 0 # 成功任务数 self.failed_count = 0 # 失败任务数 self.task_lock = asyncio.Lock() # 任务统计锁 self.running_task_types = set() # 当前运行的任务类型组 self.task_type_lock = asyncio.Lock() # 任务类型锁 async def __aenter__(self): """异步上下文管理器入口""" self.session = aiohttp.ClientSession() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" if self.session: await self.session.close() def generate_unique_req_code(self) -> str: """生成唯一的请求码""" while True: req_code = f"TEST_{int(time.time() * 1000)}_{random.randint(1000, 9999)}" if req_code not in self.used_req_codes: self.used_req_codes.add(req_code) return req_code async def get_available_storage_locations(self, area_names: List[str]) -> List[str]: """ 查询指定库区中已占用且未锁定的库位,如果找不到则持续查询直到找到 Args: area_names: 库区名称列表 Returns: List[str]: 可用库位的station_name列表 """ print(f"开始在库区 {area_names} 中查询可用库位...") attempt = 0 while True: attempt += 1 try: async with get_async_session() as db_session: stmt = select(OperatePointLayer).where( and_( OperatePointLayer.area_name.in_(area_names), OperatePointLayer.is_occupied == True, OperatePointLayer.is_locked == False, OperatePointLayer.is_deleted == False ) ) result = await db_session.execute(stmt) layers = result.scalars().all() station_names = [layer.station_name for layer in layers if layer.station_name] if station_names: print(f"✓ 第{attempt}次尝试:在库区 {area_names} 中找到 {len(station_names)} 个可用库位") return station_names else: print(f"✗ 第{attempt}次尝试:在库区 {area_names} 中未找到可用库位,等待2秒后重试...") await asyncio.sleep(2) except Exception as e: print(f"✗ 第{attempt}次查询库位时出错: {str(e)},等待2秒后重试...") await asyncio.sleep(2) def get_task_type_group(self, task_type: str) -> str: """ 获取任务类型对应的组名 Args: task_type: 任务类型 Returns: str: 组名 """ for group_name, types in TASK_TYPE_GROUPS.items(): if task_type in types: return group_name return "UNKNOWN" async def select_task_type_with_diversity(self) -> Optional[str]: """ 选择任务类型,确保尽可能不重复组类型 Returns: Optional[str]: 选择的任务类型,如果没有可用类型返回None """ async with self.task_type_lock: available_groups = [] # 找出当前未使用的组 for group_name in TASK_TYPE_GROUPS.keys(): if group_name not in self.running_task_types: available_groups.append(group_name) # 如果没有未使用的组,且当前运行任务少于最大并发数,可以重复使用组 if not available_groups and len(self.running_task_types) < self.max_concurrent_tasks: available_groups = list(TASK_TYPE_GROUPS.keys()) if not available_groups: return None # 所有组都在使用中 # 随机选择一个组 selected_group = random.choice(available_groups) # 从该组中随机选择一个任务类型 task_types = TASK_TYPE_GROUPS[selected_group] selected_task_type = random.choice(task_types) # 标记该组为使用中 self.running_task_types.add(selected_group) return selected_task_type async def release_task_type_group(self, task_type: str): """ 释放任务类型组 Args: task_type: 任务类型 """ async with self.task_type_lock: group_name = self.get_task_type_group(task_type) self.running_task_types.discard(group_name) async def monitor_task_status(self, req_code: str, timeout: int = 5) -> bool: """ 监控任务状态,超时或失败时返回False Args: req_code: 请求码 timeout: 超时时间(秒) Returns: bool: True表示任务成功或继续运行,False表示失败或超时 """ start_time = time.time() while time.time() - start_time < timeout: try: # 通过req_code查询external_task_record表获取task_record_id async with get_async_session() as db_session: stmt = select(VWEDExternalTaskRecord).where( VWEDExternalTaskRecord.req_code == req_code ) result = await db_session.execute(stmt) external_record = result.scalar_one_or_none() if external_record and external_record.task_record_id: # 调用TaskRecordService.get_task_record_detail获取任务状态 task_detail_result = await TaskRecordService.get_task_record_detail( external_record.task_record_id ) if task_detail_result.get("success", False): task_detail = task_detail_result.get("data", {}) task_status = task_detail.get("status", "") # 如果任务失败,返回False if task_status == TaskStatus.FAILED: print(f"任务 {req_code} 失败,状态: {task_status}") return False # 如果任务成功完成,返回True(可以继续后续步骤) if task_status == TaskStatus.COMPLETED: print(f"任务 {req_code} 完成,状态: {task_status}") return True # 任务还在运行中,继续监控 print(f"任务 {req_code} 运行中,状态: {task_status}") await asyncio.sleep(1) # 每秒检查一次 except Exception as e: print(f"监控任务状态异常: {str(e)}") await asyncio.sleep(1) # 超时,返回True继续执行(不阻塞后续流程) print(f"任务 {req_code} 监控超时,继续执行后续流程") return True async def monitor_agv_task_completion(self, req_code: str) -> bool: """ 监控GenAgvSchedulingTask任务完成状态,必须等待任务完成(成功或失败) Args: req_code: GenAgvSchedulingTask的请求码 Returns: bool: True表示任务完成(成功或失败),用于统计 """ print(f"开始监控AGV调度任务完成状态: {req_code}") while True: try: # 通过req_code查询external_task_record表获取task_record_id async with get_async_session() as db_session: stmt = select(VWEDExternalTaskRecord).where( VWEDExternalTaskRecord.req_code == req_code ) result = await db_session.execute(stmt) external_record = result.scalar_one_or_none() if external_record and external_record.task_record_id: # 调用TaskRecordService.get_task_record_detail获取任务状态 task_detail_result = await TaskRecordService.get_task_record_detail( external_record.task_record_id ) if task_detail_result.get("success", False): task_detail = task_detail_result.get("data", {}) task_status = task_detail.get("status", "") print(f"AGV调度任务 {req_code} 当前状态: {task_status}") # 如果任务完成(成功或失败),返回对应结果 if task_status == TaskStatus.COMPLETED: print(f"✓ AGV调度任务 {req_code} 执行成功完成") return True elif task_status == TaskStatus.FAILED: print(f"✗ AGV调度任务 {req_code} 执行失败") return False elif task_status == TaskStatus.CANCELED: print(f"⚠ AGV调度任务 {req_code} 被取消") return False # 任务还在运行中,继续监控 else: print(f"AGV调度任务 {req_code} 仍在运行中,状态: {task_status}") else: print(f"无法获取AGV调度任务 {req_code} 的详情,继续等待...") else: print(f"未找到AGV调度任务 {req_code} 的记录或task_record_id,继续等待...") await asyncio.sleep(2) # 每2秒检查一次 except Exception as e: print(f"监控AGV调度任务状态异常: {str(e)}") await asyncio.sleep(2) # 异常时等待2秒后重试 async def create_new_task(self, task_type: str, target_id: str, req_code: str) -> Optional[Dict[str, Any]]: """ 调用create_new_task接口创建新任务 Args: task_type: 任务类型 target_id: 目标ID Returns: Dict: 接口响应结果 """ payload = { "ReqCode": req_code, "SourceID": "", "TargetID": target_id, "TaskType": task_type } try: async with self.session.post(CREATE_TASK_URL, json=payload) as response: result = await response.json() print(f"创建任务 - 类型: {task_type}, 目标: {target_id}, 请求码: {req_code}") print(f"响应: {result}") if result.get("code") == 0: self.create_task_records.append({ "req_code": req_code, "task_type": task_type, "target_id": target_id, "response": result }) return result except Exception as e: print(f"创建任务失败: {str(e)}") return None async def gen_agv_scheduling_task(self, task_type: str, task_code: str, start_point: str, end_point: str) -> \ Optional[Dict[str, Any]]: """ 调用gen_agv_scheduling_task接口生成AGV调度任务 Args: task_type: 任务类型 task_code: 任务代码(create_new_task的ReqCode) start_point: 起点 end_point: 终点 Returns: Dict: 接口响应结果 """ req_code = self.generate_unique_req_code() payload = { "ReqCode": req_code, "TaskTyp": task_type, "TaskCode": task_code, "SecurityKey": "", "Type": "", "SubType": "", "AreaPositonCode": "", "AreaPositonName": "", "PositionCodePath": [ {"PositionCode": start_point, "Type": "start"}, {"PositionCode": end_point, "Type": "end"} ], "ClientCode": "", "TokenCode": "" } try: async with self.session.post(GEN_AGV_TASK_URL, json=payload) as response: result = await response.json() print(f"生成AGV调度 - 类型: {task_type}, 任务码: {task_code}, 路径: {start_point} -> {end_point}") print(f"响应: {result}") # 确保返回的结果包含我们生成的req_code,用于后续监控 if result and isinstance(result, dict): result["generated_req_code"] = req_code # 添加生成的req_code return result except Exception as e: print(f"生成AGV调度任务失败: {str(e)}") return None async def execute_single_task(self, task_id: int): """ 执行单个完整任务流程(在并发控制下) Args: task_id: 任务ID """ create_task_type = None async with self.semaphore: # 获取信号量,控制并发数 try: print(f"\n========== 任务 {task_id} 开始执行 ==========") # Step 1: 智能选择create_new_task的任务类型,确保不同类型分布 create_task_type = await self.select_task_type_with_diversity() if not create_task_type: print(f"✗ 任务 {task_id} - 无法选择任务类型(所有类型都在使用中),跳过") async with self.task_lock: self.failed_count += 1 self.completed_tasks += 1 return print(f"任务 {task_id} - 选择任务类型: {create_task_type} (组: {self.get_task_type_group(create_task_type)})") target_points = CREATE_TASK_TARGET_MAPPING.get(create_task_type, []) if not target_points: print(f"✗ 任务 {task_id} - 任务类型 {create_task_type} 没有配置对应的动作点,跳过") async with self.task_lock: self.failed_count += 1 self.completed_tasks += 1 return target_id = random.choice(target_points) print(f"任务 {task_id} - 1. 调用 newTask - 类型: {create_task_type}, 目标: {target_id}") req_code = self.generate_unique_req_code() create_result = await self.create_new_task(create_task_type, target_id, req_code) if not create_result or create_result.get("code") != 0: print(f"✗ 任务 {task_id} - newTask 调用失败,跳过后续步骤") async with self.task_lock: self.failed_count += 1 self.completed_tasks += 1 return print(f"✓ 任务 {task_id} - newTask 调用成功") # Step 1.5: 监控create_new_task创建的任务状态(5秒超时) print(f"任务 {task_id} - 开始监控任务状态...") task_continue = await self.monitor_task_status(req_code, timeout=5) if not task_continue: print(f"✗ 任务 {task_id} - 任务监控失败,终止执行") async with self.task_lock: self.failed_count += 1 self.completed_tasks += 1 return print(f"✓ 任务 {task_id} - 任务监控通过,继续执行") # Step 2: 随机选择gen_agv_scheduling_task的任务类型 gen_task_type = CREATE_TASK_TYPES_JSON.get(create_task_type) task_code = req_code # 获取起点(从库区中持续查询直到找到) area_names = TASK_TYPE_AREA_MAPPING.get(gen_task_type, []) if not area_names: print(f"✗ 任务 {task_id} - 任务类型 {gen_task_type} 没有配置对应的库区,跳过") async with self.task_lock: self.failed_count += 1 self.completed_tasks += 1 return print(f"任务 {task_id} - 2. 查询库区 {area_names} 中的可用库位...") available_locations = await self.get_available_storage_locations(area_names) start_point = random.choice(available_locations) # 获取终点 end_points = GEN_TASK_ENDPOINT_MAPPING.get(gen_task_type, []) if not end_points: print(f"✗ 任务 {task_id} - 任务类型 {gen_task_type} 没有配置对应的终点,跳过") async with self.task_lock: self.failed_count += 1 self.completed_tasks += 1 return print(f"任务 {task_id} - 3. 调用 GenAgvSchedulingTask - 类型: {gen_task_type}, 任务码: {task_code}") gen_result = await self.gen_agv_scheduling_task(gen_task_type, task_code, start_point, target_id) if gen_result: print(f"✓ 任务 {task_id} - GenAgvSchedulingTask 调用成功") # Step 3.5: 监控GenAgvSchedulingTask任务直到完成 # 获取GenAgvSchedulingTask的req_code (gen_agv_scheduling_task方法中生成的req_code) # 优先使用接口返回的reqCode,如果没有则使用我们生成的generated_req_code agv_req_code = gen_result.get("reqCode") or gen_result.get("generated_req_code") if agv_req_code: print(f"任务 {task_id} - 4. 开始监控AGV调度任务完成状态...") agv_task_success = await self.monitor_agv_task_completion(agv_req_code) async with self.task_lock: if agv_task_success: print(f"✓ 任务 {task_id} - 完整流程执行成功(AGV任务已完成)") self.success_count += 1 else: print(f"✗ 任务 {task_id} - AGV调度任务执行失败") self.failed_count += 1 self.completed_tasks += 1 else: # 如果无法获取agv_req_code,仍然记录为成功(接口调用成功) print(f"⚠ 任务 {task_id} - 无法获取AGV调度任务的请求码,标记为成功") async with self.task_lock: self.success_count += 1 self.completed_tasks += 1 else: print(f"✗ 任务 {task_id} - GenAgvSchedulingTask 调用失败") async with self.task_lock: self.failed_count += 1 self.completed_tasks += 1 # 任务间隔 await asyncio.sleep(1) except Exception as e: print(f"✗ 任务 {task_id} 执行异常: {str(e)}") async with self.task_lock: self.failed_count += 1 self.completed_tasks += 1 finally: # 确保任务类型组被释放 if create_task_type: await self.release_task_type_group(create_task_type) async def run_complete_task_workflow(self, count: int = 500): """ 运行完整的任务工作流测试(多线程并发执行,最多4个并发任务) Args: count: 测试次数(最大400个) """ # 限制总任务数不超过最大值 count = min(count, self.max_total_tasks) print(f"\n开始运行完整任务工作流测试,共 {count} 个任务") print(f"最大并发数: {self.max_concurrent_tasks},最大总任务数: {self.max_total_tasks}") print("=" * 80) # 重置统计计数器 async with self.task_lock: self.completed_tasks = 0 self.success_count = 0 self.failed_count = 0 # 创建所有任务 tasks = [] for i in range(count): task = asyncio.create_task(self.execute_single_task(i + 1)) tasks.append(task) # 启动进度监控任务 monitor_task = asyncio.create_task(self.monitor_progress(count)) # 等待所有任务完成 await asyncio.gather(*tasks, return_exceptions=True) # 停止进度监控 monitor_task.cancel() # 最终统计 print(f"\n========== 最终统计 ===========") print(f"总任务数: {count}") print(f"成功: {self.success_count} 个") print(f"失败: {self.failed_count} 个") print(f"成功率: {self.success_count / count * 100:.1f}%") print("=" * 40) async def monitor_progress(self, total_count: int): """ 监控任务执行进度 Args: total_count: 总任务数 """ try: while True: await asyncio.sleep(10) # 每10秒输出一次进度 async with self.task_lock: if self.completed_tasks >= total_count: break print(f"\n========== 进度统计 ===========") print(f"已完成: {self.completed_tasks}/{total_count} 个任务") print(f"成功: {self.success_count} 个") print(f"失败: {self.failed_count} 个") if self.completed_tasks > 0: print(f"成功率: {self.success_count / self.completed_tasks * 100:.1f}%") print(f"当前并发数: {self.max_concurrent_tasks - self.semaphore._value}") print(f"运行中的任务类型组: {list(self.running_task_types)}") print("=" * 40) except asyncio.CancelledError: pass async def main(): """主函数""" print("自动化任务压测脚本启动") print(f"启动时间: {datetime.now()}") print(f"API基础URL: {BASE_URL}") print(f"压测任务量: 400个(最大并发4个)") print(f"测试流程: newTask -> 监控任务状态 -> GenAgvSchedulingTask -> 监控AGV任务完成") start_time = time.time() async with AutomatedTaskTester(max_concurrent_tasks=4, max_total_tasks=400) as tester: # 运行完整工作流测试 - 最大并发4个任务 await tester.run_complete_task_workflow(count=400) end_time = time.time() duration = end_time - start_time print(f"\n========== 压测完成 ===========") print(f"结束时间: {datetime.now()}") print(f"总耗时: {duration:.2f} 秒") print(f"平均每个任务耗时: {duration / 400:.2f} 秒") print("=" * 40) if __name__ == "__main__": asyncio.run(main())