use @meshtastic/protobuf
This commit is contained in:
147
mqtt/index.js
147
mqtt/index.js
@ -1,11 +1,12 @@
|
||||
const crypto = require("crypto");
|
||||
const path = require("path");
|
||||
const mqtt = require("mqtt");
|
||||
const protobufjs = require("protobufjs");
|
||||
const commandLineArgs = require("command-line-args");
|
||||
const commandLineUsage = require("command-line-usage");
|
||||
const PositionUtil = require("./utils/position_util");
|
||||
|
||||
const { Mesh, Mqtt, Portnums, Telemetry } = require("@meshtastic/protobufs");
|
||||
const { fromBinary } = require("@bufbuild/protobuf");
|
||||
|
||||
// create prisma db client
|
||||
const { PrismaClient } = require("@prisma/client");
|
||||
const prisma = new PrismaClient();
|
||||
@ -209,8 +210,8 @@ if(options.help){
|
||||
const mqttBrokerUrl = options["mqtt-broker-url"] ?? "mqtt://mqtt.meshtastic.org";
|
||||
const mqttUsername = options["mqtt-username"] ?? "meshdev";
|
||||
const mqttPassword = options["mqtt-password"] ?? "large4cats";
|
||||
const mqttClientId = options["mqtt-client-id"] ?? null;
|
||||
const mqttTopics = options["mqtt-topic"] ?? ["msh/#"];
|
||||
const mqttClientId = options["mqtt-client-id"] ?? "mqttx_1bc723c7";
|
||||
const mqttTopics = options["mqtt-topic"] ?? ["msh/US/#"];
|
||||
const allowedPortnums = options["allowed-portnums"] ?? null;
|
||||
const logUnknownPortnums = options["log-unknown-portnums"] ?? false;
|
||||
const collectServiceEnvelopes = options["collect-service-envelopes"] ?? false;
|
||||
@ -247,20 +248,6 @@ const client = mqtt.connect(mqttBrokerUrl, {
|
||||
clientId: mqttClientId,
|
||||
});
|
||||
|
||||
// load protobufs
|
||||
const root = new protobufjs.Root();
|
||||
root.resolvePath = (origin, target) => path.join(__dirname, "protos", target);
|
||||
root.loadSync('meshtastic/mqtt.proto');
|
||||
const Data = root.lookupType("Data");
|
||||
const ServiceEnvelope = root.lookupType("ServiceEnvelope");
|
||||
const MapReport = root.lookupType("MapReport");
|
||||
const NeighborInfo = root.lookupType("NeighborInfo");
|
||||
const Position = root.lookupType("Position");
|
||||
const RouteDiscovery = root.lookupType("RouteDiscovery");
|
||||
const Telemetry = root.lookupType("Telemetry");
|
||||
const User = root.lookupType("User");
|
||||
const Waypoint = root.lookupType("Waypoint");
|
||||
|
||||
// run automatic purge if configured
|
||||
if(purgeIntervalSeconds){
|
||||
setInterval(async () => {
|
||||
@ -634,7 +621,6 @@ function decrypt(packet) {
|
||||
// attempt to decrypt with all available decryption keys
|
||||
for(const decryptionKey of decryptionKeys){
|
||||
try {
|
||||
|
||||
// convert encryption key to buffer
|
||||
const key = Buffer.from(decryptionKey, "base64");
|
||||
|
||||
@ -657,10 +643,10 @@ function decrypt(packet) {
|
||||
const decipher = crypto.createDecipheriv(algorithm, key, nonceBuffer);
|
||||
|
||||
// decrypt encrypted packet
|
||||
const decryptedBuffer = Buffer.concat([decipher.update(packet.encrypted), decipher.final()]);
|
||||
const decryptedBuffer = Buffer.concat([decipher.update(packet.payloadVariant.value), decipher.final()]);
|
||||
|
||||
// parse as data message
|
||||
return Data.decode(decryptedBuffer);
|
||||
return fromBinary(Mesh.DataSchema, decryptedBuffer);
|
||||
|
||||
} catch(e){}
|
||||
}
|
||||
@ -689,55 +675,40 @@ client.on("connect", () => {
|
||||
// handle message received
|
||||
client.on("message", async (topic, message) => {
|
||||
try {
|
||||
|
||||
// decode service envelope
|
||||
const envelope = ServiceEnvelope.decode(message);
|
||||
if(!envelope.packet){
|
||||
let envelope = null;
|
||||
try {
|
||||
envelope = fromBinary(Mqtt.ServiceEnvelopeSchema, message);
|
||||
} catch(e) { }
|
||||
if (!envelope) {
|
||||
return;
|
||||
}
|
||||
|
||||
let dataPacket = envelope.packet.payloadVariant.value;
|
||||
// attempt to decrypt encrypted packets
|
||||
const isEncrypted = envelope.packet.encrypted?.length > 0;
|
||||
if(isEncrypted){
|
||||
const decoded = decrypt(envelope.packet);
|
||||
if(decoded){
|
||||
envelope.packet.decoded = decoded;
|
||||
}
|
||||
if (envelope.packet.payloadVariant.case === 'encrypted') {
|
||||
dataPacket = decrypt(envelope.packet);
|
||||
}
|
||||
|
||||
// get portnum from decoded packet
|
||||
const portnum = envelope.packet?.decoded?.portnum;
|
||||
|
||||
// get bitfield from decoded packet
|
||||
// bitfield was added in v2.5 of meshtastic firmware
|
||||
// this value will be null for packets from v2.4.x and below, and will be an integer in v2.5.x and above
|
||||
const bitfield = envelope.packet?.decoded?.bitfield;
|
||||
|
||||
// check if we can see the decrypted packet data
|
||||
if(envelope.packet.decoded != null){
|
||||
|
||||
if (dataPacket !== null) {
|
||||
const bitfield = dataPacket.payload.bitfield ?? null;
|
||||
// check if bitfield is available (v2.5.x firmware or newer)
|
||||
if(bitfield != null){
|
||||
|
||||
if (bitfield != null){
|
||||
// drop packets where "OK to MQTT" is false
|
||||
const isOkToMqtt = bitfield & BITFIELD_OK_TO_MQTT_MASK;
|
||||
if(dropPacketsNotOkToMqtt && !isOkToMqtt){
|
||||
if (dropPacketsNotOkToMqtt && !isOkToMqtt) {
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// if bitfield is not available for this packet, check if we want to drop this portnum
|
||||
if(bitfield == null){
|
||||
|
||||
if (bitfield == null) {
|
||||
// drop packet if portnum is in drop list
|
||||
// this is useful for dropping specific packet types from firmware older than v2.5
|
||||
if(dropPortnumsWithoutBitfield != null && dropPortnumsWithoutBitfield.includes(portnum)){
|
||||
if (dropPortnumsWithoutBitfield != null && dropPortnumsWithoutBitfield.includes(portnum)) {
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// create service envelope in db
|
||||
@ -774,6 +745,19 @@ client.on("message", async (topic, message) => {
|
||||
// don't care if updating mqtt timestamp fails
|
||||
}
|
||||
|
||||
// bail if we don't have a usable Data packet
|
||||
if (dataPacket === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = dataPacket.payload;
|
||||
// get portnum from decoded packet
|
||||
const portnum = dataPacket.portnum;
|
||||
// get bitfield from decoded packet
|
||||
// bitfield was added in v2.5 of meshtastic firmware
|
||||
// this value will be null for packets from v2.4.x and below, and will be an integer in v2.5.x and above
|
||||
const bitfield = dataPacket.bitfield;
|
||||
|
||||
const logKnownPacketTypes = false;
|
||||
|
||||
// if allowed portnums are configured, ignore portnums that are not in the list
|
||||
@ -781,7 +765,7 @@ client.on("message", async (topic, message) => {
|
||||
return;
|
||||
}
|
||||
|
||||
if(portnum === 1) {
|
||||
if(portnum === Portnums.PortNum.TEXT_MESSAGE_APP) {
|
||||
|
||||
if(!collectTextMessages){
|
||||
return;
|
||||
@ -796,7 +780,7 @@ client.on("message", async (topic, message) => {
|
||||
console.log("TEXT_MESSAGE_APP", {
|
||||
to: envelope.packet.to.toString(16),
|
||||
from: envelope.packet.from.toString(16),
|
||||
text: envelope.packet.decoded.payload.toString(),
|
||||
text: payload.toString(),
|
||||
});
|
||||
}
|
||||
|
||||
@ -809,7 +793,7 @@ client.on("message", async (topic, message) => {
|
||||
packet_id: envelope.packet.id,
|
||||
channel_id: envelope.channelId,
|
||||
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
|
||||
text: envelope.packet.decoded.payload.toString(),
|
||||
text: payload.toString(),
|
||||
rx_time: envelope.packet.rxTime,
|
||||
rx_snr: envelope.packet.rxSnr,
|
||||
rx_rssi: envelope.packet.rxRssi,
|
||||
@ -822,9 +806,8 @@ client.on("message", async (topic, message) => {
|
||||
|
||||
}
|
||||
|
||||
else if(portnum === 3) {
|
||||
|
||||
const position = Position.decode(envelope.packet.decoded.payload);
|
||||
else if(portnum === Portnums.PortNum.POSITION_APP) {
|
||||
const position = fromBinary(Mesh.PositionSchema, payload);
|
||||
|
||||
if(logKnownPacketTypes){
|
||||
console.log("POSITION_APP", {
|
||||
@ -911,9 +894,9 @@ client.on("message", async (topic, message) => {
|
||||
|
||||
}
|
||||
|
||||
else if(portnum === 4) {
|
||||
else if(portnum === Portnums.PortNum.NODEINFO_APP) {
|
||||
|
||||
const user = User.decode(envelope.packet.decoded.payload);
|
||||
const user = fromBinary(Mesh.UserSchema, payload);
|
||||
|
||||
if(logKnownPacketTypes) {
|
||||
console.log("NODEINFO_APP", {
|
||||
@ -950,13 +933,13 @@ client.on("message", async (topic, message) => {
|
||||
|
||||
}
|
||||
|
||||
else if(portnum === 8) {
|
||||
else if(portnum === Portnums.PortNum.WAYPOINT_APP) {
|
||||
|
||||
if(!collectWaypoints){
|
||||
return;
|
||||
}
|
||||
|
||||
const waypoint = Waypoint.decode(envelope.packet.decoded.payload);
|
||||
const waypoint = fromBinary(Mesh.WaypointSchema, payload);
|
||||
|
||||
if(logKnownPacketTypes) {
|
||||
console.log("WAYPOINT_APP", {
|
||||
@ -991,9 +974,9 @@ client.on("message", async (topic, message) => {
|
||||
|
||||
}
|
||||
|
||||
else if(portnum === 71) {
|
||||
else if(portnum === Portnums.PortNum.NEIGHBORINFO_APP) {
|
||||
|
||||
const neighbourInfo = NeighborInfo.decode(envelope.packet.decoded.payload);
|
||||
const neighbourInfo = fromBinary(Mesh.NeighborInfoSchema, payload);
|
||||
|
||||
if(logKnownPacketTypes) {
|
||||
console.log("NEIGHBORINFO_APP", {
|
||||
@ -1048,9 +1031,9 @@ client.on("message", async (topic, message) => {
|
||||
|
||||
}
|
||||
|
||||
else if(portnum === 67) {
|
||||
else if(portnum === Portnums.PortNum.TELEMETRY_APP) {
|
||||
|
||||
const telemetry = Telemetry.decode(envelope.packet.decoded.payload);
|
||||
const telemetry = fromBinary(Telemetry.TelemetrySchema, payload);
|
||||
|
||||
if(logKnownPacketTypes) {
|
||||
console.log("TELEMETRY_APP", {
|
||||
@ -1232,15 +1215,15 @@ client.on("message", async (topic, message) => {
|
||||
|
||||
}
|
||||
|
||||
else if(portnum === 70) {
|
||||
else if(portnum === Portnums.PortNum.TRACEROUTE_APP) {
|
||||
|
||||
const routeDiscovery = RouteDiscovery.decode(envelope.packet.decoded.payload);
|
||||
const routeDiscovery = fromBinary(Mesh.RouteDiscoverySchema, payload);
|
||||
|
||||
if(logKnownPacketTypes) {
|
||||
console.log("TRACEROUTE_APP", {
|
||||
to: envelope.packet.to.toString(16),
|
||||
from: envelope.packet.from.toString(16),
|
||||
want_response: envelope.packet.decoded.wantResponse,
|
||||
want_response: payload.wantResponse,
|
||||
route_discovery: routeDiscovery,
|
||||
});
|
||||
}
|
||||
@ -1267,9 +1250,9 @@ client.on("message", async (topic, message) => {
|
||||
|
||||
}
|
||||
|
||||
else if(portnum === 73) {
|
||||
else if(portnum === Portnums.PortNum.MAP_REPORT_APP) {
|
||||
|
||||
const mapReport = MapReport.decode(envelope.packet.decoded.payload);
|
||||
const mapReport = fromBinary(Mqtt.MapReportSchema, payload);
|
||||
|
||||
if(logKnownPacketTypes) {
|
||||
console.log("MAP_REPORT_APP", {
|
||||
@ -1362,23 +1345,21 @@ client.on("message", async (topic, message) => {
|
||||
}
|
||||
|
||||
else {
|
||||
if(logUnknownPortnums){
|
||||
|
||||
if (logUnknownPortnums) {
|
||||
const ignoredPortnums = [
|
||||
Portnums.PortNum.UNKNOWN_APP,
|
||||
Portnums.PortNum.TEXT_MESSAGE_COMPRESSED_APP,
|
||||
Portnums.PortNum.ROUTING_APP,
|
||||
Portnums.PortNum.PAXCOUNTER_APP,
|
||||
Portnums.PortNum.STORE_FORWARD_APP,
|
||||
Portnums.PortNum.RANGE_TEST_APP,
|
||||
Portnums.PortNum.ATAK_PLUGIN,
|
||||
Portnums.PortNum.ATAK_FORWARDER,
|
||||
];
|
||||
// ignore packets we don't want to see for now
|
||||
if(portnum === undefined // ignore failed to decrypt
|
||||
|| portnum === 0 // ignore UNKNOWN_APP
|
||||
|| portnum === 1 // ignore TEXT_MESSAGE_APP
|
||||
|| portnum === 5 // ignore ROUTING_APP
|
||||
|| portnum === 34 // ignore PAXCOUNTER_APP
|
||||
|| portnum === 65 // ignore STORE_FORWARD_APP
|
||||
|| portnum === 66 // ignore RANGE_TEST_APP
|
||||
|| portnum === 72 // ignore ATAK_PLUGIN
|
||||
|| portnum === 257 // ignore ATAK_FORWARDER
|
||||
|| portnum > 511 // ignore above MAX
|
||||
){
|
||||
if (portnum === undefined || ignoredPortnums.includes(portnum) || portnum > 511) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(portnum, envelope);
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user