233 lines
8.1 KiB
Python
Raw Normal View History

2025-09-29 09:35:08 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
VWED.melsec 模块 - Melsec PLC通信相关功能
"""
from typing import Optional, Union
from utils.logger import get_logger
import pymcprotocol
class VWEDMelsecModule:
"""VWED.melsec 模块 - Melsec PLC通信相关功能"""
def __init__(self, script_id: str):
self.script_id = script_id
self.logger = get_logger(f"script.{script_id}.melsec")
def read_boolean(self, ip: str, port: int, address: str) -> Optional[bool]:
"""Melsec 读取 boolean
Args:
ip (str): 从机 IP
port (int): 从机端口
address (str): 读取的地址位
Returns:
Optional[bool]: 读取成功的返回值失败返回 None
"""
try:
# 创建Melsec连接
plc = pymcprotocol.Type3E()
plc.setaccessopt(commtype="binary")
plc.connect(ip, port)
# 读取布尔值
result = plc.batchread_bitunits(address, 1)
plc.close()
if result and len(result) > 0:
value = bool(result[0])
self.logger.info(f"读取Melsec boolean成功: {ip}:{port} 地址: {address} 值: {value}")
return value
else:
self.logger.warning(f"读取Melsec boolean无数据: {ip}:{port} 地址: {address}")
return None
except Exception as e:
self.logger.error(f"读取Melsec boolean失败: {ip}:{port} 地址: {address} 错误: {e}")
return None
def read_number(self, ip: str, port: int, address: str) -> Optional[float]:
"""Melsec 读取 number
Args:
ip (str): 从机 IP
port (int): 从机端口
address (str): 读取的地址位
Returns:
Optional[float]: 读取成功的返回值失败返回 None
"""
try:
# 创建Melsec连接
plc = pymcprotocol.Type3E()
plc.setaccessopt(commtype="binary")
plc.connect(ip, port)
# 读取数值假设为16位整数
result = plc.batchread_wordunits(address, 1)
plc.close()
if result and len(result) > 0:
value = float(result[0])
self.logger.info(f"读取Melsec number成功: {ip}:{port} 地址: {address} 值: {value}")
return value
else:
self.logger.warning(f"读取Melsec number无数据: {ip}:{port} 地址: {address}")
return None
except Exception as e:
self.logger.error(f"读取Melsec number失败: {ip}:{port} 地址: {address} 错误: {e}")
return None
def read_string(self, ip: str, port: int, address: str, length: int) -> Optional[str]:
"""Melsec 读取 string
Args:
ip (str): 从机 IP
port (int): 从机端口
address (str): 读取的地址位
length (int): 读取的长度
Returns:
Optional[str]: 读取成功的返回值失败返回 None
"""
try:
# 创建Melsec连接
plc = pymcprotocol.Type3E()
plc.setaccessopt(commtype="binary")
plc.connect(ip, port)
# 读取字符串每个字符占用2字节
word_count = (length + 1) // 2 # 计算需要读取的字数
result = plc.batchread_wordunits(address, word_count)
plc.close()
if result and len(result) > 0:
# 将字数据转换为字符串
byte_data = []
for word in result:
byte_data.extend([word & 0xFF, (word >> 8) & 0xFF])
# 转换为字符串并去除空字符
value = bytes(byte_data[:length]).decode('ascii', errors='ignore').rstrip('\x00')
self.logger.info(f"读取Melsec string成功: {ip}:{port} 地址: {address} 长度: {length} 值: {value}")
return value
else:
self.logger.warning(f"读取Melsec string无数据: {ip}:{port} 地址: {address}")
return None
except Exception as e:
self.logger.error(f"读取Melsec string失败: {ip}:{port} 地址: {address} 错误: {e}")
return None
def write_boolean(self, ip: str, port: int, address: str, value: bool) -> bool:
"""Melsec 写入 boolean
Args:
ip (str): 从机 IP
port (int): 从机端口
address (str): 写入的地址位
value (bool): 写入的值
Returns:
bool: 成功返回 True失败返回 False
"""
try:
# 创建Melsec连接
plc = pymcprotocol.Type3E()
plc.setaccessopt(commtype="binary")
plc.connect(ip, port)
# 写入布尔值
plc.batchwrite_bitunits(address, [1 if value else 0])
plc.close()
self.logger.info(f"写入Melsec boolean成功: {ip}:{port} 地址: {address} 值: {value}")
return True
except Exception as e:
self.logger.error(f"写入Melsec boolean失败: {ip}:{port} 地址: {address} 值: {value} 错误: {e}")
return False
def write_number(self, ip: str, port: int, address: str, value: float) -> bool:
"""Melsec 写入 number
Args:
ip (str): 从机 IP
port (int): 从机端口
address (str): 写入的地址位
value (float): 写入的值
Returns:
bool: 成功返回 True失败返回 False
"""
try:
# 创建Melsec连接
plc = pymcprotocol.Type3E()
plc.setaccessopt(commtype="binary")
plc.connect(ip, port)
# 写入数值(转换为整数)
int_value = int(value)
plc.batchwrite_wordunits(address, [int_value])
plc.close()
self.logger.info(f"写入Melsec number成功: {ip}:{port} 地址: {address} 值: {value}")
return True
except Exception as e:
self.logger.error(f"写入Melsec number失败: {ip}:{port} 地址: {address} 值: {value} 错误: {e}")
return False
def write_string(self, ip: str, port: int, address: str, value: str) -> bool:
"""Melsec 写入 string
Args:
ip (str): 从机 IP
port (int): 从机端口
address (str): 写入的地址位
value (str): 写入的值
Returns:
bool: 成功返回 True失败返回 False
"""
try:
# 创建Melsec连接
plc = pymcprotocol.Type3E()
plc.setaccessopt(commtype="binary")
plc.connect(ip, port)
# 准备字符串数据
byte_data = value.encode('ascii')[:254] # 限制长度并编码为ASCII
# 补齐到偶数长度
if len(byte_data) % 2 == 1:
byte_data += b'\x00'
# 转换为字数据
word_data = []
for i in range(0, len(byte_data), 2):
word = byte_data[i] | (byte_data[i+1] << 8)
word_data.append(word)
# 写入字符串
plc.batchwrite_wordunits(address, word_data)
plc.close()
self.logger.info(f"写入Melsec string成功: {ip}:{port} 地址: {address} 值: {value}")
return True
except Exception as e:
self.logger.error(f"写入Melsec string失败: {ip}:{port} 地址: {address} 值: {value} 错误: {e}")
return False