Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-3393): snapshot time not applied if distinct executed first #2908

Merged
merged 5 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -891,11 +891,12 @@ export function updateSessionFromResponse(session: ClientSession, document: Docu
session.transaction._recoveryToken = document.recoveryToken;
}

if (
document.cursor?.atClusterTime &&
session?.[kSnapshotEnabled] &&
session[kSnapshotTime] === undefined
) {
session[kSnapshotTime] = document.cursor.atClusterTime;
if (session?.[kSnapshotEnabled] && session[kSnapshotTime] === undefined) {
// find and aggregate commands return atClusterTime on the cursor
// distinct includes it in the response body
const atClusterTime = document.cursor?.atClusterTime || document.atClusterTime;
if (atClusterTime) {
session[kSnapshotTime] = atClusterTime;
}
}
}
26 changes: 1 addition & 25 deletions test/functional/sessions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -200,35 +200,11 @@ describe('Sessions - functional', function () {
for (const sessionTests of loadSpecTests(path.join('sessions', 'unified'))) {
expect(sessionTests).to.be.an('object');
context(String(sessionTests.description), function () {
// TODO: NODE-3393 fix test runner to apply session to all operations
const skipTestMap = {
'snapshot-sessions': [
'countDocuments operation with snapshot',
'Distinct operation with snapshot',
'Mixed operation with snapshot'
],
'snapshot-sessions-not-supported-client-error': [
'Client error on distinct with snapshot'
],
'snapshot-sessions-not-supported-server-error': [
'Server returns an error on distinct with snapshot'
],
'snapshot-sessions-unsupported-ops': [
'Server returns an error on listCollections with snapshot',
'Server returns an error on listDatabases with snapshot',
'Server returns an error on listIndexes with snapshot',
'Server returns an error on runCommand with snapshot',
'Server returns an error on findOneAndUpdate with snapshot',
'Server returns an error on deleteOne with snapshot',
'Server returns an error on updateOne with snapshot'
]
};
const testsToSkip = skipTestMap[sessionTests.description] || [];
for (const test of sessionTests.tests) {
it(String(test.description), {
metadata: { sessions: { skipLeakTests: true } },
test: async function () {
await runUnifiedTest(this, sessionTests, test, testsToSkip);
await runUnifiedTest(this, sessionTests, test);
}
});
}
Expand Down
122 changes: 42 additions & 80 deletions test/functional/unified-spec-runner/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import { Collection, Db, GridFSFile, MongoClient, ObjectId, AbstractCursor } fro
import { ReadConcern } from '../../../src/read_concern';
import { ReadPreference } from '../../../src/read_preference';
import { WriteConcern } from '../../../src/write_concern';
import { Document, InsertOneOptions } from '../../../src';
import { Document } from '../../../src';
import { EventCollector } from '../../tools/utils';
import { EntitiesMap } from './entities';
import { expectErrorCheck, resultCheck } from './match';
import type { OperationDescription } from './schema';
import { CommandStartedEvent } from '../../../src/cmap/command_monitoring_events';
import { translateOptions } from './unified-utils';
import { getSymbolFrom } from '../../tools/utils';

interface OperationFunctionParams {
client: MongoClient;
Expand All @@ -22,18 +21,6 @@ interface OperationFunctionParams {
type RunOperationFn = (p: OperationFunctionParams) => Promise<Document | boolean | number | void>;
export const operations = new Map<string, RunOperationFn>();

function executeWithPotentialSession(
entities: EntitiesMap,
operation: OperationDescription,
cursor: AbstractCursor
) {
const session = entities.getEntity('session', operation.arguments.session, false);
if (session) {
cursor.session = session;
}
return cursor.toArray();
}

operations.set('abortTransaction', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.object);
return session.abortTransaction();
Expand All @@ -44,18 +31,9 @@ operations.set('aggregate', async ({ entities, operation }) => {
if (!(dbOrCollection instanceof Db || dbOrCollection instanceof Collection)) {
throw new Error(`Operation object '${operation.object}' must be a db or collection`);
}
const cursor = dbOrCollection.aggregate(operation.arguments.pipeline, {
allowDiskUse: operation.arguments.allowDiskUse,
batchSize: operation.arguments.batchSize,
bypassDocumentValidation: operation.arguments.bypassDocumentValidation,
maxTimeMS: operation.arguments.maxTimeMS,
maxAwaitTimeMS: operation.arguments.maxAwaitTimeMS,
collation: operation.arguments.collation,
hint: operation.arguments.hint,
let: operation.arguments.let,
out: operation.arguments.out
});
return executeWithPotentialSession(entities, operation, cursor);
const { pipeline, ...opts } = operation.arguments;
const cursor = dbOrCollection.aggregate(pipeline, opts);
return cursor.toArray();
});

operations.set('assertCollectionExists', async ({ operation, client }) => {
Expand Down Expand Up @@ -139,27 +117,27 @@ operations.set('assertSameLsidOnLastTwoCommands', async ({ entities, operation }
});

operations.set('assertSessionDirty', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;
expect(session.serverSession.isDirty).to.be.true;
});

operations.set('assertSessionNotDirty', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;
expect(session.serverSession.isDirty).to.be.false;
});

operations.set('assertSessionPinned', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;
expect(session.transaction.isPinned).to.be.true;
});

operations.set('assertSessionUnpinned', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;
expect(session.transaction.isPinned).to.be.false;
});

operations.set('assertSessionTransactionState', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;

const transactionStateTranslation = {
none: 'NO_TRANSACTION',
Expand Down Expand Up @@ -235,17 +213,14 @@ operations.set('createChangeStream', async ({ entities, operation }) => {

operations.set('createCollection', async ({ entities, operation }) => {
const db = entities.getEntity('db', operation.object);
const { session, collection, ...opts } = operation.arguments;
await db.createCollection(collection, {
session: entities.getEntity('session', session, false),
...opts
});
const { collection, ...opts } = operation.arguments;
await db.createCollection(collection, opts);
});

operations.set('createFindCursor', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
const { filter, sort, batchSize, limit, let: vars } = operation.arguments;
const cursor = collection.find(filter, { sort, batchSize, limit, let: vars });
const { filter, ...opts } = operation.arguments;
const cursor = collection.find(filter, opts);
// The spec dictates that we create the cursor and force the find command
// to execute, but don't move the cursor forward. hasNext() accomplishes
// this.
Expand All @@ -255,11 +230,8 @@ operations.set('createFindCursor', async ({ entities, operation }) => {

operations.set('createIndex', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
const session = entities.getEntity('session', operation.arguments.session, false);
await collection.createIndex(operation.arguments.keys, {
session,
name: operation.arguments.name
});
const { keys, ...opts } = operation.arguments;
await collection.createIndex(keys, opts);
});

operations.set('deleteOne', async ({ entities, operation }) => {
Expand All @@ -270,7 +242,8 @@ operations.set('deleteOne', async ({ entities, operation }) => {

operations.set('dropCollection', async ({ entities, operation }) => {
const db = entities.getEntity('db', operation.object);
return await db.dropCollection(operation.arguments.collection);
const { collection, ...opts } = operation.arguments;
return await db.dropCollection(collection, opts);
});

operations.set('endSession', async ({ entities, operation }) => {
Expand All @@ -280,9 +253,8 @@ operations.set('endSession', async ({ entities, operation }) => {

operations.set('find', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
const { filter, sort, batchSize, limit, let: vars } = operation.arguments;
const cursor = collection.find(filter, { sort, batchSize, limit, let: vars });
return executeWithPotentialSession(entities, operation, cursor);
const { filter, ...opts } = operation.arguments;
return collection.find(filter, opts).toArray();
});

operations.set('findOneAndReplace', async ({ entities, operation }) => {
Expand All @@ -304,27 +276,14 @@ operations.set('failPoint', async ({ entities, operation }) => {

operations.set('insertOne', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);

const session = entities.getEntity('session', operation.arguments.session, false);

const options = {
session
} as InsertOneOptions;

return collection.insertOne(operation.arguments.document, options);
const { document, ...opts } = operation.arguments;
return collection.insertOne(document, opts);
});

operations.set('insertMany', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);

const session = entities.getEntity('session', operation.arguments.session, false);

const options = {
session,
ordered: operation.arguments.ordered ?? true
};

return collection.insertMany(operation.arguments.documents, options);
const { documents, ...opts } = operation.arguments;
return collection.insertMany(documents, opts);
});

operations.set('iterateUntilDocumentOrError', async ({ entities, operation }) => {
Expand All @@ -349,7 +308,7 @@ operations.set('listCollections', async ({ entities, operation }) => {

operations.set('listDatabases', async ({ entities, operation }) => {
const client = entities.getEntity('client', operation.object);
return client.db().admin().listDatabases();
return client.db().admin().listDatabases(operation.arguments);
});

operations.set('listIndexes', async ({ entities, operation }) => {
Expand All @@ -359,12 +318,8 @@ operations.set('listIndexes', async ({ entities, operation }) => {

operations.set('replaceOne', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
return collection.replaceOne(operation.arguments.filter, operation.arguments.replacement, {
bypassDocumentValidation: operation.arguments.bypassDocumentValidation,
collation: operation.arguments.collation,
hint: operation.arguments.hint,
upsert: operation.arguments.upsert
});
const { filter, replacement, ...opts } = operation.arguments;
return collection.replaceOne(filter, replacement, opts);
});

operations.set('startTransaction', async ({ entities, operation }) => {
Expand All @@ -373,7 +328,7 @@ operations.set('startTransaction', async ({ entities, operation }) => {
});

operations.set('targetedFailPoint', async ({ entities, operation }) => {
const session = entities.getEntity('session', operation.arguments.session);
const session = operation.arguments.session;
expect(session.transaction.isPinned, 'Session must be pinned for a targetedFailPoint').to.be.true;
await entities.failPoints.enableFailPoint(
session.transaction._pinnedServer.s.description.hostAddress,
Expand Down Expand Up @@ -432,20 +387,20 @@ operations.set('withTransaction', async ({ entities, operation, client }) => {

operations.set('countDocuments', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
return collection.countDocuments(operation.arguments.filter as Document);
const { filter, ...opts } = operation.arguments;
return collection.countDocuments(filter, opts);
});

operations.set('deleteMany', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
return collection.deleteMany(operation.arguments.filter);
const { filter, ...opts } = operation.arguments;
return collection.deleteMany(filter, opts);
});

operations.set('distinct', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
return collection.distinct(
operation.arguments.fieldName as string,
operation.arguments.filter as Document
);
const { fieldName, filter, ...opts } = operation.arguments;
return collection.distinct(fieldName, filter, opts);
});

operations.set('estimatedDocumentCount', async ({ entities, operation }) => {
Expand All @@ -455,12 +410,14 @@ operations.set('estimatedDocumentCount', async ({ entities, operation }) => {

operations.set('findOneAndDelete', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
return collection.findOneAndDelete(operation.arguments.filter);
const { filter, ...opts } = operation.arguments;
return collection.findOneAndDelete(filter, opts);
});

operations.set('runCommand', async ({ entities, operation }: OperationFunctionParams) => {
const db = entities.getEntity('db', operation.object);
return db.command(operation.arguments.command);
const { command, ...opts } = operation.arguments;
return db.command(command, opts);
});

operations.set('updateMany', async ({ entities, operation }) => {
Expand All @@ -483,6 +440,11 @@ export async function executeOperationAndCheck(
const opFunc = operations.get(operation.name);
expect(opFunc, `Unknown operation: ${operation.name}`).to.exist;

if (operation.arguments?.session) {
const session = entities.getEntity('session', operation.arguments.session, false);
operation.arguments.session = session;
}

let result;

try {
Expand Down