Skip to content

Commit

Permalink
feat: use two lists to avoid clashes
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent 7f81430 commit a0c3e1c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 32 deletions.
2 changes: 1 addition & 1 deletion profile.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ function marker (n, done) {
}


spawn(1000, 1000, (err) => {
spawn(1000, 10000, (err) => {
if (err) {
throw err
}
Expand Down
12 changes: 6 additions & 6 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ class Channel extends EE {
this._plex.push([
this._id,
this._initiator
? consts.type.IN_MESSAGE
: consts.type.OUT_MESSAGE,
? consts.type.OUT_MESSAGE
: consts.type.IN_MESSAGE,
data
])
}
Expand All @@ -173,8 +173,8 @@ class Channel extends EE {
this._plex.push([
this._id,
this._initiator
? consts.type.IN_CLOSE
: consts.type.OUT_CLOSE,
? consts.type.OUT_CLOSE
: consts.type.IN_CLOSE,
Buffer.from([0])
])
}
Expand All @@ -189,8 +189,8 @@ class Channel extends EE {
this._plex.push([
this._id,
this._initiator
? consts.type.IN_RESET
: consts.type.OUT_RESET
? consts.type.OUT_RESET
: consts.type.IN_RESET
])
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/consts.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
exports.type = {
NEW: 0,
OUT_MESSAGE: 1,
IN_MESSAGE: 2,
OUT_CLOSE: 3,
IN_CLOSE: 4,
OUT_RESET: 5,
IN_RESET: 6
IN_MESSAGE: 1,
OUT_MESSAGE: 2,
IN_CLOSE: 3,
OUT_CLOSE: 4,
IN_RESET: 5,
OUT_RESET: 6
}
45 changes: 27 additions & 18 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class Mplex extends EE {

this._initiator = !!opts.initiator
this._chanId = this._initiator ? 0 : 1
this._channels = new Map()
this._inChannels = new Map()
this._outChannels = new Map()
this._endedRemote = false // remote stream ended
this._endedLocal = false // local stream ended

Expand Down Expand Up @@ -110,7 +111,9 @@ class Mplex extends EE {
this._endedLocal = true

// propagate close to channels
for (let chan of this._channels.values()) {
const chans = new Map(this._outChannels,
this._inChannels)
for (let chan of chans.values()) {
chan.close(err)
}

Expand Down Expand Up @@ -146,15 +149,15 @@ class Mplex extends EE {

createStream (name) {
if (typeof name === 'number') { name = name.toString() }
const chan = this._newStream(null, this._initiator, false, name)
const chan = this._newStream(null, true, false, name, this._outChannels)
if (!this._lazy) { chan.openChan() }
return chan
}

_newStream (id, initiator, open, name) {
_newStream (id, initiator, open, name, list) {
this._log('_newStream', Array.prototype.slice.call(arguments))

if (this._channels.size >= this._maxChannels) {
if (this.chanSize >= this._maxChannels) {
this.emit('error', new Error('max channels exceeded'))
return
}
Expand All @@ -171,7 +174,7 @@ class Mplex extends EE {
}

id = typeof id === 'number' ? id : this._nextChanId(initiator)
if (this._channels.has(id)) {
if (list.has(id)) {
this.emit('error', new Error(`channel with id ${id} already exist!`))
return
}
Expand All @@ -183,40 +186,44 @@ class Mplex extends EE {
open: open || false
})

return this._addChan(id, chan, list)
}

_addChan (id, chan, list) {
chan.once('close', () => {
const chan = this._channels.get(id)
const chan = list.get(id)
this._log('deleting channel', JSON.stringify({
channel: this._name,
id: id,
endedLocal: chan._endedLocal,
endedRemote: chan._endedRemote,
initiator: chan._initiator
}))
this._channels.delete(id)
list.delete(id)
})

this._channels.set(id, chan)
list.set(id, chan)
return chan
}

get chanSize () {
return this._inChannels.size + this._outChannels.size
}

_handle (msg) {
this._log('_handle', msg)
const { id, type, data } = msg
switch (type) {
case consts.type.NEW: {
// if (this._initiator && (id & 1) === 1) {
// this.emit('error', new Error('two initiators detected'))
// return
// }

const chan = this._newStream(id, false, true, data.toString())
const chan = this._newStream(id, false, true, data.toString(), this._inChannels)
setImmediate(() => this.emit('stream', chan, id))
return
}

case consts.type.OUT_MESSAGE:
case consts.type.IN_MESSAGE: {
const chan = this._channels.get(id)
const list = type & 1 ? this._outChannels : this._inChannels
const chan = list.get(id)
if (chan) {
chan.push(data)
}
Expand All @@ -225,7 +232,8 @@ class Mplex extends EE {

case consts.type.OUT_CLOSE:
case consts.type.IN_CLOSE: {
const chan = this._channels.get(id)
const list = type & 1 ? this._outChannels : this._inChannels
const chan = list.get(id)
if (chan) {
chan.close()
}
Expand All @@ -234,7 +242,8 @@ class Mplex extends EE {

case consts.type.OUT_RESET:
case consts.type.IN_RESET: {
const chan = this._channels.get(id)
const list = type & 1 ? this._outChannels : this._inChannels
const chan = list.get(id)
if (chan) {
chan.reset()
}
Expand Down
2 changes: 1 addition & 1 deletion test/node.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@
require('./old-mplex-interop')
require('./stream-mplex-interop')

0 comments on commit a0c3e1c

Please sign in to comment.