240 lines
6.7 KiB
TypeScript
240 lines
6.7 KiB
TypeScript
// modbus_manager.ts - Modbus TCP 管理器,支持写操作高优先级
|
|
import Modbus from "npm:jsmodbus@4.0.10";
|
|
import net from "node:net";
|
|
|
|
type ReadJob = {
|
|
fn: "readHoldingRegisters" | "readInputRegisters" | "readCoils" | "readDiscreteInputs";
|
|
start: number;
|
|
len: number;
|
|
resolve: (vals: number[]) => void;
|
|
reject: (err: Error) => void;
|
|
};
|
|
|
|
type WriteJob = {
|
|
fn: "writeSingleRegister" | "writeMultipleRegisters";
|
|
start: number;
|
|
payload: number | number[];
|
|
resolve: () => void;
|
|
reject: (err: Error) => void;
|
|
};
|
|
|
|
export class ModbusManager {
|
|
private socket!: net.Socket;
|
|
private client!: any;
|
|
|
|
// 双队列实现写操作高优先级
|
|
private writeQueue: WriteJob[] = [];
|
|
private readQueue: ReadJob[] = [];
|
|
|
|
private reconnectTimer?: number;
|
|
private isConnected = false;
|
|
private isRunning = false;
|
|
|
|
constructor(
|
|
private host: string,
|
|
private port: number,
|
|
private unitId: number,
|
|
private logger: (msg: string) => void
|
|
) {}
|
|
|
|
async init() {
|
|
this.logger("Initializing ModbusManager...");
|
|
await this.connect();
|
|
this.runLoop();
|
|
this.logger("ModbusManager initialized successfully");
|
|
}
|
|
|
|
private connect(): Promise<void> {
|
|
return new Promise((resolve, reject) => {
|
|
try {
|
|
this.socket = new net.Socket();
|
|
this.client = new Modbus.client.TCP(this.socket, this.unitId);
|
|
|
|
// 设置socket选项
|
|
this.socket.setKeepAlive(true, 60000);
|
|
this.socket.setTimeout(5000);
|
|
|
|
const cleanup = () => {
|
|
this.socket.off("error", onError);
|
|
this.socket.off("close", onClose);
|
|
this.socket.off("timeout", onTimeout);
|
|
};
|
|
|
|
const onError = (error: Error) => {
|
|
cleanup();
|
|
this.isConnected = false;
|
|
this.logger(`Modbus socket error: ${error.message}`);
|
|
reject(error);
|
|
this.scheduleReconnect();
|
|
};
|
|
|
|
const onClose = () => {
|
|
cleanup();
|
|
this.isConnected = false;
|
|
this.logger("Modbus socket closed");
|
|
this.scheduleReconnect();
|
|
};
|
|
|
|
const onTimeout = () => {
|
|
cleanup();
|
|
this.isConnected = false;
|
|
this.logger("Modbus socket timeout");
|
|
this.socket.destroy();
|
|
this.scheduleReconnect();
|
|
};
|
|
|
|
this.socket.once("connect", () => {
|
|
cleanup();
|
|
this.isConnected = true;
|
|
this.logger(`Modbus connected to ${this.host}:${this.port}, unitId: ${this.unitId}`);
|
|
resolve();
|
|
});
|
|
|
|
this.socket.once("error", onError);
|
|
this.socket.once("close", onClose);
|
|
this.socket.once("timeout", onTimeout);
|
|
|
|
// 连接到Modbus设备
|
|
this.socket.connect({ host: this.host, port: this.port });
|
|
} catch (error) {
|
|
this.logger(`Failed to create Modbus connection: ${(error as Error).message}`);
|
|
reject(error);
|
|
}
|
|
});
|
|
}
|
|
|
|
private scheduleReconnect() {
|
|
if (this.reconnectTimer) return;
|
|
|
|
this.reconnectTimer = setTimeout(async () => {
|
|
this.reconnectTimer = undefined;
|
|
this.logger("Attempting Modbus reconnect...");
|
|
try {
|
|
await this.connect();
|
|
this.logger("Modbus reconnected successfully");
|
|
} catch (error) {
|
|
this.logger(`Modbus reconnect failed: ${(error as Error).message}`);
|
|
// 会继续重试
|
|
}
|
|
}, 3000);
|
|
}
|
|
|
|
enqueueRead(
|
|
fn: ReadJob["fn"],
|
|
start: number,
|
|
len: number
|
|
): Promise<number[]> {
|
|
return new Promise((resolve, reject) => {
|
|
this.readQueue.push({ fn, start, len, resolve, reject });
|
|
});
|
|
}
|
|
|
|
enqueueWrite(
|
|
fn: WriteJob["fn"],
|
|
start: number,
|
|
payload: number | number[]
|
|
): Promise<void> {
|
|
return new Promise((resolve, reject) => {
|
|
this.writeQueue.push({ fn, start, payload, resolve, reject });
|
|
});
|
|
}
|
|
|
|
private async runLoop() {
|
|
if (this.isRunning) return;
|
|
this.isRunning = true;
|
|
|
|
this.logger("Starting Modbus job processing loop");
|
|
|
|
while (this.isRunning) {
|
|
let job: ReadJob | WriteJob | undefined;
|
|
|
|
// 写操作高优先级:永远先检查写队列
|
|
if (this.writeQueue.length > 0) {
|
|
job = this.writeQueue.shift();
|
|
} else if (this.readQueue.length > 0) {
|
|
job = this.readQueue.shift();
|
|
}
|
|
|
|
if (!job) {
|
|
// 没有任务时短暂休眠
|
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
|
continue;
|
|
}
|
|
|
|
// 检查连接状态
|
|
if (!this.isConnected) {
|
|
job.reject(new Error("Modbus not connected"));
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
if ("len" in job) {
|
|
// 读操作
|
|
const response = await this.client[job.fn](job.start, job.len);
|
|
const values = Array.from(response.response.body.values as number[]);
|
|
job.resolve(values);
|
|
this.logger(`Read ${job.fn} at ${job.start}, length ${job.len}: [${values.join(', ')}]`);
|
|
} else {
|
|
// 写操作
|
|
await this.client[job.fn](job.start, job.payload);
|
|
job.resolve();
|
|
this.logger(`Write ${job.fn} at ${job.start}, payload: ${Array.isArray(job.payload) ? `[${job.payload.join(', ')}]` : job.payload}`);
|
|
}
|
|
} catch (error) {
|
|
const errorMsg = (error as Error).message;
|
|
this.logger(`Modbus operation failed: ${errorMsg}`);
|
|
job.reject(error as Error);
|
|
|
|
// 如果是连接相关错误,标记连接断开
|
|
if (errorMsg.includes("ECONNRESET") || errorMsg.includes("ENOTCONN") || errorMsg.includes("ETIMEDOUT")) {
|
|
this.isConnected = false;
|
|
this.scheduleReconnect();
|
|
}
|
|
}
|
|
|
|
// 操作间隔,避免过于频繁的请求
|
|
await new Promise((resolve) => setTimeout(resolve, 5));
|
|
}
|
|
|
|
this.logger("Modbus job processing loop stopped");
|
|
}
|
|
|
|
getQueueStatus() {
|
|
return {
|
|
writeQueue: this.writeQueue.length,
|
|
readQueue: this.readQueue.length,
|
|
isConnected: this.isConnected
|
|
};
|
|
}
|
|
|
|
async close() {
|
|
this.logger("Closing ModbusManager...");
|
|
this.isRunning = false;
|
|
|
|
// 清空队列,拒绝所有等待的任务
|
|
[...this.writeQueue, ...this.readQueue].forEach(job => {
|
|
job.reject(new Error("ModbusManager is closing"));
|
|
});
|
|
this.writeQueue = [];
|
|
this.readQueue = [];
|
|
|
|
// 清理重连定时器
|
|
if (this.reconnectTimer) {
|
|
clearTimeout(this.reconnectTimer);
|
|
this.reconnectTimer = undefined;
|
|
}
|
|
|
|
// 关闭socket连接
|
|
if (this.socket) {
|
|
try {
|
|
this.socket.destroy();
|
|
this.logger("Modbus socket closed");
|
|
} catch (error) {
|
|
this.logger(`Error closing Modbus socket: ${(error as Error).message}`);
|
|
}
|
|
}
|
|
|
|
this.isConnected = false;
|
|
this.logger("ModbusManager closed");
|
|
}
|
|
}
|