Skip to content

Commit

Permalink
fix: apply message size limit before decoding message (libp2p#231)
Browse files Browse the repository at this point in the history
* fix: apply message size limit before decoding message

If we apply the message size limit after decoding the message it's
too late as we've already processed the bad message.

Instead, if the buffer full of unprocessed messages grows to be
large than the max message size (e.g. we have not recieved a complete
message under the size limit), throw an error which will cause the
stream to be reset.

* fix: add implementation
  • Loading branch information
achingbrain authored Nov 24, 2022
1 parent 9b120c9 commit 279ad47
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 70 deletions.
25 changes: 18 additions & 7 deletions src/decode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { Uint8ArrayList } from 'uint8arraylist'
import type { Source } from 'it-stream-types'
import type { Message } from './message-types.js'

export const MAX_MSG_SIZE = 1 << 20 // 1MB

interface MessageHeader {
id: number
type: keyof typeof MessageTypeNames
Expand All @@ -13,10 +15,12 @@ interface MessageHeader {
class Decoder {
private readonly _buffer: Uint8ArrayList
private _headerInfo: MessageHeader | null
private readonly _maxMessageSize: number

constructor () {
constructor (maxMessageSize: number = MAX_MSG_SIZE) {
this._buffer = new Uint8ArrayList()
this._headerInfo = null
this._maxMessageSize = maxMessageSize
}

write (chunk: Uint8Array) {
Expand All @@ -25,6 +29,11 @@ class Decoder {
}

this._buffer.append(chunk)

if (this._buffer.byteLength > this._maxMessageSize) {
throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' })
}

const msgs: Message[] = []

while (this._buffer.length !== 0) {
Expand Down Expand Up @@ -119,14 +128,16 @@ function readVarInt (buf: Uint8ArrayList, offset: number = 0) {
/**
* Decode a chunk and yield an _array_ of decoded messages
*/
export async function * decode (source: Source<Uint8Array>) {
const decoder = new Decoder()
export function decode (maxMessageSize: number = MAX_MSG_SIZE) {
return async function * decodeMessages (source: Source<Uint8Array>): Source<Message> {
const decoder = new Decoder(maxMessageSize)

for await (const chunk of source) {
const msgs = decoder.write(chunk)
for await (const chunk of source) {
const msgs = decoder.write(chunk)

if (msgs.length > 0) {
yield msgs
if (msgs.length > 0) {
yield * msgs
}
}
}
}
4 changes: 1 addition & 3 deletions src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { pushableV } from 'it-pushable'
import { abortableSource } from 'abortable-iterator'
import { encode } from './encode.js'
import { decode } from './decode.js'
import { restrictSize } from './restrict-size.js'
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
import { createStream } from './stream.js'
import { toString as uint8ArrayToString } from 'uint8arrays'
Expand Down Expand Up @@ -204,8 +203,7 @@ export class MplexStreamMuxer implements StreamMuxer {
try {
await pipe(
source,
decode,
restrictSize(this._init.maxMsgSize),
decode(this._init.maxMsgSize),
async source => {
for await (const msg of source) {
await this._handleIncoming(msg)
Expand Down
36 changes: 0 additions & 36 deletions src/restrict-size.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { abortableSource } from 'abortable-iterator'
import { pushable } from 'it-pushable'
import errCode from 'err-code'
import { MAX_MSG_SIZE } from './restrict-size.js'
import { MAX_MSG_SIZE } from './decode.js'
import { anySignal } from 'any-signal'
import { InitiatorMessageTypes, ReceiverMessageTypes } from './message-types.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
Expand Down
15 changes: 6 additions & 9 deletions test/coder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ describe('coder', () => {

it('should decode header', async () => {
const source = [uint8ArrayFromString('8801023137', 'base16')]
for await (const msgs of decode(source)) {
expect(msgs.length).to.equal(1)

expect(messageWithBytes(msgs[0])).to.be.deep.equal({ id: 17, type: 0, data: uint8ArrayFromString('17') })
for await (const msg of decode()(source)) {
expect(messageWithBytes(msg)).to.be.deep.equal({ id: 17, type: 0, data: uint8ArrayFromString('17') })
}
})

Expand Down Expand Up @@ -67,8 +65,8 @@ describe('coder', () => {
const source = [uint8ArrayFromString('88010231379801023139a801023231', 'base16')]

const res = []
for await (const msgs of decode(source)) {
res.push(...msgs)
for await (const msg of decode()(source)) {
res.push(msg)
}

expect(res.map(messageWithBytes)).to.deep.equal([
Expand All @@ -89,9 +87,8 @@ describe('coder', () => {
it('should decode zero length body msg', async () => {
const source = [uint8ArrayFromString('880100', 'base16')]

for await (const msgs of decode(source)) {
expect(msgs.length).to.equal(1)
expect(messageWithBytes(msgs[0])).to.be.eql({ id: 17, type: 0, data: new Uint8Array(0) })
for await (const msg of decode()(source)) {
expect(messageWithBytes(msg)).to.be.eql({ id: 17, type: 0, data: new Uint8Array(0) })
}
})
})
16 changes: 8 additions & 8 deletions test/mplex.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ describe('mplex', () => {

await muxer.sink(stream)

const messages = await all(decode(bufs))
const messages = await all(decode()(bufs))

expect(messages).to.have.nested.property('[0][0].id', 11, 'Did not specify the correct stream id')
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER, 'Did not reset the stream that tipped us over the inbound stream limit')
expect(messages).to.have.nested.property('[0].id', 11, 'Did not specify the correct stream id')
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER, 'Did not reset the stream that tipped us over the inbound stream limit')
})

it('should reset a stream that fills the message buffer', async () => {
Expand All @@ -103,7 +103,7 @@ describe('mplex', () => {
const dataMessage: MessageInitiatorMessage = {
id,
type: MessageTypes.MESSAGE_INITIATOR,
data: new Uint8ArrayList(new Uint8Array(1024 * 1024))
data: new Uint8ArrayList(new Uint8Array(1024 * 1000))
}
yield dataMessage

Expand Down Expand Up @@ -144,9 +144,9 @@ describe('mplex', () => {

// collect outgoing mplex messages
const muxerFinished = pDefer()
let messages: Message[][] = []
let messages: Message[] = []
void Promise.resolve().then(async () => {
messages = await all(decode(muxer.source))
messages = await all(decode()(muxer.source))
muxerFinished.resolve()
})

Expand All @@ -159,7 +159,7 @@ describe('mplex', () => {

// should have sent reset message to peer for this stream
await muxerFinished.promise
expect(messages).to.have.nested.property('[0][0].id', id)
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER)
expect(messages).to.have.nested.property('[0].id', id)
expect(messages).to.have.nested.property('[0].type', MessageTypes.RESET_RECEIVER)
})
})
15 changes: 9 additions & 6 deletions test/restrict-size.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,28 @@ import all from 'it-all'
import drain from 'it-drain'
import each from 'it-foreach'
import { Message, MessageTypes } from '../src/message-types.js'
import { restrictSize } from '../src/restrict-size.js'
import { encode } from '../src/encode.js'
import { decode } from '../src/decode.js'
import { Uint8ArrayList } from 'uint8arraylist'

describe('restrict-size', () => {
describe('restrict size', () => {
it('should throw when size is too big', async () => {
const maxSize = 32

const input: Message[] = [
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(8)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxSize)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) }
]

const output: Message[] = []

try {
await pipe(
input,
restrictSize(maxSize),
encode,
decode(maxSize),
(source) => each(source, chunk => {
output.push(chunk)
}),
Expand All @@ -51,7 +53,8 @@ describe('restrict-size', () => {

const output = await pipe(
input,
restrictSize(32),
encode,
decode(32),
async (source) => await all(source)
)
expect(output).to.deep.equal(input)
Expand Down

0 comments on commit 279ad47

Please sign in to comment.