From 1f1c0e98812f345553410b74a53a108692786527 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 15 Apr 2025 17:56:19 -0400 Subject: [PATCH] use @meshtastic/protobuf --- mqtt/index.js | 147 ++++++++++++++++++----------------------- mqtt/package-lock.json | 114 +++++--------------------------- mqtt/package.json | 5 +- 3 files changed, 85 insertions(+), 181 deletions(-) diff --git a/mqtt/index.js b/mqtt/index.js index f960619..d799dcd 100644 --- a/mqtt/index.js +++ b/mqtt/index.js @@ -1,11 +1,12 @@ const crypto = require("crypto"); -const path = require("path"); const mqtt = require("mqtt"); -const protobufjs = require("protobufjs"); const commandLineArgs = require("command-line-args"); const commandLineUsage = require("command-line-usage"); const PositionUtil = require("./utils/position_util"); +const { Mesh, Mqtt, Portnums, Telemetry } = require("@meshtastic/protobufs"); +const { fromBinary } = require("@bufbuild/protobuf"); + // create prisma db client const { PrismaClient } = require("@prisma/client"); const prisma = new PrismaClient(); @@ -209,8 +210,8 @@ if(options.help){ const mqttBrokerUrl = options["mqtt-broker-url"] ?? "mqtt://mqtt.meshtastic.org"; const mqttUsername = options["mqtt-username"] ?? "meshdev"; const mqttPassword = options["mqtt-password"] ?? "large4cats"; -const mqttClientId = options["mqtt-client-id"] ?? null; -const mqttTopics = options["mqtt-topic"] ?? ["msh/#"]; +const mqttClientId = options["mqtt-client-id"] ?? "mqttx_1bc723c7"; +const mqttTopics = options["mqtt-topic"] ?? ["msh/US/#"]; const allowedPortnums = options["allowed-portnums"] ?? null; const logUnknownPortnums = options["log-unknown-portnums"] ?? false; const collectServiceEnvelopes = options["collect-service-envelopes"] ?? false; @@ -247,20 +248,6 @@ const client = mqtt.connect(mqttBrokerUrl, { clientId: mqttClientId, }); -// load protobufs -const root = new protobufjs.Root(); -root.resolvePath = (origin, target) => path.join(__dirname, "protos", target); -root.loadSync('meshtastic/mqtt.proto'); -const Data = root.lookupType("Data"); -const ServiceEnvelope = root.lookupType("ServiceEnvelope"); -const MapReport = root.lookupType("MapReport"); -const NeighborInfo = root.lookupType("NeighborInfo"); -const Position = root.lookupType("Position"); -const RouteDiscovery = root.lookupType("RouteDiscovery"); -const Telemetry = root.lookupType("Telemetry"); -const User = root.lookupType("User"); -const Waypoint = root.lookupType("Waypoint"); - // run automatic purge if configured if(purgeIntervalSeconds){ setInterval(async () => { @@ -634,7 +621,6 @@ function decrypt(packet) { // attempt to decrypt with all available decryption keys for(const decryptionKey of decryptionKeys){ try { - // convert encryption key to buffer const key = Buffer.from(decryptionKey, "base64"); @@ -657,10 +643,10 @@ function decrypt(packet) { const decipher = crypto.createDecipheriv(algorithm, key, nonceBuffer); // decrypt encrypted packet - const decryptedBuffer = Buffer.concat([decipher.update(packet.encrypted), decipher.final()]); + const decryptedBuffer = Buffer.concat([decipher.update(packet.payloadVariant.value), decipher.final()]); // parse as data message - return Data.decode(decryptedBuffer); + return fromBinary(Mesh.DataSchema, decryptedBuffer); } catch(e){} } @@ -689,55 +675,40 @@ client.on("connect", () => { // handle message received client.on("message", async (topic, message) => { try { - // decode service envelope - const envelope = ServiceEnvelope.decode(message); - if(!envelope.packet){ + let envelope = null; + try { + envelope = fromBinary(Mqtt.ServiceEnvelopeSchema, message); + } catch(e) { } + if (!envelope) { return; } + let dataPacket = envelope.packet.payloadVariant.value; // attempt to decrypt encrypted packets - const isEncrypted = envelope.packet.encrypted?.length > 0; - if(isEncrypted){ - const decoded = decrypt(envelope.packet); - if(decoded){ - envelope.packet.decoded = decoded; - } + if (envelope.packet.payloadVariant.case === 'encrypted') { + dataPacket = decrypt(envelope.packet); } - // get portnum from decoded packet - const portnum = envelope.packet?.decoded?.portnum; - - // get bitfield from decoded packet - // bitfield was added in v2.5 of meshtastic firmware - // this value will be null for packets from v2.4.x and below, and will be an integer in v2.5.x and above - const bitfield = envelope.packet?.decoded?.bitfield; - - // check if we can see the decrypted packet data - if(envelope.packet.decoded != null){ - + if (dataPacket !== null) { + const bitfield = dataPacket.payload.bitfield ?? null; // check if bitfield is available (v2.5.x firmware or newer) - if(bitfield != null){ - + if (bitfield != null){ // drop packets where "OK to MQTT" is false const isOkToMqtt = bitfield & BITFIELD_OK_TO_MQTT_MASK; - if(dropPacketsNotOkToMqtt && !isOkToMqtt){ + if (dropPacketsNotOkToMqtt && !isOkToMqtt) { return; } - } // if bitfield is not available for this packet, check if we want to drop this portnum - if(bitfield == null){ - + if (bitfield == null) { // drop packet if portnum is in drop list // this is useful for dropping specific packet types from firmware older than v2.5 - if(dropPortnumsWithoutBitfield != null && dropPortnumsWithoutBitfield.includes(portnum)){ + if (dropPortnumsWithoutBitfield != null && dropPortnumsWithoutBitfield.includes(portnum)) { return; } - } - } // create service envelope in db @@ -774,6 +745,19 @@ client.on("message", async (topic, message) => { // don't care if updating mqtt timestamp fails } + // bail if we don't have a usable Data packet + if (dataPacket === null) { + return; + } + + const payload = dataPacket.payload; + // get portnum from decoded packet + const portnum = dataPacket.portnum; + // get bitfield from decoded packet + // bitfield was added in v2.5 of meshtastic firmware + // this value will be null for packets from v2.4.x and below, and will be an integer in v2.5.x and above + const bitfield = dataPacket.bitfield; + const logKnownPacketTypes = false; // if allowed portnums are configured, ignore portnums that are not in the list @@ -781,7 +765,7 @@ client.on("message", async (topic, message) => { return; } - if(portnum === 1) { + if(portnum === Portnums.PortNum.TEXT_MESSAGE_APP) { if(!collectTextMessages){ return; @@ -796,7 +780,7 @@ client.on("message", async (topic, message) => { console.log("TEXT_MESSAGE_APP", { to: envelope.packet.to.toString(16), from: envelope.packet.from.toString(16), - text: envelope.packet.decoded.payload.toString(), + text: payload.toString(), }); } @@ -809,7 +793,7 @@ client.on("message", async (topic, message) => { packet_id: envelope.packet.id, channel_id: envelope.channelId, gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, - text: envelope.packet.decoded.payload.toString(), + text: payload.toString(), rx_time: envelope.packet.rxTime, rx_snr: envelope.packet.rxSnr, rx_rssi: envelope.packet.rxRssi, @@ -822,9 +806,8 @@ client.on("message", async (topic, message) => { } - else if(portnum === 3) { - - const position = Position.decode(envelope.packet.decoded.payload); + else if(portnum === Portnums.PortNum.POSITION_APP) { + const position = fromBinary(Mesh.PositionSchema, payload); if(logKnownPacketTypes){ console.log("POSITION_APP", { @@ -911,9 +894,9 @@ client.on("message", async (topic, message) => { } - else if(portnum === 4) { + else if(portnum === Portnums.PortNum.NODEINFO_APP) { - const user = User.decode(envelope.packet.decoded.payload); + const user = fromBinary(Mesh.UserSchema, payload); if(logKnownPacketTypes) { console.log("NODEINFO_APP", { @@ -950,13 +933,13 @@ client.on("message", async (topic, message) => { } - else if(portnum === 8) { + else if(portnum === Portnums.PortNum.WAYPOINT_APP) { if(!collectWaypoints){ return; } - const waypoint = Waypoint.decode(envelope.packet.decoded.payload); + const waypoint = fromBinary(Mesh.WaypointSchema, payload); if(logKnownPacketTypes) { console.log("WAYPOINT_APP", { @@ -991,9 +974,9 @@ client.on("message", async (topic, message) => { } - else if(portnum === 71) { + else if(portnum === Portnums.PortNum.NEIGHBORINFO_APP) { - const neighbourInfo = NeighborInfo.decode(envelope.packet.decoded.payload); + const neighbourInfo = fromBinary(Mesh.NeighborInfoSchema, payload); if(logKnownPacketTypes) { console.log("NEIGHBORINFO_APP", { @@ -1048,9 +1031,9 @@ client.on("message", async (topic, message) => { } - else if(portnum === 67) { + else if(portnum === Portnums.PortNum.TELEMETRY_APP) { - const telemetry = Telemetry.decode(envelope.packet.decoded.payload); + const telemetry = fromBinary(Telemetry.TelemetrySchema, payload); if(logKnownPacketTypes) { console.log("TELEMETRY_APP", { @@ -1232,15 +1215,15 @@ client.on("message", async (topic, message) => { } - else if(portnum === 70) { + else if(portnum === Portnums.PortNum.TRACEROUTE_APP) { - const routeDiscovery = RouteDiscovery.decode(envelope.packet.decoded.payload); + const routeDiscovery = fromBinary(Mesh.RouteDiscoverySchema, payload); if(logKnownPacketTypes) { console.log("TRACEROUTE_APP", { to: envelope.packet.to.toString(16), from: envelope.packet.from.toString(16), - want_response: envelope.packet.decoded.wantResponse, + want_response: payload.wantResponse, route_discovery: routeDiscovery, }); } @@ -1267,9 +1250,9 @@ client.on("message", async (topic, message) => { } - else if(portnum === 73) { + else if(portnum === Portnums.PortNum.MAP_REPORT_APP) { - const mapReport = MapReport.decode(envelope.packet.decoded.payload); + const mapReport = fromBinary(Mqtt.MapReportSchema, payload); if(logKnownPacketTypes) { console.log("MAP_REPORT_APP", { @@ -1362,23 +1345,21 @@ client.on("message", async (topic, message) => { } else { - if(logUnknownPortnums){ - + if (logUnknownPortnums) { + const ignoredPortnums = [ + Portnums.PortNum.UNKNOWN_APP, + Portnums.PortNum.TEXT_MESSAGE_COMPRESSED_APP, + Portnums.PortNum.ROUTING_APP, + Portnums.PortNum.PAXCOUNTER_APP, + Portnums.PortNum.STORE_FORWARD_APP, + Portnums.PortNum.RANGE_TEST_APP, + Portnums.PortNum.ATAK_PLUGIN, + Portnums.PortNum.ATAK_FORWARDER, + ]; // ignore packets we don't want to see for now - if(portnum === undefined // ignore failed to decrypt - || portnum === 0 // ignore UNKNOWN_APP - || portnum === 1 // ignore TEXT_MESSAGE_APP - || portnum === 5 // ignore ROUTING_APP - || portnum === 34 // ignore PAXCOUNTER_APP - || portnum === 65 // ignore STORE_FORWARD_APP - || portnum === 66 // ignore RANGE_TEST_APP - || portnum === 72 // ignore ATAK_PLUGIN - || portnum === 257 // ignore ATAK_FORWARDER - || portnum > 511 // ignore above MAX - ){ + if (portnum === undefined || ignoredPortnums.includes(portnum) || portnum > 511) { return; } - console.log(portnum, envelope); } diff --git a/mqtt/package-lock.json b/mqtt/package-lock.json index 7b84046..f62bab1 100644 --- a/mqtt/package-lock.json +++ b/mqtt/package-lock.json @@ -9,11 +9,12 @@ "version": "1.0.0", "license": "ISC", "dependencies": { + "@bufbuild/protobuf": "^2.2.5", + "@meshtastic/protobufs": "npm:@jsr/meshtastic__protobufs@^2.6.2", "@prisma/client": "^5.11.0", "command-line-args": "^5.2.1", "command-line-usage": "^7.0.1", - "mqtt": "^5.11.0", - "protobufjs": "^7.5.0" + "mqtt": "^5.11.0" }, "devDependencies": { "prisma": "^5.10.2" @@ -31,6 +32,21 @@ "node": ">=6.9.0" } }, + "node_modules/@bufbuild/protobuf": { + "version": "2.2.5", + "resolved": "https://registry.npmjs.org/@bufbuild/protobuf/-/protobuf-2.2.5.tgz", + "integrity": "sha512-/g5EzJifw5GF8aren8wZ/G5oMuPoGeS6MQD3ca8ddcvdXR5UELUfdTZITCGNhNXynY/AYl3Z4plmxdj/tRl/hQ==", + "license": "(Apache-2.0 AND BSD-3-Clause)" + }, + "node_modules/@meshtastic/protobufs": { + "name": "@jsr/meshtastic__protobufs", + "version": "2.6.2", + "resolved": "https://npm.jsr.io/~/11/@jsr/meshtastic__protobufs/2.6.2.tgz", + "integrity": "sha512-bIENtFnUEru28GrAeSdiBS9skp0hN/3HZunMbF/IjvUrXOlx2fptKVj3b+pzjOWnLBZxllrByV/W+XDmrxqJ6g==", + "dependencies": { + "@bufbuild/protobuf": "^2.2.3" + } + }, "node_modules/@prisma/client": { "version": "5.22.0", "resolved": "https://registry.npmjs.org/@prisma/client/-/client-5.22.0.tgz", @@ -99,70 +115,6 @@ "@prisma/debug": "5.22.0" } }, - "node_modules/@protobufjs/aspromise": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/@protobufjs/aspromise/-/aspromise-1.1.2.tgz", - "integrity": "sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ==", - "license": "BSD-3-Clause" - }, - "node_modules/@protobufjs/base64": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/@protobufjs/base64/-/base64-1.1.2.tgz", - "integrity": "sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg==", - "license": "BSD-3-Clause" - }, - "node_modules/@protobufjs/codegen": { - "version": "2.0.4", - "resolved": "https://registry.npmjs.org/@protobufjs/codegen/-/codegen-2.0.4.tgz", - "integrity": "sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg==", - "license": "BSD-3-Clause" - }, - "node_modules/@protobufjs/eventemitter": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/@protobufjs/eventemitter/-/eventemitter-1.1.0.tgz", - "integrity": "sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q==", - "license": "BSD-3-Clause" - }, - "node_modules/@protobufjs/fetch": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/@protobufjs/fetch/-/fetch-1.1.0.tgz", - "integrity": "sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ==", - "license": "BSD-3-Clause", - "dependencies": { - "@protobufjs/aspromise": "^1.1.1", - "@protobufjs/inquire": "^1.1.0" - } - }, - "node_modules/@protobufjs/float": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/@protobufjs/float/-/float-1.0.2.tgz", - "integrity": "sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ==", - "license": "BSD-3-Clause" - }, - "node_modules/@protobufjs/inquire": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/@protobufjs/inquire/-/inquire-1.1.0.tgz", - "integrity": "sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q==", - "license": "BSD-3-Clause" - }, - "node_modules/@protobufjs/path": { - "version": "1.1.2", - "resolved": "https://registry.npmjs.org/@protobufjs/path/-/path-1.1.2.tgz", - "integrity": "sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA==", - "license": "BSD-3-Clause" - }, - "node_modules/@protobufjs/pool": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/@protobufjs/pool/-/pool-1.1.0.tgz", - "integrity": "sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw==", - "license": "BSD-3-Clause" - }, - "node_modules/@protobufjs/utf8": { - "version": "1.1.0", - "resolved": "https://registry.npmjs.org/@protobufjs/utf8/-/utf8-1.1.0.tgz", - "integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==", - "license": "BSD-3-Clause" - }, "node_modules/@types/node": { "version": "22.14.1", "resolved": "https://registry.npmjs.org/@types/node/-/node-22.14.1.tgz", @@ -572,12 +524,6 @@ "integrity": "sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA==", "license": "MIT" }, - "node_modules/long": { - "version": "5.3.1", - "resolved": "https://registry.npmjs.org/long/-/long-5.3.1.tgz", - "integrity": "sha512-ka87Jz3gcx/I7Hal94xaN2tZEOPoUOEVftkQqZx2EeQRN7LGdfLlI3FvZ+7WDplm+vK2Urx9ULrvSowtdCieng==", - "license": "Apache-2.0" - }, "node_modules/lru-cache": { "version": "10.4.3", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.4.3.tgz", @@ -688,30 +634,6 @@ "integrity": "sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==", "license": "MIT" }, - "node_modules/protobufjs": { - "version": "7.5.0", - "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-7.5.0.tgz", - "integrity": "sha512-Z2E/kOY1QjoMlCytmexzYfDm/w5fKAiRwpSzGtdnXW1zC88Z2yXazHHrOtwCzn+7wSxyE8PYM4rvVcMphF9sOA==", - "hasInstallScript": true, - "license": "BSD-3-Clause", - "dependencies": { - "@protobufjs/aspromise": "^1.1.2", - "@protobufjs/base64": "^1.1.2", - "@protobufjs/codegen": "^2.0.4", - "@protobufjs/eventemitter": "^1.1.0", - "@protobufjs/fetch": "^1.1.0", - "@protobufjs/float": "^1.0.2", - "@protobufjs/inquire": "^1.1.0", - "@protobufjs/path": "^1.1.2", - "@protobufjs/pool": "^1.1.0", - "@protobufjs/utf8": "^1.1.0", - "@types/node": ">=13.7.0", - "long": "^5.0.0" - }, - "engines": { - "node": ">=12.0.0" - } - }, "node_modules/readable-stream": { "version": "4.7.0", "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-4.7.0.tgz", diff --git a/mqtt/package.json b/mqtt/package.json index 454daa7..90b1ca3 100644 --- a/mqtt/package.json +++ b/mqtt/package.json @@ -6,11 +6,12 @@ "license": "ISC", "description": "", "dependencies": { + "@bufbuild/protobuf": "^2.2.5", + "@meshtastic/protobufs": "npm:@jsr/meshtastic__protobufs@^2.6.2", "@prisma/client": "^5.11.0", "command-line-args": "^5.2.1", "command-line-usage": "^7.0.1", - "mqtt": "^5.11.0", - "protobufjs": "^7.5.0" + "mqtt": "^5.11.0" }, "devDependencies": { "prisma": "^5.10.2"