api-amr/vda5050_transformer_worker.ts
2025-06-04 19:15:02 +08:00

428 lines
12 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import mqtt, { MqttClient, IClientOptions } from "npm:mqtt";
// ==== 消息/配置类型 ====
/** init 消息接口,从主线程传入 */
interface InitMessage {
type: "init";
mqtt: {
brokerUrl: string;
clientId: string;
username?: string;
password?: string;
reconnectInterval?: number;
qos?: 0 | 1 | 2;
};
/**
* 这个数组里每项配置一对 topic 和对应的映射规则
*/
mappings: Array<{
/** MQTT 订阅的源 topic可以包含 + 或 # 通配符 */
sourceTopic: string;
/** 发布时使用的目标 topic 模板,可包含 {agvId} 等占位符 */
targetTopicTemplate: string;
/** transformGeneric 用到的映射规则 */
mapping: Record<string, MappingRule>;
}>;
instanceId: string;
}
/** 停止 Worker 的消息 */
interface StopMessage {
type: "stop" | "shutdown"; // 支持 stop 和 shutdown
}
// transformGeneric 中用到的规则
type MappingRule =
| string // 简单字符串映射,直接从源路径复制
| {
op: "const";
value: any;
source?: string;
}
| {
type: "object";
op?: "object";
source?: string;
mapping: Record<string, MappingRule>;
}
| {
type: "array";
op?: "array";
source: string;
mapping: Record<string, MappingRule>;
}
| {
op: "toString";
source: string;
}
| {
// 通用对象映射(没有 type 字段)
mapping: Record<string, MappingRule>;
source?: string;
};
// ==== 通用工具函数 ====
/** 按路径 a.b.c 从 obj 取值 */
function getByPath(obj: any, path: string): any {
return path.split(".").reduce((o, key) => (o != null ? o[key] : undefined), obj);
}
/** 按路径 a.b.c 在 target 上写入 value */
function setByPath(target: any, path: string, value: any) {
const keys = path.split(".");
let cur = target;
for (let i = 0; i < keys.length - 1; i++) {
const k = keys[i];
if (!(k in cur) || typeof cur[k] !== "object") {
cur[k] = {};
}
cur = cur[k];
}
cur[keys[keys.length - 1]] = value;
}
/** 通用的 JSON 转换器 */
function transformGeneric(raw: any, mapping: Record<string, MappingRule>): any {
const out: any = {};
for (const [outKey, rule] of Object.entries(mapping)) {
try {
// 处理简单字符串映射
if (typeof rule === "string") {
const value = getByPath(raw, rule);
if (value !== undefined) {
setByPath(out, outKey, value);
}
continue;
}
// 处理对象规则
if (typeof rule === "object" && rule !== null) {
// 常量值映射
if ("op" in rule && rule.op === "const") {
setByPath(out, outKey, rule.value);
continue;
}
// toString 操作
if ("op" in rule && rule.op === "toString") {
const sourceValue = getByPath(raw, rule.source);
if (sourceValue !== undefined) {
// 如果已经是字符串,直接使用;否则进行 JSON 序列化
const stringValue = typeof sourceValue === "string"
? sourceValue
: JSON.stringify(sourceValue);
setByPath(out, outKey, stringValue);
}
continue;
}
// 对象映射
if ("type" in rule && rule.type === "object") {
const subRaw = rule.source ? getByPath(raw, rule.source) : raw;
if (subRaw !== undefined) {
const transformedObj = transformGeneric(subRaw, rule.mapping);
setByPath(out, outKey, transformedObj);
}
continue;
}
// 数组映射
if ("type" in rule && rule.type === "array") {
const sourceArray = getByPath(raw, rule.source);
if (Array.isArray(sourceArray)) {
const transformedArray = sourceArray.map(item =>
transformGeneric(item, rule.mapping)
);
setByPath(out, outKey, transformedArray);
} else {
setByPath(out, outKey, []);
}
continue;
}
// 如果是对象但没有特殊操作,尝试作为嵌套映射处理
if ("mapping" in rule && typeof rule.mapping === "object") {
const mappingRule = rule as { mapping: Record<string, MappingRule>; source?: string };
const subRaw = mappingRule.source ? getByPath(raw, mappingRule.source) : raw;
if (subRaw !== undefined) {
const transformedObj = transformGeneric(subRaw, mappingRule.mapping);
setByPath(out, outKey, transformedObj);
}
continue;
}
}
console.warn(`Unknown mapping rule for key "${outKey}":`, rule);
} catch (error) {
console.error(`Error processing mapping rule for key "${outKey}":`, error);
}
}
return out;
}
/**
* 判断一个 topic 是否匹配源 topic 模式(含 +, #
*/
function topicMatches(pattern: string, topic: string): boolean {
const patSeg = pattern.split("/");
const topSeg = topic.split("/");
for (let i = 0; i < patSeg.length; i++) {
const p = patSeg[i];
const t = topSeg[i];
if (p === "#") return true; // 后面的全部都匹配
if (p === "+") continue; // 单层通配符
if (t === undefined) return false;
if (p !== t) return false;
}
return patSeg.length === topSeg.length;
}
/**
* 从 topic 根据 pattern 抽取 '+' 通配符对应的位置上的参数
* 目前只支持单个 {agvId} 占位
*/
function extractAgvId(pattern: string, topic: string): string | null {
const patSeg = pattern.split("/");
const topSeg = topic.split("/");
for (let i = 0; i < patSeg.length; i++) {
if (patSeg[i] === "+") {
return topSeg[i];
}
}
return null;
}
// ==== 全局状态 ====
let mqttClient: MqttClient | null = null;
let reconnectTimer: any = null;
const CONFIG: {
mqtt: {
brokerUrl: string;
clientId: string;
username?: string;
password?: string;
reconnectInterval?: number;
qos?: 0 | 1 | 2;
};
mappings: Array<{
sourceTopic: string;
targetTopicTemplate: string;
mapping: Record<string, MappingRule>;
}>;
} = {
mqtt: {
brokerUrl: "",
clientId: "",
username: undefined,
password: undefined,
reconnectInterval: 5000,
qos: 1,
},
mappings: [],
};
// instance identifier generated from config.manufacturer and UUID
let INSTANCE_ID: string = "";
// 连续 MQTT 错误计数只有超过10次才触发重连
let consecutiveErrors = 0;
// ==== MQTT 逻辑 ====
async function connectAndSubscribe() {
try {
const opts: IClientOptions = {
clientId: CONFIG.mqtt.clientId,
username: CONFIG.mqtt.username,
password: CONFIG.mqtt.password,
reconnectPeriod: 3000, // 我们自己重连
};
mqttClient = mqtt.connect(CONFIG.mqtt.brokerUrl, opts);
// 预先注册错误、断开和关闭处理器
mqttClient.on("error", onMqttError);
mqttClient.on("disconnect", onMqttDisconnect);
mqttClient.on("close", onMqttClose);
mqttClient.on("connect", async () => {
// 连接成功后重置错误计数
consecutiveErrors = 0;
// 订阅所有 mappings 里配置的源 topic
for (const m of CONFIG.mappings) {
const inTopic = m.sourceTopic
.replace("{instanceId}", INSTANCE_ID);
// console.log("inTopic", inTopic);
try {
await mqttClient!.subscribe(inTopic, { qos: CONFIG.mqtt.qos! });
} catch (err) {
console.error(`✗ MQTT subscribe failed for ${inTopic}:`, err);
self.postMessage({ type: "error", error: "subscribe_failed", details: (err as Error).message });
scheduleReconnect();
return; // 订阅失败,中断后续处理,尝试重连
}
}
// 订阅完成后监听消息
mqttClient!.on("message", onMqttMessage);
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
self.postMessage({ type: "status", status: "connected" });
});
} catch (err) {
console.error("✗ MQTT connect failed:", err);
self.postMessage({
type: "error",
error: "connect_failed",
details: err instanceof Error ? err.message : String(err),
});
scheduleReconnect();
}
}
async function onMqttMessage(topic: string, payload: Uint8Array) {
try {
const raw = JSON.parse(new TextDecoder().decode(payload));
// 对每个匹配的 mappingConfig做一次转换并发布
for (const m of CONFIG.mappings) {
const inTopic = m.sourceTopic
.replace("{instanceId}", INSTANCE_ID);
if (!topicMatches(inTopic, topic)) continue;
// 提取占位符 agvId
const agvId = extractAgvId(inTopic, topic) ?? "";
// 转换
const transformed = transformGeneric(raw, m.mapping);
// 填充目标 topic
const outTopic = m.targetTopicTemplate
.replace("{instanceId}", INSTANCE_ID)
.replace("{agvId}", agvId);
// 发布
// console.log("outTopic", outTopic);
await mqttClient!.publish(outTopic, JSON.stringify(transformed), {
qos: CONFIG.mqtt.qos!,
});
self.postMessage({
type: "published",
topic: outTopic,
agvId,
sourceTopic: topic,
});
}
} catch (err) {
console.error("✗ onMqttMessage error:", err);
self.postMessage({
type: "error",
error: "process_message",
details: err instanceof Error ? err.message : String(err),
});
}
}
function onMqttClose() {
console.log("vda onMqttClose");
self.postMessage({ type: "status", status: "closed" });
// 确保在 close 时释放底层 socket
if (mqttClient) {
mqttClient.end(true);
mqttClient = null;
}
self.postMessage({ type: "reconnect-down" });
// connectAndSubscribe();
}
function onMqttDisconnect() {
self.postMessage({ type: "status", status: "disconnected" });
console.log("vda onMqttDisconnect");
// 断开时也关闭客户端,避免累积未关闭的连接
if (mqttClient) {
mqttClient.end(true);
mqttClient = null;
}
scheduleReconnect();
}
function onMqttError(err: Error) {
// 如果是连接被拒绝错误,通知主线程重启所有 worker
const anyErr = err as any;
if (anyErr.code === 'ECONNREFUSED' || anyErr.code === 'ECONNRESET') {
console.error('❌ MQTT connection refused, requesting full restart:', anyErr);
self.postMessage({ type: 'reconnect-all' });
}
// 增加错误计数只在超过10次后才重连
consecutiveErrors++;
console.error(`! MQTT error (#${consecutiveErrors}):`, err);
self.postMessage({
type: "error",
error: "mqtt_error",
details: err.message,
});
if (consecutiveErrors >= 5) {
// 重置错误计数
consecutiveErrors = 0;
if (mqttClient) {
mqttClient.end(true);
mqttClient = null;
}
scheduleReconnect();
}
}
function scheduleReconnect() {
if (!reconnectTimer) {
const t = CONFIG.mqtt.reconnectInterval;
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
console.log("vda schedule Worker 自身重连");
self.postMessage({ type: "reconnect-down" });
connectAndSubscribe();
}, t);
}
}
// ==== 主线程消息处理 ====
self.onmessage = (e: MessageEvent<InitMessage | StopMessage>) => {
const msg = e.data;
if (msg.type === "init") {
// 覆盖配置
CONFIG.mqtt = {
...CONFIG.mqtt,
brokerUrl: msg.mqtt.brokerUrl,
clientId: msg.mqtt.clientId,
username: msg.mqtt.username,
password: msg.mqtt.password,
reconnectInterval: msg.mqtt.reconnectInterval ?? CONFIG.mqtt.reconnectInterval,
qos: msg.mqtt.qos ?? CONFIG.mqtt.qos,
};
CONFIG.mappings = msg.mappings;
// set our instance identifier
INSTANCE_ID = msg.instanceId;
console.log("vda5050_transformer_worker: instanceId", INSTANCE_ID);
// 启动连接
connectAndSubscribe();
} else if (msg.type === "stop" || msg.type === "shutdown") {
// 支持 stop 和 shutdown统一关闭
if (mqttClient) {
mqttClient.end(true);
mqttClient = null;
}
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
self.postMessage({ type: "status", status: "stopped" });
}
};