Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pipeline promise and handle errors #21

Merged
merged 12 commits into from
Feb 28, 2020
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ node_modules

# ignore redis dump
dump.rdb

package-lock.json
33 changes: 24 additions & 9 deletions mqemitter-redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ function MQEmitterRedis (opts) {
}

opts = opts || {}

this._opts = opts

this.subConn = new Redis(opts)
Expand All @@ -33,6 +34,14 @@ function MQEmitterRedis (opts) {

var that = this

function onError (err) {
robertsLando marked this conversation as resolved.
Show resolved Hide resolved
if (err) {
that.state.emit('emitError', err)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why its this not error?

Copy link
Contributor Author

@robertsLando robertsLando Feb 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina Because 'error' throws unhandled rejections on emit, check #15 and #21 (comment)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to track down why this is happening wqithout changing event name.

Copy link
Contributor Author

@robertsLando robertsLando Feb 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina I have no clue Matteo... I have try to debug this but I can only tell you when this happens:

If emit throws an error and cb is not defined the unhandled rejection is thrown after I call this.state.emit('error'), it is like that emit throws an error instead of just emit it

Copy link
Contributor Author

@robertsLando robertsLando Feb 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I have found this in docs: https://nodejs.org/api/events.html#events_class_events_eventemitter

If an EventEmitter does not have at least one listener registered for the 'error' event, and an 'error' event is emitted, the error is thrown, a stack trace is printed, and the Node.js process exits.

So the solutions are: change the event name or create a default event listener for 'error' (noop) like I did in my latest commit 464e7ed

}
}

this._onError = onError

function handler (sub, topic, payload) {
var packet = msgpack.decode(payload)
if (!that._cache.get(packet.id)) {
Expand All @@ -54,15 +63,15 @@ function MQEmitterRedis (opts) {
})

this.subConn.on('error', function (err) {
that.state.emit('error', err)
that.state.emit('connectionError', err)
robertsLando marked this conversation as resolved.
Show resolved Hide resolved
})

this.pubConn.on('connect', function () {
that.state.emit('pubConnect')
})

this.pubConn.on('error', function (err) {
that.state.emit('error', err)
that.state.emit('connectionError', err)
})

MQEmitter.call(this, opts)
Expand All @@ -77,6 +86,14 @@ inherits(MQEmitterRedis, MQEmitter)
})

MQEmitterRedis.prototype.close = function (cb) {
cb = cb || noop

if (this.closed || this.closing) {
return cb()
}

this.closing = true

var count = 2
var that = this

Expand Down Expand Up @@ -129,21 +146,19 @@ MQEmitterRedis.prototype.on = function on (topic, cb, done) {
}

MQEmitterRedis.prototype.emit = function (msg, done) {
done = done || this._onError

if (this.closed) {
return done(new Error('mqemitter-redis is closed'))
var err = new Error('mqemitter-redis is closed')
return done(err)
}

var packet = {
id: hyperid(),
msg: msg
}

const p = this._pipeline.publish(msg.topic, msgpack.encode(packet))
if (done) {
p.then(done, done)
} else {
p.catch(noop)
}
this._pipeline.publish(msg.topic, msgpack.encode(packet)).then(() => done()).catch(done)
}

MQEmitterRedis.prototype.removeListener = function (topic, cb, done) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"ioredis": "^4.0.0",
"ioredis-auto-pipeline": "^1.0.1",
"lru-cache": "^5.1.1",
"mqemitter": "^4.0.0",
"mqemitter": "^4.1.2",
"msgpack-lite": "^0.1.14"
},
"devDependencies": {
Expand Down
5 changes: 3 additions & 2 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ test('actual unsubscribe from Redis', function (t) {

e.on('hello', noop)
e.removeListener('hello', noop)
e.emit({ topic: 'hello' }, function () {
e.emit({ topic: 'hello' }, function (err) {
t.notOk(err)
e.close(function () {
t.end()
})
Expand Down Expand Up @@ -57,7 +58,7 @@ test('ioredis error event', function (t) {

t.plan(1)

e.state.once('error', function (err) {
e.state.once('connectionError', function (err) {
t.deepEqual(err.message.substr(0, 7), 'connect')
e.close(function () {
t.end()
Expand Down