# core/context.py """ 任务上下文 """ from typing import Dict, Any, Optional import threading import time class TaskContext: """ 任务上下文,存储任务变量和执行结果 使用线程本地存储确保线程安全 """ _thread_local = threading.local() @classmethod def get_instance(cls): """获取当前线程的上下文实例""" if not hasattr(cls._thread_local, 'instance'): cls._thread_local.instance = TaskContext() return cls._thread_local.instance def __init__(self): self.task_id: Optional[str] = None self.workflow_id: Optional[str] = None self.variables: Dict[str, Any] = {} # 任务变量 self.blocks: Dict[str, Any] = {} # 块执行结果 self.task_inputs: Dict[str, Any] = {} # 任务输入参数 self.task_status: str = "pending" # 任务状态 self.start_time: Optional[int] = None # 开始时间 self.end_time: Optional[int] = None # 结束时间 self.executing_robot: Optional[str] = None # 执行机器人 def reset(self): """重置上下文""" self.__init__() def initialize_task(self, task_id: str, workflow_id: str, inputs: Dict[str, Any] = None): """初始化任务""" self.task_id = task_id self.workflow_id = workflow_id self.start_time = int(time.time() * 1000) self.task_status = "executing" if inputs: self.task_inputs = inputs.copy() self.variables["taskInputs"] = self.task_inputs def set_variable(self, name: str, value: Any): """设置变量值""" self.variables[name] = value def get_variable(self, name: str, default=None): """获取变量值""" return self.variables.get(name, default) def set_block_result(self, block_id: str, result: Dict[str, Any]): """设置块执行结果""" self.blocks[block_id] = result def get_block_result(self, block_id: str, default=None): """获取块执行结果""" return self.blocks.get(block_id, default) def set_task_status(self, status: str): """设置任务状态""" self.task_status = status self.variables["taskStatus"] = status def set_executing_robot(self, robot_id: str): """设置执行机器人""" self.executing_robot = robot_id self.variables["executingRobot"] = robot_id def complete_task(self, status: str = "completed"): """完成任务""" self.task_status = status self.variables["taskStatus"] = status self.end_time = int(time.time() * 1000) def to_dict(self): """转换为字典""" return { "task_id": self.task_id, "workflow_id": self.workflow_id, "variables": self.variables, "blocks": self.blocks, "task_inputs": self.task_inputs, "task_status": self.task_status, "start_time": self.start_time, "end_time": self.end_time, "executing_robot": self.executing_robot }