2025-03-17 14:58:05 +08:00

94 lines
3.1 KiB
Python

# core/context.py
"""
任务上下文
"""
from typing import Dict, Any, Optional
import threading
import time
class TaskContext:
"""
任务上下文,存储任务变量和执行结果
使用线程本地存储确保线程安全
"""
_thread_local = threading.local()
@classmethod
def get_instance(cls):
"""获取当前线程的上下文实例"""
if not hasattr(cls._thread_local, 'instance'):
cls._thread_local.instance = TaskContext()
return cls._thread_local.instance
def __init__(self):
self.task_id: Optional[str] = None
self.workflow_id: Optional[str] = None
self.variables: Dict[str, Any] = {} # 任务变量
self.blocks: Dict[str, Any] = {} # 块执行结果
self.task_inputs: Dict[str, Any] = {} # 任务输入参数
self.task_status: str = "pending" # 任务状态
self.start_time: Optional[int] = None # 开始时间
self.end_time: Optional[int] = None # 结束时间
self.executing_robot: Optional[str] = None # 执行机器人
def reset(self):
"""重置上下文"""
self.__init__()
def initialize_task(self, task_id: str, workflow_id: str, inputs: Dict[str, Any] = None):
"""初始化任务"""
self.task_id = task_id
self.workflow_id = workflow_id
self.start_time = int(time.time() * 1000)
self.task_status = "executing"
if inputs:
self.task_inputs = inputs.copy()
self.variables["taskInputs"] = self.task_inputs
def set_variable(self, name: str, value: Any):
"""设置变量值"""
self.variables[name] = value
def get_variable(self, name: str, default=None):
"""获取变量值"""
return self.variables.get(name, default)
def set_block_result(self, block_id: str, result: Dict[str, Any]):
"""设置块执行结果"""
self.blocks[block_id] = result
def get_block_result(self, block_id: str, default=None):
"""获取块执行结果"""
return self.blocks.get(block_id, default)
def set_task_status(self, status: str):
"""设置任务状态"""
self.task_status = status
self.variables["taskStatus"] = status
def set_executing_robot(self, robot_id: str):
"""设置执行机器人"""
self.executing_robot = robot_id
self.variables["executingRobot"] = robot_id
def complete_task(self, status: str = "completed"):
"""完成任务"""
self.task_status = status
self.variables["taskStatus"] = status
self.end_time = int(time.time() * 1000)
def to_dict(self):
"""转换为字典"""
return {
"task_id": self.task_id,
"workflow_id": self.workflow_id,
"variables": self.variables,
"blocks": self.blocks,
"task_inputs": self.task_inputs,
"task_status": self.task_status,
"start_time": self.start_time,
"end_time": self.end_time,
"executing_robot": self.executing_robot
}