start work on lora ingest app
All checks were successful
Build Docker containers / Build (push) Successful in 16s
All checks were successful
Build Docker containers / Build (push) Successful in 16s
This commit is contained in:
60
lora/LoraStream.js
Normal file
60
lora/LoraStream.js
Normal file
@ -0,0 +1,60 @@
|
||||
import { Transform } from 'stream';
|
||||
|
||||
export class LoraStream extends Transform {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
this.byteBuffer = new Uint8Array([]);
|
||||
this.textDecoder = new TextDecoder();
|
||||
}
|
||||
|
||||
_transform(chunk, encoding, callback) {
|
||||
this.byteBuffer = new Uint8Array([
|
||||
...this.byteBuffer,
|
||||
...chunk,
|
||||
]);
|
||||
let processingExhausted = false;
|
||||
while (this.byteBuffer.length !== 0 && !processingExhausted) {
|
||||
const framingIndex = this.byteBuffer.findIndex((byte) => byte === 0x94);
|
||||
const framingByte2 = this.byteBuffer[framingIndex + 1];
|
||||
if (framingByte2 === 0xc3) {
|
||||
if (this.byteBuffer.subarray(0, framingIndex).length) {
|
||||
this.byteBuffer = this.byteBuffer.subarray(framingIndex);
|
||||
}
|
||||
const msb = this.byteBuffer[2];
|
||||
const lsb = this.byteBuffer[3];
|
||||
if (msb !== undefined && lsb !== undefined && this.byteBuffer.length >= 4 + (msb << 8) + lsb) {
|
||||
const packet = this.byteBuffer.subarray(4, 4 + (msb << 8) + lsb);
|
||||
|
||||
const malformedDetectorIndex = packet.findIndex(
|
||||
(byte) => byte === 0x94,
|
||||
);
|
||||
if (malformedDetectorIndex !== -1 && packet[malformedDetectorIndex + 1] === 0xc3) {
|
||||
// malformed
|
||||
this.byteBuffer = this.byteBuffer.subarray(malformedDetectorIndex);
|
||||
} else {
|
||||
this.byteBuffer = this.byteBuffer.subarray(3 + (msb << 8) + lsb + 1);
|
||||
this.push(packet);
|
||||
}
|
||||
} else {
|
||||
/** Only partioal message in buffer, wait for the rest */
|
||||
processingExhausted = true;
|
||||
}
|
||||
} else {
|
||||
/** Message not complete, only 1 byte in buffer */
|
||||
processingExhausted = true;
|
||||
}
|
||||
}
|
||||
|
||||
callback();
|
||||
}
|
||||
_flush(callback) {
|
||||
try {
|
||||
if (this._buffer) {
|
||||
this.push(this._buffer.trim());
|
||||
}
|
||||
callback();
|
||||
} catch (err) {
|
||||
callback(err);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user