rewrite parser for mqtt
All checks were successful
Build Docker containers / Build (push) Successful in 30s
All checks were successful
Build Docker containers / Build (push) Successful in 30s
This commit is contained in:
577
mqtt/index.js
577
mqtt/index.js
@ -214,16 +214,19 @@ const mqttClientId = options["mqtt-client-id"] ?? "mqttx_1bc723c7";
|
|||||||
const mqttTopics = options["mqtt-topic"] ?? ["msh/US/#"];
|
const mqttTopics = options["mqtt-topic"] ?? ["msh/US/#"];
|
||||||
const allowedPortnums = options["allowed-portnums"] ?? null;
|
const allowedPortnums = options["allowed-portnums"] ?? null;
|
||||||
const logUnknownPortnums = options["log-unknown-portnums"] ?? false;
|
const logUnknownPortnums = options["log-unknown-portnums"] ?? false;
|
||||||
const collectServiceEnvelopes = options["collect-service-envelopes"] ?? false;
|
const collectorEnabled = {
|
||||||
const collectPositions = options["collect-positions"] ?? false;
|
serviceEnvelopes: options["collect-service-envelopes"] ?? false,
|
||||||
const collectTextMessages = options["collect-text-messages"] ?? false;
|
positions: options["collect-positions"] ?? false,
|
||||||
const ignoreDirectMessages = options["ignore-direct-messages"] ?? false;
|
textMessages: options["collect-text-messages"] ?? false,
|
||||||
const collectWaypoints = options["collect-waypoints"] ?? false;
|
waypoints: options["collect-waypoints"] ?? false,
|
||||||
const collectNeighbourInfo = options["collect-neighbour-info"] ?? false;
|
mapReports: options["collect-map-reports"] ?? false,
|
||||||
const collectMapReports = options["collect-map-reports"] ?? false;
|
directMessages: !(options["ignore-direct-messages"] ?? false),
|
||||||
|
neighborInfo: options["collect-neighbor-info"] ?? false,
|
||||||
|
}
|
||||||
const decryptionKeys = options["decryption-keys"] ?? [
|
const decryptionKeys = options["decryption-keys"] ?? [
|
||||||
"1PG7OiApB1nwvP+rz05pAQ==", // add default "AQ==" decryption key
|
"1PG7OiApB1nwvP+rz05pAQ==", // add default "AQ==" decryption key
|
||||||
];
|
];
|
||||||
|
const logKnownPacketTypes = false;
|
||||||
const dropPacketsNotOkToMqtt = options["drop-packets-not-ok-to-mqtt"] ?? false;
|
const dropPacketsNotOkToMqtt = options["drop-packets-not-ok-to-mqtt"] ?? false;
|
||||||
const dropPortnumsWithoutBitfield = options["drop-portnums-without-bitfield"] ?? null;
|
const dropPortnumsWithoutBitfield = options["drop-portnums-without-bitfield"] ?? null;
|
||||||
const oldFirmwarePositionPrecision = options["old-firmware-position-precision"] ?? null;
|
const oldFirmwarePositionPrecision = options["old-firmware-position-precision"] ?? null;
|
||||||
@ -241,6 +244,7 @@ const purgeTextMessagesAfterSeconds = options["purge-text-messages-after-seconds
|
|||||||
const purgeTraceroutesAfterSeconds = options["purge-traceroutes-after-seconds"] ?? null;
|
const purgeTraceroutesAfterSeconds = options["purge-traceroutes-after-seconds"] ?? null;
|
||||||
const purgeWaypointsAfterSeconds = options["purge-waypoints-after-seconds"] ?? null;
|
const purgeWaypointsAfterSeconds = options["purge-waypoints-after-seconds"] ?? null;
|
||||||
|
|
||||||
|
|
||||||
// create mqtt client
|
// create mqtt client
|
||||||
const client = mqtt.connect(mqttBrokerUrl, {
|
const client = mqtt.connect(mqttBrokerUrl, {
|
||||||
username: mqttUsername,
|
username: mqttUsername,
|
||||||
@ -266,6 +270,15 @@ if(purgeIntervalSeconds){
|
|||||||
}, purgeIntervalSeconds * 1000);
|
}, purgeIntervalSeconds * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function printStatus() {
|
||||||
|
console.log(`MQTT server: ${mqttBrokerUrl}`)
|
||||||
|
console.log(`MQTT topic(s): ${mqttTopics.join(', ')}`)
|
||||||
|
for (const key of Object.keys(collectorEnabled)) {
|
||||||
|
console.log(`${key} collection: ${collectorEnabled[key] ? 'enabled' : 'disabled'}`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
printStatus();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Purges all nodes from the database that haven't been heard from within the configured timeframe.
|
* Purges all nodes from the database that haven't been heard from within the configured timeframe.
|
||||||
*/
|
*/
|
||||||
@ -666,14 +679,14 @@ function convertHexIdToNumericId(hexId) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// subscribe to everything when connected
|
// subscribe to everything when connected
|
||||||
client.on("connect", () => {
|
client.on('connect', () => {
|
||||||
for(const mqttTopic of mqttTopics){
|
for(const mqttTopic of mqttTopics){
|
||||||
client.subscribe(mqttTopic);
|
client.subscribe(mqttTopic);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// handle message received
|
// handle message received
|
||||||
client.on("message", async (topic, message) => {
|
client.on('message', async (topic, message) => {
|
||||||
try {
|
try {
|
||||||
// decode service envelope
|
// decode service envelope
|
||||||
let envelope = null;
|
let envelope = null;
|
||||||
@ -687,7 +700,7 @@ client.on("message", async (topic, message) => {
|
|||||||
let dataPacket = envelope.packet.payloadVariant.value;
|
let dataPacket = envelope.packet.payloadVariant.value;
|
||||||
// attempt to decrypt encrypted packets
|
// attempt to decrypt encrypted packets
|
||||||
if (envelope.packet.payloadVariant.case === 'encrypted') {
|
if (envelope.packet.payloadVariant.case === 'encrypted') {
|
||||||
dataPacket = decrypt(envelope.packet);
|
envelope.packet.payloadVariant.value = dataPacket = decrypt(envelope.packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dataPacket !== null) {
|
if (dataPacket !== null) {
|
||||||
@ -712,7 +725,7 @@ client.on("message", async (topic, message) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create service envelope in db
|
// create service envelope in db
|
||||||
if(collectServiceEnvelopes){
|
if (collectorEnabled.serviceEnvelopes) {
|
||||||
try {
|
try {
|
||||||
await prisma.serviceEnvelope.create({
|
await prisma.serviceEnvelope.create({
|
||||||
data: {
|
data: {
|
||||||
@ -750,7 +763,7 @@ client.on("message", async (topic, message) => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const payload = dataPacket.payload;
|
let payload = dataPacket.payload;
|
||||||
// get portnum from decoded packet
|
// get portnum from decoded packet
|
||||||
const portnum = dataPacket.portnum;
|
const portnum = dataPacket.portnum;
|
||||||
// get bitfield from decoded packet
|
// get bitfield from decoded packet
|
||||||
@ -758,32 +771,100 @@ client.on("message", async (topic, message) => {
|
|||||||
// this value will be null for packets from v2.4.x and below, and will be an integer in v2.5.x and above
|
// this value will be null for packets from v2.4.x and below, and will be an integer in v2.5.x and above
|
||||||
const bitfield = dataPacket.bitfield;
|
const bitfield = dataPacket.bitfield;
|
||||||
|
|
||||||
const logKnownPacketTypes = false;
|
|
||||||
|
|
||||||
// if allowed portnums are configured, ignore portnums that are not in the list
|
// if allowed portnums are configured, ignore portnums that are not in the list
|
||||||
if(allowedPortnums != null && !allowedPortnums.includes(portnum)){
|
if(allowedPortnums != null && !allowedPortnums.includes(portnum)){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(portnum === Portnums.PortNum.TEXT_MESSAGE_APP) {
|
let callback = null;
|
||||||
|
let schema = null;
|
||||||
if(!collectTextMessages){
|
switch(portnum) {
|
||||||
|
case Portnums.PortNum.TEXT_MESSAGE_APP:
|
||||||
|
callback = onTextMessage;
|
||||||
|
break;
|
||||||
|
case Portnums.PortNum.POSITION_APP:
|
||||||
|
callback = onPosition;
|
||||||
|
schema = Mesh.PositionSchema;
|
||||||
|
break;
|
||||||
|
case Portnums.PortNum.NODEINFO_APP:
|
||||||
|
callback = onNodeInfo;
|
||||||
|
schema = Mesh.NodeInfoSchema;
|
||||||
|
break;
|
||||||
|
case Portnums.PortNum.WAYPOINT_APP:
|
||||||
|
callback = onWaypoint;
|
||||||
|
schema = Mesh.WaypointSchema;
|
||||||
|
break;
|
||||||
|
case Portnums.PortNum.NEIGHBORINFO_APP:
|
||||||
|
callback = onNeighborInfo;
|
||||||
|
schema = Mesh.NeighborInfoSchema;
|
||||||
|
break;
|
||||||
|
case Portnums.PortNum.TELEMETRY_APP:
|
||||||
|
callback = onTelemetry;
|
||||||
|
schema = Telemetry.TelemetrySchema;
|
||||||
|
break;
|
||||||
|
case Portnums.PortNum.TRACEROUTE_APP:
|
||||||
|
callback = onRouteDiscovery;
|
||||||
|
schema = Mesh.RouteDiscoverySchema;
|
||||||
|
break;
|
||||||
|
case Portnums.PortNum.MAP_REPORT_APP:
|
||||||
|
callback = onMapReport;
|
||||||
|
schema = Mqtt.MapReportSchema;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// handle unknown port nums here
|
||||||
|
if (logUnknownPortnums) {
|
||||||
|
const ignoredPortnums = [
|
||||||
|
Portnums.PortNum.UNKNOWN_APP,
|
||||||
|
Portnums.PortNum.TEXT_MESSAGE_COMPRESSED_APP,
|
||||||
|
Portnums.PortNum.ROUTING_APP,
|
||||||
|
Portnums.PortNum.PAXCOUNTER_APP,
|
||||||
|
Portnums.PortNum.STORE_FORWARD_APP,
|
||||||
|
Portnums.PortNum.RANGE_TEST_APP,
|
||||||
|
Portnums.PortNum.ATAK_PLUGIN,
|
||||||
|
Portnums.PortNum.ATAK_FORWARDER,
|
||||||
|
];
|
||||||
|
// ignore packets we don't want to see for now
|
||||||
|
if (portnum === undefined || ignoredPortnums.includes(portnum) || portnum > 511) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
console.log(portnum, envelope);
|
||||||
// check if we want to ignore direct messages
|
}
|
||||||
if(ignoreDirectMessages && envelope.packet.to !== 0xFFFFFFFF){
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (callback !== null) {
|
||||||
|
if (schema !== null) {
|
||||||
|
try {
|
||||||
|
payload = fromBinary(schema, payload);
|
||||||
|
} catch(e) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await callback(envelope, payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch(e) {
|
||||||
|
// ignore errors
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
async function onTextMessage(envelope, payload) {
|
||||||
if(logKnownPacketTypes) {
|
if(logKnownPacketTypes) {
|
||||||
console.log("TEXT_MESSAGE_APP", {
|
console.log('TEXT_MESSAGE_APP', {
|
||||||
to: envelope.packet.to.toString(16),
|
to: envelope.packet.to.toString(16),
|
||||||
from: envelope.packet.from.toString(16),
|
from: envelope.packet.from.toString(16),
|
||||||
text: payload.toString(),
|
text: payload.toString(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!collectorEnabled.textMessages) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if we want to ignore direct messages
|
||||||
|
if(!collectorEnabled.directMessages && envelope.packet.to !== 0xFFFFFFFF){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await prisma.textMessage.create({
|
await prisma.textMessage.create({
|
||||||
data: {
|
data: {
|
||||||
@ -803,32 +884,30 @@ client.on("message", async (topic, message) => {
|
|||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error(e);
|
console.error(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
else if(portnum === Portnums.PortNum.POSITION_APP) {
|
async function onPosition(envelope, payload) {
|
||||||
const position = fromBinary(Mesh.PositionSchema, payload);
|
|
||||||
|
|
||||||
if(logKnownPacketTypes){
|
if(logKnownPacketTypes){
|
||||||
console.log("POSITION_APP", {
|
console.log('POSITION_APP', {
|
||||||
from: envelope.packet.from.toString(16),
|
from: envelope.packet.from.toString(16),
|
||||||
position: position,
|
position: payload,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// process position
|
// process position
|
||||||
if(position.latitudeI != null && position.longitudeI){
|
if(payload.latitudeI != null && payload.longitudeI){
|
||||||
|
|
||||||
|
const bitfield = envelope.packet.payloadVariant.value.bitfield;
|
||||||
// if bitfield is not available, we are on firmware v2.4 or below
|
// if bitfield is not available, we are on firmware v2.4 or below
|
||||||
// if configured, position packets should have their precision reduced
|
// if configured, position packets should have their precision reduced
|
||||||
if (bitfield == null && oldFirmwarePositionPrecision != null) {
|
if (bitfield == null && oldFirmwarePositionPrecision != null) {
|
||||||
|
|
||||||
// adjust precision of latitude and longitude
|
// adjust precision of latitude and longitude
|
||||||
position.latitudeI = PositionUtil.setPositionPrecision(position.latitudeI, oldFirmwarePositionPrecision);
|
payload.latitudeI = PositionUtil.setPositionPrecision(payload.latitudeI, oldFirmwarePositionPrecision);
|
||||||
position.longitudeI = PositionUtil.setPositionPrecision(position.longitudeI, oldFirmwarePositionPrecision);
|
payload.longitudeI = PositionUtil.setPositionPrecision(payload.longitudeI, oldFirmwarePositionPrecision);
|
||||||
|
|
||||||
// update position precision on packet to show that it is no longer full precision
|
// update position precision on packet to show that it is no longer full precision
|
||||||
position.precisionBits = oldFirmwarePositionPrecision;
|
payload.precisionBits = oldFirmwarePositionPrecision;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -840,10 +919,10 @@ client.on("message", async (topic, message) => {
|
|||||||
},
|
},
|
||||||
data: {
|
data: {
|
||||||
position_updated_at: new Date(),
|
position_updated_at: new Date(),
|
||||||
latitude: position.latitudeI,
|
latitude: payload.latitudeI,
|
||||||
longitude: position.longitudeI,
|
longitude: payload.longitudeI,
|
||||||
altitude: position.altitude !== 0 ? position.altitude : null,
|
altitude: payload.altitude !== 0 ? payload.altitude : null,
|
||||||
position_precision: position.precisionBits,
|
position_precision: payload.precisionBits,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
@ -853,7 +932,7 @@ client.on("message", async (topic, message) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// don't collect position history if not enabled, but we still want to update the node above
|
// don't collect position history if not enabled, but we still want to update the node above
|
||||||
if(!collectPositions){
|
if (!collectorEnabled.positions) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -881,9 +960,9 @@ client.on("message", async (topic, message) => {
|
|||||||
packet_id: envelope.packet.id,
|
packet_id: envelope.packet.id,
|
||||||
channel_id: envelope.channelId,
|
channel_id: envelope.channelId,
|
||||||
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
|
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
|
||||||
latitude: position.latitudeI,
|
latitude: payload.latitudeI,
|
||||||
longitude: position.longitudeI,
|
longitude: payload.longitudeI,
|
||||||
altitude: position.altitude,
|
altitude: payload.altitude,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -891,17 +970,13 @@ client.on("message", async (topic, message) => {
|
|||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error(e);
|
console.error(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
else if(portnum === Portnums.PortNum.NODEINFO_APP) {
|
async function onNodeInfo(envelope, payload) {
|
||||||
|
|
||||||
const user = fromBinary(Mesh.UserSchema, payload);
|
|
||||||
|
|
||||||
if(logKnownPacketTypes) {
|
if(logKnownPacketTypes) {
|
||||||
console.log("NODEINFO_APP", {
|
console.log('NODEINFO_APP', {
|
||||||
from: envelope.packet.from.toString(16),
|
from: envelope.packet.from.toString(16),
|
||||||
user: user,
|
user: payload,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -913,55 +988,51 @@ client.on("message", async (topic, message) => {
|
|||||||
},
|
},
|
||||||
create: {
|
create: {
|
||||||
node_id: envelope.packet.from,
|
node_id: envelope.packet.from,
|
||||||
long_name: user.longName,
|
long_name: payload.longName,
|
||||||
short_name: user.shortName,
|
short_name: payload.shortName,
|
||||||
hardware_model: user.hwModel,
|
hardware_model: payload.hwModel,
|
||||||
is_licensed: user.isLicensed === true,
|
is_licensed: payload.isLicensed === true,
|
||||||
role: user.role,
|
role: payload.role,
|
||||||
},
|
},
|
||||||
update: {
|
update: {
|
||||||
long_name: user.longName,
|
long_name: payload.longName,
|
||||||
short_name: user.shortName,
|
short_name: payload.shortName,
|
||||||
hardware_model: user.hwModel,
|
hardware_model: payload.hwModel,
|
||||||
is_licensed: user.isLicensed === true,
|
is_licensed: payload.isLicensed === true,
|
||||||
role: user.role,
|
role: payload.role,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error(e);
|
console.error(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
else if(portnum === Portnums.PortNum.WAYPOINT_APP) {
|
async function onWaypoint(envelope, payload) {
|
||||||
|
|
||||||
if(!collectWaypoints){
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const waypoint = fromBinary(Mesh.WaypointSchema, payload);
|
|
||||||
|
|
||||||
if(logKnownPacketTypes) {
|
if(logKnownPacketTypes) {
|
||||||
console.log("WAYPOINT_APP", {
|
console.log('WAYPOINT_APP', {
|
||||||
to: envelope.packet.to.toString(16),
|
to: envelope.packet.to.toString(16),
|
||||||
from: envelope.packet.from.toString(16),
|
from: envelope.packet.from.toString(16),
|
||||||
waypoint: waypoint,
|
waypoint: payload,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!collectorEnabled.waypoints) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await prisma.waypoint.create({
|
await prisma.waypoint.create({
|
||||||
data: {
|
data: {
|
||||||
to: envelope.packet.to,
|
to: envelope.packet.to,
|
||||||
from: envelope.packet.from,
|
from: envelope.packet.from,
|
||||||
waypoint_id: waypoint.id,
|
waypoint_id: payload.id,
|
||||||
latitude: waypoint.latitudeI,
|
latitude: payload.latitudeI,
|
||||||
longitude: waypoint.longitudeI,
|
longitude: payload.longitudeI,
|
||||||
expire: waypoint.expire,
|
expire: payload.expire,
|
||||||
locked_to: waypoint.lockedTo,
|
locked_to: payload.lockedTo,
|
||||||
name: waypoint.name,
|
name: payload.name,
|
||||||
description: waypoint.description,
|
description: payload.description,
|
||||||
icon: waypoint.icon,
|
icon: payload.icon,
|
||||||
channel: envelope.packet.channel,
|
channel: envelope.packet.channel,
|
||||||
packet_id: envelope.packet.id,
|
packet_id: envelope.packet.id,
|
||||||
channel_id: envelope.channelId,
|
channel_id: envelope.channelId,
|
||||||
@ -971,17 +1042,103 @@ client.on("message", async (topic, message) => {
|
|||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error(e);
|
console.error(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
else if(portnum === Portnums.PortNum.NEIGHBORINFO_APP) {
|
async function onMapReport(envelope, payload) {
|
||||||
|
|
||||||
const neighbourInfo = fromBinary(Mesh.NeighborInfoSchema, payload);
|
|
||||||
|
|
||||||
if(logKnownPacketTypes) {
|
if(logKnownPacketTypes) {
|
||||||
console.log("NEIGHBORINFO_APP", {
|
console.log('MAP_REPORT_APP', {
|
||||||
from: envelope.packet.from.toString(16),
|
from: envelope.packet.from.toString(16),
|
||||||
neighbour_info: neighbourInfo,
|
map_report: payload,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// create or update node in db
|
||||||
|
try {
|
||||||
|
|
||||||
|
// data to set on node
|
||||||
|
const data = {
|
||||||
|
long_name: payload.longName,
|
||||||
|
short_name: payload.shortName,
|
||||||
|
hardware_model: payload.hwModel,
|
||||||
|
role: payload.role,
|
||||||
|
latitude: payload.latitudeI,
|
||||||
|
longitude: payload.longitudeI,
|
||||||
|
altitude: payload.altitude !== 0 ? payload.altitude : null,
|
||||||
|
firmware_version: payload.firmwareVersion,
|
||||||
|
region: payload.region,
|
||||||
|
modem_preset: payload.modemPreset,
|
||||||
|
has_default_channel: payload.hasDefaultChannel,
|
||||||
|
position_precision: payload.positionPrecision,
|
||||||
|
num_online_local_nodes: payload.numOnlineLocalNodes,
|
||||||
|
position_updated_at: new Date(),
|
||||||
|
};
|
||||||
|
|
||||||
|
await prisma.node.upsert({
|
||||||
|
where: {
|
||||||
|
node_id: envelope.packet.from,
|
||||||
|
},
|
||||||
|
create: {
|
||||||
|
node_id: envelope.packet.from,
|
||||||
|
...data,
|
||||||
|
},
|
||||||
|
update: data,
|
||||||
|
});
|
||||||
|
|
||||||
|
} catch (e) {
|
||||||
|
console.error(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// don't collect map report history if not enabled, but we still want to update the node above
|
||||||
|
if (!collectorEnabled.mapReports) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
// find an existing map with duplicate information created in the last 60 seconds
|
||||||
|
const existingDuplicateMapReport = await prisma.mapReport.findFirst({
|
||||||
|
where: {
|
||||||
|
node_id: envelope.packet.from,
|
||||||
|
long_name: payload.longName,
|
||||||
|
short_name: payload.shortName,
|
||||||
|
created_at: {
|
||||||
|
gte: new Date(Date.now() - 60000), // created in the last 60 seconds
|
||||||
|
},
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// create map report if no duplicates found
|
||||||
|
if(!existingDuplicateMapReport){
|
||||||
|
await prisma.mapReport.create({
|
||||||
|
data: {
|
||||||
|
node_id: envelope.packet.from,
|
||||||
|
long_name: payload.longName,
|
||||||
|
short_name: payload.shortName,
|
||||||
|
role: payload.role,
|
||||||
|
hardware_model: payload.hwModel,
|
||||||
|
firmware_version: payload.firmwareVersion,
|
||||||
|
region: payload.region,
|
||||||
|
modem_preset: payload.modemPreset,
|
||||||
|
has_default_channel: payload.hasDefaultChannel,
|
||||||
|
latitude: payload.latitudeI,
|
||||||
|
longitude: payload.longitudeI,
|
||||||
|
altitude: payload.altitude,
|
||||||
|
position_precision: payload.positionPrecision,
|
||||||
|
num_online_local_nodes: payload.numOnlineLocalNodes,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (e) {
|
||||||
|
console.error(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function onNeighborInfo(envelope, payload) {
|
||||||
|
if(logKnownPacketTypes) {
|
||||||
|
console.log('NEIGHBORINFO_APP', {
|
||||||
|
from: envelope.packet.from.toString(16),
|
||||||
|
neighbor_info: payload,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -993,8 +1150,8 @@ client.on("message", async (topic, message) => {
|
|||||||
},
|
},
|
||||||
data: {
|
data: {
|
||||||
neighbours_updated_at: new Date(),
|
neighbours_updated_at: new Date(),
|
||||||
neighbour_broadcast_interval_secs: neighbourInfo.nodeBroadcastIntervalSecs,
|
neighbour_broadcast_interval_secs: payload.nodeBroadcastIntervalSecs,
|
||||||
neighbours: neighbourInfo.neighbors.map((neighbour) => {
|
neighbours: payload.neighbors.map((neighbour) => {
|
||||||
return {
|
return {
|
||||||
node_id: neighbour.nodeId,
|
node_id: neighbour.nodeId,
|
||||||
snr: neighbour.snr,
|
snr: neighbour.snr,
|
||||||
@ -1007,7 +1164,7 @@ client.on("message", async (topic, message) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// don't store all neighbour infos, but we want to update the existing node above
|
// don't store all neighbour infos, but we want to update the existing node above
|
||||||
if(!collectNeighbourInfo){
|
if (!collectorEnabled.neighborInfo) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1016,8 +1173,8 @@ client.on("message", async (topic, message) => {
|
|||||||
await prisma.neighbourInfo.create({
|
await prisma.neighbourInfo.create({
|
||||||
data: {
|
data: {
|
||||||
node_id: envelope.packet.from,
|
node_id: envelope.packet.from,
|
||||||
node_broadcast_interval_secs: neighbourInfo.nodeBroadcastIntervalSecs,
|
node_broadcast_interval_secs: payload.nodeBroadcastIntervalSecs,
|
||||||
neighbours: neighbourInfo.neighbors.map((neighbour) => {
|
neighbours: payload.neighbors.map((neighbour) => {
|
||||||
return {
|
return {
|
||||||
node_id: neighbour.nodeId,
|
node_id: neighbour.nodeId,
|
||||||
snr: neighbour.snr,
|
snr: neighbour.snr,
|
||||||
@ -1028,17 +1185,45 @@ client.on("message", async (topic, message) => {
|
|||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error(e);
|
console.error(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
else if(portnum === Portnums.PortNum.TELEMETRY_APP) {
|
async function onRouteDiscovery(envelope, payload) {
|
||||||
|
if(logKnownPacketTypes) {
|
||||||
|
console.log('TRACEROUTE_APP', {
|
||||||
|
to: envelope.packet.to.toString(16),
|
||||||
|
from: envelope.packet.from.toString(16),
|
||||||
|
want_response: envelope.packet.payloadVariant.value.wantResponse,
|
||||||
|
route_discovery: payload,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
const telemetry = fromBinary(Telemetry.TelemetrySchema, payload);
|
try {
|
||||||
|
await prisma.traceRoute.create({
|
||||||
|
data: {
|
||||||
|
to: envelope.packet.to,
|
||||||
|
from: envelope.packet.from,
|
||||||
|
want_response: envelope.packet.payloadVariant.value.wantResponse,
|
||||||
|
route: payload.route,
|
||||||
|
snr_towards: payload.snrTowards,
|
||||||
|
route_back: payload.routeBack,
|
||||||
|
snr_back: payload.snrBack,
|
||||||
|
channel: envelope.packet.channel,
|
||||||
|
packet_id: envelope.packet.id,
|
||||||
|
channel_id: envelope.channelId,
|
||||||
|
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
} catch (e) {
|
||||||
|
console.error(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function onTelemetry(envelope, payload) {
|
||||||
|
|
||||||
if(logKnownPacketTypes) {
|
if(logKnownPacketTypes) {
|
||||||
console.log("TELEMETRY_APP", {
|
console.log('TELEMETRY_APP', {
|
||||||
from: envelope.packet.from.toString(16),
|
from: envelope.packet.from.toString(16),
|
||||||
telemetry: telemetry,
|
telemetry: payload,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1046,13 +1231,13 @@ client.on("message", async (topic, message) => {
|
|||||||
const data = {};
|
const data = {};
|
||||||
|
|
||||||
// handle device metrics
|
// handle device metrics
|
||||||
if(telemetry.deviceMetrics){
|
if(payload.deviceMetrics){
|
||||||
|
|
||||||
data.battery_level = telemetry.deviceMetrics.batteryLevel !== 0 ? telemetry.deviceMetrics.batteryLevel : null;
|
data.battery_level = payload.deviceMetrics.batteryLevel !== 0 ? payload.deviceMetrics.batteryLevel : null;
|
||||||
data.voltage = telemetry.deviceMetrics.voltage !== 0 ? telemetry.deviceMetrics.voltage : null;
|
data.voltage = payload.deviceMetrics.voltage !== 0 ? payload.deviceMetrics.voltage : null;
|
||||||
data.channel_utilization = telemetry.deviceMetrics.channelUtilization !== 0 ? telemetry.deviceMetrics.channelUtilization : null;
|
data.channel_utilization = payload.deviceMetrics.channelUtilization !== 0 ? payload.deviceMetrics.channelUtilization : null;
|
||||||
data.air_util_tx = telemetry.deviceMetrics.airUtilTx !== 0 ? telemetry.deviceMetrics.airUtilTx : null;
|
data.air_util_tx = payload.deviceMetrics.airUtilTx !== 0 ? payload.deviceMetrics.airUtilTx : null;
|
||||||
data.uptime_seconds = telemetry.deviceMetrics.uptimeSeconds !== 0 ? telemetry.deviceMetrics.uptimeSeconds : null;
|
data.uptime_seconds = payload.deviceMetrics.uptimeSeconds !== 0 ? payload.deviceMetrics.uptimeSeconds : null;
|
||||||
|
|
||||||
// create device metric
|
// create device metric
|
||||||
try {
|
try {
|
||||||
@ -1091,20 +1276,20 @@ client.on("message", async (topic, message) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handle environment metrics
|
// handle environment metrics
|
||||||
if(telemetry.environmentMetrics){
|
if(payload.environmentMetrics){
|
||||||
|
|
||||||
// get metric values
|
// get metric values
|
||||||
const temperature = telemetry.environmentMetrics.temperature !== 0 ? telemetry.environmentMetrics.temperature : null;
|
const temperature = payload.environmentMetrics.temperature !== 0 ? payload.environmentMetrics.temperature : null;
|
||||||
const relativeHumidity = telemetry.environmentMetrics.relativeHumidity !== 0 ? telemetry.environmentMetrics.relativeHumidity : null;
|
const relativeHumidity = payload.environmentMetrics.relativeHumidity !== 0 ? payload.environmentMetrics.relativeHumidity : null;
|
||||||
const barometricPressure = telemetry.environmentMetrics.barometricPressure !== 0 ? telemetry.environmentMetrics.barometricPressure : null;
|
const barometricPressure = payload.environmentMetrics.barometricPressure !== 0 ? payload.environmentMetrics.barometricPressure : null;
|
||||||
const gasResistance = telemetry.environmentMetrics.gasResistance !== 0 ? telemetry.environmentMetrics.gasResistance : null;
|
const gasResistance = payload.environmentMetrics.gasResistance !== 0 ? payload.environmentMetrics.gasResistance : null;
|
||||||
const voltage = telemetry.environmentMetrics.voltage !== 0 ? telemetry.environmentMetrics.voltage : null;
|
const voltage = payload.environmentMetrics.voltage !== 0 ? payload.environmentMetrics.voltage : null;
|
||||||
const current = telemetry.environmentMetrics.current !== 0 ? telemetry.environmentMetrics.current : null;
|
const current = payload.environmentMetrics.current !== 0 ? payload.environmentMetrics.current : null;
|
||||||
const iaq = telemetry.environmentMetrics.iaq !== 0 ? telemetry.environmentMetrics.iaq : null;
|
const iaq = payload.environmentMetrics.iaq !== 0 ? payload.environmentMetrics.iaq : null;
|
||||||
const windDirection = telemetry.environmentMetrics.windDirection;
|
const windDirection = payload.environmentMetrics.windDirection;
|
||||||
const windSpeed = telemetry.environmentMetrics.windSpeed;
|
const windSpeed = payload.environmentMetrics.windSpeed;
|
||||||
const windGust = telemetry.environmentMetrics.windGust;
|
const windGust = payload.environmentMetrics.windGust;
|
||||||
const windLull = telemetry.environmentMetrics.windLull;
|
const windLull = payload.environmentMetrics.windLull;
|
||||||
|
|
||||||
// set metrics to update on node table
|
// set metrics to update on node table
|
||||||
data.temperature = temperature;
|
data.temperature = temperature;
|
||||||
@ -1153,15 +1338,15 @@ client.on("message", async (topic, message) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handle power metrics
|
// handle power metrics
|
||||||
if(telemetry.powerMetrics){
|
if(payload.powerMetrics){
|
||||||
|
|
||||||
// get metric values
|
// get metric values
|
||||||
const ch1Voltage = telemetry.powerMetrics.ch1Voltage !== 0 ? telemetry.powerMetrics.ch1Voltage : null;
|
const ch1Voltage = payload.powerMetrics.ch1Voltage !== 0 ? payload.powerMetrics.ch1Voltage : null;
|
||||||
const ch1Current = telemetry.powerMetrics.ch1Current !== 0 ? telemetry.powerMetrics.ch1Current : null;
|
const ch1Current = payload.powerMetrics.ch1Current !== 0 ? payload.powerMetrics.ch1Current : null;
|
||||||
const ch2Voltage = telemetry.powerMetrics.ch2Voltage !== 0 ? telemetry.powerMetrics.ch2Voltage : null;
|
const ch2Voltage = payload.powerMetrics.ch2Voltage !== 0 ? payload.powerMetrics.ch2Voltage : null;
|
||||||
const ch2Current = telemetry.powerMetrics.ch2Current !== 0 ? telemetry.powerMetrics.ch2Current : null;
|
const ch2Current = payload.powerMetrics.ch2Current !== 0 ? payload.powerMetrics.ch2Current : null;
|
||||||
const ch3Voltage = telemetry.powerMetrics.ch3Voltage !== 0 ? telemetry.powerMetrics.ch3Voltage : null;
|
const ch3Voltage = payload.powerMetrics.ch3Voltage !== 0 ? payload.powerMetrics.ch3Voltage : null;
|
||||||
const ch3Current = telemetry.powerMetrics.ch3Current !== 0 ? telemetry.powerMetrics.ch3Current : null;
|
const ch3Current = payload.powerMetrics.ch3Current !== 0 ? payload.powerMetrics.ch3Current : null;
|
||||||
|
|
||||||
// create power metric
|
// create power metric
|
||||||
try {
|
try {
|
||||||
@ -1212,160 +1397,4 @@ client.on("message", async (topic, message) => {
|
|||||||
console.error(e);
|
console.error(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
else if(portnum === Portnums.PortNum.TRACEROUTE_APP) {
|
|
||||||
|
|
||||||
const routeDiscovery = fromBinary(Mesh.RouteDiscoverySchema, payload);
|
|
||||||
|
|
||||||
if(logKnownPacketTypes) {
|
|
||||||
console.log("TRACEROUTE_APP", {
|
|
||||||
to: envelope.packet.to.toString(16),
|
|
||||||
from: envelope.packet.from.toString(16),
|
|
||||||
want_response: payload.wantResponse,
|
|
||||||
route_discovery: routeDiscovery,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
await prisma.traceRoute.create({
|
|
||||||
data: {
|
|
||||||
to: envelope.packet.to,
|
|
||||||
from: envelope.packet.from,
|
|
||||||
want_response: envelope.packet.decoded.wantResponse,
|
|
||||||
route: routeDiscovery.route,
|
|
||||||
snr_towards: routeDiscovery.snrTowards,
|
|
||||||
route_back: routeDiscovery.routeBack,
|
|
||||||
snr_back: routeDiscovery.snrBack,
|
|
||||||
channel: envelope.packet.channel,
|
|
||||||
packet_id: envelope.packet.id,
|
|
||||||
channel_id: envelope.channelId,
|
|
||||||
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
} catch (e) {
|
|
||||||
console.error(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
else if(portnum === Portnums.PortNum.MAP_REPORT_APP) {
|
|
||||||
|
|
||||||
const mapReport = fromBinary(Mqtt.MapReportSchema, payload);
|
|
||||||
|
|
||||||
if(logKnownPacketTypes) {
|
|
||||||
console.log("MAP_REPORT_APP", {
|
|
||||||
from: envelope.packet.from.toString(16),
|
|
||||||
map_report: mapReport,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// create or update node in db
|
|
||||||
try {
|
|
||||||
|
|
||||||
// data to set on node
|
|
||||||
const data = {
|
|
||||||
long_name: mapReport.longName,
|
|
||||||
short_name: mapReport.shortName,
|
|
||||||
hardware_model: mapReport.hwModel,
|
|
||||||
role: mapReport.role,
|
|
||||||
latitude: mapReport.latitudeI,
|
|
||||||
longitude: mapReport.longitudeI,
|
|
||||||
altitude: mapReport.altitude !== 0 ? mapReport.altitude : null,
|
|
||||||
firmware_version: mapReport.firmwareVersion,
|
|
||||||
region: mapReport.region,
|
|
||||||
modem_preset: mapReport.modemPreset,
|
|
||||||
has_default_channel: mapReport.hasDefaultChannel,
|
|
||||||
position_precision: mapReport.positionPrecision,
|
|
||||||
num_online_local_nodes: mapReport.numOnlineLocalNodes,
|
|
||||||
position_updated_at: new Date(),
|
|
||||||
};
|
|
||||||
|
|
||||||
await prisma.node.upsert({
|
|
||||||
where: {
|
|
||||||
node_id: envelope.packet.from,
|
|
||||||
},
|
|
||||||
create: {
|
|
||||||
node_id: envelope.packet.from,
|
|
||||||
...data,
|
|
||||||
},
|
|
||||||
update: data,
|
|
||||||
});
|
|
||||||
|
|
||||||
} catch (e) {
|
|
||||||
console.error(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// don't collect map report history if not enabled, but we still want to update the node above
|
|
||||||
if(!collectMapReports){
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
|
|
||||||
// find an existing map with duplicate information created in the last 60 seconds
|
|
||||||
const existingDuplicateMapReport = await prisma.mapReport.findFirst({
|
|
||||||
where: {
|
|
||||||
node_id: envelope.packet.from,
|
|
||||||
long_name: mapReport.longName,
|
|
||||||
short_name: mapReport.shortName,
|
|
||||||
created_at: {
|
|
||||||
gte: new Date(Date.now() - 60000), // created in the last 60 seconds
|
|
||||||
},
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// create map report if no duplicates found
|
|
||||||
if(!existingDuplicateMapReport){
|
|
||||||
await prisma.mapReport.create({
|
|
||||||
data: {
|
|
||||||
node_id: envelope.packet.from,
|
|
||||||
long_name: mapReport.longName,
|
|
||||||
short_name: mapReport.shortName,
|
|
||||||
role: mapReport.role,
|
|
||||||
hardware_model: mapReport.hwModel,
|
|
||||||
firmware_version: mapReport.firmwareVersion,
|
|
||||||
region: mapReport.region,
|
|
||||||
modem_preset: mapReport.modemPreset,
|
|
||||||
has_default_channel: mapReport.hasDefaultChannel,
|
|
||||||
latitude: mapReport.latitudeI,
|
|
||||||
longitude: mapReport.longitudeI,
|
|
||||||
altitude: mapReport.altitude,
|
|
||||||
position_precision: mapReport.positionPrecision,
|
|
||||||
num_online_local_nodes: mapReport.numOnlineLocalNodes,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (e) {
|
|
||||||
console.error(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
else {
|
|
||||||
if (logUnknownPortnums) {
|
|
||||||
const ignoredPortnums = [
|
|
||||||
Portnums.PortNum.UNKNOWN_APP,
|
|
||||||
Portnums.PortNum.TEXT_MESSAGE_COMPRESSED_APP,
|
|
||||||
Portnums.PortNum.ROUTING_APP,
|
|
||||||
Portnums.PortNum.PAXCOUNTER_APP,
|
|
||||||
Portnums.PortNum.STORE_FORWARD_APP,
|
|
||||||
Portnums.PortNum.RANGE_TEST_APP,
|
|
||||||
Portnums.PortNum.ATAK_PLUGIN,
|
|
||||||
Portnums.PortNum.ATAK_FORWARDER,
|
|
||||||
];
|
|
||||||
// ignore packets we don't want to see for now
|
|
||||||
if (portnum === undefined || ignoredPortnums.includes(portnum) || portnum > 511) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
console.log(portnum, envelope);
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch(e) {
|
|
||||||
// ignore errors
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
Reference in New Issue
Block a user