tianfeng_task_modules/components/device_components.py
2025-03-17 14:58:05 +08:00

89 lines
2.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# components/device_components.py
"""
设备组件
"""
from typing import Dict, Any, Optional
import time
from core.component import Component, ComponentFactory
from core.exceptions import ComponentError
@ComponentFactory.register("wait_modbus_value")
class WaitModbusValueComponent(Component):
"""通用等待Modbus值(Name)组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
target_value = self.resolve_param("target_value")
name = self.resolve_param("name")
address = self.resolve_param("address")
description = self.resolve_param("description", "")
timeout = self.resolve_param("timeout", 30000) # 默认30秒超时
# 验证必要参数
self.validate_required_params(["target_value", "name"])
# 实际实现中这里应该轮询Modbus设备直到值匹配或超时
# 这里简化为返回模拟数据
start_time = time.time() * 1000
current_value = None
success = False
# 模拟等待过程
while (time.time() * 1000 - start_time) < timeout:
# 模拟读取当前值
current_value = target_value # 假设值已经匹配
if current_value == target_value:
success = True
break
# 等待一段时间再次检查
time.sleep(0.1)
if not success:
result = {
"success": False,
"timeout": True,
"name": name,
"address": address,
"targetValue": target_value,
"currentValue": current_value
}
else:
result = {
"success": True,
"name": name,
"address": address,
"value": current_value
}
# 存储结果
self.store_result(result)
return result
@ComponentFactory.register("write_modbus_value")
class WriteModbusValueComponent(Component):
"""通用写入Modbus值(Name)组件"""
def execute(self) -> Dict[str, Any]:
# 获取参数
write_value = self.resolve_param("write_value")
name = self.resolve_param("name")
address = self.resolve_param("address")
description = self.resolve_param("description", "")
# 验证必要参数
self.validate_required_params(["write_value", "name"])
# 实际实现中这里应该调用Modbus设备API写入值
# 这里简化为返回模拟数据
result = {
"success": True,
"name": name,
"address": address,
"value": write_value
}
# 存储结果
self.store_result(result)
return result