89 lines
2.8 KiB
Python
89 lines
2.8 KiB
Python
# components/device_components.py
|
||
"""
|
||
设备组件
|
||
"""
|
||
from typing import Dict, Any, Optional
|
||
import time
|
||
from core.component import Component, ComponentFactory
|
||
from core.exceptions import ComponentError
|
||
|
||
@ComponentFactory.register("wait_modbus_value")
|
||
class WaitModbusValueComponent(Component):
|
||
"""通用等待Modbus值(Name)组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
target_value = self.resolve_param("target_value")
|
||
name = self.resolve_param("name")
|
||
address = self.resolve_param("address")
|
||
description = self.resolve_param("description", "")
|
||
timeout = self.resolve_param("timeout", 30000) # 默认30秒超时
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["target_value", "name"])
|
||
|
||
# 实际实现中,这里应该轮询Modbus设备直到值匹配或超时
|
||
# 这里简化为返回模拟数据
|
||
start_time = time.time() * 1000
|
||
current_value = None
|
||
success = False
|
||
|
||
# 模拟等待过程
|
||
while (time.time() * 1000 - start_time) < timeout:
|
||
# 模拟读取当前值
|
||
current_value = target_value # 假设值已经匹配
|
||
|
||
if current_value == target_value:
|
||
success = True
|
||
break
|
||
|
||
# 等待一段时间再次检查
|
||
time.sleep(0.1)
|
||
|
||
if not success:
|
||
result = {
|
||
"success": False,
|
||
"timeout": True,
|
||
"name": name,
|
||
"address": address,
|
||
"targetValue": target_value,
|
||
"currentValue": current_value
|
||
}
|
||
else:
|
||
result = {
|
||
"success": True,
|
||
"name": name,
|
||
"address": address,
|
||
"value": current_value
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result
|
||
|
||
@ComponentFactory.register("write_modbus_value")
|
||
class WriteModbusValueComponent(Component):
|
||
"""通用写入Modbus值(Name)组件"""
|
||
|
||
def execute(self) -> Dict[str, Any]:
|
||
# 获取参数
|
||
write_value = self.resolve_param("write_value")
|
||
name = self.resolve_param("name")
|
||
address = self.resolve_param("address")
|
||
description = self.resolve_param("description", "")
|
||
|
||
# 验证必要参数
|
||
self.validate_required_params(["write_value", "name"])
|
||
|
||
# 实际实现中,这里应该调用Modbus设备API写入值
|
||
# 这里简化为返回模拟数据
|
||
result = {
|
||
"success": True,
|
||
"name": name,
|
||
"address": address,
|
||
"value": write_value
|
||
}
|
||
|
||
# 存储结果
|
||
self.store_result(result)
|
||
return result |