diff --git a/lib/change_stream.js b/lib/change_stream.js index e28bde286d..2c01aab345 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -2,9 +2,7 @@ const EventEmitter = require('events'); const inherits = require('util').inherits; -const MongoNetworkError = require('mongodb-core').MongoNetworkError; -const mongoErrorContextSymbol = require('mongodb-core').mongoErrorContextSymbol; -const GET_MORE_NON_RESUMABLE_CODES = require('./error_codes').GET_MORE_NON_RESUMABLE_CODES; +const isResumableError = require('./error').isResumableError; var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference']; @@ -298,35 +296,6 @@ ChangeStream.prototype.stream = function(options) { return this.cursor.stream(options); }; -// From spec@https://github.com/mongodb/specifications/blob/35e466ddf25059cb30e4113de71cdebd3754657f/source/change-streams.rst#resumable-error: -// -// An error is considered resumable if it meets any of the following criteria: -// - any error encountered which is not a server error (e.g. a timeout error or network error) -// - any server error response from a getMore command excluding those containing the following error codes -// - Interrupted: 11601 -// - CappedPositionLost: 136 -// - CursorKilled: 237 -// - a server error response with an error message containing the substring "not master" or "node is recovering" -// -// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors. - -function isGetMoreError(error) { - return !!error[mongoErrorContextSymbol].isGetMore; -} - -function isResumableError(error) { - if (!isGetMoreError(error)) { - return false; - } - - return !!( - error instanceof MongoNetworkError || - !GET_MORE_NON_RESUMABLE_CODES.has(error.code) || - error.message.match(/not master/) || - error.message.match(/node is recovering/) - ); -} - // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream. var processNewChange = function(self, err, change, callback) { // Handle errors diff --git a/lib/error.js b/lib/error.js new file mode 100644 index 0000000000..03b555ad4c --- /dev/null +++ b/lib/error.js @@ -0,0 +1,43 @@ +'use strict'; + +const MongoNetworkError = require('mongodb-core').MongoNetworkError; +const mongoErrorContextSymbol = require('mongodb-core').mongoErrorContextSymbol; + +const GET_MORE_NON_RESUMABLE_CODES = new Set([ + 136, // CappedPositionLost + 237, // CursorKilled + 11601 // Interrupted +]); + +// From spec@https://github.com/mongodb/specifications/blob/35e466ddf25059cb30e4113de71cdebd3754657f/source/change-streams.rst#resumable-error: +// +// An error is considered resumable if it meets any of the following criteria: +// - any error encountered which is not a server error (e.g. a timeout error or network error) +// - any server error response from a getMore command excluding those containing the following error codes +// - Interrupted: 11601 +// - CappedPositionLost: 136 +// - CursorKilled: 237 +// - a server error response with an error message containing the substring "not master" or "node is recovering" +// +// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors. + +function isGetMoreError(error) { + if (error[mongoErrorContextSymbol]) { + return error[mongoErrorContextSymbol].isGetMore; + } +} + +function isResumableError(error) { + if (!isGetMoreError(error)) { + return false; + } + + return !!( + error instanceof MongoNetworkError || + !GET_MORE_NON_RESUMABLE_CODES.has(error.code) || + error.message.match(/not master/) || + error.message.match(/node is recovering/) + ); +} + +module.exports = { GET_MORE_NON_RESUMABLE_CODES, isResumableError }; diff --git a/lib/error_codes.js b/lib/error_codes.js deleted file mode 100644 index 2b1d1ffa95..0000000000 --- a/lib/error_codes.js +++ /dev/null @@ -1,9 +0,0 @@ -'use strict'; - -const GET_MORE_NON_RESUMABLE_CODES = new Set([ - 136, // CappedPositionLost - 237, // CursorKilled - 11601 // Interrupted -]); - -module.exports = { GET_MORE_NON_RESUMABLE_CODES }; diff --git a/test/unit/change_stream_resume_tests.js b/test/unit/change_stream_resume_tests.js index 599b0def7c..5cd5922549 100644 --- a/test/unit/change_stream_resume_tests.js +++ b/test/unit/change_stream_resume_tests.js @@ -6,7 +6,8 @@ const MongoClient = require('../../lib/mongo_client'); const ObjectId = require('../../index').ObjectId; const Timestamp = require('../../index').Timestamp; const Long = require('../../index').Long; -const GET_MORE_NON_RESUMABLE_CODES = require('../../lib/error_codes').GET_MORE_NON_RESUMABLE_CODES; +const GET_MORE_NON_RESUMABLE_CODES = require('../../lib/error').GET_MORE_NON_RESUMABLE_CODES; +const isResumableError = require('../../lib/error').isResumableError; describe('Change Stream Resume Tests', function() { const test = {}; @@ -126,7 +127,7 @@ describe('Change Stream Resume Tests', function() { secondGetMore: req => req.reply(GET_MORE_RESPONSE) }, { - description: `should resume on an error that says "not master"`, + description: `should resume on an error that says 'not master'`, passing: true, firstAggregate: req => req.reply(AGGREGATE_RESPONSE), secondAggregate: req => req.reply(AGGREGATE_RESPONSE), @@ -134,7 +135,7 @@ describe('Change Stream Resume Tests', function() { secondGetMore: req => req.reply(GET_MORE_RESPONSE) }, { - description: `should resume on an error that says "node is recovering"`, + description: `should resume on an error that says 'node is recovering'`, passing: true, firstAggregate: req => req.reply(AGGREGATE_RESPONSE), secondAggregate: req => req.reply(AGGREGATE_RESPONSE), @@ -175,6 +176,7 @@ describe('Change Stream Resume Tests', function() { test.server = server; }); }); + afterEach(done => changeStream.close(() => client.close(() => mock.cleanup(done)))); configs.forEach(config => { @@ -182,7 +184,9 @@ describe('Change Stream Resume Tests', function() { metadata: { requires: { mongodb: '>=3.6.0' } }, test: function() { test.server.setMessageHandler(makeServerHandler(config)); - client = new MongoClient(`mongodb://${test.server.uri()}`, { socketTimeoutMS: 300 }); + client = new MongoClient(`mongodb://${test.server.uri()}`, { + socketTimeoutMS: 300 + }); return client .connect() .then(client => client.db('test')) @@ -210,3 +214,9 @@ describe('Change Stream Resume Tests', function() { }); }); }); + +describe('Change Stream Resume Error Tests', function() { + it('should properly process errors that lack the `mongoErrorContextSymbol`', function() { + expect(() => isResumableError(new Error())).to.not.throw(); + }); +});