Skip to content

Commit

Permalink
feat: validation and coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent 9ea6999 commit bec8862
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 152 deletions.
28 changes: 18 additions & 10 deletions src/channel.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const pushable = require('pull-pushable')
const defaults = require('lodash.defaults')

const consts = require('./consts')
const EE = require('events')
Expand All @@ -11,13 +12,16 @@ const log = debug('pull-plex:chan')
log.err = debug('pull-plex:chan:err')

class Channel extends EE {
constructor (id, name, plex, initiator, open) {
constructor (opts) {
super()
this._id = id
this._name = name
this._plex = plex
this._open = open
this._initiator = initiator

opts = defaults({}, opts, { initiator: false })

this._id = opts.id
this._name = opts.name
this._plex = opts.plex
this._open = opts.open
this._initiator = opts.initiator
this._endedRemote = false // remote stream ended
this._endedLocal = false // local stream ended
this._reset = false
Expand All @@ -41,9 +45,7 @@ class Channel extends EE {
if (err && typeof err !== 'boolean') {
setImmediate(() => this.emit('error', err))
}
if (this._reset) { return } // don't try closing the channel on reset

// this.endChan()
// this.endChan() // TODO: do not uncoment this, it will end the channel too early
})

this._source = this._msgs
Expand Down Expand Up @@ -99,6 +101,10 @@ class Channel extends EE {
return this._name
}

get destroyed () {
return this._endedRemote && this._endedLocal
}

push (data) {
this._log('push', data)
this._msgs.push(data)
Expand All @@ -124,6 +130,8 @@ class Channel extends EE {
openChan () {
this._log('openChan')

if (this.open) { return } // chan already open

let name
if (this._name && !Buffer.isBuffer(this._name)) {
name = Buffer.from(this._name)
Expand All @@ -133,7 +141,7 @@ class Channel extends EE {
this._plex.push([
this._id,
consts.type.NEW,
name != this._id.toString() ? name : null
name !== this._id.toString() ? name : null
])
}

Expand Down
103 changes: 71 additions & 32 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

const pull = require('pull-stream')
const pushable = require('pull-pushable')
const through = require('pull-through')

const defautls = require('lodash.defaults')

const EE = require('events')

Expand All @@ -14,18 +17,31 @@ const debug = require('debug')
const log = debug('pull-plex')
log.err = debug('pull-plex:err')

const MAX_MSG_SIZE = 1024 * 1024 // 1mb

class Plex extends EE {
constructor (initiator, onChan) {
constructor (opts) {
super()

if (typeof initiator === 'function') {
onChan = initiator
initiator = true
if (typeof opts === 'boolean') {
opts = { initiator: opts }
}

this._initiator = !!initiator
opts = defautls({}, opts, {
initiator: true,
onChan: null,
maxChannels: 10000,
maxMsgSize: MAX_MSG_SIZE,
lazy: false
})

this._maxChannels = opts.maxChannels
this._maxMsgSize = opts.maxMsgSize
this._lazy = opts.lazy

this._initiator = !!opts.initiator
this._chanId = this._initiator ? 1 : 0
this._channels = {}
this._channels = new Map()
this._endedRemote = false // remote stream ended
this._endedLocal = false // local stream ended

Expand All @@ -45,16 +61,24 @@ class Plex extends EE {
this.close(err)
})

if (onChan) {
this.on('stream', (chan) => onChan(chan, chan.id))
if (opts.onChan) {
this.on('stream', (chan) => opts.onChan(chan, chan.id))
}

this.source = pull(
this._chandata,
coder.encode()
)

const self = this
this.sink = pull(
through(function (data) {
if (Buffer.byteLength(data) > self._maxMsgSize) {
setImmediate(() => self.emit('error', new Error('message too large!')))
return this.queue(null)
}
this.queue(data)
}),
coder.decode(),
(read) => {
const next = (end, data) => {
Expand Down Expand Up @@ -86,12 +110,9 @@ class Plex extends EE {
this._endedLocal = true

// propagate close to channels
Object
.keys(this._channels)
.forEach((id) => {
const chan = this._channels[id]
if (chan) { return chan.close(err) }
})
for (let chan of this._channels.values()) {
chan.close(err)
}

this.emit('close')
}
Expand All @@ -101,13 +122,18 @@ class Plex extends EE {
}

reset (err) {
err = err || 'Underlying stream has been closed'
err = err || new Error('Underlying stream has been closed')
this._chandata.end(err)
this.close(err)
}

push (data) {
this._log('push', data)
if (data.data
&& Buffer.byteLength(data.data) > this._maxMsgSize) {
this._chandata.end(new Error('message too large!'))
}

this._chandata.push(data)
log('buffer', this._chandata.buffer)
}
Expand All @@ -119,14 +145,20 @@ class Plex extends EE {
}

createStream (name) {
if (typeof name === 'number') {
name = name.toString()
}
return this._newStream(null, this._initiator, false, name)
if (typeof name === 'number') { name = name.toString() }
const chan = this._newStream(null, this._initiator, false, name)
if (!this._lazy) { chan.openChan() }
return chan
}

_newStream (id, initiator, open, name) {
this._log('_newStream', Array.prototype.slice.call(arguments))

if (this._channels.size >= this._maxChannels) {
this.emit('error', new Error('max channels exceeded'))
return
}

if (typeof initiator === 'string') {
name = initiator
initiator = false
Expand All @@ -139,28 +171,32 @@ class Plex extends EE {
}

id = typeof id === 'number' ? id : this._nextChanId(initiator)
const chan = new Channel(id,
const chan = new Channel({
id,
name,
this,
plex: this,
initiator,
open || false)
open: open || false
})

chan.once('close', () => {
const chan = this._channels.get(id)
this._log('deleting channel', JSON.stringify({
channel: this._name,
id: id,
endedLocal: this._channels[id]._endedLocal,
endedRemote: this._channels[id]._endedRemote,
initiator: this._channels[id]._initiator
endedLocal: chan._endedLocal,
endedRemote: chan._endedRemote,
initiator: chan._initiator
}))
delete this._channels[id]
this._channels.delete(id)
})

if (this._channels[id]) {
return this.emit('error', `channel with id ${id} already exist!`)
if (this._channels.has(id)) {
this.emit('error', new Error(`channel with id ${id} already exist!`))
return
}

this._channels[id] = chan
this._channels.set(id, chan)
return chan
}

Expand All @@ -176,7 +212,7 @@ class Plex extends EE {

case consts.type.OUT_MESSAGE:
case consts.type.IN_MESSAGE: {
const chan = this._channels[id]
const chan = this._channels.get(id)
if (chan) {
chan.push(data)
}
Expand All @@ -185,7 +221,7 @@ class Plex extends EE {

case consts.type.OUT_CLOSE:
case consts.type.IN_CLOSE: {
const chan = this._channels[id]
const chan = this._channels.get(id)
if (chan) {
chan.close()
}
Expand All @@ -194,12 +230,15 @@ class Plex extends EE {

case consts.type.OUT_RESET:
case consts.type.IN_RESET: {
const chan = this._channels[id]
const chan = this._channels.get(id)
if (chan) {
chan.reset()
}
return
}

default:
this.emit('error', new Error('Invalid message type'))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions test/channel.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,14 @@ describe('channel', () => {
plex2.on('stream', (stream) => {
pull(
stream,
pull.onEnd((err) => {
pull.collect((err, data) => {
expect(err).to.exist()
expect(data[0].toString()).to.eql('hello there!')
done()
})
)

sndrSrc.push('hello there!') // should be able to write to closed chan
sndrSrc.push(Buffer.from('hello there!'))
aborter.abort(new Error('nasty error!'))
})

Expand Down
2 changes: 1 addition & 1 deletion test/coder.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,4 @@ describe('coder', () => {
})
)
})
})
})
1 change: 1 addition & 0 deletions test/node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
require('./old-mplex-interop')
Loading

0 comments on commit bec8862

Please sign in to comment.