diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 1a8c858101..d5cc560cb1 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -213,7 +213,7 @@ export abstract class AbstractCursor extends EventEmitter { return done(undefined, true); } - next(this, (err, doc) => { + next(this, true, (err, doc) => { if (err) return done(err); if (doc) { @@ -236,7 +236,23 @@ export abstract class AbstractCursor extends EventEmitter { return done(new MongoError('Cursor is exhausted')); } - next(this, done); + next(this, true, done); + }); + } + + /** + * Try to get the next available document from the cursor or `null` if an empty batch is returned + * @internal + */ + tryNext(): Promise; + tryNext(callback: Callback): void; + tryNext(callback?: Callback): Promise | void { + return maybePromise(callback, done => { + if (this[kId] === Long.ZERO) { + return done(new MongoError('Cursor is exhausted')); + } + + next(this, false, done); }); } @@ -259,7 +275,7 @@ export abstract class AbstractCursor extends EventEmitter { return maybePromise(callback, done => { const transform = this[kTransform]; const fetchDocs = () => { - next(this, (err, doc) => { + next(this, true, (err, doc) => { if (err || doc == null) return done(err); if (doc == null) return done(); @@ -350,7 +366,7 @@ export abstract class AbstractCursor extends EventEmitter { const transform = this[kTransform]; const fetchDocs = () => { // NOTE: if we add a `nextBatch` then we should use it here - next(this, (err, doc) => { + next(this, true, (err, doc) => { if (err) return done(err); if (doc == null) return done(undefined, docs); @@ -518,7 +534,11 @@ function nextDocument(cursor: AbstractCursor): Document | null | undefined { return null; } -function next(cursor: AbstractCursor, callback: Callback): void { +function next( + cursor: AbstractCursor, + blocking: boolean, + callback: Callback +): void { const cursorId = cursor[kId]; if (cursor.closed) { return callback(undefined, null); @@ -577,7 +597,7 @@ function next(cursor: AbstractCursor, callback: Callback): void return cleanupCursor(cursor, () => callback(err, nextDocument(cursor))); } - next(cursor, callback); + next(cursor, blocking, callback); }); return; @@ -604,7 +624,11 @@ function next(cursor: AbstractCursor, callback: Callback): void return cleanupCursor(cursor, () => callback(err, nextDocument(cursor))); } - next(cursor, callback); + if (cursor[kDocuments].length === 0 && blocking === false) { + return callback(undefined, null); + } + + next(cursor, blocking, callback); }); } @@ -666,7 +690,7 @@ function makeCursorStream(cursor: AbstractCursor) { function readNext() { needToClose = false; - next(cursor, (err, result) => { + next(cursor, true, (err, result) => { needToClose = err ? !cursor.closed : result !== null; if (err) { diff --git a/test/functional/abstract_cursor.test.js b/test/functional/abstract_cursor.test.js index 52230215b8..61fccc91d6 100644 --- a/test/functional/abstract_cursor.test.js +++ b/test/functional/abstract_cursor.test.js @@ -23,7 +23,8 @@ describe('AbstractCursor', function () { withClientV2((client, done) => { const docs = [{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }, { a: 5 }, { a: 6 }]; const coll = client.db().collection('find_cursor'); - coll.drop(() => coll.insertMany(docs, done)); + const tryNextColl = client.db().collection('try_next'); + coll.drop(() => tryNextColl.drop(() => coll.insertMany(docs, done))); }) ); @@ -124,4 +125,38 @@ describe('AbstractCursor', function () { }) ); }); + + context('#tryNext', function () { + it( + 'should return control to the user if an empty batch is returned', + withClientV2(function (client, done) { + const db = client.db(); + db.createCollection('try_next', { capped: true, size: 10000000 }, () => { + const coll = db.collection('try_next'); + coll.insertMany([{}, {}], err => { + expect(err).to.not.exist; + + const cursor = coll.find({}, { tailable: true, awaitData: true }); + this.defer(() => cursor.close()); + + cursor.tryNext((err, doc) => { + expect(err).to.not.exist; + expect(doc).to.exist; + + cursor.tryNext((err, doc) => { + expect(err).to.not.exist; + expect(doc).to.exist; + + cursor.tryNext((err, doc) => { + expect(err).to.not.exist; + expect(doc).to.be.null; + done(); + }); + }); + }); + }); + }); + }) + ); + }); });