1399 lines
46 KiB
JavaScript
1399 lines
46 KiB
JavaScript
import crypto from 'crypto';
|
|
import mqtt from "mqtt";
|
|
import commandLineArgs from 'command-line-args';
|
|
import commandLineUsage from 'command-line-usage';
|
|
import { fromBinary } from '@bufbuild/protobuf';
|
|
import { Mesh, Mqtt, Portnums, Telemetry } from '@meshtastic/protobufs';
|
|
import PositionUtil from './utils/position_util.js';
|
|
|
|
// create prisma db client
|
|
import { PrismaClient } from "@prisma/client";
|
|
const prisma = new PrismaClient();
|
|
|
|
// meshtastic bitfield flags
|
|
const BITFIELD_OK_TO_MQTT_SHIFT = 0;
|
|
const BITFIELD_OK_TO_MQTT_MASK = (1 << BITFIELD_OK_TO_MQTT_SHIFT);
|
|
|
|
const optionsList = [
|
|
{
|
|
name: 'help',
|
|
alias: 'h',
|
|
type: Boolean,
|
|
description: 'Display this usage guide.'
|
|
},
|
|
{
|
|
name: "mqtt-broker-url",
|
|
type: String,
|
|
description: "MQTT Broker URL (e.g: mqtt://mqtt.meshtastic.org)",
|
|
},
|
|
{
|
|
name: "mqtt-username",
|
|
type: String,
|
|
description: "MQTT Username (e.g: meshdev)",
|
|
},
|
|
{
|
|
name: "mqtt-password",
|
|
type: String,
|
|
description: "MQTT Password (e.g: large4cats)",
|
|
},
|
|
{
|
|
name: "mqtt-client-id",
|
|
type: String,
|
|
description: "MQTT Client ID (e.g: map.example.com)",
|
|
},
|
|
{
|
|
name: "mqtt-topic",
|
|
type: String,
|
|
multiple: true,
|
|
typeLabel: '<topic> ...',
|
|
description: "MQTT Topic to subscribe to (e.g: msh/#)",
|
|
},
|
|
{
|
|
name: "allowed-portnums",
|
|
type: Number,
|
|
multiple: true,
|
|
typeLabel: '<portnum> ...',
|
|
description: "If provided, only packets with these portnums will be processed.",
|
|
},
|
|
{
|
|
name: "log-unknown-portnums",
|
|
type: Boolean,
|
|
description: "This option will log packets for unknown portnums to the console.",
|
|
},
|
|
{
|
|
name: "collect-service-envelopes",
|
|
type: Boolean,
|
|
description: "This option will save all received service envelopes to the database.",
|
|
},
|
|
{
|
|
name: "collect-positions",
|
|
type: Boolean,
|
|
description: "This option will save all received positions to the database.",
|
|
},
|
|
{
|
|
name: "collect-text-messages",
|
|
type: Boolean,
|
|
description: "This option will save all received text messages to the database.",
|
|
},
|
|
{
|
|
name: "ignore-direct-messages",
|
|
type: Boolean,
|
|
description: "This option will prevent saving direct messages to the database.",
|
|
},
|
|
{
|
|
name: "collect-waypoints",
|
|
type: Boolean,
|
|
description: "This option will save all received waypoints to the database.",
|
|
},
|
|
{
|
|
name: "collect-neighbour-info",
|
|
type: Boolean,
|
|
description: "This option will save all received neighbour infos to the database.",
|
|
},
|
|
{
|
|
name: "collect-map-reports",
|
|
type: Boolean,
|
|
description: "This option will save all received map reports to the database.",
|
|
},
|
|
{
|
|
name: "decryption-keys",
|
|
type: String,
|
|
multiple: true,
|
|
typeLabel: '<base64DecryptionKey> ...',
|
|
description: "Decryption keys encoded in base64 to use when decrypting service envelopes.",
|
|
},
|
|
{
|
|
name: "drop-packets-not-ok-to-mqtt",
|
|
type: Boolean,
|
|
description: "This option will drop all packets that have 'OK to MQTT' set to false.",
|
|
},
|
|
{
|
|
name: "drop-portnums-without-bitfield",
|
|
type: Number,
|
|
multiple: true,
|
|
typeLabel: '<portnum> ...',
|
|
description: "If provided, packets with these portnums will be dropped if they don't have a bitfield. (bitfield available from firmware v2.5+)",
|
|
},
|
|
{
|
|
name: "old-firmware-position-precision",
|
|
type: Number,
|
|
description: "If provided, position packets from firmware v2.4 and older will be truncated to this many decimal places.",
|
|
},
|
|
{
|
|
name: "forget-outdated-node-positions-after-seconds",
|
|
type: Number,
|
|
description: "If provided, nodes that haven't sent a position report in this time will have their current position cleared.",
|
|
},
|
|
{
|
|
name: "purge-interval-seconds",
|
|
type: Number,
|
|
description: "How long to wait between each automatic database purge.",
|
|
},
|
|
{
|
|
name: "purge-device-metrics-after-seconds",
|
|
type: Number,
|
|
description: "Device Metrics older than this many seconds will be purged from the database.",
|
|
},
|
|
{
|
|
name: "purge-environment-metrics-after-seconds",
|
|
type: Number,
|
|
description: "Environment Metrics older than this many seconds will be purged from the database.",
|
|
},
|
|
{
|
|
name: "purge-power-metrics-after-seconds",
|
|
type: Number,
|
|
description: "Power Metrics older than this many seconds will be purged from the database.",
|
|
},
|
|
{
|
|
name: "purge-map-reports-after-seconds",
|
|
type: Number,
|
|
description: "Map reports older than this many seconds will be purged from the database.",
|
|
},
|
|
{
|
|
name: "purge-neighbour-infos-after-seconds",
|
|
type: Number,
|
|
description: "Neighbour infos older than this many seconds will be purged from the database.",
|
|
},
|
|
{
|
|
name: "purge-nodes-unheard-for-seconds",
|
|
type: Number,
|
|
description: "Nodes that haven't been heard from in this many seconds will be purged from the database.",
|
|
},
|
|
{
|
|
name: "purge-positions-after-seconds",
|
|
type: Number,
|
|
description: "Positions older than this many seconds will be purged from the database.",
|
|
},
|
|
{
|
|
name: "purge-service-envelopes-after-seconds",
|
|
type: Number,
|
|
description: "Service envelopes older than this many seconds will be purged from the database.",
|
|
},
|
|
{
|
|
name: "purge-text-messages-after-seconds",
|
|
type: Number,
|
|
description: "Text Messages older than this many seconds will be purged from the database.",
|
|
},
|
|
{
|
|
name: "purge-traceroutes-after-seconds",
|
|
type: Number,
|
|
description: "Traceroutes older than this many seconds will be purged from the database.",
|
|
},
|
|
{
|
|
name: "purge-waypoints-after-seconds",
|
|
type: Number,
|
|
description: "Waypoints older than this many seconds will be purged from the database.",
|
|
},
|
|
];
|
|
|
|
// parse command line args
|
|
const options = commandLineArgs(optionsList);
|
|
|
|
// show help
|
|
if(options.help){
|
|
const usage = commandLineUsage([
|
|
{
|
|
header: 'Meshtastic MQTT Collector',
|
|
content: 'Collects and processes service envelopes from a Meshtastic MQTT server.',
|
|
},
|
|
{
|
|
header: 'Options',
|
|
optionList: optionsList,
|
|
},
|
|
]);
|
|
console.log(usage);
|
|
process.exit(1);
|
|
}
|
|
|
|
// get options and fallback to default values
|
|
const mqttBrokerUrl = options["mqtt-broker-url"] ?? "mqtt://mqtt.meshtastic.org";
|
|
const mqttUsername = options["mqtt-username"] ?? "meshdev";
|
|
const mqttPassword = options["mqtt-password"] ?? "large4cats";
|
|
const mqttClientId = options["mqtt-client-id"] ?? "mqttx_1bc723c7";
|
|
const mqttTopics = options["mqtt-topic"] ?? ["msh/US/#"];
|
|
const allowedPortnums = options["allowed-portnums"] ?? null;
|
|
const logUnknownPortnums = options["log-unknown-portnums"] ?? false;
|
|
const collectorEnabled = {
|
|
serviceEnvelopes: options["collect-service-envelopes"] ?? false,
|
|
positions: options["collect-positions"] ?? false,
|
|
textMessages: options["collect-text-messages"] ?? false,
|
|
waypoints: options["collect-waypoints"] ?? false,
|
|
mapReports: options["collect-map-reports"] ?? false,
|
|
directMessages: !(options["ignore-direct-messages"] ?? false),
|
|
neighborInfo: options["collect-neighbor-info"] ?? false,
|
|
}
|
|
const decryptionKeys = options["decryption-keys"] ?? [
|
|
"1PG7OiApB1nwvP+rz05pAQ==", // add default "AQ==" decryption key
|
|
];
|
|
const logKnownPacketTypes = false;
|
|
const dropPacketsNotOkToMqtt = options["drop-packets-not-ok-to-mqtt"] ?? false;
|
|
const dropPortnumsWithoutBitfield = options["drop-portnums-without-bitfield"] ?? null;
|
|
const oldFirmwarePositionPrecision = options["old-firmware-position-precision"] ?? null;
|
|
const forgetOutdatedNodePositionsAfterSeconds = options["forget-outdated-node-positions-after-seconds"] ?? null;
|
|
const purgeIntervalSeconds = options["purge-interval-seconds"] ?? 10;
|
|
const purgeNodesUnheardForSeconds = options["purge-nodes-unheard-for-seconds"] ?? null;
|
|
const purgeDeviceMetricsAfterSeconds = options["purge-device-metrics-after-seconds"] ?? null;
|
|
const purgeEnvironmentMetricsAfterSeconds = options["purge-environment-metrics-after-seconds"] ?? null;
|
|
const purgeMapReportsAfterSeconds = options["purge-map-reports-after-seconds"] ?? null;
|
|
const purgeNeighbourInfosAfterSeconds = options["purge-neighbour-infos-after-seconds"] ?? null;
|
|
const purgePowerMetricsAfterSeconds = options["purge-power-metrics-after-seconds"] ?? null;
|
|
const purgePositionsAfterSeconds = options["purge-positions-after-seconds"] ?? null;
|
|
const purgeServiceEnvelopesAfterSeconds = options["purge-service-envelopes-after-seconds"] ?? null;
|
|
const purgeTextMessagesAfterSeconds = options["purge-text-messages-after-seconds"] ?? null;
|
|
const purgeTraceroutesAfterSeconds = options["purge-traceroutes-after-seconds"] ?? null;
|
|
const purgeWaypointsAfterSeconds = options["purge-waypoints-after-seconds"] ?? null;
|
|
|
|
|
|
// create mqtt client
|
|
const client = mqtt.connect(mqttBrokerUrl, {
|
|
username: mqttUsername,
|
|
password: mqttPassword,
|
|
clientId: mqttClientId,
|
|
});
|
|
|
|
// run automatic purge if configured
|
|
if(purgeIntervalSeconds){
|
|
setInterval(async () => {
|
|
await purgeUnheardNodes();
|
|
await purgeOldDeviceMetrics();
|
|
await purgeOldEnvironmentMetrics();
|
|
await purgeOldMapReports();
|
|
await purgeOldNeighbourInfos();
|
|
await purgeOldPowerMetrics();
|
|
await purgeOldPositions();
|
|
await purgeOldServiceEnvelopes();
|
|
await purgeOldTextMessages();
|
|
await purgeOldTraceroutes();
|
|
await purgeOldWaypoints();
|
|
await forgetOutdatedNodePositions();
|
|
}, purgeIntervalSeconds * 1000);
|
|
}
|
|
|
|
function printStatus() {
|
|
console.log(`MQTT server: ${mqttBrokerUrl}`)
|
|
console.log(`MQTT topic(s): ${mqttTopics.join(', ')}`)
|
|
for (const key of Object.keys(collectorEnabled)) {
|
|
console.log(`${key} collection: ${collectorEnabled[key] ? 'enabled' : 'disabled'}`)
|
|
}
|
|
}
|
|
printStatus();
|
|
|
|
/**
|
|
* Purges all nodes from the database that haven't been heard from within the configured timeframe.
|
|
*/
|
|
async function purgeUnheardNodes() {
|
|
|
|
// make sure seconds provided
|
|
if(!purgeNodesUnheardForSeconds){
|
|
return;
|
|
}
|
|
|
|
// delete all nodes that were last updated before configured purge time
|
|
try {
|
|
await prisma.node.deleteMany({
|
|
where: {
|
|
updated_at: {
|
|
// last updated before x seconds ago
|
|
lt: new Date(Date.now() - purgeNodesUnheardForSeconds * 1000),
|
|
},
|
|
}
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Purges all device metrics from the database that are older than the configured timeframe.
|
|
*/
|
|
async function purgeOldDeviceMetrics() {
|
|
|
|
// make sure seconds provided
|
|
if(!purgeDeviceMetricsAfterSeconds){
|
|
return;
|
|
}
|
|
|
|
// delete all device metrics that are older than the configured purge time
|
|
try {
|
|
await prisma.deviceMetric.deleteMany({
|
|
where: {
|
|
created_at: {
|
|
// created before x seconds ago
|
|
lt: new Date(Date.now() - purgeDeviceMetricsAfterSeconds * 1000),
|
|
},
|
|
}
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Purges all environment metrics from the database that are older than the configured timeframe.
|
|
*/
|
|
async function purgeOldEnvironmentMetrics() {
|
|
|
|
// make sure seconds provided
|
|
if(!purgeEnvironmentMetricsAfterSeconds){
|
|
return;
|
|
}
|
|
|
|
// delete all environment metrics that are older than the configured purge time
|
|
try {
|
|
await prisma.environmentMetric.deleteMany({
|
|
where: {
|
|
created_at: {
|
|
// created before x seconds ago
|
|
lt: new Date(Date.now() - purgeEnvironmentMetricsAfterSeconds * 1000),
|
|
},
|
|
}
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Purges all power metrics from the database that are older than the configured timeframe.
|
|
*/
|
|
async function purgeOldMapReports() {
|
|
|
|
// make sure seconds provided
|
|
if(!purgeMapReportsAfterSeconds){
|
|
return;
|
|
}
|
|
|
|
// delete all map reports that are older than the configured purge time
|
|
try {
|
|
await prisma.mapReport.deleteMany({
|
|
where: {
|
|
created_at: {
|
|
// created before x seconds ago
|
|
lt: new Date(Date.now() - purgeMapReportsAfterSeconds * 1000),
|
|
},
|
|
}
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Purges all neighbour infos from the database that are older than the configured timeframe.
|
|
*/
|
|
async function purgeOldNeighbourInfos() {
|
|
|
|
// make sure seconds provided
|
|
if(!purgeNeighbourInfosAfterSeconds){
|
|
return;
|
|
}
|
|
|
|
// delete all neighbour infos that are older than the configured purge time
|
|
try {
|
|
await prisma.neighbourInfo.deleteMany({
|
|
where: {
|
|
created_at: {
|
|
// created before x seconds ago
|
|
lt: new Date(Date.now() - purgeNeighbourInfosAfterSeconds * 1000),
|
|
},
|
|
}
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Purges all power metrics from the database that are older than the configured timeframe.
|
|
*/
|
|
async function purgeOldPowerMetrics() {
|
|
|
|
// make sure seconds provided
|
|
if(!purgePowerMetricsAfterSeconds){
|
|
return;
|
|
}
|
|
|
|
// delete all power metrics that are older than the configured purge time
|
|
try {
|
|
await prisma.powerMetric.deleteMany({
|
|
where: {
|
|
created_at: {
|
|
// created before x seconds ago
|
|
lt: new Date(Date.now() - purgePowerMetricsAfterSeconds * 1000),
|
|
},
|
|
}
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Purges all positions from the database that are older than the configured timeframe.
|
|
*/
|
|
async function purgeOldPositions() {
|
|
|
|
// make sure seconds provided
|
|
if(!purgePositionsAfterSeconds){
|
|
return;
|
|
}
|
|
|
|
// delete all positions that are older than the configured purge time
|
|
try {
|
|
await prisma.position.deleteMany({
|
|
where: {
|
|
created_at: {
|
|
// created before x seconds ago
|
|
lt: new Date(Date.now() - purgePositionsAfterSeconds * 1000),
|
|
},
|
|
}
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Purges all service envelopes from the database that are older than the configured timeframe.
|
|
*/
|
|
async function purgeOldServiceEnvelopes() {
|
|
|
|
// make sure seconds provided
|
|
if(!purgeServiceEnvelopesAfterSeconds){
|
|
return;
|
|
}
|
|
|
|
// delete all service envelopes that are older than the configured purge time
|
|
try {
|
|
await prisma.serviceEnvelope.deleteMany({
|
|
where: {
|
|
created_at: {
|
|
// created before x seconds ago
|
|
lt: new Date(Date.now() - purgeServiceEnvelopesAfterSeconds * 1000),
|
|
},
|
|
}
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Purges all text messages from the database that are older than the configured timeframe.
|
|
*/
|
|
async function purgeOldTextMessages() {
|
|
|
|
// make sure seconds provided
|
|
if(!purgeTextMessagesAfterSeconds){
|
|
return;
|
|
}
|
|
|
|
// delete all text messages that are older than the configured purge time
|
|
try {
|
|
await prisma.textMessage.deleteMany({
|
|
where: {
|
|
created_at: {
|
|
// created before x seconds ago
|
|
lt: new Date(Date.now() - purgeTextMessagesAfterSeconds * 1000),
|
|
},
|
|
}
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Purges all traceroutes from the database that are older than the configured timeframe.
|
|
*/
|
|
async function purgeOldTraceroutes() {
|
|
|
|
// make sure seconds provided
|
|
if(!purgeTraceroutesAfterSeconds){
|
|
return;
|
|
}
|
|
|
|
// delete all traceroutes that are older than the configured purge time
|
|
try {
|
|
await prisma.traceRoute.deleteMany({
|
|
where: {
|
|
created_at: {
|
|
// created before x seconds ago
|
|
lt: new Date(Date.now() - purgeTraceroutesAfterSeconds * 1000),
|
|
},
|
|
}
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Purges all waypoints from the database that are older than the configured timeframe.
|
|
*/
|
|
async function purgeOldWaypoints() {
|
|
|
|
// make sure seconds provided
|
|
if(!purgeWaypointsAfterSeconds){
|
|
return;
|
|
}
|
|
|
|
// delete all waypoints that are older than the configured purge time
|
|
try {
|
|
await prisma.waypoint.deleteMany({
|
|
where: {
|
|
created_at: {
|
|
// created before x seconds ago
|
|
lt: new Date(Date.now() - purgeWaypointsAfterSeconds * 1000),
|
|
},
|
|
}
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Clears the current position stored for nodes if the position hasn't been updated within the configured timeframe.
|
|
* This allows the node position to drop off the map if the user disabled position reporting, but still wants telemetry lookup etc
|
|
*/
|
|
async function forgetOutdatedNodePositions() {
|
|
|
|
// make sure seconds provided
|
|
if(!forgetOutdatedNodePositionsAfterSeconds){
|
|
return;
|
|
}
|
|
|
|
// clear latitude/longitude/altitude for nodes that haven't updated their position in the configured timeframe
|
|
try {
|
|
await prisma.node.updateMany({
|
|
where: {
|
|
position_updated_at: {
|
|
// position_updated_at before x seconds ago
|
|
lt: new Date(Date.now() - forgetOutdatedNodePositionsAfterSeconds * 1000),
|
|
},
|
|
// don't forget outdated node positions for nodes that don't actually have a position set
|
|
// otherwise the updated_at is updated, when nothing changed
|
|
NOT: {
|
|
latitude: null,
|
|
longitude: null,
|
|
altitude: null,
|
|
},
|
|
},
|
|
data: {
|
|
latitude: null,
|
|
longitude: null,
|
|
altitude: null,
|
|
},
|
|
});
|
|
} catch(e) {
|
|
// do nothing
|
|
}
|
|
|
|
}
|
|
|
|
function createNonce(packetId, fromNode) {
|
|
|
|
// Expand packetId to 64 bits
|
|
const packetId64 = BigInt(packetId);
|
|
|
|
// Initialize block counter (32-bit, starts at zero)
|
|
const blockCounter = 0;
|
|
|
|
// Create a buffer for the nonce
|
|
const buf = Buffer.alloc(16);
|
|
|
|
// Write packetId, fromNode, and block counter to the buffer
|
|
buf.writeBigUInt64LE(packetId64, 0);
|
|
buf.writeUInt32LE(fromNode, 8);
|
|
buf.writeUInt32LE(blockCounter, 12);
|
|
|
|
return buf;
|
|
|
|
}
|
|
|
|
/**
|
|
* References:
|
|
* https://github.com/crypto-smoke/meshtastic-go/blob/develop/radio/aes.go#L42
|
|
* https://github.com/pdxlocations/Meshtastic-MQTT-Connect/blob/main/meshtastic-mqtt-connect.py#L381
|
|
*/
|
|
function decrypt(packet) {
|
|
|
|
// attempt to decrypt with all available decryption keys
|
|
for(const decryptionKey of decryptionKeys){
|
|
try {
|
|
// convert encryption key to buffer
|
|
const key = Buffer.from(decryptionKey, "base64");
|
|
|
|
// create decryption iv/nonce for this packet
|
|
const nonceBuffer = createNonce(packet.id, packet.from);
|
|
|
|
// determine algorithm based on key length
|
|
var algorithm = null;
|
|
if(key.length === 16){
|
|
algorithm = "aes-128-ctr";
|
|
} else if(key.length === 32){
|
|
algorithm = "aes-256-ctr";
|
|
} else {
|
|
// skip this key, try the next one...
|
|
console.error(`Skipping decryption key with invalid length: ${key.length}`);
|
|
continue;
|
|
}
|
|
|
|
// create decipher
|
|
const decipher = crypto.createDecipheriv(algorithm, key, nonceBuffer);
|
|
|
|
// decrypt encrypted packet
|
|
const decryptedBuffer = Buffer.concat([decipher.update(packet.payloadVariant.value), decipher.final()]);
|
|
|
|
// parse as data message
|
|
return fromBinary(Mesh.DataSchema, decryptedBuffer);
|
|
|
|
} catch(e){}
|
|
}
|
|
|
|
// couldn't decrypt
|
|
return null;
|
|
|
|
}
|
|
|
|
/**
|
|
* converts hex id to numeric id, for example: !FFFFFFFF to 4294967295
|
|
* @param hexId a node id in hex format with a prepended "!"
|
|
* @returns {bigint} the node id in numeric form
|
|
*/
|
|
function convertHexIdToNumericId(hexId) {
|
|
return BigInt('0x' + hexId.replaceAll("!", ""));
|
|
}
|
|
|
|
// subscribe to everything when connected
|
|
client.on('connect', () => {
|
|
for(const mqttTopic of mqttTopics){
|
|
client.subscribe(mqttTopic);
|
|
}
|
|
});
|
|
|
|
// handle message received
|
|
client.on('message', async (topic, message) => {
|
|
try {
|
|
// decode service envelope
|
|
let envelope = null;
|
|
try {
|
|
envelope = fromBinary(Mqtt.ServiceEnvelopeSchema, message);
|
|
} catch(e) { }
|
|
if (!envelope) {
|
|
return;
|
|
}
|
|
|
|
let dataPacket = envelope.packet.payloadVariant.value;
|
|
// attempt to decrypt encrypted packets
|
|
if (envelope.packet.payloadVariant.case === 'encrypted') {
|
|
envelope.packet.payloadVariant.value = dataPacket = decrypt(envelope.packet);
|
|
}
|
|
|
|
if (dataPacket !== null) {
|
|
const bitfield = dataPacket.payload.bitfield ?? null;
|
|
// check if bitfield is available (v2.5.x firmware or newer)
|
|
if (bitfield != null){
|
|
// drop packets where "OK to MQTT" is false
|
|
const isOkToMqtt = bitfield & BITFIELD_OK_TO_MQTT_MASK;
|
|
if (dropPacketsNotOkToMqtt && !isOkToMqtt) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
// if bitfield is not available for this packet, check if we want to drop this portnum
|
|
if (bitfield == null) {
|
|
// drop packet if portnum is in drop list
|
|
// this is useful for dropping specific packet types from firmware older than v2.5
|
|
if (dropPortnumsWithoutBitfield != null && dropPortnumsWithoutBitfield.includes(portnum)) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
// create service envelope in db
|
|
if (collectorEnabled.serviceEnvelopes) {
|
|
try {
|
|
await prisma.serviceEnvelope.create({
|
|
data: {
|
|
mqtt_topic: topic,
|
|
channel_id: envelope.channelId,
|
|
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
|
|
to: envelope.packet.to,
|
|
from: envelope.packet.from,
|
|
protobuf: message,
|
|
},
|
|
});
|
|
} catch (e) {
|
|
console.error(e, {
|
|
envelope: envelope.packet,
|
|
});
|
|
}
|
|
}
|
|
|
|
// track when a node last gated a packet to mqtt
|
|
try {
|
|
await prisma.node.updateMany({
|
|
where: {
|
|
node_id: convertHexIdToNumericId(envelope.gatewayId),
|
|
},
|
|
data: {
|
|
mqtt_connection_state_updated_at: new Date(),
|
|
},
|
|
});
|
|
} catch(e) {
|
|
// don't care if updating mqtt timestamp fails
|
|
}
|
|
|
|
// bail if we don't have a usable Data packet
|
|
if (dataPacket === null) {
|
|
return;
|
|
}
|
|
|
|
let payload = dataPacket.payload;
|
|
// get portnum from decoded packet
|
|
const portnum = dataPacket.portnum;
|
|
// get bitfield from decoded packet
|
|
// bitfield was added in v2.5 of meshtastic firmware
|
|
// this value will be null for packets from v2.4.x and below, and will be an integer in v2.5.x and above
|
|
const bitfield = dataPacket.bitfield;
|
|
|
|
// if allowed portnums are configured, ignore portnums that are not in the list
|
|
if(allowedPortnums != null && !allowedPortnums.includes(portnum)){
|
|
return;
|
|
}
|
|
|
|
let callback = null;
|
|
let schema = null;
|
|
switch(portnum) {
|
|
case Portnums.PortNum.TEXT_MESSAGE_APP:
|
|
callback = onTextMessage;
|
|
break;
|
|
case Portnums.PortNum.POSITION_APP:
|
|
callback = onPosition;
|
|
schema = Mesh.PositionSchema;
|
|
break;
|
|
case Portnums.PortNum.NODEINFO_APP:
|
|
callback = onNodeInfo;
|
|
schema = Mesh.NodeInfoSchema;
|
|
break;
|
|
case Portnums.PortNum.WAYPOINT_APP:
|
|
callback = onWaypoint;
|
|
schema = Mesh.WaypointSchema;
|
|
break;
|
|
case Portnums.PortNum.NEIGHBORINFO_APP:
|
|
callback = onNeighborInfo;
|
|
schema = Mesh.NeighborInfoSchema;
|
|
break;
|
|
case Portnums.PortNum.TELEMETRY_APP:
|
|
callback = onTelemetry;
|
|
schema = Telemetry.TelemetrySchema;
|
|
break;
|
|
case Portnums.PortNum.TRACEROUTE_APP:
|
|
callback = onRouteDiscovery;
|
|
schema = Mesh.RouteDiscoverySchema;
|
|
break;
|
|
case Portnums.PortNum.MAP_REPORT_APP:
|
|
callback = onMapReport;
|
|
schema = Mqtt.MapReportSchema;
|
|
break;
|
|
default:
|
|
// handle unknown port nums here
|
|
if (logUnknownPortnums) {
|
|
const ignoredPortnums = [
|
|
Portnums.PortNum.UNKNOWN_APP,
|
|
Portnums.PortNum.TEXT_MESSAGE_COMPRESSED_APP,
|
|
Portnums.PortNum.ROUTING_APP,
|
|
Portnums.PortNum.PAXCOUNTER_APP,
|
|
Portnums.PortNum.STORE_FORWARD_APP,
|
|
Portnums.PortNum.RANGE_TEST_APP,
|
|
Portnums.PortNum.ATAK_PLUGIN,
|
|
Portnums.PortNum.ATAK_FORWARDER,
|
|
];
|
|
// ignore packets we don't want to see for now
|
|
if (portnum === undefined || ignoredPortnums.includes(portnum) || portnum > 511) {
|
|
return;
|
|
}
|
|
console.log(portnum, envelope);
|
|
}
|
|
}
|
|
|
|
if (callback !== null) {
|
|
if (schema !== null) {
|
|
try {
|
|
payload = fromBinary(schema, payload);
|
|
} catch(e) {
|
|
return;
|
|
}
|
|
}
|
|
await callback(envelope, payload);
|
|
}
|
|
|
|
} catch(e) {
|
|
// ignore errors
|
|
}
|
|
});
|
|
|
|
async function onTextMessage(envelope, payload) {
|
|
if(logKnownPacketTypes) {
|
|
console.log('TEXT_MESSAGE_APP', {
|
|
to: envelope.packet.to.toString(16),
|
|
from: envelope.packet.from.toString(16),
|
|
text: payload.toString(),
|
|
});
|
|
}
|
|
|
|
if (!collectorEnabled.textMessages) {
|
|
return;
|
|
}
|
|
|
|
// check if we want to ignore direct messages
|
|
if(!collectorEnabled.directMessages && envelope.packet.to !== 0xFFFFFFFF){
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await prisma.textMessage.create({
|
|
data: {
|
|
to: envelope.packet.to,
|
|
from: envelope.packet.from,
|
|
channel: envelope.packet.channel,
|
|
packet_id: envelope.packet.id,
|
|
channel_id: envelope.channelId,
|
|
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
|
|
text: payload.toString(),
|
|
rx_time: envelope.packet.rxTime,
|
|
rx_snr: envelope.packet.rxSnr,
|
|
rx_rssi: envelope.packet.rxRssi,
|
|
hop_limit: envelope.packet.hopLimit,
|
|
},
|
|
});
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
}
|
|
|
|
async function onPosition(envelope, payload) {
|
|
if(logKnownPacketTypes){
|
|
console.log('POSITION_APP', {
|
|
from: envelope.packet.from.toString(16),
|
|
position: payload,
|
|
});
|
|
}
|
|
|
|
// process position
|
|
if(payload.latitudeI != null && payload.longitudeI){
|
|
|
|
const bitfield = envelope.packet.payloadVariant.value.bitfield;
|
|
// if bitfield is not available, we are on firmware v2.4 or below
|
|
// if configured, position packets should have their precision reduced
|
|
if (bitfield == null && oldFirmwarePositionPrecision != null) {
|
|
|
|
// adjust precision of latitude and longitude
|
|
payload.latitudeI = PositionUtil.setPositionPrecision(payload.latitudeI, oldFirmwarePositionPrecision);
|
|
payload.longitudeI = PositionUtil.setPositionPrecision(payload.longitudeI, oldFirmwarePositionPrecision);
|
|
|
|
// update position precision on packet to show that it is no longer full precision
|
|
payload.precisionBits = oldFirmwarePositionPrecision;
|
|
|
|
}
|
|
|
|
// update node position in db
|
|
try {
|
|
await prisma.node.updateMany({
|
|
where: {
|
|
node_id: envelope.packet.from,
|
|
},
|
|
data: {
|
|
position_updated_at: new Date(),
|
|
latitude: payload.latitudeI,
|
|
longitude: payload.longitudeI,
|
|
altitude: payload.altitude !== 0 ? payload.altitude : null,
|
|
position_precision: payload.precisionBits,
|
|
},
|
|
});
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
|
|
}
|
|
|
|
// don't collect position history if not enabled, but we still want to update the node above
|
|
if (!collectorEnabled.positions) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
|
|
// find an existing position with duplicate information created in the last 60 seconds
|
|
const existingDuplicatePosition = await prisma.position.findFirst({
|
|
where: {
|
|
node_id: envelope.packet.from,
|
|
packet_id: envelope.packet.id,
|
|
created_at: {
|
|
gte: new Date(Date.now() - 60000), // created in the last 60 seconds
|
|
},
|
|
}
|
|
});
|
|
|
|
// create position if no duplicates found
|
|
if(!existingDuplicatePosition){
|
|
await prisma.position.create({
|
|
data: {
|
|
node_id: envelope.packet.from,
|
|
to: envelope.packet.to,
|
|
from: envelope.packet.from,
|
|
channel: envelope.packet.channel,
|
|
packet_id: envelope.packet.id,
|
|
channel_id: envelope.channelId,
|
|
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
|
|
latitude: payload.latitudeI,
|
|
longitude: payload.longitudeI,
|
|
altitude: payload.altitude,
|
|
},
|
|
});
|
|
}
|
|
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
}
|
|
|
|
async function onNodeInfo(envelope, payload) {
|
|
if(logKnownPacketTypes) {
|
|
console.log('NODEINFO_APP', {
|
|
from: envelope.packet.from.toString(16),
|
|
user: payload,
|
|
});
|
|
}
|
|
|
|
// create or update node in db
|
|
try {
|
|
await prisma.node.upsert({
|
|
where: {
|
|
node_id: envelope.packet.from,
|
|
},
|
|
create: {
|
|
node_id: envelope.packet.from,
|
|
long_name: payload.longName,
|
|
short_name: payload.shortName,
|
|
hardware_model: payload.hwModel,
|
|
is_licensed: payload.isLicensed === true,
|
|
role: payload.role,
|
|
},
|
|
update: {
|
|
long_name: payload.longName,
|
|
short_name: payload.shortName,
|
|
hardware_model: payload.hwModel,
|
|
is_licensed: payload.isLicensed === true,
|
|
role: payload.role,
|
|
},
|
|
});
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
}
|
|
|
|
async function onWaypoint(envelope, payload) {
|
|
if(logKnownPacketTypes) {
|
|
console.log('WAYPOINT_APP', {
|
|
to: envelope.packet.to.toString(16),
|
|
from: envelope.packet.from.toString(16),
|
|
waypoint: payload,
|
|
});
|
|
}
|
|
|
|
if (!collectorEnabled.waypoints) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await prisma.waypoint.create({
|
|
data: {
|
|
to: envelope.packet.to,
|
|
from: envelope.packet.from,
|
|
waypoint_id: payload.id,
|
|
latitude: payload.latitudeI,
|
|
longitude: payload.longitudeI,
|
|
expire: payload.expire,
|
|
locked_to: payload.lockedTo,
|
|
name: payload.name,
|
|
description: payload.description,
|
|
icon: payload.icon,
|
|
channel: envelope.packet.channel,
|
|
packet_id: envelope.packet.id,
|
|
channel_id: envelope.channelId,
|
|
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
|
|
},
|
|
});
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
}
|
|
|
|
async function onMapReport(envelope, payload) {
|
|
if(logKnownPacketTypes) {
|
|
console.log('MAP_REPORT_APP', {
|
|
from: envelope.packet.from.toString(16),
|
|
map_report: payload,
|
|
});
|
|
}
|
|
|
|
// create or update node in db
|
|
try {
|
|
|
|
// data to set on node
|
|
const data = {
|
|
long_name: payload.longName,
|
|
short_name: payload.shortName,
|
|
hardware_model: payload.hwModel,
|
|
role: payload.role,
|
|
latitude: payload.latitudeI,
|
|
longitude: payload.longitudeI,
|
|
altitude: payload.altitude !== 0 ? payload.altitude : null,
|
|
firmware_version: payload.firmwareVersion,
|
|
region: payload.region,
|
|
modem_preset: payload.modemPreset,
|
|
has_default_channel: payload.hasDefaultChannel,
|
|
position_precision: payload.positionPrecision,
|
|
num_online_local_nodes: payload.numOnlineLocalNodes,
|
|
position_updated_at: new Date(),
|
|
};
|
|
|
|
await prisma.node.upsert({
|
|
where: {
|
|
node_id: envelope.packet.from,
|
|
},
|
|
create: {
|
|
node_id: envelope.packet.from,
|
|
...data,
|
|
},
|
|
update: data,
|
|
});
|
|
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
|
|
// don't collect map report history if not enabled, but we still want to update the node above
|
|
if (!collectorEnabled.mapReports) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
|
|
// find an existing map with duplicate information created in the last 60 seconds
|
|
const existingDuplicateMapReport = await prisma.mapReport.findFirst({
|
|
where: {
|
|
node_id: envelope.packet.from,
|
|
long_name: payload.longName,
|
|
short_name: payload.shortName,
|
|
created_at: {
|
|
gte: new Date(Date.now() - 60000), // created in the last 60 seconds
|
|
},
|
|
}
|
|
});
|
|
|
|
// create map report if no duplicates found
|
|
if(!existingDuplicateMapReport){
|
|
await prisma.mapReport.create({
|
|
data: {
|
|
node_id: envelope.packet.from,
|
|
long_name: payload.longName,
|
|
short_name: payload.shortName,
|
|
role: payload.role,
|
|
hardware_model: payload.hwModel,
|
|
firmware_version: payload.firmwareVersion,
|
|
region: payload.region,
|
|
modem_preset: payload.modemPreset,
|
|
has_default_channel: payload.hasDefaultChannel,
|
|
latitude: payload.latitudeI,
|
|
longitude: payload.longitudeI,
|
|
altitude: payload.altitude,
|
|
position_precision: payload.positionPrecision,
|
|
num_online_local_nodes: payload.numOnlineLocalNodes,
|
|
},
|
|
});
|
|
}
|
|
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
}
|
|
|
|
async function onNeighborInfo(envelope, payload) {
|
|
if(logKnownPacketTypes) {
|
|
console.log('NEIGHBORINFO_APP', {
|
|
from: envelope.packet.from.toString(16),
|
|
neighbor_info: payload,
|
|
});
|
|
}
|
|
|
|
// update node neighbour info in db
|
|
try {
|
|
await prisma.node.updateMany({
|
|
where: {
|
|
node_id: envelope.packet.from,
|
|
},
|
|
data: {
|
|
neighbours_updated_at: new Date(),
|
|
neighbour_broadcast_interval_secs: payload.nodeBroadcastIntervalSecs,
|
|
neighbours: payload.neighbors.map((neighbour) => {
|
|
return {
|
|
node_id: neighbour.nodeId,
|
|
snr: neighbour.snr,
|
|
};
|
|
}),
|
|
},
|
|
});
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
|
|
// don't store all neighbour infos, but we want to update the existing node above
|
|
if (!collectorEnabled.neighborInfo) {
|
|
return;
|
|
}
|
|
|
|
// create neighbour info
|
|
try {
|
|
await prisma.neighbourInfo.create({
|
|
data: {
|
|
node_id: envelope.packet.from,
|
|
node_broadcast_interval_secs: payload.nodeBroadcastIntervalSecs,
|
|
neighbours: payload.neighbors.map((neighbour) => {
|
|
return {
|
|
node_id: neighbour.nodeId,
|
|
snr: neighbour.snr,
|
|
};
|
|
}),
|
|
},
|
|
});
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
}
|
|
|
|
async function onRouteDiscovery(envelope, payload) {
|
|
if(logKnownPacketTypes) {
|
|
console.log('TRACEROUTE_APP', {
|
|
to: envelope.packet.to.toString(16),
|
|
from: envelope.packet.from.toString(16),
|
|
want_response: envelope.packet.payloadVariant.value.wantResponse,
|
|
route_discovery: payload,
|
|
});
|
|
}
|
|
|
|
try {
|
|
await prisma.traceRoute.create({
|
|
data: {
|
|
to: envelope.packet.to,
|
|
from: envelope.packet.from,
|
|
want_response: envelope.packet.payloadVariant.value.wantResponse,
|
|
route: payload.route,
|
|
snr_towards: payload.snrTowards,
|
|
route_back: payload.routeBack,
|
|
snr_back: payload.snrBack,
|
|
channel: envelope.packet.channel,
|
|
packet_id: envelope.packet.id,
|
|
channel_id: envelope.channelId,
|
|
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
|
|
},
|
|
});
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
}
|
|
|
|
async function onTelemetry(envelope, payload) {
|
|
|
|
if(logKnownPacketTypes) {
|
|
console.log('TELEMETRY_APP', {
|
|
from: envelope.packet.from.toString(16),
|
|
telemetry: payload,
|
|
});
|
|
}
|
|
|
|
// data to update
|
|
const data = {};
|
|
|
|
// handle device metrics
|
|
if(payload.deviceMetrics){
|
|
|
|
data.battery_level = payload.deviceMetrics.batteryLevel !== 0 ? payload.deviceMetrics.batteryLevel : null;
|
|
data.voltage = payload.deviceMetrics.voltage !== 0 ? payload.deviceMetrics.voltage : null;
|
|
data.channel_utilization = payload.deviceMetrics.channelUtilization !== 0 ? payload.deviceMetrics.channelUtilization : null;
|
|
data.air_util_tx = payload.deviceMetrics.airUtilTx !== 0 ? payload.deviceMetrics.airUtilTx : null;
|
|
data.uptime_seconds = payload.deviceMetrics.uptimeSeconds !== 0 ? payload.deviceMetrics.uptimeSeconds : null;
|
|
|
|
// create device metric
|
|
try {
|
|
|
|
// find an existing metric with duplicate information created in the last 15 seconds
|
|
const existingDuplicateDeviceMetric = await prisma.deviceMetric.findFirst({
|
|
where: {
|
|
node_id: envelope.packet.from,
|
|
battery_level: data.battery_level,
|
|
voltage: data.voltage,
|
|
channel_utilization: data.channel_utilization,
|
|
air_util_tx: data.air_util_tx,
|
|
created_at: {
|
|
gte: new Date(Date.now() - 15000), // created in the last 15 seconds
|
|
},
|
|
}
|
|
})
|
|
|
|
// create metric if no duplicates found
|
|
if(!existingDuplicateDeviceMetric){
|
|
await prisma.deviceMetric.create({
|
|
data: {
|
|
node_id: envelope.packet.from,
|
|
battery_level: data.battery_level,
|
|
voltage: data.voltage,
|
|
channel_utilization: data.channel_utilization,
|
|
air_util_tx: data.air_util_tx,
|
|
},
|
|
});
|
|
}
|
|
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
|
|
}
|
|
|
|
// handle environment metrics
|
|
if(payload.environmentMetrics){
|
|
|
|
// get metric values
|
|
const temperature = payload.environmentMetrics.temperature !== 0 ? payload.environmentMetrics.temperature : null;
|
|
const relativeHumidity = payload.environmentMetrics.relativeHumidity !== 0 ? payload.environmentMetrics.relativeHumidity : null;
|
|
const barometricPressure = payload.environmentMetrics.barometricPressure !== 0 ? payload.environmentMetrics.barometricPressure : null;
|
|
const gasResistance = payload.environmentMetrics.gasResistance !== 0 ? payload.environmentMetrics.gasResistance : null;
|
|
const voltage = payload.environmentMetrics.voltage !== 0 ? payload.environmentMetrics.voltage : null;
|
|
const current = payload.environmentMetrics.current !== 0 ? payload.environmentMetrics.current : null;
|
|
const iaq = payload.environmentMetrics.iaq !== 0 ? payload.environmentMetrics.iaq : null;
|
|
const windDirection = payload.environmentMetrics.windDirection;
|
|
const windSpeed = payload.environmentMetrics.windSpeed;
|
|
const windGust = payload.environmentMetrics.windGust;
|
|
const windLull = payload.environmentMetrics.windLull;
|
|
|
|
// set metrics to update on node table
|
|
data.temperature = temperature;
|
|
data.relative_humidity = relativeHumidity;
|
|
data.barometric_pressure = barometricPressure;
|
|
|
|
// create environment metric
|
|
try {
|
|
|
|
// find an existing metric with duplicate information created in the last 15 seconds
|
|
const existingDuplicateEnvironmentMetric = await prisma.environmentMetric.findFirst({
|
|
where: {
|
|
node_id: envelope.packet.from,
|
|
packet_id: envelope.packet.id,
|
|
created_at: {
|
|
gte: new Date(Date.now() - 15000), // created in the last 15 seconds
|
|
},
|
|
}
|
|
})
|
|
|
|
// create metric if no duplicates found
|
|
if(!existingDuplicateEnvironmentMetric){
|
|
await prisma.environmentMetric.create({
|
|
data: {
|
|
node_id: envelope.packet.from,
|
|
packet_id: envelope.packet.id,
|
|
temperature: temperature,
|
|
relative_humidity: relativeHumidity,
|
|
barometric_pressure: barometricPressure,
|
|
gas_resistance: gasResistance,
|
|
voltage: voltage,
|
|
current: current,
|
|
iaq: iaq,
|
|
wind_direction: windDirection,
|
|
wind_speed: windSpeed,
|
|
wind_gust: windGust,
|
|
wind_lull: windLull,
|
|
},
|
|
});
|
|
}
|
|
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
|
|
}
|
|
|
|
// handle power metrics
|
|
if(payload.powerMetrics){
|
|
|
|
// get metric values
|
|
const ch1Voltage = payload.powerMetrics.ch1Voltage !== 0 ? payload.powerMetrics.ch1Voltage : null;
|
|
const ch1Current = payload.powerMetrics.ch1Current !== 0 ? payload.powerMetrics.ch1Current : null;
|
|
const ch2Voltage = payload.powerMetrics.ch2Voltage !== 0 ? payload.powerMetrics.ch2Voltage : null;
|
|
const ch2Current = payload.powerMetrics.ch2Current !== 0 ? payload.powerMetrics.ch2Current : null;
|
|
const ch3Voltage = payload.powerMetrics.ch3Voltage !== 0 ? payload.powerMetrics.ch3Voltage : null;
|
|
const ch3Current = payload.powerMetrics.ch3Current !== 0 ? payload.powerMetrics.ch3Current : null;
|
|
|
|
// create power metric
|
|
try {
|
|
|
|
// find an existing metric with duplicate information created in the last 15 seconds
|
|
const existingDuplicatePowerMetric = await prisma.powerMetric.findFirst({
|
|
where: {
|
|
node_id: envelope.packet.from,
|
|
packet_id: envelope.packet.id,
|
|
created_at: {
|
|
gte: new Date(Date.now() - 15000), // created in the last 15 seconds
|
|
},
|
|
}
|
|
})
|
|
|
|
// create metric if no duplicates found
|
|
if(!existingDuplicatePowerMetric){
|
|
await prisma.powerMetric.create({
|
|
data: {
|
|
node_id: envelope.packet.from,
|
|
packet_id: envelope.packet.id,
|
|
ch1_voltage: ch1Voltage,
|
|
ch1_current: ch1Current,
|
|
ch2_voltage: ch2Voltage,
|
|
ch2_current: ch2Current,
|
|
ch3_voltage: ch3Voltage,
|
|
ch3_current: ch3Current,
|
|
},
|
|
});
|
|
}
|
|
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
|
|
}
|
|
|
|
// update node telemetry in db
|
|
if(Object.keys(data).length > 0){
|
|
try {
|
|
await prisma.node.updateMany({
|
|
where: {
|
|
node_id: envelope.packet.from,
|
|
},
|
|
data: data,
|
|
});
|
|
} catch (e) {
|
|
console.error(e);
|
|
}
|
|
}
|
|
} |