576 lines
22 KiB
Python
Raw Permalink Normal View History

2025-04-30 16:57:46 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Modbus设备操作处理器模块
提供与Modbus设备操作相关的各种处理器
"""
import logging
import json
import asyncio
import uuid
from typing import Dict, Any, Optional
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ConnectionException, ModbusException
from services.execution.task_context import TaskContext
from .base import BlockHandler, register_handler
from config.settings import settings
from sqlalchemy import select
from data.models.modbusconfig import VWEDModbusConfig
from data.session import get_async_session
from utils.logger import get_logger
from .model.block_name import ModbusBlockName
# 获取日志记录器
logger = get_logger("services.execution.handlers.modbus_equipment")
# 模拟模式下的延迟时间(秒)
MOCK_DELAY = 0.5
async def get_modbus_config(instance_name: str, address_number: Optional[int] = None) -> Optional[Dict[str, Any]]:
"""
从数据库获取Modbus配置信息
Args:
instance_name: Modbus实例名称
address_number: 地址编号(可选)
Returns:
Optional[Dict[str, Any]]: Modbus配置信息如果未找到则返回None
"""
try:
async with get_async_session() as session:
# 构建查询条件
query = select(VWEDModbusConfig).where(VWEDModbusConfig.name == instance_name)
# 如果指定了地址编号,添加条件
if address_number.strip() and address_number is not None :
query = query.where(VWEDModbusConfig.address_number == address_number)
# 执行查询
result = await session.execute(query)
config = result.scalars().first()
if not config:
logger.warning(f"未找到Modbus配置: {instance_name}" +
(f", 地址编号: {address_number}" if address_number is not None else ""))
return None
# 将ORM对象转换为字典
config_dict = {
"id": config.id,
"name": config.name,
"ip": config.ip,
"port": config.port,
"slave_id": config.slave_id,
"address_type": config.address_type,
"address_number": config.address_number,
"task_id": config.task_id,
"target_value": config.target_value,
"remark": config.remark,
"status": config.status
}
return config_dict
except Exception as e:
logger.error(f"获取Modbus配置出错: {str(e)}")
return None
async def read_modbus_value(config: Dict[str, Any]) -> Dict[str, Any]:
"""
读取Modbus值
Args:
config: Modbus配置信息
Returns:
Dict[str, Any]: 读取结果包含success和value字段
"""
# 检查是否启用测试模式
if settings.MODBUS_MOCK_MODE:
logger.info(f"[测试模式] 模拟读取Modbus值: {config['name']}, 地址: {config['address_number']}")
# 模拟读取延迟
await asyncio.sleep(MOCK_DELAY)
# 构造测试模式下的响应
mock_value = (config.get('address_number', 0) + uuid.uuid4().int % 100) % 1000
return {
"success": True,
"value": mock_value,
"message": f"[测试模式] 成功读取Modbus值: {mock_value}"
}
try:
# 创建Modbus TCP客户端
client = ModbusTcpClient(
host=config['ip'],
port=config['port'],
timeout=settings.MODBUS_TIMEOUT
)
# 连接到Modbus服务器
if not client.connect():
return {
"success": False,
"message": f"无法连接到Modbus服务器: {config['ip']}:{config['port']}"
}
# 根据地址类型读取不同的寄存器
address_type = config['address_type']
address = config['address_number']
slave_id = config['slave_id']
value = None
if address_type == 'coil':
# 读取线圈状态
result = client.read_coils(address, 1, unit=slave_id)
if not result.isError():
value = 1 if result.bits[0] else 0
elif address_type == 'discrete_input':
# 读取离散输入
result = client.read_discrete_inputs(address, 1, unit=slave_id)
if not result.isError():
value = 1 if result.bits[0] else 0
elif address_type == 'holding_register':
# 读取保持寄存器
result = client.read_holding_registers(address, 1, unit=slave_id)
if not result.isError():
value = result.registers[0]
elif address_type == 'input_register':
# 读取输入寄存器
result = client.read_input_registers(address, 1, unit=slave_id)
if not result.isError():
value = result.registers[0]
else:
# 未知的地址类型
client.close()
return {
"success": False,
"message": f"不支持的地址类型: {address_type}"
}
# 关闭连接
client.close()
# 检查读取结果
if value is None:
return {
"success": False,
"message": f"读取Modbus值失败: {result if 'result' in locals() else '未知错误'}"
}
return {
"success": True,
"value": value,
"message": f"成功读取Modbus值: {value}"
}
except ConnectionException as e:
logger.error(f"Modbus连接异常: {str(e)}")
return {
"success": False,
"message": f"Modbus连接异常: {str(e)}"
}
except ModbusException as e:
logger.error(f"Modbus操作异常: {str(e)}")
return {
"success": False,
"message": f"Modbus操作异常: {str(e)}"
}
except Exception as e:
logger.error(f"读取Modbus值异常: {str(e)}")
return {
"success": False,
"message": f"读取Modbus值异常: {str(e)}"
}
async def write_modbus_value(config: Dict[str, Any], value: int) -> Dict[str, Any]:
"""
写入Modbus值
Args:
config: Modbus配置信息
value: 要写入的值
Returns:
Dict[str, Any]: 写入结果包含success字段
"""
# 检查是否启用测试模式
if settings.MODBUS_MOCK_MODE:
logger.info(f"[测试模式] 模拟写入Modbus值: {config['name']}, 地址: {config['address_number']}, 值: {value}")
# 模拟写入延迟
await asyncio.sleep(MOCK_DELAY)
# 构造测试模式下的响应
return {
"success": True,
"message": f"[测试模式] 成功写入Modbus值: {value}"
}
try:
# 创建Modbus TCP客户端
client = ModbusTcpClient(
host=config['ip'],
port=config['port'],
timeout=settings.MODBUS_TIMEOUT
)
# 连接到Modbus服务器
if not client.connect():
return {
"success": False,
"message": f"无法连接到Modbus服务器: {config['ip']}:{config['port']}"
}
# 根据地址类型写入不同的寄存器
address_type = config['address_type']
address = config['address_number']
slave_id = config['slave_id']
result = None
if address_type == 'coil':
# 写入线圈状态
result = client.write_coil(address, value == 1, unit=slave_id)
elif address_type == 'holding_register':
# 写入保持寄存器
result = client.write_register(address, value, unit=slave_id)
else:
# 不支持写入离散输入和输入寄存器
client.close()
return {
"success": False,
"message": f"不支持向地址类型 {address_type} 写入值"
}
# 关闭连接
client.close()
# 检查写入结果
if result and not result.isError():
return {
"success": True,
"message": f"成功写入Modbus值: {value}"
}
else:
return {
"success": False,
"message": f"写入Modbus值失败: {result if result else '未知错误'}"
}
except ConnectionException as e:
logger.error(f"Modbus连接异常: {str(e)}")
return {
"success": False,
"message": f"Modbus连接异常: {str(e)}"
}
except ModbusException as e:
logger.error(f"Modbus操作异常: {str(e)}")
return {
"success": False,
"message": f"Modbus操作异常: {str(e)}"
}
except Exception as e:
logger.error(f"写入Modbus值异常: {str(e)}")
return {
"success": False,
"message": f"写入Modbus值异常: {str(e)}"
}
class ModbusBlockHandler(BlockHandler):
"""Modbus设备操作处理器基类"""
async def _get_modbus_config(self, instance_name: str, address_number: Optional[int] = None) -> Optional[Dict[str, Any]]:
"""获取Modbus配置信息"""
return await get_modbus_config(instance_name, address_number)
async def _read_modbus_value(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""读取Modbus值"""
return await read_modbus_value(config)
async def _write_modbus_value(self, config: Dict[str, Any], value: int) -> Dict[str, Any]:
"""写入Modbus值"""
return await write_modbus_value(config, value)
# 通用读取Modbus值处理器
@register_handler(ModbusBlockName.COMMON_READ_NAME)
class ModbusCommonReadNameBlockHandler(ModbusBlockHandler):
"""通用读取Modbus值(Name)处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行通用读取Modbus值操作"""
try:
# 获取关键参数
instance_name = input_params.get("instanceName")
address = input_params.get("address")
remark = input_params.get("remark", "")
# 参数检查
if not instance_name:
result = {
"success": False,
"message": "Modbus实例名称不能为空"
}
await self._record_task_log(block, result, context)
return result
# 获取Modbus配置信息
config = await self._get_modbus_config(instance_name, address)
if not config:
result = {
"success": False,
"message": f"未找到Modbus配置: {instance_name}" +
(f", 地址编号: {address}" if address is not None else "")
}
await self._record_task_log(block, result, context)
return result
# 重试标志
retry = True
retry_count = 0
while retry:
# 读取Modbus值
read_result = await self._read_modbus_value(config)
if read_result.get("success", False):
# 读取成功,设置上下文变量
modbus_value = read_result.get("value")
context.set_variable("modbusValue", str(modbus_value))
context.set_block_output(block.get("name"), {"modbusValue": str(modbus_value)})
# 构建成功消息
remark_info = f", 说明: {remark}" if remark else ""
address_info = f", 地址: {config['address_number']}" if 'address_number' in config else ""
result = {
"success": True,
2025-05-12 15:43:21 +08:00
"output": {
2025-04-30 16:57:46 +08:00
"modbusValue": str(modbus_value)
},
"message": f"成功读取Modbus值: 实例: {instance_name}{address_info}, 值: {modbus_value}{remark_info}"
}
await self._record_task_log(block, result, context)
return result
else:
# 读取失败,重试
retry_count += 1
logger.warning(f"读取Modbus值失败准备第 {retry_count} 次重试: {read_result.get('message')}")
# 等待一段时间再重试
await asyncio.sleep(settings.MODBUS_RETRY_INTERVAL)
except Exception as e:
result = {
"success": False,
"message": f"执行通用读取Modbus值异常: {str(e)}"
}
await self._record_task_log(block, result, context)
return result
# 通用写入Modbus值处理器
@register_handler(ModbusBlockName.COMMON_WRITE_NAME)
class ModbusCommonWriteNameBlockHandler(ModbusBlockHandler):
"""通用写入Modbus值(Name)处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行通用写入Modbus值操作"""
try:
# 获取关键参数
instance_name = input_params.get("instanceName")
new_value = input_params.get("newValue")
address = input_params.get("address")
remark = input_params.get("remark", "")
# 参数检查
if not instance_name:
result = {
"success": False,
"message": "Modbus实例名称不能为空"
}
await self._record_task_log(block, result, context)
return result
if new_value is None:
result = {
"success": False,
"message": "写入值不能为空"
}
await self._record_task_log(block, result, context)
return result
# 确保写入值是整数类型
try:
new_value = int(new_value)
except (ValueError, TypeError):
result = {
"success": False,
"message": f"写入值必须是整数类型,当前值: {new_value}"
}
await self._record_task_log(block, result, context)
return result
# 获取Modbus配置信息
config = await self._get_modbus_config(instance_name, address)
if not config:
result = {
"success": False,
"message": f"未找到Modbus配置: {instance_name}" +
(f", 地址编号: {address}" if address is not None else "")
}
await self._record_task_log(block, result, context)
return result
# 重试标志
retry = True
retry_count = 0
while retry:
# 写入Modbus值
write_result = await self._write_modbus_value(config, new_value)
if write_result.get("success", False):
# 写入成功
# 构建成功消息
remark_info = f", 说明: {remark}" if remark else ""
address_info = f", 地址: {config['address_number']}" if 'address_number' in config else ""
result = {
"success": True,
"message": f"成功写入Modbus值: 实例: {instance_name}{address_info}, 值: {new_value}{remark_info}"
}
await self._record_task_log(block, result, context)
return result
else:
# 写入失败,重试
retry_count += 1
logger.warning(f"写入Modbus值失败准备第 {retry_count} 次重试: {write_result.get('message')}")
# 等待一段时间再重试
await asyncio.sleep(settings.MODBUS_RETRY_INTERVAL)
except Exception as e:
result = {
"success": False,
"message": f"执行通用写入Modbus值异常: {str(e)}"
}
await self._record_task_log(block, result, context)
return result
# 通用等待Modbus值处理器
@register_handler(ModbusBlockName.COMMON_WAIT_NAME)
class ModbusCommonWaitNameBlockHandler(ModbusBlockHandler):
"""通用等待Modbus值(Name)处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行通用等待Modbus值操作"""
try:
# 获取关键参数
instance_name = input_params.get("instanceName")
target_value = input_params.get("targetValue")
address = input_params.get("address")
remark = input_params.get("remark", "")
# 参数检查
if not instance_name:
result = {
"success": False,
"message": "Modbus实例名称不能为空"
}
await self._record_task_log(block, result, context)
return result
if target_value is None:
result = {
"success": False,
"message": "目标值不能为空"
}
await self._record_task_log(block, result, context)
return result
# 确保目标值是整数类型
try:
target_value = int(target_value)
except (ValueError, TypeError):
result = {
"success": False,
"message": f"目标值必须是整数类型,当前值: {target_value}"
}
await self._record_task_log(block, result, context)
return result
# 获取Modbus配置信息
config = await self._get_modbus_config(instance_name, address)
if not config:
result = {
"success": False,
"message": f"未找到Modbus配置: {instance_name}" +
(f", 地址编号: {address}" if address is not None else "")
}
await self._record_task_log(block, result, context)
return result
# 轮询标志
polling = True
poll_count = 0
# 记录开始等待的日志
start_log = {
"success": True,
"message": f"开始等待Modbus值达到目标值: 实例: {instance_name}, 目标值: {target_value}"
}
await self._record_task_log(block, start_log, context)
while polling:
# 读取Modbus值
read_result = await self._read_modbus_value(config)
if read_result.get("success", False):
# 读取成功,检查值是否达到目标
current_value = read_result.get("value")
if int(current_value) == target_value:
# 达到目标值
remark_info = f", 说明: {remark}" if remark else ""
address_info = f", 地址: {config['address_number']}" if 'address_number' in config else ""
result = {
"success": True,
"message": f"Modbus值已达到目标值: 实例: {instance_name}{address_info}, 值: {current_value}{remark_info}"
}
await self._record_task_log(block, result, context)
return result
else:
# 未达到目标值,继续轮询
poll_count += 1
if poll_count % 10 == 0: # 每10次轮询记录一次日志
logger.info(f"等待Modbus值当前值: {current_value}, 目标值: {target_value}, 已轮询 {poll_count}")
# 等待一段时间再检查
await asyncio.sleep(settings.MODBUS_POLL_INTERVAL)
else:
# 读取失败,记录错误但继续轮询
poll_count += 1
logger.warning(f"读取Modbus值失败继续轮询第 {poll_count} 次: {read_result.get('message')}")
# 等待一段时间再重试
await asyncio.sleep(settings.MODBUS_RETRY_INTERVAL)
except Exception as e:
result = {
"success": False,
"message": f"执行通用等待Modbus值异常: {str(e)}"
}
await self._record_task_log(block, result, context)
return result