#!/usr/bin/env python # -*- coding: utf-8 -*- """ VWED.melsec 模块 - Melsec PLC通信相关功能 """ from typing import Optional, Union from utils.logger import get_logger import pymcprotocol class VWEDMelsecModule: """VWED.melsec 模块 - Melsec PLC通信相关功能""" def __init__(self, script_id: str): self.script_id = script_id self.logger = get_logger(f"script.{script_id}.melsec") def read_boolean(self, ip: str, port: int, address: str) -> Optional[bool]: """Melsec 读取 boolean Args: ip (str): 从机 IP port (int): 从机端口 address (str): 读取的地址位 Returns: Optional[bool]: 读取成功的返回值,失败返回 None """ try: # 创建Melsec连接 plc = pymcprotocol.Type3E() plc.setaccessopt(commtype="binary") plc.connect(ip, port) # 读取布尔值 result = plc.batchread_bitunits(address, 1) plc.close() if result and len(result) > 0: value = bool(result[0]) self.logger.info(f"读取Melsec boolean成功: {ip}:{port} 地址: {address} 值: {value}") return value else: self.logger.warning(f"读取Melsec boolean无数据: {ip}:{port} 地址: {address}") return None except Exception as e: self.logger.error(f"读取Melsec boolean失败: {ip}:{port} 地址: {address} 错误: {e}") return None def read_number(self, ip: str, port: int, address: str) -> Optional[float]: """Melsec 读取 number Args: ip (str): 从机 IP port (int): 从机端口 address (str): 读取的地址位 Returns: Optional[float]: 读取成功的返回值,失败返回 None """ try: # 创建Melsec连接 plc = pymcprotocol.Type3E() plc.setaccessopt(commtype="binary") plc.connect(ip, port) # 读取数值(假设为16位整数) result = plc.batchread_wordunits(address, 1) plc.close() if result and len(result) > 0: value = float(result[0]) self.logger.info(f"读取Melsec number成功: {ip}:{port} 地址: {address} 值: {value}") return value else: self.logger.warning(f"读取Melsec number无数据: {ip}:{port} 地址: {address}") return None except Exception as e: self.logger.error(f"读取Melsec number失败: {ip}:{port} 地址: {address} 错误: {e}") return None def read_string(self, ip: str, port: int, address: str, length: int) -> Optional[str]: """Melsec 读取 string Args: ip (str): 从机 IP port (int): 从机端口 address (str): 读取的地址位 length (int): 读取的长度 Returns: Optional[str]: 读取成功的返回值,失败返回 None """ try: # 创建Melsec连接 plc = pymcprotocol.Type3E() plc.setaccessopt(commtype="binary") plc.connect(ip, port) # 读取字符串(每个字符占用2字节) word_count = (length + 1) // 2 # 计算需要读取的字数 result = plc.batchread_wordunits(address, word_count) plc.close() if result and len(result) > 0: # 将字数据转换为字符串 byte_data = [] for word in result: byte_data.extend([word & 0xFF, (word >> 8) & 0xFF]) # 转换为字符串并去除空字符 value = bytes(byte_data[:length]).decode('ascii', errors='ignore').rstrip('\x00') self.logger.info(f"读取Melsec string成功: {ip}:{port} 地址: {address} 长度: {length} 值: {value}") return value else: self.logger.warning(f"读取Melsec string无数据: {ip}:{port} 地址: {address}") return None except Exception as e: self.logger.error(f"读取Melsec string失败: {ip}:{port} 地址: {address} 错误: {e}") return None def write_boolean(self, ip: str, port: int, address: str, value: bool) -> bool: """Melsec 写入 boolean Args: ip (str): 从机 IP port (int): 从机端口 address (str): 写入的地址位 value (bool): 写入的值 Returns: bool: 成功返回 True,失败返回 False """ try: # 创建Melsec连接 plc = pymcprotocol.Type3E() plc.setaccessopt(commtype="binary") plc.connect(ip, port) # 写入布尔值 plc.batchwrite_bitunits(address, [1 if value else 0]) plc.close() self.logger.info(f"写入Melsec boolean成功: {ip}:{port} 地址: {address} 值: {value}") return True except Exception as e: self.logger.error(f"写入Melsec boolean失败: {ip}:{port} 地址: {address} 值: {value} 错误: {e}") return False def write_number(self, ip: str, port: int, address: str, value: float) -> bool: """Melsec 写入 number Args: ip (str): 从机 IP port (int): 从机端口 address (str): 写入的地址位 value (float): 写入的值 Returns: bool: 成功返回 True,失败返回 False """ try: # 创建Melsec连接 plc = pymcprotocol.Type3E() plc.setaccessopt(commtype="binary") plc.connect(ip, port) # 写入数值(转换为整数) int_value = int(value) plc.batchwrite_wordunits(address, [int_value]) plc.close() self.logger.info(f"写入Melsec number成功: {ip}:{port} 地址: {address} 值: {value}") return True except Exception as e: self.logger.error(f"写入Melsec number失败: {ip}:{port} 地址: {address} 值: {value} 错误: {e}") return False def write_string(self, ip: str, port: int, address: str, value: str) -> bool: """Melsec 写入 string Args: ip (str): 从机 IP port (int): 从机端口 address (str): 写入的地址位 value (str): 写入的值 Returns: bool: 成功返回 True,失败返回 False """ try: # 创建Melsec连接 plc = pymcprotocol.Type3E() plc.setaccessopt(commtype="binary") plc.connect(ip, port) # 准备字符串数据 byte_data = value.encode('ascii')[:254] # 限制长度并编码为ASCII # 补齐到偶数长度 if len(byte_data) % 2 == 1: byte_data += b'\x00' # 转换为字数据 word_data = [] for i in range(0, len(byte_data), 2): word = byte_data[i] | (byte_data[i+1] << 8) word_data.append(word) # 写入字符串 plc.batchwrite_wordunits(address, word_data) plc.close() self.logger.info(f"写入Melsec string成功: {ip}:{port} 地址: {address} 值: {value}") return True except Exception as e: self.logger.error(f"写入Melsec string失败: {ip}:{port} 地址: {address} 值: {value} 错误: {e}") return False