diff --git a/mqtt/index.js b/mqtt/index.js index d799dcd..57af425 100644 --- a/mqtt/index.js +++ b/mqtt/index.js @@ -214,16 +214,19 @@ const mqttClientId = options["mqtt-client-id"] ?? "mqttx_1bc723c7"; const mqttTopics = options["mqtt-topic"] ?? ["msh/US/#"]; const allowedPortnums = options["allowed-portnums"] ?? null; const logUnknownPortnums = options["log-unknown-portnums"] ?? false; -const collectServiceEnvelopes = options["collect-service-envelopes"] ?? false; -const collectPositions = options["collect-positions"] ?? false; -const collectTextMessages = options["collect-text-messages"] ?? false; -const ignoreDirectMessages = options["ignore-direct-messages"] ?? false; -const collectWaypoints = options["collect-waypoints"] ?? false; -const collectNeighbourInfo = options["collect-neighbour-info"] ?? false; -const collectMapReports = options["collect-map-reports"] ?? false; +const collectorEnabled = { + serviceEnvelopes: options["collect-service-envelopes"] ?? false, + positions: options["collect-positions"] ?? false, + textMessages: options["collect-text-messages"] ?? false, + waypoints: options["collect-waypoints"] ?? false, + mapReports: options["collect-map-reports"] ?? false, + directMessages: !(options["ignore-direct-messages"] ?? false), + neighborInfo: options["collect-neighbor-info"] ?? false, +} const decryptionKeys = options["decryption-keys"] ?? [ "1PG7OiApB1nwvP+rz05pAQ==", // add default "AQ==" decryption key ]; +const logKnownPacketTypes = false; const dropPacketsNotOkToMqtt = options["drop-packets-not-ok-to-mqtt"] ?? false; const dropPortnumsWithoutBitfield = options["drop-portnums-without-bitfield"] ?? null; const oldFirmwarePositionPrecision = options["old-firmware-position-precision"] ?? null; @@ -241,6 +244,7 @@ const purgeTextMessagesAfterSeconds = options["purge-text-messages-after-seconds const purgeTraceroutesAfterSeconds = options["purge-traceroutes-after-seconds"] ?? null; const purgeWaypointsAfterSeconds = options["purge-waypoints-after-seconds"] ?? null; + // create mqtt client const client = mqtt.connect(mqttBrokerUrl, { username: mqttUsername, @@ -266,6 +270,15 @@ if(purgeIntervalSeconds){ }, purgeIntervalSeconds * 1000); } +function printStatus() { + console.log(`MQTT server: ${mqttBrokerUrl}`) + console.log(`MQTT topic(s): ${mqttTopics.join(', ')}`) + for (const key of Object.keys(collectorEnabled)) { + console.log(`${key} collection: ${collectorEnabled[key] ? 'enabled' : 'disabled'}`) + } +} +printStatus(); + /** * Purges all nodes from the database that haven't been heard from within the configured timeframe. */ @@ -666,14 +679,14 @@ function convertHexIdToNumericId(hexId) { } // subscribe to everything when connected -client.on("connect", () => { +client.on('connect', () => { for(const mqttTopic of mqttTopics){ client.subscribe(mqttTopic); } }); // handle message received -client.on("message", async (topic, message) => { +client.on('message', async (topic, message) => { try { // decode service envelope let envelope = null; @@ -687,7 +700,7 @@ client.on("message", async (topic, message) => { let dataPacket = envelope.packet.payloadVariant.value; // attempt to decrypt encrypted packets if (envelope.packet.payloadVariant.case === 'encrypted') { - dataPacket = decrypt(envelope.packet); + envelope.packet.payloadVariant.value = dataPacket = decrypt(envelope.packet); } if (dataPacket !== null) { @@ -712,7 +725,7 @@ client.on("message", async (topic, message) => { } // create service envelope in db - if(collectServiceEnvelopes){ + if (collectorEnabled.serviceEnvelopes) { try { await prisma.serviceEnvelope.create({ data: { @@ -750,7 +763,7 @@ client.on("message", async (topic, message) => { return; } - const payload = dataPacket.payload; + let payload = dataPacket.payload; // get portnum from decoded packet const portnum = dataPacket.portnum; // get bitfield from decoded packet @@ -758,614 +771,630 @@ client.on("message", async (topic, message) => { // this value will be null for packets from v2.4.x and below, and will be an integer in v2.5.x and above const bitfield = dataPacket.bitfield; - const logKnownPacketTypes = false; - // if allowed portnums are configured, ignore portnums that are not in the list if(allowedPortnums != null && !allowedPortnums.includes(portnum)){ return; } - if(portnum === Portnums.PortNum.TEXT_MESSAGE_APP) { - - if(!collectTextMessages){ - return; - } - - // check if we want to ignore direct messages - if(ignoreDirectMessages && envelope.packet.to !== 0xFFFFFFFF){ - return; - } - - if(logKnownPacketTypes) { - console.log("TEXT_MESSAGE_APP", { - to: envelope.packet.to.toString(16), - from: envelope.packet.from.toString(16), - text: payload.toString(), - }); - } - - try { - await prisma.textMessage.create({ - data: { - to: envelope.packet.to, - from: envelope.packet.from, - channel: envelope.packet.channel, - packet_id: envelope.packet.id, - channel_id: envelope.channelId, - gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, - text: payload.toString(), - rx_time: envelope.packet.rxTime, - rx_snr: envelope.packet.rxSnr, - rx_rssi: envelope.packet.rxRssi, - hop_limit: envelope.packet.hopLimit, - }, - }); - } catch (e) { - console.error(e); - } - - } - - else if(portnum === Portnums.PortNum.POSITION_APP) { - const position = fromBinary(Mesh.PositionSchema, payload); - - if(logKnownPacketTypes){ - console.log("POSITION_APP", { - from: envelope.packet.from.toString(16), - position: position, - }); - } - - // process position - if(position.latitudeI != null && position.longitudeI){ - - // if bitfield is not available, we are on firmware v2.4 or below - // if configured, position packets should have their precision reduced - if(bitfield == null && oldFirmwarePositionPrecision != null){ - - // adjust precision of latitude and longitude - position.latitudeI = PositionUtil.setPositionPrecision(position.latitudeI, oldFirmwarePositionPrecision); - position.longitudeI = PositionUtil.setPositionPrecision(position.longitudeI, oldFirmwarePositionPrecision); - - // update position precision on packet to show that it is no longer full precision - position.precisionBits = oldFirmwarePositionPrecision; - - } - - // update node position in db - try { - await prisma.node.updateMany({ - where: { - node_id: envelope.packet.from, - }, - data: { - position_updated_at: new Date(), - latitude: position.latitudeI, - longitude: position.longitudeI, - altitude: position.altitude !== 0 ? position.altitude : null, - position_precision: position.precisionBits, - }, - }); - } catch (e) { - console.error(e); - } - - } - - // don't collect position history if not enabled, but we still want to update the node above - if(!collectPositions){ - return; - } - - try { - - // find an existing position with duplicate information created in the last 60 seconds - const existingDuplicatePosition = await prisma.position.findFirst({ - where: { - node_id: envelope.packet.from, - packet_id: envelope.packet.id, - created_at: { - gte: new Date(Date.now() - 60000), // created in the last 60 seconds - }, + let callback = null; + let schema = null; + switch(portnum) { + case Portnums.PortNum.TEXT_MESSAGE_APP: + callback = onTextMessage; + break; + case Portnums.PortNum.POSITION_APP: + callback = onPosition; + schema = Mesh.PositionSchema; + break; + case Portnums.PortNum.NODEINFO_APP: + callback = onNodeInfo; + schema = Mesh.NodeInfoSchema; + break; + case Portnums.PortNum.WAYPOINT_APP: + callback = onWaypoint; + schema = Mesh.WaypointSchema; + break; + case Portnums.PortNum.NEIGHBORINFO_APP: + callback = onNeighborInfo; + schema = Mesh.NeighborInfoSchema; + break; + case Portnums.PortNum.TELEMETRY_APP: + callback = onTelemetry; + schema = Telemetry.TelemetrySchema; + break; + case Portnums.PortNum.TRACEROUTE_APP: + callback = onRouteDiscovery; + schema = Mesh.RouteDiscoverySchema; + break; + case Portnums.PortNum.MAP_REPORT_APP: + callback = onMapReport; + schema = Mqtt.MapReportSchema; + break; + default: + // handle unknown port nums here + if (logUnknownPortnums) { + const ignoredPortnums = [ + Portnums.PortNum.UNKNOWN_APP, + Portnums.PortNum.TEXT_MESSAGE_COMPRESSED_APP, + Portnums.PortNum.ROUTING_APP, + Portnums.PortNum.PAXCOUNTER_APP, + Portnums.PortNum.STORE_FORWARD_APP, + Portnums.PortNum.RANGE_TEST_APP, + Portnums.PortNum.ATAK_PLUGIN, + Portnums.PortNum.ATAK_FORWARDER, + ]; + // ignore packets we don't want to see for now + if (portnum === undefined || ignoredPortnums.includes(portnum) || portnum > 511) { + return; } - }); - - // create position if no duplicates found - if(!existingDuplicatePosition){ - await prisma.position.create({ - data: { - node_id: envelope.packet.from, - to: envelope.packet.to, - from: envelope.packet.from, - channel: envelope.packet.channel, - packet_id: envelope.packet.id, - channel_id: envelope.channelId, - gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, - latitude: position.latitudeI, - longitude: position.longitudeI, - altitude: position.altitude, - }, - }); + console.log(portnum, envelope); } - - } catch (e) { - console.error(e); - } - } - else if(portnum === Portnums.PortNum.NODEINFO_APP) { - - const user = fromBinary(Mesh.UserSchema, payload); - - if(logKnownPacketTypes) { - console.log("NODEINFO_APP", { - from: envelope.packet.from.toString(16), - user: user, - }); - } - - // create or update node in db - try { - await prisma.node.upsert({ - where: { - node_id: envelope.packet.from, - }, - create: { - node_id: envelope.packet.from, - long_name: user.longName, - short_name: user.shortName, - hardware_model: user.hwModel, - is_licensed: user.isLicensed === true, - role: user.role, - }, - update: { - long_name: user.longName, - short_name: user.shortName, - hardware_model: user.hwModel, - is_licensed: user.isLicensed === true, - role: user.role, - }, - }); - } catch (e) { - console.error(e); - } - - } - - else if(portnum === Portnums.PortNum.WAYPOINT_APP) { - - if(!collectWaypoints){ - return; - } - - const waypoint = fromBinary(Mesh.WaypointSchema, payload); - - if(logKnownPacketTypes) { - console.log("WAYPOINT_APP", { - to: envelope.packet.to.toString(16), - from: envelope.packet.from.toString(16), - waypoint: waypoint, - }); - } - - try { - await prisma.waypoint.create({ - data: { - to: envelope.packet.to, - from: envelope.packet.from, - waypoint_id: waypoint.id, - latitude: waypoint.latitudeI, - longitude: waypoint.longitudeI, - expire: waypoint.expire, - locked_to: waypoint.lockedTo, - name: waypoint.name, - description: waypoint.description, - icon: waypoint.icon, - channel: envelope.packet.channel, - packet_id: envelope.packet.id, - channel_id: envelope.channelId, - gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, - }, - }); - } catch (e) { - console.error(e); - } - - } - - else if(portnum === Portnums.PortNum.NEIGHBORINFO_APP) { - - const neighbourInfo = fromBinary(Mesh.NeighborInfoSchema, payload); - - if(logKnownPacketTypes) { - console.log("NEIGHBORINFO_APP", { - from: envelope.packet.from.toString(16), - neighbour_info: neighbourInfo, - }); - } - - // update node neighbour info in db - try { - await prisma.node.updateMany({ - where: { - node_id: envelope.packet.from, - }, - data: { - neighbours_updated_at: new Date(), - neighbour_broadcast_interval_secs: neighbourInfo.nodeBroadcastIntervalSecs, - neighbours: neighbourInfo.neighbors.map((neighbour) => { - return { - node_id: neighbour.nodeId, - snr: neighbour.snr, - }; - }), - }, - }); - } catch (e) { - console.error(e); - } - - // don't store all neighbour infos, but we want to update the existing node above - if(!collectNeighbourInfo){ - return; - } - - // create neighbour info - try { - await prisma.neighbourInfo.create({ - data: { - node_id: envelope.packet.from, - node_broadcast_interval_secs: neighbourInfo.nodeBroadcastIntervalSecs, - neighbours: neighbourInfo.neighbors.map((neighbour) => { - return { - node_id: neighbour.nodeId, - snr: neighbour.snr, - }; - }), - }, - }); - } catch (e) { - console.error(e); - } - - } - - else if(portnum === Portnums.PortNum.TELEMETRY_APP) { - - const telemetry = fromBinary(Telemetry.TelemetrySchema, payload); - - if(logKnownPacketTypes) { - console.log("TELEMETRY_APP", { - from: envelope.packet.from.toString(16), - telemetry: telemetry, - }); - } - - // data to update - const data = {}; - - // handle device metrics - if(telemetry.deviceMetrics){ - - data.battery_level = telemetry.deviceMetrics.batteryLevel !== 0 ? telemetry.deviceMetrics.batteryLevel : null; - data.voltage = telemetry.deviceMetrics.voltage !== 0 ? telemetry.deviceMetrics.voltage : null; - data.channel_utilization = telemetry.deviceMetrics.channelUtilization !== 0 ? telemetry.deviceMetrics.channelUtilization : null; - data.air_util_tx = telemetry.deviceMetrics.airUtilTx !== 0 ? telemetry.deviceMetrics.airUtilTx : null; - data.uptime_seconds = telemetry.deviceMetrics.uptimeSeconds !== 0 ? telemetry.deviceMetrics.uptimeSeconds : null; - - // create device metric + if (callback !== null) { + if (schema !== null) { try { - - // find an existing metric with duplicate information created in the last 15 seconds - const existingDuplicateDeviceMetric = await prisma.deviceMetric.findFirst({ - where: { - node_id: envelope.packet.from, - battery_level: data.battery_level, - voltage: data.voltage, - channel_utilization: data.channel_utilization, - air_util_tx: data.air_util_tx, - created_at: { - gte: new Date(Date.now() - 15000), // created in the last 15 seconds - }, - } - }) - - // create metric if no duplicates found - if(!existingDuplicateDeviceMetric){ - await prisma.deviceMetric.create({ - data: { - node_id: envelope.packet.from, - battery_level: data.battery_level, - voltage: data.voltage, - channel_utilization: data.channel_utilization, - air_util_tx: data.air_util_tx, - }, - }); - } - - } catch (e) { - console.error(e); - } - - } - - // handle environment metrics - if(telemetry.environmentMetrics){ - - // get metric values - const temperature = telemetry.environmentMetrics.temperature !== 0 ? telemetry.environmentMetrics.temperature : null; - const relativeHumidity = telemetry.environmentMetrics.relativeHumidity !== 0 ? telemetry.environmentMetrics.relativeHumidity : null; - const barometricPressure = telemetry.environmentMetrics.barometricPressure !== 0 ? telemetry.environmentMetrics.barometricPressure : null; - const gasResistance = telemetry.environmentMetrics.gasResistance !== 0 ? telemetry.environmentMetrics.gasResistance : null; - const voltage = telemetry.environmentMetrics.voltage !== 0 ? telemetry.environmentMetrics.voltage : null; - const current = telemetry.environmentMetrics.current !== 0 ? telemetry.environmentMetrics.current : null; - const iaq = telemetry.environmentMetrics.iaq !== 0 ? telemetry.environmentMetrics.iaq : null; - const windDirection = telemetry.environmentMetrics.windDirection; - const windSpeed = telemetry.environmentMetrics.windSpeed; - const windGust = telemetry.environmentMetrics.windGust; - const windLull = telemetry.environmentMetrics.windLull; - - // set metrics to update on node table - data.temperature = temperature; - data.relative_humidity = relativeHumidity; - data.barometric_pressure = barometricPressure; - - // create environment metric - try { - - // find an existing metric with duplicate information created in the last 15 seconds - const existingDuplicateEnvironmentMetric = await prisma.environmentMetric.findFirst({ - where: { - node_id: envelope.packet.from, - packet_id: envelope.packet.id, - created_at: { - gte: new Date(Date.now() - 15000), // created in the last 15 seconds - }, - } - }) - - // create metric if no duplicates found - if(!existingDuplicateEnvironmentMetric){ - await prisma.environmentMetric.create({ - data: { - node_id: envelope.packet.from, - packet_id: envelope.packet.id, - temperature: temperature, - relative_humidity: relativeHumidity, - barometric_pressure: barometricPressure, - gas_resistance: gasResistance, - voltage: voltage, - current: current, - iaq: iaq, - wind_direction: windDirection, - wind_speed: windSpeed, - wind_gust: windGust, - wind_lull: windLull, - }, - }); - } - - } catch (e) { - console.error(e); - } - - } - - // handle power metrics - if(telemetry.powerMetrics){ - - // get metric values - const ch1Voltage = telemetry.powerMetrics.ch1Voltage !== 0 ? telemetry.powerMetrics.ch1Voltage : null; - const ch1Current = telemetry.powerMetrics.ch1Current !== 0 ? telemetry.powerMetrics.ch1Current : null; - const ch2Voltage = telemetry.powerMetrics.ch2Voltage !== 0 ? telemetry.powerMetrics.ch2Voltage : null; - const ch2Current = telemetry.powerMetrics.ch2Current !== 0 ? telemetry.powerMetrics.ch2Current : null; - const ch3Voltage = telemetry.powerMetrics.ch3Voltage !== 0 ? telemetry.powerMetrics.ch3Voltage : null; - const ch3Current = telemetry.powerMetrics.ch3Current !== 0 ? telemetry.powerMetrics.ch3Current : null; - - // create power metric - try { - - // find an existing metric with duplicate information created in the last 15 seconds - const existingDuplicatePowerMetric = await prisma.powerMetric.findFirst({ - where: { - node_id: envelope.packet.from, - packet_id: envelope.packet.id, - created_at: { - gte: new Date(Date.now() - 15000), // created in the last 15 seconds - }, - } - }) - - // create metric if no duplicates found - if(!existingDuplicatePowerMetric){ - await prisma.powerMetric.create({ - data: { - node_id: envelope.packet.from, - packet_id: envelope.packet.id, - ch1_voltage: ch1Voltage, - ch1_current: ch1Current, - ch2_voltage: ch2Voltage, - ch2_current: ch2Current, - ch3_voltage: ch3Voltage, - ch3_current: ch3Current, - }, - }); - } - - } catch (e) { - console.error(e); - } - - } - - // update node telemetry in db - if(Object.keys(data).length > 0){ - try { - await prisma.node.updateMany({ - where: { - node_id: envelope.packet.from, - }, - data: data, - }); - } catch (e) { - console.error(e); - } - } - - } - - else if(portnum === Portnums.PortNum.TRACEROUTE_APP) { - - const routeDiscovery = fromBinary(Mesh.RouteDiscoverySchema, payload); - - if(logKnownPacketTypes) { - console.log("TRACEROUTE_APP", { - to: envelope.packet.to.toString(16), - from: envelope.packet.from.toString(16), - want_response: payload.wantResponse, - route_discovery: routeDiscovery, - }); - } - - try { - await prisma.traceRoute.create({ - data: { - to: envelope.packet.to, - from: envelope.packet.from, - want_response: envelope.packet.decoded.wantResponse, - route: routeDiscovery.route, - snr_towards: routeDiscovery.snrTowards, - route_back: routeDiscovery.routeBack, - snr_back: routeDiscovery.snrBack, - channel: envelope.packet.channel, - packet_id: envelope.packet.id, - channel_id: envelope.channelId, - gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, - }, - }); - } catch (e) { - console.error(e); - } - - } - - else if(portnum === Portnums.PortNum.MAP_REPORT_APP) { - - const mapReport = fromBinary(Mqtt.MapReportSchema, payload); - - if(logKnownPacketTypes) { - console.log("MAP_REPORT_APP", { - from: envelope.packet.from.toString(16), - map_report: mapReport, - }); - } - - // create or update node in db - try { - - // data to set on node - const data = { - long_name: mapReport.longName, - short_name: mapReport.shortName, - hardware_model: mapReport.hwModel, - role: mapReport.role, - latitude: mapReport.latitudeI, - longitude: mapReport.longitudeI, - altitude: mapReport.altitude !== 0 ? mapReport.altitude : null, - firmware_version: mapReport.firmwareVersion, - region: mapReport.region, - modem_preset: mapReport.modemPreset, - has_default_channel: mapReport.hasDefaultChannel, - position_precision: mapReport.positionPrecision, - num_online_local_nodes: mapReport.numOnlineLocalNodes, - position_updated_at: new Date(), - }; - - await prisma.node.upsert({ - where: { - node_id: envelope.packet.from, - }, - create: { - node_id: envelope.packet.from, - ...data, - }, - update: data, - }); - - } catch (e) { - console.error(e); - } - - // don't collect map report history if not enabled, but we still want to update the node above - if(!collectMapReports){ - return; - } - - try { - - // find an existing map with duplicate information created in the last 60 seconds - const existingDuplicateMapReport = await prisma.mapReport.findFirst({ - where: { - node_id: envelope.packet.from, - long_name: mapReport.longName, - short_name: mapReport.shortName, - created_at: { - gte: new Date(Date.now() - 60000), // created in the last 60 seconds - }, - } - }); - - // create map report if no duplicates found - if(!existingDuplicateMapReport){ - await prisma.mapReport.create({ - data: { - node_id: envelope.packet.from, - long_name: mapReport.longName, - short_name: mapReport.shortName, - role: mapReport.role, - hardware_model: mapReport.hwModel, - firmware_version: mapReport.firmwareVersion, - region: mapReport.region, - modem_preset: mapReport.modemPreset, - has_default_channel: mapReport.hasDefaultChannel, - latitude: mapReport.latitudeI, - longitude: mapReport.longitudeI, - altitude: mapReport.altitude, - position_precision: mapReport.positionPrecision, - num_online_local_nodes: mapReport.numOnlineLocalNodes, - }, - }); - } - - } catch (e) { - console.error(e); - } - - } - - else { - if (logUnknownPortnums) { - const ignoredPortnums = [ - Portnums.PortNum.UNKNOWN_APP, - Portnums.PortNum.TEXT_MESSAGE_COMPRESSED_APP, - Portnums.PortNum.ROUTING_APP, - Portnums.PortNum.PAXCOUNTER_APP, - Portnums.PortNum.STORE_FORWARD_APP, - Portnums.PortNum.RANGE_TEST_APP, - Portnums.PortNum.ATAK_PLUGIN, - Portnums.PortNum.ATAK_FORWARDER, - ]; - // ignore packets we don't want to see for now - if (portnum === undefined || ignoredPortnums.includes(portnum) || portnum > 511) { + payload = fromBinary(schema, payload); + } catch(e) { return; } - console.log(portnum, envelope); - } + await callback(envelope, payload); } } catch(e) { // ignore errors } }); + +async function onTextMessage(envelope, payload) { + if(logKnownPacketTypes) { + console.log('TEXT_MESSAGE_APP', { + to: envelope.packet.to.toString(16), + from: envelope.packet.from.toString(16), + text: payload.toString(), + }); + } + + if (!collectorEnabled.textMessages) { + return; + } + + // check if we want to ignore direct messages + if(!collectorEnabled.directMessages && envelope.packet.to !== 0xFFFFFFFF){ + return; + } + + try { + await prisma.textMessage.create({ + data: { + to: envelope.packet.to, + from: envelope.packet.from, + channel: envelope.packet.channel, + packet_id: envelope.packet.id, + channel_id: envelope.channelId, + gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, + text: payload.toString(), + rx_time: envelope.packet.rxTime, + rx_snr: envelope.packet.rxSnr, + rx_rssi: envelope.packet.rxRssi, + hop_limit: envelope.packet.hopLimit, + }, + }); + } catch (e) { + console.error(e); + } +} + +async function onPosition(envelope, payload) { + if(logKnownPacketTypes){ + console.log('POSITION_APP', { + from: envelope.packet.from.toString(16), + position: payload, + }); + } + + // process position + if(payload.latitudeI != null && payload.longitudeI){ + + const bitfield = envelope.packet.payloadVariant.value.bitfield; + // if bitfield is not available, we are on firmware v2.4 or below + // if configured, position packets should have their precision reduced + if (bitfield == null && oldFirmwarePositionPrecision != null) { + + // adjust precision of latitude and longitude + payload.latitudeI = PositionUtil.setPositionPrecision(payload.latitudeI, oldFirmwarePositionPrecision); + payload.longitudeI = PositionUtil.setPositionPrecision(payload.longitudeI, oldFirmwarePositionPrecision); + + // update position precision on packet to show that it is no longer full precision + payload.precisionBits = oldFirmwarePositionPrecision; + + } + + // update node position in db + try { + await prisma.node.updateMany({ + where: { + node_id: envelope.packet.from, + }, + data: { + position_updated_at: new Date(), + latitude: payload.latitudeI, + longitude: payload.longitudeI, + altitude: payload.altitude !== 0 ? payload.altitude : null, + position_precision: payload.precisionBits, + }, + }); + } catch (e) { + console.error(e); + } + + } + + // don't collect position history if not enabled, but we still want to update the node above + if (!collectorEnabled.positions) { + return; + } + + try { + + // find an existing position with duplicate information created in the last 60 seconds + const existingDuplicatePosition = await prisma.position.findFirst({ + where: { + node_id: envelope.packet.from, + packet_id: envelope.packet.id, + created_at: { + gte: new Date(Date.now() - 60000), // created in the last 60 seconds + }, + } + }); + + // create position if no duplicates found + if(!existingDuplicatePosition){ + await prisma.position.create({ + data: { + node_id: envelope.packet.from, + to: envelope.packet.to, + from: envelope.packet.from, + channel: envelope.packet.channel, + packet_id: envelope.packet.id, + channel_id: envelope.channelId, + gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, + latitude: payload.latitudeI, + longitude: payload.longitudeI, + altitude: payload.altitude, + }, + }); + } + + } catch (e) { + console.error(e); + } +} + +async function onNodeInfo(envelope, payload) { + if(logKnownPacketTypes) { + console.log('NODEINFO_APP', { + from: envelope.packet.from.toString(16), + user: payload, + }); + } + + // create or update node in db + try { + await prisma.node.upsert({ + where: { + node_id: envelope.packet.from, + }, + create: { + node_id: envelope.packet.from, + long_name: payload.longName, + short_name: payload.shortName, + hardware_model: payload.hwModel, + is_licensed: payload.isLicensed === true, + role: payload.role, + }, + update: { + long_name: payload.longName, + short_name: payload.shortName, + hardware_model: payload.hwModel, + is_licensed: payload.isLicensed === true, + role: payload.role, + }, + }); + } catch (e) { + console.error(e); + } +} + +async function onWaypoint(envelope, payload) { + if(logKnownPacketTypes) { + console.log('WAYPOINT_APP', { + to: envelope.packet.to.toString(16), + from: envelope.packet.from.toString(16), + waypoint: payload, + }); + } + + if (!collectorEnabled.waypoints) { + return; + } + + try { + await prisma.waypoint.create({ + data: { + to: envelope.packet.to, + from: envelope.packet.from, + waypoint_id: payload.id, + latitude: payload.latitudeI, + longitude: payload.longitudeI, + expire: payload.expire, + locked_to: payload.lockedTo, + name: payload.name, + description: payload.description, + icon: payload.icon, + channel: envelope.packet.channel, + packet_id: envelope.packet.id, + channel_id: envelope.channelId, + gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, + }, + }); + } catch (e) { + console.error(e); + } +} + +async function onMapReport(envelope, payload) { + if(logKnownPacketTypes) { + console.log('MAP_REPORT_APP', { + from: envelope.packet.from.toString(16), + map_report: payload, + }); + } + + // create or update node in db + try { + + // data to set on node + const data = { + long_name: payload.longName, + short_name: payload.shortName, + hardware_model: payload.hwModel, + role: payload.role, + latitude: payload.latitudeI, + longitude: payload.longitudeI, + altitude: payload.altitude !== 0 ? payload.altitude : null, + firmware_version: payload.firmwareVersion, + region: payload.region, + modem_preset: payload.modemPreset, + has_default_channel: payload.hasDefaultChannel, + position_precision: payload.positionPrecision, + num_online_local_nodes: payload.numOnlineLocalNodes, + position_updated_at: new Date(), + }; + + await prisma.node.upsert({ + where: { + node_id: envelope.packet.from, + }, + create: { + node_id: envelope.packet.from, + ...data, + }, + update: data, + }); + + } catch (e) { + console.error(e); + } + + // don't collect map report history if not enabled, but we still want to update the node above + if (!collectorEnabled.mapReports) { + return; + } + + try { + + // find an existing map with duplicate information created in the last 60 seconds + const existingDuplicateMapReport = await prisma.mapReport.findFirst({ + where: { + node_id: envelope.packet.from, + long_name: payload.longName, + short_name: payload.shortName, + created_at: { + gte: new Date(Date.now() - 60000), // created in the last 60 seconds + }, + } + }); + + // create map report if no duplicates found + if(!existingDuplicateMapReport){ + await prisma.mapReport.create({ + data: { + node_id: envelope.packet.from, + long_name: payload.longName, + short_name: payload.shortName, + role: payload.role, + hardware_model: payload.hwModel, + firmware_version: payload.firmwareVersion, + region: payload.region, + modem_preset: payload.modemPreset, + has_default_channel: payload.hasDefaultChannel, + latitude: payload.latitudeI, + longitude: payload.longitudeI, + altitude: payload.altitude, + position_precision: payload.positionPrecision, + num_online_local_nodes: payload.numOnlineLocalNodes, + }, + }); + } + + } catch (e) { + console.error(e); + } +} + +async function onNeighborInfo(envelope, payload) { + if(logKnownPacketTypes) { + console.log('NEIGHBORINFO_APP', { + from: envelope.packet.from.toString(16), + neighbor_info: payload, + }); + } + + // update node neighbour info in db + try { + await prisma.node.updateMany({ + where: { + node_id: envelope.packet.from, + }, + data: { + neighbours_updated_at: new Date(), + neighbour_broadcast_interval_secs: payload.nodeBroadcastIntervalSecs, + neighbours: payload.neighbors.map((neighbour) => { + return { + node_id: neighbour.nodeId, + snr: neighbour.snr, + }; + }), + }, + }); + } catch (e) { + console.error(e); + } + + // don't store all neighbour infos, but we want to update the existing node above + if (!collectorEnabled.neighborInfo) { + return; + } + + // create neighbour info + try { + await prisma.neighbourInfo.create({ + data: { + node_id: envelope.packet.from, + node_broadcast_interval_secs: payload.nodeBroadcastIntervalSecs, + neighbours: payload.neighbors.map((neighbour) => { + return { + node_id: neighbour.nodeId, + snr: neighbour.snr, + }; + }), + }, + }); + } catch (e) { + console.error(e); + } +} + +async function onRouteDiscovery(envelope, payload) { + if(logKnownPacketTypes) { + console.log('TRACEROUTE_APP', { + to: envelope.packet.to.toString(16), + from: envelope.packet.from.toString(16), + want_response: envelope.packet.payloadVariant.value.wantResponse, + route_discovery: payload, + }); + } + + try { + await prisma.traceRoute.create({ + data: { + to: envelope.packet.to, + from: envelope.packet.from, + want_response: envelope.packet.payloadVariant.value.wantResponse, + route: payload.route, + snr_towards: payload.snrTowards, + route_back: payload.routeBack, + snr_back: payload.snrBack, + channel: envelope.packet.channel, + packet_id: envelope.packet.id, + channel_id: envelope.channelId, + gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null, + }, + }); + } catch (e) { + console.error(e); + } +} + +async function onTelemetry(envelope, payload) { + + if(logKnownPacketTypes) { + console.log('TELEMETRY_APP', { + from: envelope.packet.from.toString(16), + telemetry: payload, + }); + } + + // data to update + const data = {}; + + // handle device metrics + if(payload.deviceMetrics){ + + data.battery_level = payload.deviceMetrics.batteryLevel !== 0 ? payload.deviceMetrics.batteryLevel : null; + data.voltage = payload.deviceMetrics.voltage !== 0 ? payload.deviceMetrics.voltage : null; + data.channel_utilization = payload.deviceMetrics.channelUtilization !== 0 ? payload.deviceMetrics.channelUtilization : null; + data.air_util_tx = payload.deviceMetrics.airUtilTx !== 0 ? payload.deviceMetrics.airUtilTx : null; + data.uptime_seconds = payload.deviceMetrics.uptimeSeconds !== 0 ? payload.deviceMetrics.uptimeSeconds : null; + + // create device metric + try { + + // find an existing metric with duplicate information created in the last 15 seconds + const existingDuplicateDeviceMetric = await prisma.deviceMetric.findFirst({ + where: { + node_id: envelope.packet.from, + battery_level: data.battery_level, + voltage: data.voltage, + channel_utilization: data.channel_utilization, + air_util_tx: data.air_util_tx, + created_at: { + gte: new Date(Date.now() - 15000), // created in the last 15 seconds + }, + } + }) + + // create metric if no duplicates found + if(!existingDuplicateDeviceMetric){ + await prisma.deviceMetric.create({ + data: { + node_id: envelope.packet.from, + battery_level: data.battery_level, + voltage: data.voltage, + channel_utilization: data.channel_utilization, + air_util_tx: data.air_util_tx, + }, + }); + } + + } catch (e) { + console.error(e); + } + + } + + // handle environment metrics + if(payload.environmentMetrics){ + + // get metric values + const temperature = payload.environmentMetrics.temperature !== 0 ? payload.environmentMetrics.temperature : null; + const relativeHumidity = payload.environmentMetrics.relativeHumidity !== 0 ? payload.environmentMetrics.relativeHumidity : null; + const barometricPressure = payload.environmentMetrics.barometricPressure !== 0 ? payload.environmentMetrics.barometricPressure : null; + const gasResistance = payload.environmentMetrics.gasResistance !== 0 ? payload.environmentMetrics.gasResistance : null; + const voltage = payload.environmentMetrics.voltage !== 0 ? payload.environmentMetrics.voltage : null; + const current = payload.environmentMetrics.current !== 0 ? payload.environmentMetrics.current : null; + const iaq = payload.environmentMetrics.iaq !== 0 ? payload.environmentMetrics.iaq : null; + const windDirection = payload.environmentMetrics.windDirection; + const windSpeed = payload.environmentMetrics.windSpeed; + const windGust = payload.environmentMetrics.windGust; + const windLull = payload.environmentMetrics.windLull; + + // set metrics to update on node table + data.temperature = temperature; + data.relative_humidity = relativeHumidity; + data.barometric_pressure = barometricPressure; + + // create environment metric + try { + + // find an existing metric with duplicate information created in the last 15 seconds + const existingDuplicateEnvironmentMetric = await prisma.environmentMetric.findFirst({ + where: { + node_id: envelope.packet.from, + packet_id: envelope.packet.id, + created_at: { + gte: new Date(Date.now() - 15000), // created in the last 15 seconds + }, + } + }) + + // create metric if no duplicates found + if(!existingDuplicateEnvironmentMetric){ + await prisma.environmentMetric.create({ + data: { + node_id: envelope.packet.from, + packet_id: envelope.packet.id, + temperature: temperature, + relative_humidity: relativeHumidity, + barometric_pressure: barometricPressure, + gas_resistance: gasResistance, + voltage: voltage, + current: current, + iaq: iaq, + wind_direction: windDirection, + wind_speed: windSpeed, + wind_gust: windGust, + wind_lull: windLull, + }, + }); + } + + } catch (e) { + console.error(e); + } + + } + + // handle power metrics + if(payload.powerMetrics){ + + // get metric values + const ch1Voltage = payload.powerMetrics.ch1Voltage !== 0 ? payload.powerMetrics.ch1Voltage : null; + const ch1Current = payload.powerMetrics.ch1Current !== 0 ? payload.powerMetrics.ch1Current : null; + const ch2Voltage = payload.powerMetrics.ch2Voltage !== 0 ? payload.powerMetrics.ch2Voltage : null; + const ch2Current = payload.powerMetrics.ch2Current !== 0 ? payload.powerMetrics.ch2Current : null; + const ch3Voltage = payload.powerMetrics.ch3Voltage !== 0 ? payload.powerMetrics.ch3Voltage : null; + const ch3Current = payload.powerMetrics.ch3Current !== 0 ? payload.powerMetrics.ch3Current : null; + + // create power metric + try { + + // find an existing metric with duplicate information created in the last 15 seconds + const existingDuplicatePowerMetric = await prisma.powerMetric.findFirst({ + where: { + node_id: envelope.packet.from, + packet_id: envelope.packet.id, + created_at: { + gte: new Date(Date.now() - 15000), // created in the last 15 seconds + }, + } + }) + + // create metric if no duplicates found + if(!existingDuplicatePowerMetric){ + await prisma.powerMetric.create({ + data: { + node_id: envelope.packet.from, + packet_id: envelope.packet.id, + ch1_voltage: ch1Voltage, + ch1_current: ch1Current, + ch2_voltage: ch2Voltage, + ch2_current: ch2Current, + ch3_voltage: ch3Voltage, + ch3_current: ch3Current, + }, + }); + } + + } catch (e) { + console.error(e); + } + + } + + // update node telemetry in db + if(Object.keys(data).length > 0){ + try { + await prisma.node.updateMany({ + where: { + node_id: envelope.packet.from, + }, + data: data, + }); + } catch (e) { + console.error(e); + } + } +} \ No newline at end of file