Skip to content

Commit

Permalink
feat: correct channel closing
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent 1843747 commit e7409ec
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 35 deletions.
14 changes: 9 additions & 5 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ class Channel extends EE {
this._plex = plex
this._open = open
this._initiator = initiator
this._msgs = pushable((err) => {
this._endedLocal = err || true
this.emit('end', err)
})
this._cb = null // queue cb for async data
this._endedRemote = false // remote stream ended
this._endedLocal = false // local stream ended

Expand All @@ -41,6 +36,11 @@ class Channel extends EE {

this._log('new channel', this._name)

this._msgs = pushable((err) => {
if (err) { log.err(err) }
setImmediate(() => this.emit('end', err))
})

this.source = this._msgs

this.sink = (read) => {
Expand Down Expand Up @@ -73,6 +73,10 @@ class Channel extends EE {
}
}

get id () {
return this._id
}

get open () {
return this._open
}
Expand Down
33 changes: 18 additions & 15 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,14 @@ class Mplex extends EE {
this._initiator = initiator || false
this._chanId = this._initiator ? 0 : 1
this._channels = {}
this._chandata = pushable()
this._cb = null
this._chandata = pushable((err) => {
setImmediate(() => this.emit('close'))
})

this.source = this._chandata

this.sink = (read) => {
const next = (end, data) => {
// if (end) {
// // propagate close to channels
// Object
// .keys(this._channels)
// .forEach((id) => {
// this._channels[id].end(end)
// delete this._channels[id]
// })
// }

if (end === true) { return }
if (end) { return this.emit('error', end) }
return this._handle(data, (err) => {
Expand All @@ -42,6 +33,16 @@ class Mplex extends EE {
}
}

end () {
// propagate close to channels
Object
.keys(this._channels)
.forEach((id) => {
this._channels[id].end(end)
delete this._channels[id]
})
}

push (data) {
this._chandata.push(data)
// this._drain()
Expand Down Expand Up @@ -76,6 +77,11 @@ class Mplex extends EE {
this,
initiator,
open || false)

chan.once('end', () => {
delete this._channels[id]
})

this._channels[id] = chan
return chan
}
Expand All @@ -94,9 +100,6 @@ class Mplex extends EE {
}

const chan = this._newStream(id, this._initiator, true, data.toString())
chan.once('end', () => {
delete this._channels[id]
})
setImmediate(() => this.emit('stream', chan))
return cb()
}
Expand Down
1 change: 1 addition & 0 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ exports.decodeMsg = (msg, cb) => {
through(function (h) {
const header = varint.decode(h)
this.queue({ id: header >> 3, type: header & 7 })
this.queue(null)
})
),
pull(
Expand Down
28 changes: 13 additions & 15 deletions test/channel.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,14 @@ describe('channel', () => {
it('receiver should be able to send data', (done) => {
const p = pair()

const plex1 = new Mplex()
const plex2 = new Mplex()
const plex1 = new Mplex(true)
const plex2 = new Mplex(false)

pull(plex1, p[0], plex1)
pull(plex2, p[1], plex2)

const id = plex1.nextChanId(true)
const chan1 = plex1._newStream(id, true, true, 'stream 1')

const chan2 = plex2._newStream(id, false, true, 'stream 2')

pull(
Expand All @@ -142,8 +141,6 @@ describe('channel', () => {
stream,
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(stream._endedRemote).to.be.ok()
expect(stream._endedLocal).to.be.ok()
expect(data[0]).to.deep.eql(Buffer.from('hellooooooooooooo'))
done()
})
Expand Down Expand Up @@ -171,7 +168,7 @@ describe('channel', () => {
})
})

it('closing channel should allow reading but not writing', (done) => {
it('should echo', (done) => {
const p = pair()

const plex1 = new Mplex(true)
Expand All @@ -182,23 +179,24 @@ describe('channel', () => {

const chan1 = plex1.newStream('stream 1')

plex2.once('stream', (stream) => {
pull(
stream,
stream
)
})

pull(
pull.values([Buffer.from('hello')]),
chan1,
pull.through(d => console.dir(d.toString())),
pull.through((data) => {
console.dir(data)
}),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0]).to.deep.eql(Buffer.from('hello'))
done()
})
)

plex2.on('stream', (stream) => {
pull(
stream,
pull.through(d => console.dir(d.toString())),
stream
)
})
})
})

0 comments on commit e7409ec

Please sign in to comment.