VWED_server/routes/external_task_api.py

1115 lines
45 KiB
Python
Raw Permalink Normal View History

2025-07-30 15:11:59 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
外部任务接口API模块
提供外部系统调用的任务创建接口
"""
import json
2025-08-13 15:27:04 +08:00
import asyncio
import aiohttp
2025-07-30 15:11:59 +08:00
from typing import Dict, Any
2025-08-13 15:27:04 +08:00
from fastapi import APIRouter, Body, Request, Path
from routes.model.external_task_model import ExternalTaskRequest, ExternalTaskResponse, TaskTypeEnum, GenAgvSchedulingTaskRequest, CancelTaskRequest
2025-07-30 15:11:59 +08:00
from routes.model.task_edit_model import TaskEditRunRequest, TaskInputParamNew, InputParamType
from services.task_edit_service import TaskEditService
2025-08-13 15:27:04 +08:00
from services.external_task_record_service import ExternalTaskRecordService
from services.task_record_service import TaskRecordService
from services.sync_service import set_task_terminated
2025-07-30 15:11:59 +08:00
from routes.common_api import format_response, error_response
from utils.logger import get_logger
2025-08-13 15:27:04 +08:00
from data.enum.task_record_enum import SourceType, TaskStatus
from data.models.external_task_record import ExternalTaskStatusEnum
from config.tf_api_config import TF_API_TOKEN, TF_API_BASE_URL, CM_ID, DG_ID, TASK_TYPE_PRIORITY, TASK_TYPE_AREA, TF_WEB_POST, sync_disabled_label
2025-07-30 15:11:59 +08:00
# 创建路由
router = APIRouter(
prefix="",
tags=["外部任务接口"]
)
# 设置日志
logger = get_logger("app.external_task_api")
2025-08-13 15:27:04 +08:00
# 外部回调接口URL
EXTERNAL_CALLBACK_URL = "http://roh.vwfawedl.mobi:9001/AGVService/ContainerSendBackRequest" # 生产线到毛坯库任务
AGV_GOODS_MOVE_URL = "http://roh.vwfawedl.mobi:9001/AGVService/HUGoodsMoveRequest" # 毛坯库到产线任务
async def call_external_callback(arrival_no: str, arrival_user: str = "000307") -> bool:
"""
调用外部回调接口
Args:
arrival_no: 到货编号ReqCode
arrival_user: 到货用户固定值 000307
Returns:
bool: 调用是否成功返回result为0
"""
payload = {
"arrival_no": arrival_no,
"arrival_user": arrival_user
}
max_retries = 1000 # 最大重试次数,防止无限循环
retry_count = 0
while retry_count < max_retries:
try:
async with aiohttp.ClientSession() as session:
async with session.post(EXTERNAL_CALLBACK_URL, json=payload) as response:
result = await response.json()
logger.info(f"外部接口调用响应: {result}, arrival_no={arrival_no}, 重试次数={retry_count}")
# 检查响应结果
if result.get("result") == "0":
logger.info(f"外部接口调用成功: arrival_no={arrival_no}, 总重试次数={retry_count}")
return True
elif result.get("result") == "1":
logger.info(f"外部接口返回result=1继续重试: arrival_no={arrival_no}, 重试次数={retry_count}")
retry_count += 1
await asyncio.sleep(5) # 等待5秒后重试
else:
logger.error(f"外部接口返回异常结果: {result}, arrival_no={arrival_no}")
retry_count += 1
await asyncio.sleep(5)
except Exception as e:
logger.error(f"调用外部接口异常: {str(e)}, arrival_no={arrival_no}, 重试次数={retry_count}")
retry_count += 1
await asyncio.sleep(5) # 等待5秒后重试
logger.error(f"外部接口调用失败,已达到最大重试次数: arrival_no={arrival_no}, 最大重试次数={max_retries}")
return False
async def call_agv_goods_move_callback(pid: str, user_id: str = "000307") -> bool:
"""
调用AGV货物移动回调接口
Args:
pid: 对应的req_code
user_id: 用户ID固定值 000307
Returns:
bool: 调用是否成功返回result为0
"""
payload = {
"PID": pid,
"UserID": user_id
}
2025-08-15 10:58:25 +08:00
max_retries = 100 # 最大重试次数,防止无限循环
2025-08-13 15:27:04 +08:00
retry_count = 0
while retry_count < max_retries:
try:
async with aiohttp.ClientSession() as session:
async with session.post(AGV_GOODS_MOVE_URL, json=payload) as response:
result = await response.json()
logger.info(f"AGV货物移动接口调用响应: {result}, PID={pid}, 重试次数={retry_count}")
# 检查响应结果
if result.get("result") == "0":
logger.info(f"AGV货物移动接口调用成功: PID={pid}, 总重试次数={retry_count}")
return True
elif result.get("result") == "1":
logger.info(f"AGV货物移动接口返回result=1继续重试: PID={pid}, 重试次数={retry_count}")
retry_count += 1
await asyncio.sleep(5) # 等待5秒后重试
else:
logger.error(f"AGV货物移动接口返回异常结果: {result}, PID={pid}")
retry_count += 1
await asyncio.sleep(5)
except Exception as e:
logger.error(f"调用AGV货物移动接口异常: {str(e)}, PID={pid}, 重试次数={retry_count}")
retry_count += 1
await asyncio.sleep(5) # 等待5秒后重试
logger.error(f"AGV货物移动接口调用失败已达到最大重试次数: PID={pid}, 最大重试次数={max_retries}")
return False
async def monitor_task_and_callback(task_record_id: str, req_code: str):
"""
异步监控任务状态并在成功时调用外部回调接口
Args:
task_record_id: 任务记录ID
req_code: 请求码用作arrival_no
"""
logger.info(f"开始监控任务状态: task_record_id={task_record_id}, req_code={req_code}")
# max_wait_time = 1800 # 最大等待时间30分钟
# wait_count = 0
while True:
try:
task_detail_result = await TaskRecordService.get_task_record_detail(task_record_id)
if task_detail_result.get("success", False):
task_detail = task_detail_result.get("data", {})
task_status = task_detail.get("status", "")
logger.info(f"监控任务状态: task_record_id={task_record_id}, status={task_status}")
# 如果任务已完成(成功)
if task_status == TaskStatus.COMPLETED:
logger.info(f"任务执行成功,开始调用外部回调接口: task_record_id={task_record_id}, req_code={req_code}")
# 调用外部回调接口
success = await call_external_callback(req_code)
if success:
logger.info(f"外部回调接口调用成功: task_record_id={task_record_id}, req_code={req_code}")
else:
logger.error(f"外部回调接口调用失败: task_record_id={task_record_id}, req_code={req_code}")
break
# 如果任务已失败或取消
elif task_status in [TaskStatus.FAILED, TaskStatus.CANCELED]:
logger.info(f"任务执行失败或取消,不调用外部回调接口: task_record_id={task_record_id}, status={task_status}")
break
# 任务还在运行中,继续等待
else:
logger.debug(f"任务仍在执行中,继续等待: task_record_id={task_record_id}, status={task_status}")
await asyncio.sleep(2) # 等待10秒
# wait_count += 10
else:
logger.warning(f"无法获取任务详情,继续等待: task_record_id={task_record_id}")
await asyncio.sleep(2) # 等待10秒
# wait_count += 10
except Exception as e:
logger.error(f"监控任务状态时出现异常: {str(e)}, task_record_id={task_record_id}")
await asyncio.sleep(2) # 等待10秒
# wait_count += 10
async def monitor_agv_task_and_callback(task_record_id: str, req_code: str):
"""
异步监控AGV调度任务状态并在成功时调用AGV货物移动回调接口
Args:
task_record_id: 任务记录ID
req_code: 请求码用作PID
"""
logger.info(f"开始监控AGV调度任务状态: task_record_id={task_record_id}, req_code={req_code}")
while True:
try:
task_detail_result = await TaskRecordService.get_task_record_detail(task_record_id)
if task_detail_result.get("success", False):
task_detail = task_detail_result.get("data", {})
task_status = task_detail.get("status", "")
logger.info(f"监控AGV调度任务状态: task_record_id={task_record_id}, status={task_status}")
# 如果任务已完成(成功)
if task_status == TaskStatus.COMPLETED:
logger.info(f"AGV调度任务执行成功开始调用AGV货物移动回调接口: task_record_id={task_record_id}, req_code={req_code}")
# 调用AGV货物移动回调接口
success = await call_agv_goods_move_callback(req_code)
if success:
logger.info(f"AGV货物移动回调接口调用成功: task_record_id={task_record_id}, req_code={req_code}")
else:
logger.error(f"AGV货物移动回调接口调用失败: task_record_id={task_record_id}, req_code={req_code}")
break
# 如果任务已失败或取消
elif task_status in [TaskStatus.FAILED, TaskStatus.CANCELED]:
logger.info(f"AGV调度任务执行失败或取消不调用AGV货物移动回调接口: task_record_id={task_record_id}, status={task_status}")
break
# 任务还在运行中,继续等待
else:
logger.debug(f"AGV调度任务仍在执行中继续等待: task_record_id={task_record_id}, status={task_status}")
await asyncio.sleep(2) # 等待2秒
else:
logger.warning(f"无法获取AGV调度任务详情继续等待: task_record_id={task_record_id}")
await asyncio.sleep(2) # 等待2秒
except Exception as e:
logger.error(f"监控AGV调度任务状态时出现异常: {str(e)}, task_record_id={task_record_id}")
await asyncio.sleep(2) # 等待2秒
async def check_task_permission(tf_api_token: str, tf_api_base_url: str, module_name: str = "其他") -> bool:
"""
检查是否允许处理任务
调用参数配置-三方接口调用接口检查系统限制
Args:
tf_api_token: API访问令牌
tf_api_base_url: API基础URL
module_name: 模块名称默认为"其他"
Returns:
bool: True表示允许处理任务False表示被限制
"""
headers = {
"X-Access-Token": tf_api_token,
"Content-Type": "text/plain"
}
# 构建 API URL
api_url = f"{tf_api_base_url}/parameter/getByModule"
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url, data=module_name, headers=headers) as response:
if response.status == 200:
result = await response.json()
logger.info(f"参数配置接口调用成功: result={result}")
# 检查响应格式
if result.get("success", False):
parameter_result = result.get("result", {})
sync_disabled = parameter_result.get(sync_disabled_label, "false")
# 如果 sync_disabled 为 "true",则被限制
if sync_disabled == "true":
logger.warning("系统限制创建任务: sync_disabled=true")
return False
else:
logger.info("系统允许创建任务: sync_disabled=false")
return True
else:
# 如果接口调用失败,默认允许处理任务
logger.warning(f"参数配置接口调用失败: {result.get('message', '未知错误')}")
return True
else:
logger.error(f"参数配置接口调用失败: status={response.status}")
response_text = await response.text()
logger.error(f"响应内容: {response_text}")
# 如果接口调用失败,默认允许处理任务
return True
except Exception as e:
logger.error(f"检查任务权限异常: error={str(e)}")
# 如果出现异常,默认允许处理任务
return True
async def get_amr_loading_state(task_record_id: str, tf_api_token: str) -> Dict[str, Any]:
"""
获取任务中小车负载状态
Args:
task_record_id: 天风任务ID
tf_api_token: API访问令牌
Returns:
Dict[str, Any]: 包含小车负载状态的响应数据
"""
headers = {
"X-Access-Token": tf_api_token,
"Content-Type": "application/json"
}
# 构建 API URL
api_url = f"{TF_API_BASE_URL}/task/vwedtask/{task_record_id}/getAmrState"
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=headers) as response:
if response.status == 200:
result = await response.json()
logger.info(f"获取小车负载状态成功: task_record_id={task_record_id}, result={result}")
return result
else:
logger.error(f"获取小车负载状态失败: task_record_id={task_record_id}, status={response.status}")
response_text = await response.text()
logger.error(f"响应内容: {response_text}")
return {
"success": False,
"message": f"HTTP {response.status}: {response_text}",
"code": response.status
}
except Exception as e:
logger.error(f"获取小车负载状态异常: task_record_id={task_record_id}, error={str(e)}")
return {
"success": False,
"message": f"请求异常: {str(e)}",
"code": 500
}
# # 任务类型到任务优先级
2025-07-30 15:11:59 +08:00
TASK_TYPE_TEMPLATE_MAPPING = {
2025-08-13 15:27:04 +08:00
TaskTypeEnum.GG2MP: "GG",
TaskTypeEnum.GGFK2MP: "GG",
TaskTypeEnum.GT2MP: "GT",
TaskTypeEnum.GTFK2MP: "GT",
TaskTypeEnum.ZG2MP: "ZG",
TaskTypeEnum.QZ2MP: "QZ",
TaskTypeEnum.LG2MP: "LG",
TaskTypeEnum.PHZ2MP: "PHZ",
TaskTypeEnum.MP2GG: "GG",
TaskTypeEnum.MP2GGFK: "GG",
TaskTypeEnum.MP2GT: "GT",
TaskTypeEnum.MP2GTFK: "GT",
TaskTypeEnum.MP2ZG: "ZG",
TaskTypeEnum.MP2QZ: "QZ",
TaskTypeEnum.MP2LG: "LG",
TaskTypeEnum.MP2PHZ: "PHZ",
2025-07-30 15:11:59 +08:00
}
2025-08-13 15:27:04 +08:00
TASK_TYPE_REMARK = {
TaskTypeEnum.GG2MP: "缸盖:{0}-毛坯库:{1}",
TaskTypeEnum.GGFK2MP: "缸盖返空:{0}-毛坯库:{1}",
TaskTypeEnum.GT2MP: "缸体:{0}-毛坯库:{1}",
TaskTypeEnum.GTFK2MP: "缸体返空:{0}-毛坯库:{1}",
TaskTypeEnum.ZG2MP: "罩盖:{0}-毛坯库:{1}",
TaskTypeEnum.QZ2MP: "曲轴:{0}-毛坯库:{1}",
TaskTypeEnum.LG2MP: "连杆:{0}-毛坯库:{1}",
TaskTypeEnum.PHZ2MP: "平衡轴:{0}-毛坯库:{1}",
TaskTypeEnum.MP2GG: "毛坯库:{0}-缸盖:{1}",
TaskTypeEnum.MP2GGFK: "毛坯库:{0}-缸盖返空:{1}",
TaskTypeEnum.MP2GT: "毛坯库:{0}-缸体:{1}",
TaskTypeEnum.MP2GTFK: "毛坯库:{0}-缸体返空:{1}",
TaskTypeEnum.MP2ZG: "毛坯库:{0}-罩盖:{1}",
TaskTypeEnum.MP2QZ: "毛坯库:{0}-曲轴:{1}",
TaskTypeEnum.MP2LG: "毛坯库:{0}-连杆:{1}",
TaskTypeEnum.MP2PHZ: "毛坯库:{0}-平衡轴:{1}",
}
2025-07-30 15:11:59 +08:00
@router.post("/newTask")
async def create_new_task(request: Request, task_request: ExternalTaskRequest = Body(...)):
"""
创建新任务接口
根据任务类型自动选择对应的任务模板并执行任务
2025-08-13 15:27:04 +08:00
2025-07-30 15:11:59 +08:00
Args:
task_request: 外部任务创建请求包含ReqCodeSourceIDTargetIDTaskType
2025-08-13 15:27:04 +08:00
2025-07-30 15:11:59 +08:00
Returns:
ExternalTaskResponse: 包含codereqCodemessagerowCount的响应
"""
2025-08-13 15:27:04 +08:00
external_record = None
2025-07-30 15:11:59 +08:00
try:
2025-08-13 15:27:04 +08:00
logger.info(f"收到外部任务创建请求: {task_request}")
# 检查系统是否允许处理任务
tf_api_token = TF_API_TOKEN
is_allowed = await check_task_permission(tf_api_token, TF_API_BASE_URL)
if not is_allowed:
logger.error(f"系统限制创建任务: ReqCode={task_request.ReqCode}")
return ExternalTaskResponse(
code=500,
reqCode=task_request.ReqCode,
message="由于系统限制创建任务失败,请联系管理员",
rowCount=0
)
# 获取客户端信息
client_ip = request.client.host if request.client else None
user_agent = request.headers.get("user-agent", "")
client_info = {
"user_agent": user_agent,
"headers": dict(request.headers),
"method": request.method,
"url": str(request.url)
}
client_info_str = json.dumps(client_info, ensure_ascii=False)
2025-07-30 15:11:59 +08:00
# 根据任务类型获取对应的模板ID
2025-08-13 15:27:04 +08:00
template_id = CM_ID
2025-07-30 15:11:59 +08:00
if not template_id:
logger.error(f"不支持的任务类型: {task_request.TaskType}")
return ExternalTaskResponse(
code=400,
reqCode=task_request.ReqCode,
message=f"不支持的任务类型: {task_request.TaskType}",
rowCount=0
)
2025-08-13 15:27:04 +08:00
# 创建外部任务记录
external_record = await ExternalTaskRecordService.create_new_task_record(
req_code=task_request.ReqCode,
source_id=task_request.SourceID or "",
target_id=task_request.TargetID,
business_task_type=task_request.TaskType.value,
template_id=template_id,
request_params=task_request.dict(),
client_ip=client_ip,
client_info=client_info_str
)
# 根据站点名称查询对应的库位ID
from data.session import get_async_session
from data.models.operate_point_layer import OperatePointLayer
from sqlalchemy import select
target_storage_location_id = ""
async with get_async_session() as session:
# 查询target_id对应的库位ID选择第一个
stmt = select(OperatePointLayer).where(
OperatePointLayer.station_name == task_request.TargetID,
OperatePointLayer.is_deleted == False
).limit(1)
result = await session.execute(stmt)
target_layer = result.scalar_one_or_none()
if target_layer:
target_storage_location_id = target_layer.layer_name
storage_area = TASK_TYPE_AREA.get(TASK_TYPE_TEMPLATE_MAPPING.get(task_request.TaskType))
priority = TASK_TYPE_PRIORITY.get(TASK_TYPE_TEMPLATE_MAPPING.get(task_request.TaskType, "OR"))
remark = TASK_TYPE_REMARK.get(task_request.TaskType)
2025-08-15 10:58:25 +08:00
remark = remark.format(target_storage_location_id, storage_area)
2025-08-13 15:27:04 +08:00
area_obj = TaskInputParamNew(
name="area",
2025-07-30 15:11:59 +08:00
type=InputParamType.STRING,
2025-08-13 15:27:04 +08:00
label="库区",
2025-07-30 15:11:59 +08:00
required=True,
2025-08-13 15:27:04 +08:00
defaultValue=storage_area,
remark="任务类型对应所属库区")
target_obj = TaskInputParamNew(
name="target_id",
2025-07-30 15:11:59 +08:00
type=InputParamType.STRING,
2025-08-13 15:27:04 +08:00
label="库位id",
2025-07-30 15:11:59 +08:00
required=True,
2025-08-13 15:27:04 +08:00
defaultValue=target_storage_location_id,
remark="取货id")
priority_obj = TaskInputParamNew(
name="priority",
type=InputParamType.STRING,
label="优先级",
required=True,
defaultValue=priority,
remark="任务级优先级")
remark_obj = TaskInputParamNew(
name="REMARK",
type=InputParamType.STRING,
label="任务备注",
required=True,
defaultValue=remark,
remark="任务详情备注")
2025-08-15 10:58:25 +08:00
req_code = TaskInputParamNew(
name="req_code",
type=InputParamType.STRING,
label="请求码",
required=True,
defaultValue=task_request.ReqCode,
remark="任务请求的唯一编码")
task_type = TaskInputParamNew(
name="task_type",
type=InputParamType.STRING,
label="任务类型",
required=True,
defaultValue=task_request.TaskType,
remark="任务类型")
task_params = [area_obj, target_obj, priority_obj, remark_obj, req_code, task_type]
2025-08-13 15:27:04 +08:00
2025-07-30 15:11:59 +08:00
# 构造任务执行请求
run_request = TaskEditRunRequest(
taskId=template_id,
params=task_params,
source_type=SourceType.SYSTEM_SCHEDULING, # 第三方系统
source_system="EXTERNAL_API", # 外部接口系统标识
source_device=request.client.host if request.client else "unknown", # 使用客户端IP作为设备标识
use_modbus=False,
2025-08-13 15:27:04 +08:00
modbus_timeout=5000,
priority = priority
2025-07-30 15:11:59 +08:00
)
2025-08-13 15:27:04 +08:00
# 更新外部任务记录状态为运行中
await ExternalTaskRecordService.update_task_record_status(
req_code=external_record.id,
task_status=ExternalTaskStatusEnum.RUNNING
)
2025-07-30 15:11:59 +08:00
tf_api_token = TF_API_TOKEN
2025-08-13 15:27:04 +08:00
2025-07-30 15:11:59 +08:00
# 调用任务执行服务
result = await TaskEditService.run_task(
run_request,
client_ip=client_ip,
client_info=client_info_str,
2025-08-13 15:27:04 +08:00
tf_api_token=tf_api_token,
# priority=priority
2025-07-30 15:11:59 +08:00
)
if result is None:
logger.error(f"任务启动失败: ReqCode={task_request.ReqCode}")
2025-08-13 15:27:04 +08:00
# 更新外部任务记录状态为失败
await ExternalTaskRecordService.update_task_record_status(
req_code=external_record.id,
task_status=ExternalTaskStatusEnum.FAILED,
response_code=500,
response_message="任务启动失败",
response_row_count=0,
error_message="任务启动失败"
)
2025-07-30 15:11:59 +08:00
return ExternalTaskResponse(
code=500,
reqCode=task_request.ReqCode,
message="任务启动失败",
rowCount=0
)
2025-08-13 15:27:04 +08:00
2025-07-30 15:11:59 +08:00
if not result.get("success", False):
2025-08-13 15:27:04 +08:00
error_msg = result.get("message", "任务启动失败")
logger.error(f"任务启动失败: {error_msg}, ReqCode={task_request.ReqCode}")
# 更新外部任务记录状态为失败
await ExternalTaskRecordService.update_task_record_status(
req_code=external_record.id,
task_status=ExternalTaskStatusEnum.FAILED,
response_code=500,
response_message=error_msg,
response_row_count=0,
error_message=error_msg
)
2025-07-30 15:11:59 +08:00
return ExternalTaskResponse(
code=500,
reqCode=task_request.ReqCode,
2025-08-13 15:27:04 +08:00
message=error_msg,
2025-07-30 15:11:59 +08:00
rowCount=0
)
2025-08-13 15:27:04 +08:00
# 更新外部任务记录状态为成功
task_record_id = result.get('taskRecordId')
await ExternalTaskRecordService.update_task_record_status(
req_code=external_record.id,
task_status=ExternalTaskStatusEnum.SUCCESS,
task_record_id=task_record_id,
response_code=0,
response_message="成功",
response_row_count=1,
response_data=result
)
2025-07-30 15:11:59 +08:00
return ExternalTaskResponse(
code=0,
reqCode=task_request.ReqCode,
message="成功",
rowCount=1
)
except Exception as e:
logger.error(f"创建外部任务异常: {str(e)}, ReqCode={task_request.ReqCode}")
2025-08-13 15:27:04 +08:00
# 如果已创建外部任务记录,更新状态为失败
if external_record:
try:
await ExternalTaskRecordService.update_task_record_status(
req_code=external_record.id,
task_status=ExternalTaskStatusEnum.FAILED,
response_code=500,
response_message=f"创建任务失败: {str(e)}",
response_row_count=0,
error_message=str(e)
)
except Exception as update_error:
logger.error(f"更新外部任务记录状态失败: {str(update_error)}")
2025-07-30 15:11:59 +08:00
return ExternalTaskResponse(
code=500,
reqCode=task_request.ReqCode,
message=f"创建任务失败: {str(e)}",
rowCount=0
)
@router.post("/GenAgvSchedulingTask")
async def gen_agv_scheduling_task(request: Request, task_request: GenAgvSchedulingTaskRequest = Body(...)):
"""
AGV调度任务接口
用于生成AGV调度任务
2025-08-13 15:27:04 +08:00
逻辑
1. 根据 taskcode 参数查询 external_task_record 表获取对应的 task_record_id
2. 调用 get_task_record_detail 接口查询任务运行状态
3. 如果 TaskTyp 对应的 TASK_TYPE_TEMPLATE_MAPPING 值不等于 "GT"需要等任务运行成功后才能运行本接口
4. 如果是 "GT" 类型直接运行任务
2025-07-30 15:11:59 +08:00
Args:
task_request: AGV调度任务请求包含ReqCodeTaskTypSecurityKey等参数
Returns:
ExternalTaskResponse: 包含codereqCodemessagerowCount的响应
"""
2025-08-13 15:27:04 +08:00
priority = TASK_TYPE_PRIORITY.get(TASK_TYPE_TEMPLATE_MAPPING.get(task_request.TaskTyp, "OR"))
remark = TASK_TYPE_REMARK.get(task_request.TaskTyp)
external_record = None
2025-07-30 15:11:59 +08:00
try:
2025-08-13 15:27:04 +08:00
logger.info(f"收到AGV调度任务请求:{task_request}")
# 检查系统是否允许处理任务
tf_api_token = TF_API_TOKEN
is_allowed = await check_task_permission(tf_api_token, TF_API_BASE_URL)
if not is_allowed:
logger.error(f"系统限制创建AGV调度任务: ReqCode={task_request.ReqCode}")
return ExternalTaskResponse(
code=500,
reqCode=task_request.ReqCode,
message="由于系统限制创建任务失败,请联系管理员",
rowCount=0
)
# 导入数据库相关模块
from data.session import get_async_session
from data.models.operate_point_layer import OperatePointLayer
from sqlalchemy import select
# 验证任务条件已移至脚本处理器中,此处保留简单检查
if not task_request.TaskCode:
return ExternalTaskResponse(
code=500,
reqCode=task_request.ReqCode,
message=f"创建任务失败: task_request.TaskCode 任务id为空",
rowCount=0
)
# 获取客户端信息
client_ip = request.client.host if request.client else None
user_agent = request.headers.get("user-agent", "")
client_info = {
"user_agent": user_agent,
"headers": dict(request.headers),
"method": request.method,
"url": str(request.url)
}
client_info_str = json.dumps(client_info, ensure_ascii=False)
2025-07-30 15:11:59 +08:00
# 根据任务类型获取对应的模板ID
2025-08-13 15:27:04 +08:00
template_id = DG_ID
# 创建外部任务记录
external_record = await ExternalTaskRecordService.create_agv_scheduling_task_record(
req_code=task_request.ReqCode,
task_code=task_request.TaskCode,
business_task_type=task_request.TaskTyp.value if hasattr(task_request.TaskTyp, 'value') else str(task_request.TaskTyp),
security_key=task_request.SecurityKey or "",
type_field=task_request.Type,
sub_type=task_request.SubType,
area_position_code=task_request.AreaPositonCode,
area_position_name=task_request.AreaPositonName,
position_code_path=[path.dict() for path in task_request.PositionCodePath],
template_id=template_id or "",
request_params=task_request.dict(),
client_code=task_request.ClientCode or "",
token_code=task_request.TokenCode or "",
client_ip=client_ip,
client_info=client_info_str
)
start_node_obj, end_node_obj = task_request.PositionCodePath
start_node = start_node_obj.PositionCode
end_node = end_node_obj.PositionCode
# 根据站点名称查询对应的库位ID
start_storage_location_id = ""
end_storage_location_id = ""
async with get_async_session() as session:
# 查询start_node对应的库位ID选择第一个
stmt = select(OperatePointLayer).where(
OperatePointLayer.station_name == start_node,
OperatePointLayer.is_deleted == False
).limit(1)
result = await session.execute(stmt)
start_layer = result.scalar_one_or_none()
if start_layer:
start_storage_location_id = start_layer.layer_name
# 查询end_node对应的库位ID选择第一个
stmt = select(OperatePointLayer).where(
OperatePointLayer.station_name == end_node,
OperatePointLayer.is_deleted == False
).limit(1)
result = await session.execute(stmt)
end_layer = result.scalar_one_or_none()
if end_layer:
end_storage_location_id = end_layer.layer_name
2025-07-30 15:11:59 +08:00
# 构造任务运行参数
task_params = []
2025-08-15 10:58:25 +08:00
remark = remark.format(start_storage_location_id, end_storage_location_id)
2025-07-30 15:11:59 +08:00
# 添加任务代码参数
task_params.append(TaskInputParamNew(
2025-08-13 15:27:04 +08:00
name="START_WL",
2025-07-30 15:11:59 +08:00
type=InputParamType.STRING,
2025-08-13 15:27:04 +08:00
label="取货库位id",
required=False,
defaultValue=start_storage_location_id,
remark="要取货所属库位"
2025-07-30 15:11:59 +08:00
))
# 添加类型参数
task_params.append(TaskInputParamNew(
2025-08-13 15:27:04 +08:00
name="END_WL",
2025-07-30 15:11:59 +08:00
type=InputParamType.STRING,
2025-08-13 15:27:04 +08:00
label="放货库位id",
required=False,
defaultValue=end_storage_location_id,
remark="要放货所属库位"
2025-07-30 15:11:59 +08:00
))
# 添加子类型参数
task_params.append(TaskInputParamNew(
2025-08-13 15:27:04 +08:00
name="priority",
2025-07-30 15:11:59 +08:00
type=InputParamType.STRING,
2025-08-13 15:27:04 +08:00
label="优先级",
required=False,
defaultValue=priority,
remark="选车优先级"
2025-07-30 15:11:59 +08:00
))
task_params.append(TaskInputParamNew(
2025-08-13 15:27:04 +08:00
name="TASK_CODE",
2025-07-30 15:11:59 +08:00
type=InputParamType.STRING,
2025-08-13 15:27:04 +08:00
label="任务id",
required=False,
defaultValue=task_request.TaskCode,
remark="创建任务时任务id"
2025-07-30 15:11:59 +08:00
))
task_params.append(TaskInputParamNew(
2025-08-13 15:27:04 +08:00
name="TASK_TYPE",
2025-07-30 15:11:59 +08:00
type=InputParamType.STRING,
2025-08-13 15:27:04 +08:00
label="任务类型",
required=False,
defaultValue=task_request.TaskTyp,
remark="创建任务时任务类型"
2025-07-30 15:11:59 +08:00
))
task_params.append(TaskInputParamNew(
2025-08-13 15:27:04 +08:00
name="REMARK",
2025-07-30 15:11:59 +08:00
type=InputParamType.STRING,
2025-08-13 15:27:04 +08:00
label="任务备注",
required=False,
defaultValue=remark,
remark="详细业务描述"
2025-07-30 15:11:59 +08:00
))
# 构造任务执行请求
run_request = TaskEditRunRequest(
taskId=template_id,
params=task_params,
source_type=SourceType.SYSTEM_SCHEDULING, # 第三方系统
source_system="AGV_SCHEDULING", # AGV调度系统标识
source_device=request.client.host if request.client else "unknown", # 使用客户端IP作为设备标识
use_modbus=False,
2025-08-13 15:27:04 +08:00
modbus_timeout=5000,
priority = priority
2025-07-30 15:11:59 +08:00
)
2025-08-13 15:27:04 +08:00
# 更新外部任务记录状态为运行中
await ExternalTaskRecordService.update_task_record_status(
req_code=external_record.id,
task_status=ExternalTaskStatusEnum.RUNNING
)
2025-07-30 15:11:59 +08:00
tf_api_token = TF_API_TOKEN
# 调用任务执行服务
result = await TaskEditService.run_task(
run_request,
client_ip=client_ip,
client_info=client_info_str,
tf_api_token=tf_api_token
)
if result is None:
logger.error(f"AGV调度任务启动失败: ReqCode={task_request.ReqCode}")
2025-08-13 15:27:04 +08:00
# 更新外部任务记录状态为失败
await ExternalTaskRecordService.update_task_record_status(
req_code=external_record.id,
task_status=ExternalTaskStatusEnum.FAILED,
response_code=500,
response_message="任务启动失败",
response_row_count=0,
error_message="AGV调度任务启动失败"
)
2025-07-30 15:11:59 +08:00
return ExternalTaskResponse(
code=500,
reqCode=task_request.ReqCode,
message="任务启动失败",
rowCount=0
)
if not result.get("success", False):
2025-08-13 15:27:04 +08:00
error_msg = result.get("message", "任务启动失败")
logger.error(f"AGV调度任务启动失败: {error_msg}, ReqCode={task_request.ReqCode}")
# 更新外部任务记录状态为失败
await ExternalTaskRecordService.update_task_record_status(
req_code=external_record.id,
task_status=ExternalTaskStatusEnum.FAILED,
response_code=500,
response_message=error_msg,
response_row_count=0,
error_message=error_msg
)
2025-07-30 15:11:59 +08:00
return ExternalTaskResponse(
code=500,
reqCode=task_request.ReqCode,
2025-08-13 15:27:04 +08:00
message=error_msg,
2025-07-30 15:11:59 +08:00
rowCount=0
)
2025-08-13 15:27:04 +08:00
# 更新外部任务记录状态为成功
task_record_id = result.get('taskRecordId')
await ExternalTaskRecordService.update_task_record_status(
req_code=external_record.id,
task_status=ExternalTaskStatusEnum.SUCCESS,
task_record_id=task_record_id,
response_code=0,
response_message="成功",
response_row_count=0,
response_data=result
)
logger.info(f"AGV调度任务启动成功: ReqCode={task_request.ReqCode}, TaskRecordId={task_record_id}")
# 定义需要监控的任务类型
agv_callback_task_types = ["MP2GG", "MP2GT", "MP2ZG", "MP2QZ", "MP2LG", "MP2PHZ"]
# 启动异步任务监控,不阻塞当前接口
if task_record_id and task_request.TaskTyp in agv_callback_task_types and TF_WEB_POST:
asyncio.create_task(monitor_agv_task_and_callback(
task_record_id=task_record_id,
req_code=task_request.TaskCode
))
logger.info(f"已启动AGV调度任务监控: TaskType={task_request.TaskTyp}, TaskRecordId={task_record_id}")
2025-07-30 15:11:59 +08:00
return ExternalTaskResponse(
code=0,
2025-08-13 15:27:04 +08:00
reqCode=task_request.TaskCode,
2025-07-30 15:11:59 +08:00
message="成功",
rowCount=0
)
except Exception as e:
logger.error(f"创建AGV调度任务异常: {str(e)}, ReqCode={task_request.ReqCode}")
2025-08-13 15:27:04 +08:00
# 如果已创建外部任务记录,更新状态为失败
if external_record:
try:
await ExternalTaskRecordService.update_task_record_status(
req_code=external_record.id,
task_status=ExternalTaskStatusEnum.FAILED,
response_code=500,
response_message=f"创建任务失败: {str(e)}",
response_row_count=0,
error_message=str(e)
)
except Exception as update_error:
logger.error(f"更新外部任务记录状态失败: {str(update_error)}")
2025-07-30 15:11:59 +08:00
return ExternalTaskResponse(
code=500,
reqCode=task_request.ReqCode,
message=f"创建任务失败: {str(e)}",
rowCount=0
2025-08-13 15:27:04 +08:00
)
@router.post("/cancelTask")
async def cancel_task(request: Request, cancel_request: CancelTaskRequest = Body(...)):
"""
取消任务接口
根据ReqCode查询对应的task_record_id然后调用内部接口终止任务并通知主系统
Args:
cancel_request: 取消任务请求包含ReqCode
Returns:
ExternalTaskResponse: 包含codereqCodemessagerowCount的响应
"""
try:
logger.info(f"收到取消任务请求: {cancel_request}")
# 检查系统是否允许处理任务
tf_api_token = TF_API_TOKEN
is_allowed = await check_task_permission(tf_api_token, TF_API_BASE_URL)
if not is_allowed:
logger.error(f"系统限制取消任务: ReqCode={cancel_request.ReqCode}")
return ExternalTaskResponse(
code=500,
reqCode=cancel_request.ReqCode,
message="由于系统限制创建任务失败,请联系管理员",
rowCount=0
)
req_code = cancel_request.ReqCode
# 根据req_code查询external_task_record获取task_record_id
external_record = await ExternalTaskRecordService.get_external_task_record(req_code)
if not external_record:
logger.error(f"未找到对应的外部任务记录: ReqCode={req_code}")
return ExternalTaskResponse(
code=1,
reqCode=req_code,
message="未找到对应的任务记录",
rowCount=0
)
task_record_id = external_record.task_record_id
if not task_record_id:
logger.error(f"外部任务记录中没有关联的task_record_id: ReqCode={req_code}")
return ExternalTaskResponse(
code=1,
reqCode=req_code,
message="任务记录中没有关联的内部任务ID",
rowCount=0
)
# 通过task_record_id查询任务详情检查任务状态
task_detail_result = await TaskRecordService.get_task_record_detail(task_record_id)
if not task_detail_result.get("success", False):
logger.error(f"获取任务详情失败: ReqCode={req_code}, task_record_id={task_record_id}")
return ExternalTaskResponse(
code=1,
reqCode=req_code,
message="获取任务详情失败",
rowCount=0
)
task_detail = task_detail_result.get("data", {})
task_status = task_detail.get("status", "")
# 检查任务状态,只有运行状态的任务才允许取消
if task_status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELED]:
logger.warning(f"任务已处于终止状态,无法取消: ReqCode={req_code}, TaskStatus={task_status}")
return ExternalTaskResponse(
code=1,
reqCode=req_code,
message=f"任务已处于{task_status}状态,无法取消",
rowCount=0
)
# 检查小车负载状态
tf_api_token = TF_API_TOKEN
logger.info(f"检查小车负载状态: task_record_id={task_record_id}")
amr_state_result = await get_amr_loading_state(task_record_id, tf_api_token)
if amr_state_result.get("success", False):
amr_state_data = amr_state_result.get("result", {})
amr_loading = amr_state_data.get("amr_loading", False)
amr_name = amr_state_data.get("amr_name", "")
logger.info(f"小车负载状态: task_record_id={task_record_id}, amr_loading={amr_loading}, amr_name={amr_name}")
# 如果小车处于负载状态,不允许取消任务
if amr_loading:
logger.warning(f"小车处于负载状态,不允许终止任务: ReqCode={req_code}, AMR={amr_name}")
return ExternalTaskResponse(
code=1,
reqCode=req_code,
message="已载货,请人工干预",
rowCount=0
)
else:
# 如果获取小车状态失败,记录警告但继续执行取消操作
logger.warning(f"获取小车负载状态失败,继续执行取消操作: {amr_state_result.get('message', '')}")
# 调用内部接口停止任务
logger.info(f"调用内部接口停止任务: task_record_id={task_record_id}")
stop_result = await TaskRecordService.stop_task_record(task_record_id)
if not stop_result.get("success", False):
error_msg = stop_result.get("message", "停止任务失败")
logger.error(f"停止任务失败: {error_msg}, task_record_id={task_record_id}")
# 检查是否是"已载货,请人工干预"的情况
if "已载货" in error_msg or "人工干预" in error_msg:
return ExternalTaskResponse(
code=1,
reqCode=req_code,
message="已载货,请人工干预",
rowCount=0
)
return ExternalTaskResponse(
code=1,
reqCode=req_code,
message=error_msg,
rowCount=0
)
# 通知主系统任务已终止
logger.info(f"通知主系统任务已终止: task_record_id={task_record_id}")
try:
await set_task_terminated(task_record_id, tf_api_token)
logger.info(f"成功通知主系统任务已终止: task_record_id={task_record_id}")
except Exception as sync_error:
logger.warning(f"通知主系统失败,但任务已成功取消: {str(sync_error)}, task_record_id={task_record_id}")
# 更新外部任务记录状态为已取消
await ExternalTaskRecordService.update_task_record_status(
req_code=external_record.id,
task_status=ExternalTaskStatusEnum.CANCELLED,
response_code=0,
response_message="任务已取消",
response_row_count=0
)
logger.info(f"任务取消成功: ReqCode={req_code}, TaskRecordId={task_record_id}")
return ExternalTaskResponse(
code=0,
reqCode=req_code,
message="成功",
rowCount=0
)
except Exception as e:
logger.error(f"取消任务异常: {str(e)}, ReqCode={cancel_request.ReqCode}")
return ExternalTaskResponse(
code=1,
reqCode=cancel_request.ReqCode,
message=f"取消任务失败: {str(e)}",
rowCount=0
)
@router.get("/api/external-task-record/by-req-code/{req_code}")
async def get_external_task_record_by_req_code(
req_code: str = Path(..., description="请求标识码")
):
"""
根据ReqCode查询外部任务记录
Args:
req_code: 请求标识码
Returns:
包含外部任务记录信息的响应
"""
try:
# 查询外部任务记录
external_record = await ExternalTaskRecordService.get_external_task_record(req_code)
if not external_record:
return error_response(
message=f"未找到ReqCode为 {req_code} 的外部任务记录",
code=404
)
# 构建响应数据
response_data = {
"id": external_record.id,
"req_code": external_record.req_code,
"task_type": external_record.task_type.value if external_record.task_type else None,
"task_status": external_record.task_status.value if external_record.task_status else None,
"task_record_id": external_record.task_record_id,
"task_code": external_record.task_code,
"related_req_code": external_record.related_req_code,
"source_id": external_record.source_id,
"target_id": external_record.target_id,
"business_task_type": external_record.business_task_type,
"template_id": external_record.template_id,
"response_code": external_record.response_code,
"response_message": external_record.response_message,
"start_time": external_record.start_time.isoformat() if external_record.start_time else None,
"end_time": external_record.end_time.isoformat() if external_record.end_time else None,
"duration": external_record.duration,
"created_at": external_record.created_at.isoformat() if external_record.created_at else None,
"updated_at": external_record.updated_at.isoformat() if external_record.updated_at else None
}
return format_response(
data=response_data,
message="成功获取外部任务记录"
)
except Exception as e:
logger.error(f"查询外部任务记录异常: {str(e)}, req_code={req_code}")
return error_response(
message=f"查询外部任务记录失败: {str(e)}",
code=500
2025-07-30 15:11:59 +08:00
)