Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4069): remove 'default' from options for fullDocument field in change stream options #3169

Merged
merged 9 commits into from
Mar 16, 2022
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
"check:socks5": "mocha --config test/manual/mocharc.json test/manual/socks5.test.ts",
"check:csfle": "mocha --config test/mocha_mongodb.json test/integration/client-side-encryption",
"check:snappy": "mocha test/unit/assorted/snappy.test.js",
"fix:eslint": "npm run check:eslint -- --fix",
"prepare": "node etc/prepare.js",
"preview:docs": "ts-node etc/docs/preview.ts",
"release": "standard-version -i HISTORY.md",
Expand Down
76 changes: 43 additions & 33 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,20 @@ const kClosed = Symbol('closed');
/** @internal */
const kMode = Symbol('mode');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
CHANGE_STREAM_OPTIONS
);
const CHANGE_STREAM_OPTIONS = [
'resumeAfter',
'startAfter',
'startAtOperationTime',
'fullDocument'
] as const;

const CURSOR_OPTIONS = [
'batchSize',
'maxAwaitTimeMS',
'collation',
'readPreference',
...CHANGE_STREAM_OPTIONS
] as const;

const CHANGE_DOMAIN_TYPES = {
COLLECTION: Symbol('Collection'),
Expand All @@ -68,6 +78,8 @@ export interface ResumeOptions {
maxAwaitTimeMS?: number;
collation?: CollationOptions;
readPreference?: ReadPreference;
resumeAfter?: ResumeToken;
startAfter?: ResumeToken;
}

/**
Expand All @@ -94,7 +106,7 @@ export interface PipeOptions {
* @public
*/
export interface ChangeStreamOptions extends AggregateOptions {
/** Allowed values: ‘default’, ‘updateLookup. When set to updateLookup, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
/** Allowed values: 'updateLookup'. When set to 'updateLookup', the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
fullDocument?: string;
/** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */
maxAwaitTimeMS?: number;
Expand Down Expand Up @@ -446,22 +458,18 @@ export class ChangeStreamCursor<TSchema extends Document = Document> extends Abs
}

get resumeOptions(): ResumeOptions {
const result = {} as ResumeOptions;
for (const optionName of CURSOR_OPTIONS) {
if (Reflect.has(this.options, optionName)) {
Reflect.set(result, optionName, Reflect.get(this.options, optionName));
}
}
const result: ResumeOptions = applyKnownOptions(this.options, CURSOR_OPTIONS);

if (this.resumeToken || this.startAtOperationTime) {
['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key =>
Reflect.deleteProperty(result, key)
);
for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime']) {
Reflect.deleteProperty(result, key);
}

if (this.resumeToken) {
const resumeKey =
this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
Reflect.set(result, resumeKey, this.resumeToken);

result[resumeKey] = this.resumeToken;
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
result.startAtOperationTime = this.startAtOperationTime;
}
Expand Down Expand Up @@ -568,25 +576,37 @@ function setIsIterator<TSchema>(changeStream: ChangeStream<TSchema>): void {
}
changeStream[kMode] = 'iterator';
}

function applyKnownOptions(source: Document, options: ReadonlyArray<string>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind moving this function back down below createChangeStreamCursor, I'm happy with modernizing the loop I think leaving it in the same spot will make it clearer that it was the only change. Sorry a little pedantic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at all, done!

const result: Document = {};

for (const option of options) {
if (source[option]) {
result[option] = source[option];
}
}

return result;
}

/**
* Create a new change stream cursor based on self's configuration
* @internal
*/
function createChangeStreamCursor<TSchema>(
changeStream: ChangeStream<TSchema>,
options: ChangeStreamOptions
changeStreamOptions: ChangeStreamOptions | ResumeOptions
): ChangeStreamCursor<TSchema> {
const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' };
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}

const changeStreamStageOptions = applyKnownOptions(changeStreamOptions, CHANGE_STREAM_OPTIONS);
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(
changeStream.pipeline
);

const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
const cursorOptions: ChangeStreamCursorOptions = applyKnownOptions(
changeStreamOptions,
CURSOR_OPTIONS
);

const changeStreamCursor = new ChangeStreamCursor<TSchema>(
getTopology(changeStream.parent),
changeStream.namespace,
Expand All @@ -605,16 +625,6 @@ function createChangeStreamCursor<TSchema>(
return changeStreamCursor;
}

function applyKnownOptions(target: Document, source: Document, optionNames: string[]) {
optionNames.forEach(name => {
if (source[name]) {
target[name] = source[name];
}
});

return target;
}

interface TopologyWaitOptions {
start?: number;
timeout?: number;
Expand Down
72 changes: 72 additions & 0 deletions test/integration/change-streams/change_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,78 @@ describe('Change Streams', function () {
});
afterEach(async () => await mock.cleanup());

context('ChangeStreamCursor options', function () {
let client, db, collection;

beforeEach(async function () {
client = await this.configuration.newClient().connect();
db = client.db('db');
collection = db.collection('collection');
});

afterEach(async function () {
await client.close();
client = undefined;
db = undefined;
collection = undefined;
});

context('fullDocument', () => {
it('sets fullDocument to `undefined` if no value is passed', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = client.watch();

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.fullDocument',
undefined
);
});

it('assigns `fullDocument` to the correct value if it is passed as an option', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = client.watch([], { fullDocument: 'updateLookup' });

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.fullDocument',
'updateLookup'
);
});
});

context('allChangesForCluster', () => {
it('assigns `allChangesForCluster` to `true` if the ChangeStream.type is Cluster', function () {
const changeStream = client.watch();

expect(changeStream).to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster',
true
);
});

it('does not assigns `allChangesForCluster` if the ChangeStream.type is Db', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = db.watch();

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster'
);
});

it('does not assign `allChangesForCluster` if the ChangeStream.type is Db', function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const changeStream = collection.watch();

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.allChangesForCluster'
);
});
});

it('ignores any invalid option values', function () {
const changeStream = collection.watch([], { invalidOption: true });

expect(changeStream).not.to.have.nested.property(
'cursor.pipeline[0].$changeStream.invalidOption'
);
});
});

it('should close the listeners after the cursor is closed', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },

Expand Down
11 changes: 11 additions & 0 deletions test/types/change_streams.test-d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { expectError } from 'tsd';

import type { ChangeStreamOptions } from '../../src';

declare const changeStreamOptions: ChangeStreamOptions;

// TODO(NODE-4076)
// The types of `ChangeStreamOptions.fullDocument` should be strenghened to
// only allow the value `updateLookup` but this cannot be done until node v5.
// At that time, this test can be removed (or reworked if we think that's valuable).
expectError<'updateLookup' | undefined>(changeStreamOptions.fullDocument);