2025-05-12 15:43:21 +08:00

576 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Modbus设备操作处理器模块
提供与Modbus设备操作相关的各种处理器
"""
import logging
import json
import asyncio
import uuid
from typing import Dict, Any, Optional
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ConnectionException, ModbusException
from services.execution.task_context import TaskContext
from .base import BlockHandler, register_handler
from config.settings import settings
from sqlalchemy import select
from data.models.modbusconfig import VWEDModbusConfig
from data.session import get_async_session
from utils.logger import get_logger
from .model.block_name import ModbusBlockName
# 获取日志记录器
logger = get_logger("services.execution.handlers.modbus_equipment")
# 模拟模式下的延迟时间(秒)
MOCK_DELAY = 0.5
async def get_modbus_config(instance_name: str, address_number: Optional[int] = None) -> Optional[Dict[str, Any]]:
"""
从数据库获取Modbus配置信息
Args:
instance_name: Modbus实例名称
address_number: 地址编号(可选)
Returns:
Optional[Dict[str, Any]]: Modbus配置信息如果未找到则返回None
"""
try:
async with get_async_session() as session:
# 构建查询条件
query = select(VWEDModbusConfig).where(VWEDModbusConfig.name == instance_name)
# 如果指定了地址编号,添加条件
if address_number.strip() and address_number is not None :
query = query.where(VWEDModbusConfig.address_number == address_number)
# 执行查询
result = await session.execute(query)
config = result.scalars().first()
if not config:
logger.warning(f"未找到Modbus配置: {instance_name}" +
(f", 地址编号: {address_number}" if address_number is not None else ""))
return None
# 将ORM对象转换为字典
config_dict = {
"id": config.id,
"name": config.name,
"ip": config.ip,
"port": config.port,
"slave_id": config.slave_id,
"address_type": config.address_type,
"address_number": config.address_number,
"task_id": config.task_id,
"target_value": config.target_value,
"remark": config.remark,
"status": config.status
}
return config_dict
except Exception as e:
logger.error(f"获取Modbus配置出错: {str(e)}")
return None
async def read_modbus_value(config: Dict[str, Any]) -> Dict[str, Any]:
"""
读取Modbus值
Args:
config: Modbus配置信息
Returns:
Dict[str, Any]: 读取结果包含success和value字段
"""
# 检查是否启用测试模式
if settings.MODBUS_MOCK_MODE:
logger.info(f"[测试模式] 模拟读取Modbus值: {config['name']}, 地址: {config['address_number']}")
# 模拟读取延迟
await asyncio.sleep(MOCK_DELAY)
# 构造测试模式下的响应
mock_value = (config.get('address_number', 0) + uuid.uuid4().int % 100) % 1000
return {
"success": True,
"value": mock_value,
"message": f"[测试模式] 成功读取Modbus值: {mock_value}"
}
try:
# 创建Modbus TCP客户端
client = ModbusTcpClient(
host=config['ip'],
port=config['port'],
timeout=settings.MODBUS_TIMEOUT
)
# 连接到Modbus服务器
if not client.connect():
return {
"success": False,
"message": f"无法连接到Modbus服务器: {config['ip']}:{config['port']}"
}
# 根据地址类型读取不同的寄存器
address_type = config['address_type']
address = config['address_number']
slave_id = config['slave_id']
value = None
if address_type == 'coil':
# 读取线圈状态
result = client.read_coils(address, 1, unit=slave_id)
if not result.isError():
value = 1 if result.bits[0] else 0
elif address_type == 'discrete_input':
# 读取离散输入
result = client.read_discrete_inputs(address, 1, unit=slave_id)
if not result.isError():
value = 1 if result.bits[0] else 0
elif address_type == 'holding_register':
# 读取保持寄存器
result = client.read_holding_registers(address, 1, unit=slave_id)
if not result.isError():
value = result.registers[0]
elif address_type == 'input_register':
# 读取输入寄存器
result = client.read_input_registers(address, 1, unit=slave_id)
if not result.isError():
value = result.registers[0]
else:
# 未知的地址类型
client.close()
return {
"success": False,
"message": f"不支持的地址类型: {address_type}"
}
# 关闭连接
client.close()
# 检查读取结果
if value is None:
return {
"success": False,
"message": f"读取Modbus值失败: {result if 'result' in locals() else '未知错误'}"
}
return {
"success": True,
"value": value,
"message": f"成功读取Modbus值: {value}"
}
except ConnectionException as e:
logger.error(f"Modbus连接异常: {str(e)}")
return {
"success": False,
"message": f"Modbus连接异常: {str(e)}"
}
except ModbusException as e:
logger.error(f"Modbus操作异常: {str(e)}")
return {
"success": False,
"message": f"Modbus操作异常: {str(e)}"
}
except Exception as e:
logger.error(f"读取Modbus值异常: {str(e)}")
return {
"success": False,
"message": f"读取Modbus值异常: {str(e)}"
}
async def write_modbus_value(config: Dict[str, Any], value: int) -> Dict[str, Any]:
"""
写入Modbus值
Args:
config: Modbus配置信息
value: 要写入的值
Returns:
Dict[str, Any]: 写入结果包含success字段
"""
# 检查是否启用测试模式
if settings.MODBUS_MOCK_MODE:
logger.info(f"[测试模式] 模拟写入Modbus值: {config['name']}, 地址: {config['address_number']}, 值: {value}")
# 模拟写入延迟
await asyncio.sleep(MOCK_DELAY)
# 构造测试模式下的响应
return {
"success": True,
"message": f"[测试模式] 成功写入Modbus值: {value}"
}
try:
# 创建Modbus TCP客户端
client = ModbusTcpClient(
host=config['ip'],
port=config['port'],
timeout=settings.MODBUS_TIMEOUT
)
# 连接到Modbus服务器
if not client.connect():
return {
"success": False,
"message": f"无法连接到Modbus服务器: {config['ip']}:{config['port']}"
}
# 根据地址类型写入不同的寄存器
address_type = config['address_type']
address = config['address_number']
slave_id = config['slave_id']
result = None
if address_type == 'coil':
# 写入线圈状态
result = client.write_coil(address, value == 1, unit=slave_id)
elif address_type == 'holding_register':
# 写入保持寄存器
result = client.write_register(address, value, unit=slave_id)
else:
# 不支持写入离散输入和输入寄存器
client.close()
return {
"success": False,
"message": f"不支持向地址类型 {address_type} 写入值"
}
# 关闭连接
client.close()
# 检查写入结果
if result and not result.isError():
return {
"success": True,
"message": f"成功写入Modbus值: {value}"
}
else:
return {
"success": False,
"message": f"写入Modbus值失败: {result if result else '未知错误'}"
}
except ConnectionException as e:
logger.error(f"Modbus连接异常: {str(e)}")
return {
"success": False,
"message": f"Modbus连接异常: {str(e)}"
}
except ModbusException as e:
logger.error(f"Modbus操作异常: {str(e)}")
return {
"success": False,
"message": f"Modbus操作异常: {str(e)}"
}
except Exception as e:
logger.error(f"写入Modbus值异常: {str(e)}")
return {
"success": False,
"message": f"写入Modbus值异常: {str(e)}"
}
class ModbusBlockHandler(BlockHandler):
"""Modbus设备操作处理器基类"""
async def _get_modbus_config(self, instance_name: str, address_number: Optional[int] = None) -> Optional[Dict[str, Any]]:
"""获取Modbus配置信息"""
return await get_modbus_config(instance_name, address_number)
async def _read_modbus_value(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""读取Modbus值"""
return await read_modbus_value(config)
async def _write_modbus_value(self, config: Dict[str, Any], value: int) -> Dict[str, Any]:
"""写入Modbus值"""
return await write_modbus_value(config, value)
# 通用读取Modbus值处理器
@register_handler(ModbusBlockName.COMMON_READ_NAME)
class ModbusCommonReadNameBlockHandler(ModbusBlockHandler):
"""通用读取Modbus值(Name)处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行通用读取Modbus值操作"""
try:
# 获取关键参数
instance_name = input_params.get("instanceName")
address = input_params.get("address")
remark = input_params.get("remark", "")
# 参数检查
if not instance_name:
result = {
"success": False,
"message": "Modbus实例名称不能为空"
}
await self._record_task_log(block, result, context)
return result
# 获取Modbus配置信息
config = await self._get_modbus_config(instance_name, address)
if not config:
result = {
"success": False,
"message": f"未找到Modbus配置: {instance_name}" +
(f", 地址编号: {address}" if address is not None else "")
}
await self._record_task_log(block, result, context)
return result
# 重试标志
retry = True
retry_count = 0
while retry:
# 读取Modbus值
read_result = await self._read_modbus_value(config)
if read_result.get("success", False):
# 读取成功,设置上下文变量
modbus_value = read_result.get("value")
context.set_variable("modbusValue", str(modbus_value))
context.set_block_output(block.get("name"), {"modbusValue": str(modbus_value)})
# 构建成功消息
remark_info = f", 说明: {remark}" if remark else ""
address_info = f", 地址: {config['address_number']}" if 'address_number' in config else ""
result = {
"success": True,
"output": {
"modbusValue": str(modbus_value)
},
"message": f"成功读取Modbus值: 实例: {instance_name}{address_info}, 值: {modbus_value}{remark_info}"
}
await self._record_task_log(block, result, context)
return result
else:
# 读取失败,重试
retry_count += 1
logger.warning(f"读取Modbus值失败准备第 {retry_count} 次重试: {read_result.get('message')}")
# 等待一段时间再重试
await asyncio.sleep(settings.MODBUS_RETRY_INTERVAL)
except Exception as e:
result = {
"success": False,
"message": f"执行通用读取Modbus值异常: {str(e)}"
}
await self._record_task_log(block, result, context)
return result
# 通用写入Modbus值处理器
@register_handler(ModbusBlockName.COMMON_WRITE_NAME)
class ModbusCommonWriteNameBlockHandler(ModbusBlockHandler):
"""通用写入Modbus值(Name)处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行通用写入Modbus值操作"""
try:
# 获取关键参数
instance_name = input_params.get("instanceName")
new_value = input_params.get("newValue")
address = input_params.get("address")
remark = input_params.get("remark", "")
# 参数检查
if not instance_name:
result = {
"success": False,
"message": "Modbus实例名称不能为空"
}
await self._record_task_log(block, result, context)
return result
if new_value is None:
result = {
"success": False,
"message": "写入值不能为空"
}
await self._record_task_log(block, result, context)
return result
# 确保写入值是整数类型
try:
new_value = int(new_value)
except (ValueError, TypeError):
result = {
"success": False,
"message": f"写入值必须是整数类型,当前值: {new_value}"
}
await self._record_task_log(block, result, context)
return result
# 获取Modbus配置信息
config = await self._get_modbus_config(instance_name, address)
if not config:
result = {
"success": False,
"message": f"未找到Modbus配置: {instance_name}" +
(f", 地址编号: {address}" if address is not None else "")
}
await self._record_task_log(block, result, context)
return result
# 重试标志
retry = True
retry_count = 0
while retry:
# 写入Modbus值
write_result = await self._write_modbus_value(config, new_value)
if write_result.get("success", False):
# 写入成功
# 构建成功消息
remark_info = f", 说明: {remark}" if remark else ""
address_info = f", 地址: {config['address_number']}" if 'address_number' in config else ""
result = {
"success": True,
"message": f"成功写入Modbus值: 实例: {instance_name}{address_info}, 值: {new_value}{remark_info}"
}
await self._record_task_log(block, result, context)
return result
else:
# 写入失败,重试
retry_count += 1
logger.warning(f"写入Modbus值失败准备第 {retry_count} 次重试: {write_result.get('message')}")
# 等待一段时间再重试
await asyncio.sleep(settings.MODBUS_RETRY_INTERVAL)
except Exception as e:
result = {
"success": False,
"message": f"执行通用写入Modbus值异常: {str(e)}"
}
await self._record_task_log(block, result, context)
return result
# 通用等待Modbus值处理器
@register_handler(ModbusBlockName.COMMON_WAIT_NAME)
class ModbusCommonWaitNameBlockHandler(ModbusBlockHandler):
"""通用等待Modbus值(Name)处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行通用等待Modbus值操作"""
try:
# 获取关键参数
instance_name = input_params.get("instanceName")
target_value = input_params.get("targetValue")
address = input_params.get("address")
remark = input_params.get("remark", "")
# 参数检查
if not instance_name:
result = {
"success": False,
"message": "Modbus实例名称不能为空"
}
await self._record_task_log(block, result, context)
return result
if target_value is None:
result = {
"success": False,
"message": "目标值不能为空"
}
await self._record_task_log(block, result, context)
return result
# 确保目标值是整数类型
try:
target_value = int(target_value)
except (ValueError, TypeError):
result = {
"success": False,
"message": f"目标值必须是整数类型,当前值: {target_value}"
}
await self._record_task_log(block, result, context)
return result
# 获取Modbus配置信息
config = await self._get_modbus_config(instance_name, address)
if not config:
result = {
"success": False,
"message": f"未找到Modbus配置: {instance_name}" +
(f", 地址编号: {address}" if address is not None else "")
}
await self._record_task_log(block, result, context)
return result
# 轮询标志
polling = True
poll_count = 0
# 记录开始等待的日志
start_log = {
"success": True,
"message": f"开始等待Modbus值达到目标值: 实例: {instance_name}, 目标值: {target_value}"
}
await self._record_task_log(block, start_log, context)
while polling:
# 读取Modbus值
read_result = await self._read_modbus_value(config)
if read_result.get("success", False):
# 读取成功,检查值是否达到目标
current_value = read_result.get("value")
if int(current_value) == target_value:
# 达到目标值
remark_info = f", 说明: {remark}" if remark else ""
address_info = f", 地址: {config['address_number']}" if 'address_number' in config else ""
result = {
"success": True,
"message": f"Modbus值已达到目标值: 实例: {instance_name}{address_info}, 值: {current_value}{remark_info}"
}
await self._record_task_log(block, result, context)
return result
else:
# 未达到目标值,继续轮询
poll_count += 1
if poll_count % 10 == 0: # 每10次轮询记录一次日志
logger.info(f"等待Modbus值当前值: {current_value}, 目标值: {target_value}, 已轮询 {poll_count}")
# 等待一段时间再检查
await asyncio.sleep(settings.MODBUS_POLL_INTERVAL)
else:
# 读取失败,记录错误但继续轮询
poll_count += 1
logger.warning(f"读取Modbus值失败继续轮询第 {poll_count} 次: {read_result.get('message')}")
# 等待一段时间再重试
await asyncio.sleep(settings.MODBUS_RETRY_INTERVAL)
except Exception as e:
result = {
"success": False,
"message": f"执行通用等待Modbus值异常: {str(e)}"
}
await self._record_task_log(block, result, context)
return result