Skip to content

Commit

Permalink
sdk/js: do not allow for backtracking upon backpressure in monte socket
Browse files Browse the repository at this point in the history
  • Loading branch information
lithdew committed Jun 12, 2020
1 parent 4caf26c commit c7ea1cc
Showing 1 changed file with 28 additions and 27 deletions.
55 changes: 28 additions & 27 deletions ts/monte.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class MonteSocket extends Duplex {
this.counter = 0;

this._paused = false;
this._state = 0;

this.conn.on('close', had_error => this.emit('close', had_error));
this.conn.on('connect', () => this.emit('connect'));
Expand Down Expand Up @@ -196,37 +197,37 @@ class MonteSocket extends Duplex {

_readable() {
while (!this._paused) {
const header = this.conn.read(4);
if (!header) return;

const length = header.readUInt32BE();

let frame = this.conn.read(length);
if (!frame) {
console.log(`Trying to read ${length} byte(s), only ${this.conn.readableLength} byte(s) available.`)
this.conn.unshift(header);
return;
}

frame = this.decrypt(frame);
if (frame.byteLength <= 4) {
this.conn.destroy(new Error(`Packet is too small: no sequence number field found in packet.`));
return;
}
if (this._state === 0) {
const header = this.conn.read(4);
if (!header) return;

this._length = header.readUInt32BE();
this._state = 1;
} else if (this._state === 1) {
let frame = this.conn.read(this._length);
if (!frame) return;

frame = this.decrypt(frame);
if (frame.byteLength <= 4) {
this.conn.destroy(new Error(`Packet is too small: no sequence number field found in packet.`));
return;
}

const seq = frame.readUInt32BE();
frame = frame.slice(4);
const seq = frame.readUInt32BE();
frame = frame.slice(4);

if (seq === 0 || this.pending.listenerCount(`${seq}`) === 0) {
try {
if (!this.push({seq, frame})) {
this._paused = true;
if (seq === 0 || this.pending.listenerCount(`${seq}`) === 0) {
try {
if (!this.push({seq, frame})) {
this._paused = true;
}
} catch (err) {
this.emit("error", err);
}
} catch (err) {
this.emit("error", err);
} else {
this.pending.emit(`${seq}`, frame);
}
} else {
this.pending.emit(`${seq}`, frame);
this._state = 0;
}
}
}
Expand Down

0 comments on commit c7ea1cc

Please sign in to comment.