import crypto from 'crypto'; import mqtt from "mqtt"; import commandLineArgs from 'command-line-args'; import commandLineUsage from 'command-line-usage'; import { fromBinary } from '@bufbuild/protobuf'; import { Mesh, Mqtt, Portnums, Telemetry } from '@meshtastic/protobufs'; import PositionUtil from './utils/position_util.js'; // create prisma db client import { PrismaClient } from "@prisma/client"; const prisma = new PrismaClient(); // meshtastic bitfield flags const BITFIELD_OK_TO_MQTT_SHIFT = 0; const BITFIELD_OK_TO_MQTT_MASK = (1 << BITFIELD_OK_TO_MQTT_SHIFT); const optionsList = [ { name: 'help', alias: 'h', type: Boolean, description: 'Display this usage guide.' }, { name: "mqtt-broker-url", type: String, description: "MQTT Broker URL (e.g: mqtt://mqtt.meshtastic.org)", }, { name: "mqtt-username", type: String, description: "MQTT Username (e.g: meshdev)", }, { name: "mqtt-password", type: String, description: "MQTT Password (e.g: large4cats)", }, { name: "mqtt-client-id", type: String, description: "MQTT Client ID (e.g: map.example.com)", }, { name: "mqtt-topic", type: String, multiple: true, typeLabel: ' ...', description: "MQTT Topic to subscribe to (e.g: msh/#)", }, { name: "allowed-portnums", type: Number, multiple: true, typeLabel: ' ...', description: "If provided, only packets with these portnums will be processed.", }, { name: "log-unknown-portnums", type: Boolean, description: "This option will log packets for unknown portnums to the console.", }, { name: "collect-service-envelopes", type: Boolean, description: "This option will save all received service envelopes to the database.", }, { name: "collect-positions", type: Boolean, description: "This option will save all received positions to the database.", }, { name: "collect-text-messages", type: Boolean, description: "This option will save all received text messages to the database.", }, { name: "ignore-direct-messages", type: Boolean, description: "This option will prevent saving direct messages to the database.", }, { name: "collect-waypoints", type: Boolean, description: "This option will save all received waypoints to the database.", }, { name: "collect-neighbor-info", type: Boolean, description: "This option will save all received neighbor infos to the database.", }, { name: "collect-map-reports", type: Boolean, description: "This option will save all received map reports to the database.", }, { name: "decryption-keys", type: String, multiple: true, typeLabel: ' ...', description: "Decryption keys encoded in base64 to use when decrypting service envelopes.", }, { name: "drop-packets-not-ok-to-mqtt", type: Boolean, description: "This option will drop all packets that have 'OK to MQTT' set to false.", }, { name: "debug-incoming-packets", type: Boolean, description: "This option will print out all known packets as they arrive.", }, { name: "drop-portnums-without-bitfield", type: Number, multiple: true, typeLabel: ' ...', description: "If provided, packets with these portnums will be dropped if they don't have a bitfield. (bitfield available from firmware v2.5+)", }, { name: "old-firmware-position-precision", type: Number, description: "If provided, position packets from firmware v2.4 and older will be truncated to this many decimal places.", }, { name: "forget-outdated-node-positions-after-seconds", type: Number, description: "If provided, nodes that haven't sent a position report in this time will have their current position cleared.", }, { name: "purge-interval-seconds", type: Number, description: "How long to wait between each automatic database purge.", }, { name: "purge-device-metrics-after-seconds", type: Number, description: "Device Metrics older than this many seconds will be purged from the database.", }, { name: "purge-environment-metrics-after-seconds", type: Number, description: "Environment Metrics older than this many seconds will be purged from the database.", }, { name: "purge-power-metrics-after-seconds", type: Number, description: "Power Metrics older than this many seconds will be purged from the database.", }, { name: "purge-map-reports-after-seconds", type: Number, description: "Map reports older than this many seconds will be purged from the database.", }, { name: "purge-neighbour-infos-after-seconds", type: Number, description: "Neighbour infos older than this many seconds will be purged from the database.", }, { name: "purge-nodes-unheard-for-seconds", type: Number, description: "Nodes that haven't been heard from in this many seconds will be purged from the database.", }, { name: "purge-positions-after-seconds", type: Number, description: "Positions older than this many seconds will be purged from the database.", }, { name: "purge-service-envelopes-after-seconds", type: Number, description: "Service envelopes older than this many seconds will be purged from the database.", }, { name: "purge-text-messages-after-seconds", type: Number, description: "Text Messages older than this many seconds will be purged from the database.", }, { name: "purge-traceroutes-after-seconds", type: Number, description: "Traceroutes older than this many seconds will be purged from the database.", }, { name: "purge-waypoints-after-seconds", type: Number, description: "Waypoints older than this many seconds will be purged from the database.", }, ]; // parse command line args const options = commandLineArgs(optionsList); // show help if(options.help){ const usage = commandLineUsage([ { header: 'Meshtastic MQTT Collector', content: 'Collects and processes service envelopes from a Meshtastic MQTT server.', }, { header: 'Options', optionList: optionsList, }, ]); console.log(usage); process.exit(1); } // get options and fallback to default values const mqttBrokerUrl = options["mqtt-broker-url"] ?? "mqtt://mqtt.meshtastic.org"; const mqttUsername = options["mqtt-username"] ?? "meshdev"; const mqttPassword = options["mqtt-password"] ?? "large4cats"; const mqttClientId = options["mqtt-client-id"] ?? "mqttx_1bc723c7"; const mqttTopics = options["mqtt-topic"] ?? ["msh/US/#"]; const allowedPortnums = options["allowed-portnums"] ?? null; const logUnknownPortnums = options["log-unknown-portnums"] ?? false; const collectorEnabled = { serviceEnvelopes: options["collect-service-envelopes"] ?? false, positions: options["collect-positions"] ?? false, textMessages: options["collect-text-messages"] ?? false, waypoints: options["collect-waypoints"] ?? false, mapReports: options["collect-map-reports"] ?? false, directMessages: !(options["ignore-direct-messages"] ?? false), neighborInfo: options["collect-neighbor-info"] ?? false, } const decryptionKeys = options["decryption-keys"] ?? [ "1PG7OiApB1nwvP+rz05pAQ==", // add default "AQ==" decryption key ]; const logKnownPacketTypes = options["debug-incoming-packets"] ?? false; const dropPacketsNotOkToMqtt = options["drop-packets-not-ok-to-mqtt"] ?? false; const dropPortnumsWithoutBitfield = options["drop-portnums-without-bitfield"] ?? null; const oldFirmwarePositionPrecision = options["old-firmware-position-precision"] ?? null; const forgetOutdatedNodePositionsAfterSeconds = options["forget-outdated-node-positions-after-seconds"] ?? null; const purgeIntervalSeconds = options["purge-interval-seconds"] ?? 10; const purgeNodesUnheardForSeconds = options["purge-nodes-unheard-for-seconds"] ?? null; const purgeDeviceMetricsAfterSeconds = options["purge-device-metrics-after-seconds"] ?? null; const purgeEnvironmentMetricsAfterSeconds = options["purge-environment-metrics-after-seconds"] ?? null; const purgeMapReportsAfterSeconds = options["purge-map-reports-after-seconds"] ?? null; const purgeNeighbourInfosAfterSeconds = options["purge-neighbour-infos-after-seconds"] ?? null; const purgePowerMetricsAfterSeconds = options["purge-power-metrics-after-seconds"] ?? null; const purgePositionsAfterSeconds = options["purge-positions-after-seconds"] ?? null; const purgeServiceEnvelopesAfterSeconds = options["purge-service-envelopes-after-seconds"] ?? null; const purgeTextMessagesAfterSeconds = options["purge-text-messages-after-seconds"] ?? null; const purgeTraceroutesAfterSeconds = options["purge-traceroutes-after-seconds"] ?? null; const purgeWaypointsAfterSeconds = options["purge-waypoints-after-seconds"] ?? null; // create mqtt client const client = mqtt.connect(mqttBrokerUrl, { username: mqttUsername, password: mqttPassword, clientId: mqttClientId, }); // run automatic purge if configured if(purgeIntervalSeconds){ setInterval(async () => { await purgeUnheardNodes(); await purgeOldDeviceMetrics(); await purgeOldEnvironmentMetrics(); await purgeOldMapReports(); await purgeOldNeighbourInfos(); await purgeOldPowerMetrics(); await purgeOldPositions(); await purgeOldServiceEnvelopes(); await purgeOldTextMessages(); await purgeOldTraceroutes(); await purgeOldWaypoints(); await forgetOutdatedNodePositions(); }, purgeIntervalSeconds * 1000); } function printStatus() { console.log(`MQTT server: ${mqttBrokerUrl}`) console.log(`MQTT topic(s): ${mqttTopics.join(', ')}`) for (const key of Object.keys(collectorEnabled)) { console.log(`${key} collection: ${collectorEnabled[key] ? 'enabled' : 'disabled'}`) } } printStatus(); /** * Purges all nodes from the database that haven't been heard from within the configured timeframe. */ async function purgeUnheardNodes() { // make sure seconds provided if(!purgeNodesUnheardForSeconds){ return; } // delete all nodes that were last updated before configured purge time try { await prisma.node.deleteMany({ where: { updated_at: { // last updated before x seconds ago lt: new Date(Date.now() - purgeNodesUnheardForSeconds * 1000), }, } }); } catch(e) { // do nothing } } /** * Purges all device metrics from the database that are older than the configured timeframe. */ async function purgeOldDeviceMetrics() { // make sure seconds provided if(!purgeDeviceMetricsAfterSeconds){ return; } // delete all device metrics that are older than the configured purge time try { await prisma.deviceMetric.deleteMany({ where: { created_at: { // created before x seconds ago lt: new Date(Date.now() - purgeDeviceMetricsAfterSeconds * 1000), }, } }); } catch(e) { // do nothing } } /** * Purges all environment metrics from the database that are older than the configured timeframe. */ async function purgeOldEnvironmentMetrics() { // make sure seconds provided if(!purgeEnvironmentMetricsAfterSeconds){ return; } // delete all environment metrics that are older than the configured purge time try { await prisma.environmentMetric.deleteMany({ where: { created_at: { // created before x seconds ago lt: new Date(Date.now() - purgeEnvironmentMetricsAfterSeconds * 1000), }, } }); } catch(e) { // do nothing } } /** * Purges all power metrics from the database that are older than the configured timeframe. */ async function purgeOldMapReports() { // make sure seconds provided if(!purgeMapReportsAfterSeconds){ return; } // delete all map reports that are older than the configured purge time try { await prisma.mapReport.deleteMany({ where: { created_at: { // created before x seconds ago lt: new Date(Date.now() - purgeMapReportsAfterSeconds * 1000), }, } }); } catch(e) { // do nothing } } /** * Purges all neighbour infos from the database that are older than the configured timeframe. */ async function purgeOldNeighbourInfos() { // make sure seconds provided if(!purgeNeighbourInfosAfterSeconds){ return; } // delete all neighbour infos that are older than the configured purge time try { await prisma.neighbourInfo.deleteMany({ where: { created_at: { // created before x seconds ago lt: new Date(Date.now() - purgeNeighbourInfosAfterSeconds * 1000), }, } }); } catch(e) { // do nothing } } /** * Purges all power metrics from the database that are older than the configured timeframe. */ async function purgeOldPowerMetrics() { // make sure seconds provided if(!purgePowerMetricsAfterSeconds){ return; } // delete all power metrics that are older than the configured purge time try { await prisma.powerMetric.deleteMany({ where: { created_at: { // created before x seconds ago lt: new Date(Date.now() - purgePowerMetricsAfterSeconds * 1000), }, } }); } catch(e) { // do nothing } } /** * Purges all positions from the database that are older than the configured timeframe. */ async function purgeOldPositions() { // make sure seconds provided if(!purgePositionsAfterSeconds){ return; } // delete all positions that are older than the configured purge time try { await prisma.position.deleteMany({ where: { created_at: { // created before x seconds ago lt: new Date(Date.now() - purgePositionsAfterSeconds * 1000), }, } }); } catch(e) { // do nothing } } /** * Purges all service envelopes from the database that are older than the configured timeframe. */ async function purgeOldServiceEnvelopes() { // make sure seconds provided if(!purgeServiceEnvelopesAfterSeconds){ return; } // delete all service envelopes that are older than the configured purge time try { await prisma.serviceEnvelope.deleteMany({ where: { created_at: { // created before x seconds ago lt: new Date(Date.now() - purgeServiceEnvelopesAfterSeconds * 1000), }, } }); } catch(e) { // do nothing } } /** * Purges all text messages from the database that are older than the configured timeframe. */ async function purgeOldTextMessages() { // make sure seconds provided if(!purgeTextMessagesAfterSeconds){ return; } // delete all text messages that are older than the configured purge time try { await prisma.textMessage.deleteMany({ where: { created_at: { // created before x seconds ago lt: new Date(Date.now() - purgeTextMessagesAfterSeconds * 1000), }, } }); } catch(e) { // do nothing } } /** * Purges all traceroutes from the database that are older than the configured timeframe. */ async function purgeOldTraceroutes() { // make sure seconds provided if(!purgeTraceroutesAfterSeconds){ return; } // delete all traceroutes that are older than the configured purge time try { await prisma.traceRoute.deleteMany({ where: { created_at: { // created before x seconds ago lt: new Date(Date.now() - purgeTraceroutesAfterSeconds * 1000), }, } }); } catch(e) { // do nothing } } /** * Purges all waypoints from the database that are older than the configured timeframe. */ async function purgeOldWaypoints() { // make sure seconds provided if(!purgeWaypointsAfterSeconds){ return; } // delete all waypoints that are older than the configured purge time try { await prisma.waypoint.deleteMany({ where: { created_at: { // created before x seconds ago lt: new Date(Date.now() - purgeWaypointsAfterSeconds * 1000), }, } }); } catch(e) { // do nothing } } /** * Clears the current position stored for nodes if the position hasn't been updated within the configured timeframe. * This allows the node position to drop off the map if the user disabled position reporting, but still wants telemetry lookup etc */ async function forgetOutdatedNodePositions() { // make sure seconds provided if(!forgetOutdatedNodePositionsAfterSeconds){ return; } // clear latitude/longitude/altitude for nodes that haven't updated their position in the configured timeframe try { await prisma.node.updateMany({ where: { position_updated_at: { // position_updated_at before x seconds ago lt: new Date(Date.now() - forgetOutdatedNodePositionsAfterSeconds * 1000), }, // don't forget outdated node positions for nodes that don't actually have a position set // otherwise the updated_at is updated, when nothing changed NOT: { latitude: null, longitude: null, altitude: null, }, }, data: { latitude: null, longitude: null, altitude: null, }, }); } catch(e) { // do nothing } } function createNonce(packetId, fromNode) { // Expand packetId to 64 bits const packetId64 = BigInt(packetId); // Initialize block counter (32-bit, starts at zero) const blockCounter = 0; // Create a buffer for the nonce const buf = Buffer.alloc(16); // Write packetId, fromNode, and block counter to the buffer buf.writeBigUInt64LE(packetId64, 0); buf.writeUInt32LE(fromNode, 8); buf.writeUInt32LE(blockCounter, 12); return buf; } /** * References: * https://github.com/crypto-smoke/meshtastic-go/blob/develop/radio/aes.go#L42 * https://github.com/pdxlocations/Meshtastic-MQTT-Connect/blob/main/meshtastic-mqtt-connect.py#L381 */ function decrypt(packet) { // attempt to decrypt with all available decryption keys for(const decryptionKey of decryptionKeys){ try { // convert encryption key to buffer const key = Buffer.from(decryptionKey, "base64"); // create decryption iv/nonce for this packet const nonceBuffer = createNonce(packet.id, packet.from); // determine algorithm based on key length var algorithm = null; if(key.length === 16){ algorithm = "aes-128-ctr"; } else if(key.length === 32){ algorithm = "aes-256-ctr"; } else { // skip this key, try the next one... console.error(`Skipping decryption key with invalid length: ${key.length}`); continue; } // create decipher const decipher = crypto.createDecipheriv(algorithm, key, nonceBuffer); // decrypt encrypted packet const decryptedBuffer = Buffer.concat([decipher.update(packet.payloadVariant.value), decipher.final()]); // parse as data message return fromBinary(Mesh.DataSchema, decryptedBuffer); } catch(e){} } // couldn't decrypt return null; } /** * converts hex id to numeric id, for example: !FFFFFFFF to 4294967295 * @param hexId a node id in hex format with a prepended "!" * @returns {bigint} the node id in numeric form */ function convertHexIdToNumericId(hexId) { return BigInt('0x' + hexId.replaceAll("!", "")); } // subscribe to everything when connected client.on('connect', () => { for(const mqttTopic of mqttTopics){ client.subscribe(mqttTopic); } }); // handle message received client.on('message', async (topic, message) => { try { // decode service envelope let envelope = null; try { envelope = fromBinary(Mqtt.ServiceEnvelopeSchema, message); } catch(e) { } if (!envelope) { return; } let dataPacket = envelope.packet.payloadVariant.value; // attempt to decrypt encrypted packets if (envelope.packet.payloadVariant.case === 'encrypted') { envelope.packet.payloadVariant.value = dataPacket = decrypt(envelope.packet); } if (dataPacket !== null) { const bitfield = dataPacket.payload.bitfield ?? null; // check if bitfield is available (v2.5.x firmware or newer) if (bitfield != null){ // drop packets where "OK to MQTT" is false const isOkToMqtt = bitfield & BITFIELD_OK_TO_MQTT_MASK; if (dropPacketsNotOkToMqtt && !isOkToMqtt) { return; } } // if bitfield is not available for this packet, check if we want to drop this portnum if (bitfield == null) { // drop packet if portnum is in drop list // this is useful for dropping specific packet types from firmware older than v2.5 if (dropPortnumsWithoutBitfield != null && dropPortnumsWithoutBitfield.includes(portnum)) { return; } } } // create service envelope in db if (collectorEnabled.serviceEnvelopes) { try { await prisma.serviceEnvelope.create({ data: { mqtt_topic: topic, channel_id: envelope.channelId, gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, to: envelope.packet.to, from: envelope.packet.from, protobuf: message, }, }); } catch (e) { console.error(e, { envelope: envelope.packet, }); } } // track when a node last gated a packet to mqtt try { await prisma.node.updateMany({ where: { node_id: convertHexIdToNumericId(envelope.gatewayId), }, data: { mqtt_connection_state_updated_at: new Date(), }, }); } catch(e) { // don't care if updating mqtt timestamp fails } // bail if we don't have a usable Data packet if (dataPacket === null) { return; } let payload = dataPacket.payload; // get portnum from decoded packet const portnum = dataPacket.portnum; // get bitfield from decoded packet // bitfield was added in v2.5 of meshtastic firmware // this value will be null for packets from v2.4.x and below, and will be an integer in v2.5.x and above const bitfield = dataPacket.bitfield; // if allowed portnums are configured, ignore portnums that are not in the list if(allowedPortnums != null && !allowedPortnums.includes(portnum)){ return; } let callback = null; let schema = null; switch(portnum) { case Portnums.PortNum.TEXT_MESSAGE_APP: callback = onTextMessage; break; case Portnums.PortNum.POSITION_APP: callback = onPosition; schema = Mesh.PositionSchema; break; case Portnums.PortNum.NODEINFO_APP: callback = onNodeInfo; schema = Mesh.NodeInfoSchema; break; case Portnums.PortNum.WAYPOINT_APP: callback = onWaypoint; schema = Mesh.WaypointSchema; break; case Portnums.PortNum.NEIGHBORINFO_APP: callback = onNeighborInfo; schema = Mesh.NeighborInfoSchema; break; case Portnums.PortNum.TELEMETRY_APP: callback = onTelemetry; schema = Telemetry.TelemetrySchema; break; case Portnums.PortNum.TRACEROUTE_APP: callback = onRouteDiscovery; schema = Mesh.RouteDiscoverySchema; break; case Portnums.PortNum.MAP_REPORT_APP: callback = onMapReport; schema = Mqtt.MapReportSchema; break; default: // handle unknown port nums here if (logUnknownPortnums) { const ignoredPortnums = [ Portnums.PortNum.UNKNOWN_APP, Portnums.PortNum.TEXT_MESSAGE_COMPRESSED_APP, Portnums.PortNum.ROUTING_APP, Portnums.PortNum.PAXCOUNTER_APP, Portnums.PortNum.STORE_FORWARD_APP, Portnums.PortNum.RANGE_TEST_APP, Portnums.PortNum.ATAK_PLUGIN, Portnums.PortNum.ATAK_FORWARDER, ]; // ignore packets we don't want to see for now if (portnum === undefined || ignoredPortnums.includes(portnum) || portnum > 511) { return; } console.log(portnum, envelope); } } if (callback !== null) { if (schema !== null) { try { payload = fromBinary(schema, payload); } catch(e) { return; } } await callback(envelope, payload); } } catch(e) { // ignore errors } }); async function onTextMessage(envelope, payload) { if(logKnownPacketTypes) { console.log('TEXT_MESSAGE_APP', { to: envelope.packet.to.toString(16), from: envelope.packet.from.toString(16), text: payload.toString(), }); } if (!collectorEnabled.textMessages) { return; } // check if we want to ignore direct messages if(!collectorEnabled.directMessages && envelope.packet.to !== 0xFFFFFFFF){ return; } try { await prisma.textMessage.create({ data: { to: envelope.packet.to, from: envelope.packet.from, channel: envelope.packet.channel, packet_id: envelope.packet.id, channel_id: envelope.channelId, gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, text: payload.toString(), rx_time: envelope.packet.rxTime, rx_snr: envelope.packet.rxSnr, rx_rssi: envelope.packet.rxRssi, hop_limit: envelope.packet.hopLimit, }, }); } catch (e) { console.error(e); } } async function onPosition(envelope, payload) { if(logKnownPacketTypes){ console.log('POSITION_APP', { from: envelope.packet.from.toString(16), position: payload, }); } // process position if(payload.latitudeI != null && payload.longitudeI){ const bitfield = envelope.packet.payloadVariant.value.bitfield; // if bitfield is not available, we are on firmware v2.4 or below // if configured, position packets should have their precision reduced if (bitfield == null && oldFirmwarePositionPrecision != null) { // adjust precision of latitude and longitude payload.latitudeI = PositionUtil.setPositionPrecision(payload.latitudeI, oldFirmwarePositionPrecision); payload.longitudeI = PositionUtil.setPositionPrecision(payload.longitudeI, oldFirmwarePositionPrecision); // update position precision on packet to show that it is no longer full precision payload.precisionBits = oldFirmwarePositionPrecision; } // update node position in db try { await prisma.node.updateMany({ where: { node_id: envelope.packet.from, }, data: { position_updated_at: new Date(), latitude: payload.latitudeI, longitude: payload.longitudeI, altitude: payload.altitude !== 0 ? payload.altitude : null, position_precision: payload.precisionBits, }, }); } catch (e) { console.error(e); } } // don't collect position history if not enabled, but we still want to update the node above if (!collectorEnabled.positions) { return; } try { // find an existing position with duplicate information created in the last 60 seconds const existingDuplicatePosition = await prisma.position.findFirst({ where: { node_id: envelope.packet.from, packet_id: envelope.packet.id, created_at: { gte: new Date(Date.now() - 60000), // created in the last 60 seconds }, } }); // create position if no duplicates found if(!existingDuplicatePosition){ await prisma.position.create({ data: { node_id: envelope.packet.from, to: envelope.packet.to, from: envelope.packet.from, channel: envelope.packet.channel, packet_id: envelope.packet.id, channel_id: envelope.channelId, gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, latitude: payload.latitudeI, longitude: payload.longitudeI, altitude: payload.altitude, }, }); } } catch (e) { console.error(e); } } async function onNodeInfo(envelope, payload) { if(logKnownPacketTypes) { console.log('NODEINFO_APP', { from: envelope.packet.from.toString(16), user: payload, }); } // create or update node in db try { await prisma.node.upsert({ where: { node_id: envelope.packet.from, }, create: { node_id: envelope.packet.from, long_name: payload.longName, short_name: payload.shortName, hardware_model: payload.hwModel, is_licensed: payload.isLicensed === true, role: payload.role, }, update: { long_name: payload.longName, short_name: payload.shortName, hardware_model: payload.hwModel, is_licensed: payload.isLicensed === true, role: payload.role, }, }); } catch (e) { console.error(e); } } async function onWaypoint(envelope, payload) { if(logKnownPacketTypes) { console.log('WAYPOINT_APP', { to: envelope.packet.to.toString(16), from: envelope.packet.from.toString(16), waypoint: payload, }); } if (!collectorEnabled.waypoints) { return; } try { await prisma.waypoint.create({ data: { to: envelope.packet.to, from: envelope.packet.from, waypoint_id: payload.id, latitude: payload.latitudeI, longitude: payload.longitudeI, expire: payload.expire, locked_to: payload.lockedTo, name: payload.name, description: payload.description, icon: payload.icon, channel: envelope.packet.channel, packet_id: envelope.packet.id, channel_id: envelope.channelId, gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, }, }); } catch (e) { console.error(e); } } async function onMapReport(envelope, payload) { if(logKnownPacketTypes) { console.log('MAP_REPORT_APP', { from: envelope.packet.from.toString(16), map_report: payload, }); } // create or update node in db try { // data to set on node const data = { long_name: payload.longName, short_name: payload.shortName, hardware_model: payload.hwModel, role: payload.role, latitude: payload.latitudeI, longitude: payload.longitudeI, altitude: payload.altitude !== 0 ? payload.altitude : null, firmware_version: payload.firmwareVersion, region: payload.region, modem_preset: payload.modemPreset, has_default_channel: payload.hasDefaultChannel, position_precision: payload.positionPrecision, num_online_local_nodes: payload.numOnlineLocalNodes, position_updated_at: new Date(), }; await prisma.node.upsert({ where: { node_id: envelope.packet.from, }, create: { node_id: envelope.packet.from, ...data, }, update: data, }); } catch (e) { console.error(e); } // don't collect map report history if not enabled, but we still want to update the node above if (!collectorEnabled.mapReports) { return; } try { // find an existing map with duplicate information created in the last 60 seconds const existingDuplicateMapReport = await prisma.mapReport.findFirst({ where: { node_id: envelope.packet.from, long_name: payload.longName, short_name: payload.shortName, created_at: { gte: new Date(Date.now() - 60000), // created in the last 60 seconds }, } }); // create map report if no duplicates found if(!existingDuplicateMapReport){ await prisma.mapReport.create({ data: { node_id: envelope.packet.from, long_name: payload.longName, short_name: payload.shortName, role: payload.role, hardware_model: payload.hwModel, firmware_version: payload.firmwareVersion, region: payload.region, modem_preset: payload.modemPreset, has_default_channel: payload.hasDefaultChannel, latitude: payload.latitudeI, longitude: payload.longitudeI, altitude: payload.altitude, position_precision: payload.positionPrecision, num_online_local_nodes: payload.numOnlineLocalNodes, }, }); } } catch (e) { console.error(e); } } async function onNeighborInfo(envelope, payload) { if(logKnownPacketTypes) { console.log('NEIGHBORINFO_APP', { from: envelope.packet.from.toString(16), neighbor_info: payload, }); } // update node neighbour info in db try { await prisma.node.updateMany({ where: { node_id: envelope.packet.from, }, data: { neighbours_updated_at: new Date(), neighbour_broadcast_interval_secs: payload.nodeBroadcastIntervalSecs, neighbours: payload.neighbors.map((neighbour) => { return { node_id: neighbour.nodeId, snr: neighbour.snr, }; }), }, }); } catch (e) { console.error(e); } // don't store all neighbour infos, but we want to update the existing node above if (!collectorEnabled.neighborInfo) { return; } // create neighbour info try { await prisma.neighbourInfo.create({ data: { node_id: envelope.packet.from, node_broadcast_interval_secs: payload.nodeBroadcastIntervalSecs, neighbours: payload.neighbors.map((neighbour) => { return { node_id: neighbour.nodeId, snr: neighbour.snr, }; }), }, }); } catch (e) { console.error(e); } } async function onRouteDiscovery(envelope, payload) { if(logKnownPacketTypes) { console.log('TRACEROUTE_APP', { to: envelope.packet.to.toString(16), from: envelope.packet.from.toString(16), want_response: envelope.packet.payloadVariant.value.wantResponse, route_discovery: payload, }); } try { await prisma.traceRoute.create({ data: { to: envelope.packet.to, from: envelope.packet.from, want_response: envelope.packet.payloadVariant.value.wantResponse, route: payload.route, snr_towards: payload.snrTowards, route_back: payload.routeBack, snr_back: payload.snrBack, channel: envelope.packet.channel, packet_id: envelope.packet.id, channel_id: envelope.channelId, gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, }, }); } catch (e) { console.error(e); } } async function onTelemetry(envelope, payload) { if(logKnownPacketTypes) { console.log('TELEMETRY_APP', { from: envelope.packet.from.toString(16), telemetry: payload, }); } // data to update const data = {}; // handle device metrics if(payload.deviceMetrics){ data.battery_level = payload.deviceMetrics.batteryLevel !== 0 ? payload.deviceMetrics.batteryLevel : null; data.voltage = payload.deviceMetrics.voltage !== 0 ? payload.deviceMetrics.voltage : null; data.channel_utilization = payload.deviceMetrics.channelUtilization !== 0 ? payload.deviceMetrics.channelUtilization : null; data.air_util_tx = payload.deviceMetrics.airUtilTx !== 0 ? payload.deviceMetrics.airUtilTx : null; data.uptime_seconds = payload.deviceMetrics.uptimeSeconds !== 0 ? payload.deviceMetrics.uptimeSeconds : null; // create device metric try { // find an existing metric with duplicate information created in the last 15 seconds const existingDuplicateDeviceMetric = await prisma.deviceMetric.findFirst({ where: { node_id: envelope.packet.from, battery_level: data.battery_level, voltage: data.voltage, channel_utilization: data.channel_utilization, air_util_tx: data.air_util_tx, created_at: { gte: new Date(Date.now() - 15000), // created in the last 15 seconds }, } }) // create metric if no duplicates found if(!existingDuplicateDeviceMetric){ await prisma.deviceMetric.create({ data: { node_id: envelope.packet.from, battery_level: data.battery_level, voltage: data.voltage, channel_utilization: data.channel_utilization, air_util_tx: data.air_util_tx, }, }); } } catch (e) { console.error(e); } } // handle environment metrics if(payload.environmentMetrics){ // get metric values const temperature = payload.environmentMetrics.temperature !== 0 ? payload.environmentMetrics.temperature : null; const relativeHumidity = payload.environmentMetrics.relativeHumidity !== 0 ? payload.environmentMetrics.relativeHumidity : null; const barometricPressure = payload.environmentMetrics.barometricPressure !== 0 ? payload.environmentMetrics.barometricPressure : null; const gasResistance = payload.environmentMetrics.gasResistance !== 0 ? payload.environmentMetrics.gasResistance : null; const voltage = payload.environmentMetrics.voltage !== 0 ? payload.environmentMetrics.voltage : null; const current = payload.environmentMetrics.current !== 0 ? payload.environmentMetrics.current : null; const iaq = payload.environmentMetrics.iaq !== 0 ? payload.environmentMetrics.iaq : null; const windDirection = payload.environmentMetrics.windDirection; const windSpeed = payload.environmentMetrics.windSpeed; const windGust = payload.environmentMetrics.windGust; const windLull = payload.environmentMetrics.windLull; // set metrics to update on node table data.temperature = temperature; data.relative_humidity = relativeHumidity; data.barometric_pressure = barometricPressure; // create environment metric try { // find an existing metric with duplicate information created in the last 15 seconds const existingDuplicateEnvironmentMetric = await prisma.environmentMetric.findFirst({ where: { node_id: envelope.packet.from, packet_id: envelope.packet.id, created_at: { gte: new Date(Date.now() - 15000), // created in the last 15 seconds }, } }) // create metric if no duplicates found if(!existingDuplicateEnvironmentMetric){ await prisma.environmentMetric.create({ data: { node_id: envelope.packet.from, packet_id: envelope.packet.id, temperature: temperature, relative_humidity: relativeHumidity, barometric_pressure: barometricPressure, gas_resistance: gasResistance, voltage: voltage, current: current, iaq: iaq, wind_direction: windDirection, wind_speed: windSpeed, wind_gust: windGust, wind_lull: windLull, }, }); } } catch (e) { console.error(e); } } // handle power metrics if(payload.powerMetrics){ // get metric values const ch1Voltage = payload.powerMetrics.ch1Voltage !== 0 ? payload.powerMetrics.ch1Voltage : null; const ch1Current = payload.powerMetrics.ch1Current !== 0 ? payload.powerMetrics.ch1Current : null; const ch2Voltage = payload.powerMetrics.ch2Voltage !== 0 ? payload.powerMetrics.ch2Voltage : null; const ch2Current = payload.powerMetrics.ch2Current !== 0 ? payload.powerMetrics.ch2Current : null; const ch3Voltage = payload.powerMetrics.ch3Voltage !== 0 ? payload.powerMetrics.ch3Voltage : null; const ch3Current = payload.powerMetrics.ch3Current !== 0 ? payload.powerMetrics.ch3Current : null; // create power metric try { // find an existing metric with duplicate information created in the last 15 seconds const existingDuplicatePowerMetric = await prisma.powerMetric.findFirst({ where: { node_id: envelope.packet.from, packet_id: envelope.packet.id, created_at: { gte: new Date(Date.now() - 15000), // created in the last 15 seconds }, } }) // create metric if no duplicates found if(!existingDuplicatePowerMetric){ await prisma.powerMetric.create({ data: { node_id: envelope.packet.from, packet_id: envelope.packet.id, ch1_voltage: ch1Voltage, ch1_current: ch1Current, ch2_voltage: ch2Voltage, ch2_current: ch2Current, ch3_voltage: ch3Voltage, ch3_current: ch3Current, }, }); } } catch (e) { console.error(e); } } // update node telemetry in db if(Object.keys(data).length > 0){ try { await prisma.node.updateMany({ where: { node_id: envelope.packet.from, }, data: data, }); } catch (e) { console.error(e); } } }