// modbus_manager.ts - Modbus TCP 管理器,支持写操作高优先级 import Modbus from "npm:jsmodbus@4.0.10"; import net from "node:net"; type ReadJob = { fn: "readHoldingRegisters" | "readInputRegisters" | "readCoils" | "readDiscreteInputs"; start: number; len: number; resolve: (vals: number[]) => void; reject: (err: Error) => void; }; type WriteJob = { fn: "writeSingleRegister" | "writeMultipleRegisters"; start: number; payload: number | number[]; resolve: () => void; reject: (err: Error) => void; }; export class ModbusManager { private socket!: net.Socket; private client!: any; // 双队列实现写操作高优先级 private writeQueue: WriteJob[] = []; private readQueue: ReadJob[] = []; private reconnectTimer?: number; private isConnected = false; private isRunning = false; constructor( private host: string, private port: number, private unitId: number, private logger: (msg: string) => void ) {} async init() { this.logger("Initializing ModbusManager..."); await this.connect(); this.runLoop(); this.logger("ModbusManager initialized successfully"); } private connect(): Promise { return new Promise((resolve, reject) => { try { this.socket = new net.Socket(); this.client = new Modbus.client.TCP(this.socket, this.unitId); // 设置socket选项 this.socket.setKeepAlive(true, 60000); this.socket.setTimeout(5000); const cleanup = () => { this.socket.off("error", onError); this.socket.off("close", onClose); this.socket.off("timeout", onTimeout); }; const onError = (error: Error) => { cleanup(); this.isConnected = false; this.logger(`Modbus socket error: ${error.message}`); reject(error); this.scheduleReconnect(); }; const onClose = () => { cleanup(); this.isConnected = false; this.logger("Modbus socket closed"); this.scheduleReconnect(); }; const onTimeout = () => { cleanup(); this.isConnected = false; this.logger("Modbus socket timeout"); this.socket.destroy(); this.scheduleReconnect(); }; this.socket.once("connect", () => { cleanup(); this.isConnected = true; this.logger(`Modbus connected to ${this.host}:${this.port}, unitId: ${this.unitId}`); resolve(); }); this.socket.once("error", onError); this.socket.once("close", onClose); this.socket.once("timeout", onTimeout); // 连接到Modbus设备 this.socket.connect({ host: this.host, port: this.port }); } catch (error) { this.logger(`Failed to create Modbus connection: ${(error as Error).message}`); reject(error); } }); } private scheduleReconnect() { if (this.reconnectTimer) return; this.reconnectTimer = setTimeout(async () => { this.reconnectTimer = undefined; this.logger("Attempting Modbus reconnect..."); try { await this.connect(); this.logger("Modbus reconnected successfully"); } catch (error) { this.logger(`Modbus reconnect failed: ${(error as Error).message}`); // 会继续重试 } }, 3000); } enqueueRead( fn: ReadJob["fn"], start: number, len: number ): Promise { return new Promise((resolve, reject) => { this.readQueue.push({ fn, start, len, resolve, reject }); }); } enqueueWrite( fn: WriteJob["fn"], start: number, payload: number | number[] ): Promise { return new Promise((resolve, reject) => { this.writeQueue.push({ fn, start, payload, resolve, reject }); }); } private async runLoop() { if (this.isRunning) return; this.isRunning = true; this.logger("Starting Modbus job processing loop"); while (this.isRunning) { let job: ReadJob | WriteJob | undefined; // 写操作高优先级:永远先检查写队列 if (this.writeQueue.length > 0) { job = this.writeQueue.shift(); } else if (this.readQueue.length > 0) { job = this.readQueue.shift(); } if (!job) { // 没有任务时短暂休眠 await new Promise((resolve) => setTimeout(resolve, 10)); continue; } // 检查连接状态 if (!this.isConnected) { job.reject(new Error("Modbus not connected")); continue; } try { if ("len" in job) { // 读操作 const response = await this.client[job.fn](job.start, job.len); const values = Array.from(response.response.body.values as number[]); job.resolve(values); this.logger(`Read ${job.fn} at ${job.start}, length ${job.len}: [${values.join(', ')}]`); } else { // 写操作 await this.client[job.fn](job.start, job.payload); job.resolve(); this.logger(`Write ${job.fn} at ${job.start}, payload: ${Array.isArray(job.payload) ? `[${job.payload.join(', ')}]` : job.payload}`); } } catch (error) { const errorMsg = (error as Error).message; this.logger(`Modbus operation failed: ${errorMsg}`); job.reject(error as Error); // 如果是连接相关错误,标记连接断开 if (errorMsg.includes("ECONNRESET") || errorMsg.includes("ENOTCONN") || errorMsg.includes("ETIMEDOUT")) { this.isConnected = false; this.scheduleReconnect(); } } // 操作间隔,避免过于频繁的请求 await new Promise((resolve) => setTimeout(resolve, 5)); } this.logger("Modbus job processing loop stopped"); } getQueueStatus() { return { writeQueue: this.writeQueue.length, readQueue: this.readQueue.length, isConnected: this.isConnected }; } async close() { this.logger("Closing ModbusManager..."); this.isRunning = false; // 清空队列,拒绝所有等待的任务 [...this.writeQueue, ...this.readQueue].forEach(job => { job.reject(new Error("ModbusManager is closing")); }); this.writeQueue = []; this.readQueue = []; // 清理重连定时器 if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = undefined; } // 关闭socket连接 if (this.socket) { try { this.socket.destroy(); this.logger("Modbus socket closed"); } catch (error) { this.logger(`Error closing Modbus socket: ${(error as Error).message}`); } } this.isConnected = false; this.logger("ModbusManager closed"); } }