715 lines
26 KiB
Python
Raw Normal View History

2025-09-29 09:35:08 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
VWED Modbus TCP模块
提供Modbus TCP通信相关的内置函数
"""
from utils.logger import get_logger
from pymodbus.client.tcp import ModbusTcpClient
logger = get_logger("services.modbus_module")
class VWEDModbusModule:
"""VWED Modbus TCP模块"""
def __init__(self, script_id: str):
self.script_id = script_id
self._modbus_instances = {} # 存储配置的Modbus实例
logger.debug(f"初始化 VWEDModbusModule, script_id: {script_id}")
def _create_client(self, ip: str, port: int, timeout: float = 3.0) -> ModbusTcpClient:
"""创建Modbus TCP客户端"""
return ModbusTcpClient(host=ip, port=port, timeout=timeout)
def _get_modbus_instance(self, name: str):
"""获取配置的Modbus实例"""
if name not in self._modbus_instances:
raise Exception(f"Modbus实例 '{name}' 未配置")
return self._modbus_instances[name]
def configure_modbus_instance(self, name: str, ip: str, port: int, slave_id: int, type_str: str = '4x'):
"""配置Modbus实例"""
self._modbus_instances[name] = {
'ip': ip,
'port': port,
'slave_id': slave_id,
'type_str': type_str
}
logger.debug(f"配置Modbus实例: {name} -> IP={ip}, Port={port}, SlaveID={slave_id}, Type={type_str}")
def read_coil_status(self, ip: str, port: int, slave_id: int, offset: int) -> bool | None:
"""
Modbus 读取线圈量功能码 01
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 地址
Returns:
bool | None: 读取成功返回布尔值失败返回None
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
result = client.read_coils(offset, count=1, slave=slave_id)
if result.isError():
logger.error(f"读取线圈状态失败: {result}")
return None
logger.debug(f"读取线圈状态成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Value={result.bits[0]}")
return result.bits[0]
except Exception as e:
logger.error(f"读取线圈状态异常: {e}")
return None
finally:
if client:
client.close()
def read_input_status(self, ip: str, port: int, slave_id: int, offset: int) -> bool | None:
"""
Modbus 读取离散输入功能码 02
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 地址
Returns:
bool | None: 读取成功返回布尔值失败返回None
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
result = client.read_discrete_inputs(offset, count=1, slave=slave_id)
if result.isError():
logger.error(f"读取离散输入失败: {result}")
return None
logger.debug(f"读取离散输入成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Value={result.bits[0]}")
return result.bits[0]
except Exception as e:
logger.error(f"读取离散输入异常: {e}")
return None
finally:
if client:
client.close()
def read_holding_register(self, ip: str, port: int, slave_id: int, offset: int, data_type: int) -> int | None:
"""
Modbus 读取保持寄存器功能码 03
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 地址
data_type: 数据类型2无符号整形3有符号整形
Returns:
int | None: 读取成功返回整数值失败返回None
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
result = client.read_holding_registers(offset, count=1, slave=slave_id)
if result.isError():
logger.error(f"读取保持寄存器失败: {result}")
return None
value = result.registers[0]
# 根据数据类型进行转换
if data_type == 3: # 有符号整形
if value > 32767:
value = value - 65536
logger.debug(f"读取保持寄存器成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Value={value}")
return value
except Exception as e:
logger.error(f"读取保持寄存器异常: {e}")
return None
finally:
if client:
client.close()
def read_input_register(self, ip: str, port: int, slave_id: int, offset: int, data_type: int) -> int | None:
"""
Modbus 读取输入寄存器功能码 04
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 地址
data_type: 数据类型2无符号整形3有符号整形
Returns:
int | None: 读取成功返回整数值失败返回None
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
result = client.read_input_registers(offset, count=1, slave=slave_id)
if result.isError():
logger.error(f"读取输入寄存器失败: {result}")
return None
value = result.registers[0]
# 根据数据类型进行转换
if data_type == 3: # 有符号整形
if value > 32767:
value = value - 65536
logger.debug(f"读取输入寄存器成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Value={value}")
return value
except Exception as e:
logger.error(f"读取输入寄存器异常: {e}")
return None
finally:
if client:
client.close()
def batch_read_coil_status(self, ip: str, port: int, slave_id: int, offset: int, length: int) -> list[bool] | None:
"""
Modbus 批量读取线圈量功能码 01
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 首地址
length: Modbus 地址数量
Returns:
list[bool] | None: 读取成功返回布尔值列表失败返回None
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
result = client.read_coils(offset, count=length, slave=slave_id)
if result.isError():
logger.error(f"批量读取线圈状态失败: {result}")
return None
values = result.bits[:length] # 只返回请求的数量
logger.debug(f"批量读取线圈状态成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Length={length}")
return values
except Exception as e:
logger.error(f"批量读取线圈状态异常: {e}")
return None
finally:
if client:
client.close()
def batch_read_input_status(self, ip: str, port: int, slave_id: int, offset: int, length: int) -> list[bool] | None:
"""
Modbus 批量读取离散输入功能码 02
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 首地址
length: Modbus 地址数量
Returns:
list[bool] | None: 读取成功返回布尔值列表失败返回None
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
result = client.read_discrete_inputs(offset, count=length, slave=slave_id)
if result.isError():
logger.error(f"批量读取离散输入失败: {result}")
return None
values = result.bits[:length] # 只返回请求的数量
logger.debug(f"批量读取离散输入成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Length={length}")
return values
except Exception as e:
logger.error(f"批量读取离散输入异常: {e}")
return None
finally:
if client:
client.close()
def batch_read_holding_registers(self, ip: str, port: int, slave_id: int, offset: int, length: int) -> list[int] | None:
"""
Modbus 批量读取保持寄存器功能码 03
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 首地址
length: Modbus 地址数量
Returns:
list[int] | None: 读取成功返回整数值列表失败返回None
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
result = client.read_holding_registers(offset, count=length, slave=slave_id)
if result.isError():
logger.error(f"批量读取保持寄存器失败: {result}")
return None
values = list(result.registers)
logger.debug(f"批量读取保持寄存器成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Length={length}")
return values
except Exception as e:
logger.error(f"批量读取保持寄存器异常: {e}")
return None
finally:
if client:
client.close()
def batch_read_input_registers(self, ip: str, port: int, slave_id: int, offset: int, length: int) -> list[int] | None:
"""
Modbus 批量读取输入寄存器功能码 04
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 首地址
length: Modbus 地址数量
Returns:
list[int] | None: 读取成功返回整数值列表失败返回None
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
result = client.read_input_registers(offset, count=length, slave=slave_id)
if result.isError():
logger.error(f"批量读取输入寄存器失败: {result}")
return None
values = list(result.registers)
logger.debug(f"批量读取输入寄存器成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Length={length}")
return values
except Exception as e:
logger.error(f"批量读取输入寄存器异常: {e}")
return None
finally:
if client:
client.close()
def write_coil_status(self, ip: str, port: int, slave_id: int, offset: int, value: bool) -> bool:
"""
Modbus 写入线圈量功能码 05
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 地址
value: 写入的值
Returns:
bool: 写入成功返回True失败返回False
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
result = client.write_coil(offset, value, slave=slave_id)
if result.isError():
logger.error(f"写入线圈状态失败: {result}")
return False
logger.debug(f"写入线圈状态成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Value={value}")
return True
except Exception as e:
logger.error(f"写入线圈状态异常: {e}")
return False
finally:
if client:
client.close()
def write_holding_register(self, ip: str, port: int, slave_id: int, offset: int, data_type: int, value: int) -> bool:
"""
Modbus 写入保持寄存器功能码 06
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 地址
data_type: 数据类型2无符号整形3有符号整形
value: 写入的值
Returns:
bool: 写入成功返回True失败返回False
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
# 根据数据类型进行转换
write_value = value
if data_type == 3 and value < 0: # 有符号整形且为负数
write_value = value + 65536
result = client.write_register(offset, write_value, slave=slave_id)
if result.isError():
logger.error(f"写入保持寄存器失败: {result}")
return False
logger.debug(f"写入保持寄存器成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Value={value}")
return True
except Exception as e:
logger.error(f"写入保持寄存器异常: {e}")
return False
finally:
if client:
client.close()
def batch_write_coil_status(self, ip: str, port: int, slave_id: int, offset: int, values: list[bool]) -> bool:
"""
Modbus 批量写入线圈量功能码 0f
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 地址
values: 多个需写入的值
Returns:
bool: 写入成功返回True失败返回False
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
result = client.write_coils(offset, values, slave=slave_id)
if result.isError():
logger.error(f"批量写入线圈状态失败: {result}")
return False
logger.debug(f"批量写入线圈状态成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Count={len(values)}")
return True
except Exception as e:
logger.error(f"批量写入线圈状态异常: {e}")
return False
finally:
if client:
client.close()
def batch_write_holding_register(self, ip: str, port: int, slave_id: int, offset: int, values: list[int]) -> bool:
"""
Modbus 批量写入保持寄存器功能码 10
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
offset: Modbus 地址
values: 多个需写入的值
Returns:
bool: 写入成功返回True失败返回False
"""
client = None
try:
client = self._create_client(ip, port)
client.connect()
result = client.write_registers(offset, values, slave=slave_id)
if result.isError():
logger.error(f"批量写入保持寄存器失败: {result}")
return False
logger.debug(f"批量写入保持寄存器成功: IP={ip}, Port={port}, SlaveID={slave_id}, Offset={offset}, Count={len(values)}")
return True
except Exception as e:
logger.error(f"批量写入保持寄存器异常: {e}")
return False
finally:
if client:
client.close()
def write_single_modbus_value(self, ip: str, port: int, slave_id: int, type_str: str, offset: int, value: int) -> bool:
"""
Modbus 通用写入单个地址
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
type_str: 地址类型可选择'0x''1x''3x''4x'
offset: Modbus 地址
value: 写入的值
Returns:
bool: 写入成功返回True失败返回False
"""
try:
if type_str == '0x': # 线圈,只支持写入
return self.write_coil_status(ip, port, slave_id, offset, bool(value))
elif type_str == '1x': # 离散输入,不支持写入
logger.error(f"离散输入(1x)不支持写入操作")
return False
elif type_str == '3x': # 输入寄存器,不支持写入
logger.error(f"输入寄存器(3x)不支持写入操作")
return False
elif type_str == '4x': # 保持寄存器
return self.write_holding_register(ip, port, slave_id, offset, 2, value) # 默认无符号
else:
logger.error(f"不支持的地址类型: {type_str}")
return False
except Exception as e:
logger.error(f"通用写入单个地址异常: {e}")
return False
def write_batch_modbus_value(self, ip: str, port: int, slave_id: int, type_str: str, offset: int, values: list[int]) -> bool:
"""
Modbus 通用写入多个地址
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
type_str: 地址类型可选择'0x''1x''3x''4x'
offset: Modbus 地址
values: 写入的值
Returns:
bool: 写入成功返回True失败返回False
"""
try:
if type_str == '0x': # 线圈,只支持写入
bool_values = [bool(v) for v in values]
return self.batch_write_coil_status(ip, port, slave_id, offset, bool_values)
elif type_str == '1x': # 离散输入,不支持写入
logger.error(f"离散输入(1x)不支持写入操作")
return False
elif type_str == '3x': # 输入寄存器,不支持写入
logger.error(f"输入寄存器(3x)不支持写入操作")
return False
elif type_str == '4x': # 保持寄存器
return self.batch_write_holding_register(ip, port, slave_id, offset, values)
else:
logger.error(f"不支持的地址类型: {type_str}")
return False
except Exception as e:
logger.error(f"通用写入多个地址异常: {e}")
return False
def write_single_modbus_value_by_instance_name(self, name: str, offset: int, value: int) -> bool:
"""
通过实例名称写入单个地址
Args:
name: 已经配置的实例名称
offset: Modbus 地址
value: 写入的值
Returns:
bool: 写入成功返回True失败返回False
Raises:
Exception: 如果没有配置 modbus 实例则会抛出初始化异常
"""
try:
instance = self._get_modbus_instance(name)
return self.write_single_modbus_value(
instance['ip'],
instance['port'],
instance['slave_id'],
instance['type_str'],
offset,
value
)
except Exception as e:
logger.error(f"通过实例名称写入单个地址失败: {e}")
raise e
def write_batch_modbus_value_by_instance_name(self, name: str, offset: int, values: list[int]) -> bool:
"""
通过实例名称写入多个地址
Args:
name: 已经配置的实例名称
offset: Modbus 地址
values: 写入的值
Returns:
bool: 写入成功返回True失败返回False
Raises:
Exception: 如果没有配置 modbus 实例则会抛出初始化异常
"""
try:
instance = self._get_modbus_instance(name)
return self.write_batch_modbus_value(
instance['ip'],
instance['port'],
instance['slave_id'],
instance['type_str'],
offset,
values
)
except Exception as e:
logger.error(f"通过实例名称写入多个地址失败: {e}")
raise e
def read_single_modbus_value(self, ip: str, port: int, slave_id: int, type_str: str, offset: int) -> int | None:
"""
Modbus 通用读取单个地址
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
type_str: 地址类型可选择'0x''1x''3x''4x'
offset: Modbus 地址
Returns:
int | None: 读取成功返回整数值失败返回None
"""
try:
if type_str == '0x': # 线圈
result = self.read_coil_status(ip, port, slave_id, offset)
return 1 if result else 0 if result is not None else None
elif type_str == '1x': # 离散输入
result = self.read_input_status(ip, port, slave_id, offset)
return 1 if result else 0 if result is not None else None
elif type_str == '3x': # 输入寄存器
return self.read_input_register(ip, port, slave_id, offset, 2) # 默认无符号
elif type_str == '4x': # 保持寄存器
return self.read_holding_register(ip, port, slave_id, offset, 2) # 默认无符号
else:
logger.error(f"不支持的地址类型: {type_str}")
return None
except Exception as e:
logger.error(f"通用读取单个地址异常: {e}")
return None
def read_batch_modbus_value(self, ip: str, port: int, slave_id: int, type_str: str, offset: int, length: int) -> list[int] | None:
"""
Modbus 通用读取多个地址
Args:
ip: 从机 IP
port: 从机端口
slave_id: 从机 slave ID
type_str: 地址类型可选择'0x''1x''3x''4x'
offset: Modbus 地址
length: 读取的数据长度
Returns:
list[int] | None: 读取成功返回整数值列表失败返回None
"""
try:
if type_str == '0x': # 线圈
result = self.batch_read_coil_status(ip, port, slave_id, offset, length)
return [1 if b else 0 for b in result] if result is not None else None
elif type_str == '1x': # 离散输入
result = self.batch_read_input_status(ip, port, slave_id, offset, length)
return [1 if b else 0 for b in result] if result is not None else None
elif type_str == '3x': # 输入寄存器
return self.batch_read_input_registers(ip, port, slave_id, offset, length)
elif type_str == '4x': # 保持寄存器
return self.batch_read_holding_registers(ip, port, slave_id, offset, length)
else:
logger.error(f"不支持的地址类型: {type_str}")
return None
except Exception as e:
logger.error(f"通用读取多个地址异常: {e}")
return None
def read_single_modbus_value_by_instance_name(self, name: str, offset: int) -> int | None:
"""
通过实例名称读取单个地址
Args:
name: 配置的唯一实例名称
offset: Modbus 地址
Returns:
int | None: 读取成功返回整数值失败返回None
Raises:
Exception: 如果没有配置 modbus 实例则会抛出初始化异常
"""
try:
instance = self._get_modbus_instance(name)
return self.read_single_modbus_value(
instance['ip'],
instance['port'],
instance['slave_id'],
instance['type_str'],
offset
)
except Exception as e:
logger.error(f"通过实例名称读取单个地址失败: {e}")
raise e
def read_batch_modbus_value_by_instance_name(self, name: str, offset: int, length: int) -> list[int] | None:
"""
通过实例名称读取多个地址
Args:
name: 配置的唯一实例名称
offset: Modbus 地址
length: 读取的数据长度
Returns:
list[int] | None: 读取成功返回整数值列表失败返回None
Raises:
Exception: 如果没有配置 modbus 实例则会抛出初始化异常
"""
try:
instance = self._get_modbus_instance(name)
return self.read_batch_modbus_value(
instance['ip'],
instance['port'],
instance['slave_id'],
instance['type_str'],
offset,
length
)
except Exception as e:
logger.error(f"通过实例名称读取多个地址失败: {e}")
raise e