Skip to content

Commit

Permalink
fix(NODE-6418): change stream resumes infinitely after failed aggrega…
Browse files Browse the repository at this point in the history
…tes (#4267)
  • Loading branch information
W-A-James authored Oct 9, 2024
1 parent 8785132 commit 6ecf198
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 22 deletions.
7 changes: 5 additions & 2 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ export class ChangeStream<
// If the change stream has been closed explicitly, do not process error.
if (this[kClosed]) return;

if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
this._endStream();

this.cursor.close().then(undefined, squashError);
Expand Down Expand Up @@ -975,7 +975,10 @@ export class ChangeStream<
throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
}

if (!isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
if (
this.cursor.id == null ||
!isResumableError(changeStreamError, this.cursor.maxWireVersion)
) {
try {
await this.close();
} catch (error) {
Expand Down
148 changes: 148 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2061,6 +2061,34 @@ describe('ChangeStream resumability', function () {
}
);
});

context('when the error occurs on the aggregate command', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
const resumableErrorCode = 7; // Host not found
await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt
data: {
failCommands: ['aggregate'],
errorCode: resumableErrorCode
}
} as FailPoint);

changeStream = collection.watch([]);

await collection.insertOne({ name: 'bailey' });

const maybeError = await changeStream.next().catch(e => e);

expect(maybeError).to.be.instanceof(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(2);
expect(changeStream.closed).to.be.true;
}
);
});
});

context('#hasNext', function () {
Expand Down Expand Up @@ -2225,6 +2253,34 @@ describe('ChangeStream resumability', function () {
}
);
});

context('when the error occurs on the aggregate command', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
const resumableErrorCode = 7; // Host not found
await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt
data: {
failCommands: ['aggregate'],
errorCode: resumableErrorCode
}
} as FailPoint);

changeStream = collection.watch([]);

await collection.insertOne({ name: 'bailey' });

const maybeError = await changeStream.hasNext().catch(e => e);

expect(maybeError).to.be.instanceof(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(2);
expect(changeStream.closed).to.be.true;
}
);
});
});

context('#tryNext', function () {
Expand Down Expand Up @@ -2401,6 +2457,34 @@ describe('ChangeStream resumability', function () {
}
);
});

context('when the error occurs on the aggregate command', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
const resumableErrorCode = 7; // Host not found
await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 }, // Fail twice to account for retry attempt in executeOperation which is separate from the change stream's resume attempt
data: {
failCommands: ['aggregate'],
errorCode: resumableErrorCode
}
} as FailPoint);

changeStream = collection.watch([]);

await collection.insertOne({ name: 'bailey' });

const maybeError = await changeStream.tryNext().catch(e => e);

expect(maybeError).to.be.instanceof(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(2);
expect(changeStream.closed).to.be.true;
}
);
});
});

context('#asyncIterator', function () {
Expand Down Expand Up @@ -2551,6 +2635,41 @@ describe('ChangeStream resumability', function () {
}
);
});

context('when the error occurs on the aggregate command', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }];
await collection.insertMany(docs);

const resumableErrorCode = 7;
await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 }, // Account for retry in executeOperation which is separate from change stream's resume
data: {
failCommands: ['aggregate'],
errorCode: resumableErrorCode
}
} as FailPoint);

try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const change of changeStream) {
expect.fail('Change stream produced events on an unresumable error');
}
expect.fail('Change stream did not iterate and did not throw an error');
} catch (error) {
expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(2);
expect(changeStream.closed).to.be.true;
}
}
);
});
});
});

Expand Down Expand Up @@ -2721,6 +2840,35 @@ describe('ChangeStream resumability', function () {
}
);
});

context('when the error occurred on the aggregate', function () {
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

const resumableErrorCode = 7;
await client.db('admin').command({
configureFailPoint: 'failCommand',
mode: { times: 2 }, // account for retry attempt in executeOperation which is separate from change stream's retry
data: {
failCommands: ['aggregate'],
errorCode: resumableErrorCode
}
} as FailPoint);

const willBeError = once(changeStream, 'change').catch(error => error);
await collection.insertOne({ name: 'bailey' });

const error = await willBeError;

expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(2);
expect(changeStream.closed).to.be.true;
}
);
});
});

it(
Expand Down
47 changes: 27 additions & 20 deletions test/integration/change-streams/change_streams.prose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import { setTimeout } from 'timers';

import {
type ChangeStream,
type Collection,
type CommandFailedEvent,
type CommandStartedEvent,
type CommandSucceededEvent,
type Document,
isHello,
LEGACY_HELLO_COMMAND,
Long,
type MongoClient,
MongoNetworkError,
ObjectId,
Timestamp
Expand Down Expand Up @@ -840,8 +842,8 @@ describe('Change Stream prose tests', function () {
// 15 - 16 removed by spec

describe('Change Stream prose 17-18', function () {
let client;
let coll;
let client: MongoClient;
let coll: Collection;
let startAfter;

function recordEvent(events, e) {
Expand Down Expand Up @@ -886,31 +888,36 @@ describe('Change Stream prose tests', function () {
// when resuming a change stream.
it('$changeStream without results must include startAfter and not resumeAfter', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } },
test: function (done) {
test: async function () {
const events = [];
client.on('commandStarted', e => recordEvent(events, e));
const changeStream = coll.watch([], { startAfter });
this.defer(() => changeStream.close());

changeStream.once('change', change => {
expect(change).to.containSubset({
operationType: 'insert',
fullDocument: { x: 2 }
});

expect(events).to.be.an('array').with.lengthOf(3);
expect(events[0]).nested.property('$changeStream.startAfter').to.exist;
expect(events[1]).to.equal('error');
expect(events[2]).nested.property('$changeStream.startAfter').to.exist;
done();
changeStream.on('error', async e => {
await changeStream.close(e);
});

waitForStarted(changeStream, () => {
triggerResumableError(changeStream, () => {
events.push('error');
coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } });
});
const changePromise = once(changeStream, 'change');
await once(changeStream.cursor, 'init');

const stub = sinon.stub(changeStream.cursor, 'close');

stub.callsFake(async function () {
stub.wrappedMethod.call(this);
stub.restore();
events.push('error');
await coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } });
});

changeStream.cursorStream.emit('error', new MongoNetworkError('error triggered from test'));

const [change] = await changePromise;
expect(change).to.containSubset({ operationType: 'insert', fullDocument: { x: 2 } });
expect(events).to.be.an('array').with.lengthOf(3);

expect(events[0]).nested.property('$changeStream.startAfter').to.exist;
expect(events[1]).to.equal('error');
expect(events[2]).nested.property('$changeStream.startAfter').to.exist;
}
});

Expand Down

0 comments on commit 6ecf198

Please sign in to comment.