api-amr/modbus_manager.ts
2025-06-04 19:15:02 +08:00

240 lines
6.7 KiB
TypeScript

// modbus_manager.ts - Modbus TCP 管理器,支持写操作高优先级
import Modbus from "npm:jsmodbus@4.0.10";
import net from "node:net";
type ReadJob = {
fn: "readHoldingRegisters" | "readInputRegisters" | "readCoils" | "readDiscreteInputs";
start: number;
len: number;
resolve: (vals: number[]) => void;
reject: (err: Error) => void;
};
type WriteJob = {
fn: "writeSingleRegister" | "writeMultipleRegisters";
start: number;
payload: number | number[];
resolve: () => void;
reject: (err: Error) => void;
};
export class ModbusManager {
private socket!: net.Socket;
private client!: any;
// 双队列实现写操作高优先级
private writeQueue: WriteJob[] = [];
private readQueue: ReadJob[] = [];
private reconnectTimer?: number;
private isConnected = false;
private isRunning = false;
constructor(
private host: string,
private port: number,
private unitId: number,
private logger: (msg: string) => void
) {}
async init() {
this.logger("Initializing ModbusManager...");
await this.connect();
this.runLoop();
this.logger("ModbusManager initialized successfully");
}
private connect(): Promise<void> {
return new Promise((resolve, reject) => {
try {
this.socket = new net.Socket();
this.client = new Modbus.client.TCP(this.socket, this.unitId);
// 设置socket选项
this.socket.setKeepAlive(true, 60000);
this.socket.setTimeout(5000);
const cleanup = () => {
this.socket.off("error", onError);
this.socket.off("close", onClose);
this.socket.off("timeout", onTimeout);
};
const onError = (error: Error) => {
cleanup();
this.isConnected = false;
this.logger(`Modbus socket error: ${error.message}`);
reject(error);
this.scheduleReconnect();
};
const onClose = () => {
cleanup();
this.isConnected = false;
this.logger("Modbus socket closed");
this.scheduleReconnect();
};
const onTimeout = () => {
cleanup();
this.isConnected = false;
this.logger("Modbus socket timeout");
this.socket.destroy();
this.scheduleReconnect();
};
this.socket.once("connect", () => {
cleanup();
this.isConnected = true;
this.logger(`Modbus connected to ${this.host}:${this.port}, unitId: ${this.unitId}`);
resolve();
});
this.socket.once("error", onError);
this.socket.once("close", onClose);
this.socket.once("timeout", onTimeout);
// 连接到Modbus设备
this.socket.connect({ host: this.host, port: this.port });
} catch (error) {
this.logger(`Failed to create Modbus connection: ${(error as Error).message}`);
reject(error);
}
});
}
private scheduleReconnect() {
if (this.reconnectTimer) return;
this.reconnectTimer = setTimeout(async () => {
this.reconnectTimer = undefined;
this.logger("Attempting Modbus reconnect...");
try {
await this.connect();
this.logger("Modbus reconnected successfully");
} catch (error) {
this.logger(`Modbus reconnect failed: ${(error as Error).message}`);
// 会继续重试
}
}, 3000);
}
enqueueRead(
fn: ReadJob["fn"],
start: number,
len: number
): Promise<number[]> {
return new Promise((resolve, reject) => {
this.readQueue.push({ fn, start, len, resolve, reject });
});
}
enqueueWrite(
fn: WriteJob["fn"],
start: number,
payload: number | number[]
): Promise<void> {
return new Promise((resolve, reject) => {
this.writeQueue.push({ fn, start, payload, resolve, reject });
});
}
private async runLoop() {
if (this.isRunning) return;
this.isRunning = true;
this.logger("Starting Modbus job processing loop");
while (this.isRunning) {
let job: ReadJob | WriteJob | undefined;
// 写操作高优先级:永远先检查写队列
if (this.writeQueue.length > 0) {
job = this.writeQueue.shift();
} else if (this.readQueue.length > 0) {
job = this.readQueue.shift();
}
if (!job) {
// 没有任务时短暂休眠
await new Promise((resolve) => setTimeout(resolve, 10));
continue;
}
// 检查连接状态
if (!this.isConnected) {
job.reject(new Error("Modbus not connected"));
continue;
}
try {
if ("len" in job) {
// 读操作
const response = await this.client[job.fn](job.start, job.len);
const values = Array.from(response.response.body.values as number[]);
job.resolve(values);
this.logger(`Read ${job.fn} at ${job.start}, length ${job.len}: [${values.join(', ')}]`);
} else {
// 写操作
await this.client[job.fn](job.start, job.payload);
job.resolve();
this.logger(`Write ${job.fn} at ${job.start}, payload: ${Array.isArray(job.payload) ? `[${job.payload.join(', ')}]` : job.payload}`);
}
} catch (error) {
const errorMsg = (error as Error).message;
this.logger(`Modbus operation failed: ${errorMsg}`);
job.reject(error as Error);
// 如果是连接相关错误,标记连接断开
if (errorMsg.includes("ECONNRESET") || errorMsg.includes("ENOTCONN") || errorMsg.includes("ETIMEDOUT")) {
this.isConnected = false;
this.scheduleReconnect();
}
}
// 操作间隔,避免过于频繁的请求
await new Promise((resolve) => setTimeout(resolve, 5));
}
this.logger("Modbus job processing loop stopped");
}
getQueueStatus() {
return {
writeQueue: this.writeQueue.length,
readQueue: this.readQueue.length,
isConnected: this.isConnected
};
}
async close() {
this.logger("Closing ModbusManager...");
this.isRunning = false;
// 清空队列,拒绝所有等待的任务
[...this.writeQueue, ...this.readQueue].forEach(job => {
job.reject(new Error("ModbusManager is closing"));
});
this.writeQueue = [];
this.readQueue = [];
// 清理重连定时器
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = undefined;
}
// 关闭socket连接
if (this.socket) {
try {
this.socket.destroy();
this.logger("Modbus socket closed");
} catch (error) {
this.logger(`Error closing Modbus socket: ${(error as Error).message}`);
}
}
this.isConnected = false;
this.logger("ModbusManager closed");
}
}