428 lines
12 KiB
TypeScript
428 lines
12 KiB
TypeScript
import mqtt, { MqttClient, IClientOptions } from "npm:mqtt";
|
||
|
||
// ==== 消息/配置类型 ====
|
||
|
||
/** init 消息接口,从主线程传入 */
|
||
interface InitMessage {
|
||
type: "init";
|
||
mqtt: {
|
||
brokerUrl: string;
|
||
clientId: string;
|
||
username?: string;
|
||
password?: string;
|
||
reconnectInterval?: number;
|
||
qos?: 0 | 1 | 2;
|
||
};
|
||
/**
|
||
* 这个数组里每项配置一对 topic 和对应的映射规则
|
||
*/
|
||
mappings: Array<{
|
||
/** MQTT 订阅的源 topic,可以包含 + 或 # 通配符 */
|
||
sourceTopic: string;
|
||
/** 发布时使用的目标 topic 模板,可包含 {agvId} 等占位符 */
|
||
targetTopicTemplate: string;
|
||
/** transformGeneric 用到的映射规则 */
|
||
mapping: Record<string, MappingRule>;
|
||
}>;
|
||
instanceId: string;
|
||
}
|
||
|
||
/** 停止 Worker 的消息 */
|
||
interface StopMessage {
|
||
type: "stop" | "shutdown"; // 支持 stop 和 shutdown
|
||
}
|
||
|
||
// transformGeneric 中用到的规则
|
||
type MappingRule =
|
||
| string // 简单字符串映射,直接从源路径复制
|
||
| {
|
||
op: "const";
|
||
value: any;
|
||
source?: string;
|
||
}
|
||
| {
|
||
type: "object";
|
||
op?: "object";
|
||
source?: string;
|
||
mapping: Record<string, MappingRule>;
|
||
}
|
||
| {
|
||
type: "array";
|
||
op?: "array";
|
||
source: string;
|
||
mapping: Record<string, MappingRule>;
|
||
}
|
||
| {
|
||
op: "toString";
|
||
source: string;
|
||
}
|
||
| {
|
||
// 通用对象映射(没有 type 字段)
|
||
mapping: Record<string, MappingRule>;
|
||
source?: string;
|
||
};
|
||
|
||
// ==== 通用工具函数 ====
|
||
|
||
/** 按路径 a.b.c 从 obj 取值 */
|
||
function getByPath(obj: any, path: string): any {
|
||
return path.split(".").reduce((o, key) => (o != null ? o[key] : undefined), obj);
|
||
}
|
||
|
||
/** 按路径 a.b.c 在 target 上写入 value */
|
||
function setByPath(target: any, path: string, value: any) {
|
||
const keys = path.split(".");
|
||
let cur = target;
|
||
for (let i = 0; i < keys.length - 1; i++) {
|
||
const k = keys[i];
|
||
if (!(k in cur) || typeof cur[k] !== "object") {
|
||
cur[k] = {};
|
||
}
|
||
cur = cur[k];
|
||
}
|
||
cur[keys[keys.length - 1]] = value;
|
||
}
|
||
|
||
/** 通用的 JSON 转换器 */
|
||
function transformGeneric(raw: any, mapping: Record<string, MappingRule>): any {
|
||
const out: any = {};
|
||
|
||
for (const [outKey, rule] of Object.entries(mapping)) {
|
||
try {
|
||
// 处理简单字符串映射
|
||
if (typeof rule === "string") {
|
||
const value = getByPath(raw, rule);
|
||
if (value !== undefined) {
|
||
setByPath(out, outKey, value);
|
||
}
|
||
continue;
|
||
}
|
||
|
||
// 处理对象规则
|
||
if (typeof rule === "object" && rule !== null) {
|
||
// 常量值映射
|
||
if ("op" in rule && rule.op === "const") {
|
||
setByPath(out, outKey, rule.value);
|
||
continue;
|
||
}
|
||
|
||
// toString 操作
|
||
if ("op" in rule && rule.op === "toString") {
|
||
const sourceValue = getByPath(raw, rule.source);
|
||
if (sourceValue !== undefined) {
|
||
// 如果已经是字符串,直接使用;否则进行 JSON 序列化
|
||
const stringValue = typeof sourceValue === "string"
|
||
? sourceValue
|
||
: JSON.stringify(sourceValue);
|
||
setByPath(out, outKey, stringValue);
|
||
}
|
||
continue;
|
||
}
|
||
|
||
// 对象映射
|
||
if ("type" in rule && rule.type === "object") {
|
||
const subRaw = rule.source ? getByPath(raw, rule.source) : raw;
|
||
if (subRaw !== undefined) {
|
||
const transformedObj = transformGeneric(subRaw, rule.mapping);
|
||
setByPath(out, outKey, transformedObj);
|
||
}
|
||
continue;
|
||
}
|
||
|
||
// 数组映射
|
||
if ("type" in rule && rule.type === "array") {
|
||
const sourceArray = getByPath(raw, rule.source);
|
||
if (Array.isArray(sourceArray)) {
|
||
const transformedArray = sourceArray.map(item =>
|
||
transformGeneric(item, rule.mapping)
|
||
);
|
||
setByPath(out, outKey, transformedArray);
|
||
} else {
|
||
setByPath(out, outKey, []);
|
||
}
|
||
continue;
|
||
}
|
||
|
||
// 如果是对象但没有特殊操作,尝试作为嵌套映射处理
|
||
if ("mapping" in rule && typeof rule.mapping === "object") {
|
||
const mappingRule = rule as { mapping: Record<string, MappingRule>; source?: string };
|
||
const subRaw = mappingRule.source ? getByPath(raw, mappingRule.source) : raw;
|
||
if (subRaw !== undefined) {
|
||
const transformedObj = transformGeneric(subRaw, mappingRule.mapping);
|
||
setByPath(out, outKey, transformedObj);
|
||
}
|
||
continue;
|
||
}
|
||
}
|
||
|
||
console.warn(`Unknown mapping rule for key "${outKey}":`, rule);
|
||
} catch (error) {
|
||
console.error(`Error processing mapping rule for key "${outKey}":`, error);
|
||
}
|
||
}
|
||
|
||
return out;
|
||
}
|
||
|
||
/**
|
||
* 判断一个 topic 是否匹配源 topic 模式(含 +, #)
|
||
*/
|
||
function topicMatches(pattern: string, topic: string): boolean {
|
||
const patSeg = pattern.split("/");
|
||
const topSeg = topic.split("/");
|
||
for (let i = 0; i < patSeg.length; i++) {
|
||
const p = patSeg[i];
|
||
const t = topSeg[i];
|
||
if (p === "#") return true; // 后面的全部都匹配
|
||
if (p === "+") continue; // 单层通配符
|
||
if (t === undefined) return false;
|
||
if (p !== t) return false;
|
||
}
|
||
return patSeg.length === topSeg.length;
|
||
}
|
||
|
||
/**
|
||
* 从 topic 根据 pattern 抽取 '+' 通配符对应的位置上的参数
|
||
* 目前只支持单个 {agvId} 占位
|
||
*/
|
||
function extractAgvId(pattern: string, topic: string): string | null {
|
||
const patSeg = pattern.split("/");
|
||
const topSeg = topic.split("/");
|
||
for (let i = 0; i < patSeg.length; i++) {
|
||
if (patSeg[i] === "+") {
|
||
return topSeg[i];
|
||
}
|
||
}
|
||
return null;
|
||
}
|
||
|
||
// ==== 全局状态 ====
|
||
|
||
let mqttClient: MqttClient | null = null;
|
||
let reconnectTimer: any = null;
|
||
const CONFIG: {
|
||
mqtt: {
|
||
brokerUrl: string;
|
||
clientId: string;
|
||
username?: string;
|
||
password?: string;
|
||
reconnectInterval?: number;
|
||
qos?: 0 | 1 | 2;
|
||
};
|
||
mappings: Array<{
|
||
sourceTopic: string;
|
||
targetTopicTemplate: string;
|
||
mapping: Record<string, MappingRule>;
|
||
}>;
|
||
} = {
|
||
mqtt: {
|
||
brokerUrl: "",
|
||
clientId: "",
|
||
username: undefined,
|
||
password: undefined,
|
||
reconnectInterval: 5000,
|
||
qos: 1,
|
||
},
|
||
mappings: [],
|
||
};
|
||
|
||
// instance identifier generated from config.manufacturer and UUID
|
||
let INSTANCE_ID: string = "";
|
||
|
||
// 连续 MQTT 错误计数,只有超过10次才触发重连
|
||
let consecutiveErrors = 0;
|
||
|
||
// ==== MQTT 逻辑 ====
|
||
|
||
async function connectAndSubscribe() {
|
||
try {
|
||
const opts: IClientOptions = {
|
||
clientId: CONFIG.mqtt.clientId,
|
||
username: CONFIG.mqtt.username,
|
||
password: CONFIG.mqtt.password,
|
||
reconnectPeriod: 3000, // 我们自己重连
|
||
};
|
||
mqttClient = mqtt.connect(CONFIG.mqtt.brokerUrl, opts);
|
||
|
||
// 预先注册错误、断开和关闭处理器
|
||
mqttClient.on("error", onMqttError);
|
||
mqttClient.on("disconnect", onMqttDisconnect);
|
||
mqttClient.on("close", onMqttClose);
|
||
|
||
mqttClient.on("connect", async () => {
|
||
// 连接成功后重置错误计数
|
||
consecutiveErrors = 0;
|
||
// 订阅所有 mappings 里配置的源 topic
|
||
for (const m of CONFIG.mappings) {
|
||
const inTopic = m.sourceTopic
|
||
.replace("{instanceId}", INSTANCE_ID);
|
||
// console.log("inTopic", inTopic);
|
||
try {
|
||
await mqttClient!.subscribe(inTopic, { qos: CONFIG.mqtt.qos! });
|
||
} catch (err) {
|
||
console.error(`✗ MQTT subscribe failed for ${inTopic}:`, err);
|
||
self.postMessage({ type: "error", error: "subscribe_failed", details: (err as Error).message });
|
||
scheduleReconnect();
|
||
return; // 订阅失败,中断后续处理,尝试重连
|
||
}
|
||
}
|
||
// 订阅完成后监听消息
|
||
mqttClient!.on("message", onMqttMessage);
|
||
|
||
if (reconnectTimer) {
|
||
clearTimeout(reconnectTimer);
|
||
reconnectTimer = null;
|
||
}
|
||
self.postMessage({ type: "status", status: "connected" });
|
||
});
|
||
} catch (err) {
|
||
console.error("✗ MQTT connect failed:", err);
|
||
self.postMessage({
|
||
type: "error",
|
||
error: "connect_failed",
|
||
details: err instanceof Error ? err.message : String(err),
|
||
});
|
||
scheduleReconnect();
|
||
}
|
||
}
|
||
|
||
async function onMqttMessage(topic: string, payload: Uint8Array) {
|
||
try {
|
||
const raw = JSON.parse(new TextDecoder().decode(payload));
|
||
|
||
// 对每个匹配的 mappingConfig,做一次转换并发布
|
||
for (const m of CONFIG.mappings) {
|
||
const inTopic = m.sourceTopic
|
||
.replace("{instanceId}", INSTANCE_ID);
|
||
if (!topicMatches(inTopic, topic)) continue;
|
||
|
||
// 提取占位符 agvId
|
||
const agvId = extractAgvId(inTopic, topic) ?? "";
|
||
|
||
// 转换
|
||
const transformed = transformGeneric(raw, m.mapping);
|
||
|
||
// 填充目标 topic
|
||
const outTopic = m.targetTopicTemplate
|
||
.replace("{instanceId}", INSTANCE_ID)
|
||
.replace("{agvId}", agvId);
|
||
|
||
// 发布
|
||
// console.log("outTopic", outTopic);
|
||
await mqttClient!.publish(outTopic, JSON.stringify(transformed), {
|
||
qos: CONFIG.mqtt.qos!,
|
||
});
|
||
|
||
self.postMessage({
|
||
type: "published",
|
||
topic: outTopic,
|
||
agvId,
|
||
sourceTopic: topic,
|
||
});
|
||
}
|
||
} catch (err) {
|
||
console.error("✗ onMqttMessage error:", err);
|
||
self.postMessage({
|
||
type: "error",
|
||
error: "process_message",
|
||
details: err instanceof Error ? err.message : String(err),
|
||
});
|
||
}
|
||
}
|
||
|
||
function onMqttClose() {
|
||
console.log("vda onMqttClose");
|
||
self.postMessage({ type: "status", status: "closed" });
|
||
// 确保在 close 时释放底层 socket
|
||
if (mqttClient) {
|
||
mqttClient.end(true);
|
||
mqttClient = null;
|
||
}
|
||
self.postMessage({ type: "reconnect-down" });
|
||
// connectAndSubscribe();
|
||
}
|
||
|
||
function onMqttDisconnect() {
|
||
self.postMessage({ type: "status", status: "disconnected" });
|
||
console.log("vda onMqttDisconnect");
|
||
// 断开时也关闭客户端,避免累积未关闭的连接
|
||
if (mqttClient) {
|
||
mqttClient.end(true);
|
||
mqttClient = null;
|
||
}
|
||
scheduleReconnect();
|
||
}
|
||
|
||
function onMqttError(err: Error) {
|
||
// 如果是连接被拒绝错误,通知主线程重启所有 worker
|
||
const anyErr = err as any;
|
||
if (anyErr.code === 'ECONNREFUSED' || anyErr.code === 'ECONNRESET') {
|
||
console.error('❌ MQTT connection refused, requesting full restart:', anyErr);
|
||
self.postMessage({ type: 'reconnect-all' });
|
||
}
|
||
// 增加错误计数,只在超过10次后才重连
|
||
consecutiveErrors++;
|
||
console.error(`! MQTT error (#${consecutiveErrors}):`, err);
|
||
self.postMessage({
|
||
type: "error",
|
||
error: "mqtt_error",
|
||
details: err.message,
|
||
});
|
||
if (consecutiveErrors >= 5) {
|
||
// 重置错误计数
|
||
consecutiveErrors = 0;
|
||
if (mqttClient) {
|
||
mqttClient.end(true);
|
||
mqttClient = null;
|
||
}
|
||
scheduleReconnect();
|
||
}
|
||
}
|
||
|
||
function scheduleReconnect() {
|
||
if (!reconnectTimer) {
|
||
const t = CONFIG.mqtt.reconnectInterval;
|
||
|
||
reconnectTimer = setTimeout(() => {
|
||
reconnectTimer = null;
|
||
console.log("vda schedule Worker 自身重连");
|
||
self.postMessage({ type: "reconnect-down" });
|
||
connectAndSubscribe();
|
||
}, t);
|
||
}
|
||
}
|
||
|
||
// ==== 主线程消息处理 ====
|
||
|
||
self.onmessage = (e: MessageEvent<InitMessage | StopMessage>) => {
|
||
const msg = e.data;
|
||
if (msg.type === "init") {
|
||
// 覆盖配置
|
||
CONFIG.mqtt = {
|
||
...CONFIG.mqtt,
|
||
brokerUrl: msg.mqtt.brokerUrl,
|
||
clientId: msg.mqtt.clientId,
|
||
username: msg.mqtt.username,
|
||
password: msg.mqtt.password,
|
||
reconnectInterval: msg.mqtt.reconnectInterval ?? CONFIG.mqtt.reconnectInterval,
|
||
qos: msg.mqtt.qos ?? CONFIG.mqtt.qos,
|
||
};
|
||
CONFIG.mappings = msg.mappings;
|
||
// set our instance identifier
|
||
INSTANCE_ID = msg.instanceId;
|
||
console.log("vda5050_transformer_worker: instanceId", INSTANCE_ID);
|
||
// 启动连接
|
||
connectAndSubscribe();
|
||
} else if (msg.type === "stop" || msg.type === "shutdown") {
|
||
// 支持 stop 和 shutdown,统一关闭
|
||
if (mqttClient) {
|
||
mqttClient.end(true);
|
||
mqttClient = null;
|
||
}
|
||
if (reconnectTimer) {
|
||
clearTimeout(reconnectTimer);
|
||
reconnectTimer = null;
|
||
}
|
||
self.postMessage({ type: "status", status: "stopped" });
|
||
}
|
||
}; |