Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-1797): Error when ChangeStream used as iterator and emitter concurrently #2871

Merged
26 changes: 26 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');
/** @internal */
const kClosed = Symbol('closed');
/** @internal */
const kMode = Symbol('mode');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
Expand Down Expand Up @@ -204,6 +206,8 @@ export class ChangeStream<TSchema extends Document> extends TypedEventEmitter<Ch
[kCursorStream]?: Readable;
/** @internal */
[kClosed]: boolean;
/** @internal */
[kMode]: false | 'iterator' | 'emitter';
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

/** @event */
static readonly RESPONSE = 'response' as const;
Expand Down Expand Up @@ -297,6 +301,7 @@ export class ChangeStream<TSchema extends Document> extends TypedEventEmitter<Ch

/** Check if there is any document still available in the Change Stream */
hasNext(callback?: Callback): Promise<void> | void {
this._setIsIterator();
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
Expand All @@ -311,6 +316,7 @@ export class ChangeStream<TSchema extends Document> extends TypedEventEmitter<Ch
next(
callback?: Callback<ChangeStreamDocument<TSchema>>
): Promise<ChangeStreamDocument<TSchema>> | void {
this._setIsIterator();
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
Expand Down Expand Up @@ -365,13 +371,32 @@ export class ChangeStream<TSchema extends Document> extends TypedEventEmitter<Ch
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
this._setIsIterator();
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
return cursor.tryNext(cb);
});
});
}

_setIsEmitter(): void {
W-A-James marked this conversation as resolved.
Show resolved Hide resolved
if (this[kMode] === 'iterator') {
throw new MongoDriverError(
'Cannot use ChangeStream as an EventEmitter after using as an iterator'
);
}
this[kMode] = 'emitter';
}

_setIsIterator(): void {
if (this[kMode] === 'emitter') {
throw new MongoDriverError(
'Cannot use ChangeStream as iterator after using as an EventEmitter'
);
}
this[kMode] = 'iterator';
}
}

/** @internal */
Expand Down Expand Up @@ -628,6 +653,7 @@ function streamEvents<TSchema>(
changeStream: ChangeStream<TSchema>,
cursor: ChangeStreamCursor<TSchema>
): void {
changeStream._setIsEmitter();
const stream = changeStream[kCursorStream] || cursor.stream();
changeStream[kCursorStream] = stream;
stream.on('data', change => processNewChange(changeStream, change));
Expand Down
173 changes: 172 additions & 1 deletion test/functional/change_stream.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';
const assert = require('assert');
const { Transform, PassThrough } = require('stream');
const { MongoNetworkError } = require('../../src/error');
const { MongoNetworkError, MongoDriverError } = require('../../src/error');
const { delay, setupDatabase, withClient, withCursor } = require('./shared');
const co = require('co');
const mock = require('../tools/mock');
Expand Down Expand Up @@ -1792,6 +1792,176 @@ describe('Change Streams', function () {
}
});

// FIXME: NODE-1797
describe('should error when used as iterator and emitter concurrently', function () {
let client, coll, changeStream, repeatInsert, val;
val = 0;

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect().catch(() => expect.fail('Failed to connect to client'));

coll = client.db(this.configuration.db).collection('tester');
changeStream = coll.watch();

repeatInsert = setInterval(async function () {
await coll.insertOne({ c: val }).catch('Failed to insert document');
val++;
}, 75);
});

afterEach(async function () {
if (repeatInsert) {
clearInterval(repeatInsert);
}
if (changeStream && !changeStream.closed) {
await changeStream.close();
}

if (client) {
await client.close();
}

await mock.cleanup();
});

// TODO: Better Errors
it(
'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext" using promises',
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.on('change', resolve));
try {
await changeStream.hasNext().catch(err => {
expect.fail(err.message);
});
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);

it(
'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext" using callbacks',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.on('change', resolve));

try {
await new Promise(resolve => changeStream.hasNext(resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);
it(
'should throw MongoDriverError when set as an iterator with "hasNext" and used as an emitter with "on" using promises',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await changeStream
.hasNext()
.catch(() => expect.fail('Failed to set changeStream to iterator'));
try {
await new Promise(resolve => changeStream.on('change', resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);

it(
'should throw MongoDriverError when set as an iterator with "hasNext" and used as an emitter with "on" using callbacks',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.hasNext(resolve));
try {
await new Promise(resolve => changeStream.on('change', resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);
it(
'should throw MongoDriverError when set as an emitter with "once" and used as an iterator with "next" using promises',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.once('change', resolve));
try {
await changeStream.next().catch(err => {
expect.fail(err.message);
});
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);

it(
'should throw MongoDriverError when set as an emitter with "once" and used as an iterator with "next" using callbacks',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.once('change', resolve));

try {
await new Promise(resolve => changeStream.next(resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);
it(
'should throw MongoDriverError when set as an iterator with "tryNext" and used as an emitter with "on" using promises',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await changeStream
.tryNext()
.catch(() => expect.fail('Failed to set changeStream to iterator'));
try {
await new Promise(resolve => changeStream.on('change', resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);

it(
'should throw MongoDriverError when set as an iterator with "tryNext" and used as an emitter with "on" using callbacks',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.tryNext(resolve));
try {
await new Promise(resolve => changeStream.on('change', resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);
});

describe('should properly handle a changeStream event being processed mid-close', function () {
let client, coll, changeStream;

Expand Down Expand Up @@ -1975,6 +2145,7 @@ describe('Change Streams', function () {
this.changeStream.on('resumeTokenChanged', resumeToken => {
this.resumeTokenChangedEvents.push({ resumeToken });
});
this.changeStream.isEmitter = false;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

return this.changeStream;
}
Expand Down