From c7ea1cc2acd93c8e56eee463b52e1be826c5d100 Mon Sep 17 00:00:00 2001 From: Kenta Iwasaki Date: Sat, 13 Jun 2020 04:23:02 +0900 Subject: [PATCH] sdk/js: do not allow for backtracking upon backpressure in monte socket --- ts/monte.js | 55 +++++++++++++++++++++++++++-------------------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/ts/monte.js b/ts/monte.js index aace08b..4ea4623 100644 --- a/ts/monte.js +++ b/ts/monte.js @@ -56,6 +56,7 @@ class MonteSocket extends Duplex { this.counter = 0; this._paused = false; + this._state = 0; this.conn.on('close', had_error => this.emit('close', had_error)); this.conn.on('connect', () => this.emit('connect')); @@ -196,37 +197,37 @@ class MonteSocket extends Duplex { _readable() { while (!this._paused) { - const header = this.conn.read(4); - if (!header) return; - - const length = header.readUInt32BE(); - - let frame = this.conn.read(length); - if (!frame) { - console.log(`Trying to read ${length} byte(s), only ${this.conn.readableLength} byte(s) available.`) - this.conn.unshift(header); - return; - } - - frame = this.decrypt(frame); - if (frame.byteLength <= 4) { - this.conn.destroy(new Error(`Packet is too small: no sequence number field found in packet.`)); - return; - } + if (this._state === 0) { + const header = this.conn.read(4); + if (!header) return; + + this._length = header.readUInt32BE(); + this._state = 1; + } else if (this._state === 1) { + let frame = this.conn.read(this._length); + if (!frame) return; + + frame = this.decrypt(frame); + if (frame.byteLength <= 4) { + this.conn.destroy(new Error(`Packet is too small: no sequence number field found in packet.`)); + return; + } - const seq = frame.readUInt32BE(); - frame = frame.slice(4); + const seq = frame.readUInt32BE(); + frame = frame.slice(4); - if (seq === 0 || this.pending.listenerCount(`${seq}`) === 0) { - try { - if (!this.push({seq, frame})) { - this._paused = true; + if (seq === 0 || this.pending.listenerCount(`${seq}`) === 0) { + try { + if (!this.push({seq, frame})) { + this._paused = true; + } + } catch (err) { + this.emit("error", err); } - } catch (err) { - this.emit("error", err); + } else { + this.pending.emit(`${seq}`, frame); } - } else { - this.pending.emit(`${seq}`, frame); + this._state = 0; } } }