Skip to content

Commit

Permalink
fix: make tryNext and Batch public (#2675)
Browse files Browse the repository at this point in the history
Exposes the `tryNext` iteration helper on cursors and change streams.

NODE-2952
  • Loading branch information
emadum authored Dec 16, 2020
1 parent 6914e87 commit 634ae4f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 12 deletions.
35 changes: 26 additions & 9 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const CHANGE_DOMAIN_TYPES = {
const NO_RESUME_TOKEN_ERROR = new MongoError(
'A change stream document has been received that lacks a resume token (_id).'
);
const NO_CURSOR_ERROR = new MongoError('ChangeStream has no cursor');
const CHANGESTREAM_CLOSED_ERROR = new MongoError('ChangeStream is closed');

/** @public */
Expand Down Expand Up @@ -287,8 +288,7 @@ export class ChangeStream extends EventEmitter {
next(callback?: Callback): Promise<void> | void {
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err) return cb(err); // failed to resume, raise an error
if (!cursor) return cb(new MongoError('Cursor is undefined'));
if (err || !cursor) return cb(err); // failed to resume, raise an error
cursor.next((error, change) => {
if (error) {
this[kResumeQueue].push(() => this.next(cb));
Expand Down Expand Up @@ -330,11 +330,23 @@ export class ChangeStream extends EventEmitter {
*/
stream(options?: CursorStreamOptions): Readable {
this.streamOptions = options;
if (!this.cursor) {
throw new MongoError('ChangeStream has no cursor, unable to stream');
}
if (!this.cursor) throw NO_CURSOR_ERROR;
return this.cursor.stream(options);
}

/**
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
*/
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
return cursor.tryNext(cb);
});
});
}
}

/** @internal */
Expand Down Expand Up @@ -707,11 +719,16 @@ function getCursor(changeStream: ChangeStream, callback: Callback<ChangeStreamCu
function processResumeQueue(changeStream: ChangeStream, err?: Error) {
while (changeStream[kResumeQueue].length) {
const request = changeStream[kResumeQueue].pop();
if (changeStream[kClosed] && !err) {
request(CHANGESTREAM_CLOSED_ERROR);
return;
if (!err) {
if (changeStream[kClosed]) {
request(CHANGESTREAM_CLOSED_ERROR);
return;
}
if (!changeStream.cursor) {
request(NO_CURSOR_ERROR);
return;
}
}

request(err, changeStream.cursor);
}
}
1 change: 0 additions & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ export abstract class AbstractCursor extends EventEmitter {

/**
* Try to get the next available document from the cursor or `null` if an empty batch is returned
* @internal
*/
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
Expand Down
3 changes: 1 addition & 2 deletions test/functional/change_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1943,8 +1943,7 @@ describe('Change Streams', function () {
changeStream.on('change', () => {
counter += 1;
if (counter === 2) {
changeStream.close();
setTimeout(() => close());
changeStream.close(close);
} else if (counter >= 3) {
close(new Error('should not have received more than 2 events'));
}
Expand Down

0 comments on commit 634ae4f

Please sign in to comment.