Skip to content

Commit

Permalink
fix(NODE-5981): read preference not applied to commands properly (#4010)
Browse files Browse the repository at this point in the history
Co-authored-by: Bailey Pearson <[email protected]>
Co-authored-by: Neal Beeken <[email protected]>
  • Loading branch information
3 people authored Mar 11, 2024
1 parent 31f1eed commit 937c9c8
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 183 deletions.
8 changes: 1 addition & 7 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { BSONSerializeOptions, Document, Long } from '../bson';
import * as BSON from '../bson';
import { MongoInvalidArgumentError, MongoRuntimeError } from '../error';
import { ReadPreference } from '../read_preference';
import { type ReadPreference } from '../read_preference';
import type { ClientSession } from '../sessions';
import type { CommandOptions } from './connection';
import {
Expand Down Expand Up @@ -51,7 +51,6 @@ export interface OpQueryOptions extends CommandOptions {
requestId?: number;
moreToCome?: boolean;
exhaustAllowed?: boolean;
readPreference?: ReadPreference;
}

/**************************************************************
Expand All @@ -77,7 +76,6 @@ export class OpQueryRequest {
awaitData: boolean;
exhaust: boolean;
partial: boolean;
documentsReturnedIn?: string;

constructor(public databaseName: string, public query: Document, options: OpQueryOptions) {
// Basic options needed to be passed in
Expand Down Expand Up @@ -503,10 +501,6 @@ export class OpMsgRequest {
// Basic options
this.command.$db = databaseName;

if (options.readPreference && options.readPreference.mode !== ReadPreference.PRIMARY) {
this.command.$readPreference = options.readPreference.toJSON();
}

// Ensure empty options
this.options = options ?? {};

Expand Down
46 changes: 33 additions & 13 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import type { ServerApi, SupportedNodeConnectionOptions } from '../mongo_client'
import { type MongoClientAuthProviders } from '../mongo_client_auth_providers';
import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mongo_logger';
import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { ReadPreferenceLike } from '../read_preference';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import {
BufferPool,
Expand Down Expand Up @@ -83,6 +84,8 @@ export interface CommandOptions extends BSONSerializeOptions {
willRetryWrite?: boolean;

writeConcern?: WriteConcern;

directConnection?: boolean;
}

/** @public */
Expand Down Expand Up @@ -371,16 +374,34 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
cmd.$clusterTime = clusterTime;
}

if (
isSharded(this) &&
!this.supportsOpMsg &&
readPreference &&
readPreference.mode !== 'primary'
) {
cmd = {
$query: cmd,
$readPreference: readPreference.toJSON()
};
// For standalone, drivers MUST NOT set $readPreference.
if (this.description.type !== ServerType.Standalone) {
if (
!isSharded(this) &&
!this.description.loadBalanced &&
this.supportsOpMsg &&
options.directConnection === true &&
readPreference?.mode === 'primary'
) {
// For mongos and load balancers with 'primary' mode, drivers MUST NOT set $readPreference.
// For all other types with a direct connection, if the read preference is 'primary'
// (driver sets 'primary' as default if no read preference is configured),
// the $readPreference MUST be set to 'primaryPreferred'
// to ensure that any server type can handle the request.
cmd.$readPreference = ReadPreference.primaryPreferred.toJSON();
} else if (isSharded(this) && !this.supportsOpMsg && readPreference?.mode !== 'primary') {
// When sending a read operation via OP_QUERY and the $readPreference modifier,
// the query MUST be provided using the $query modifier.
cmd = {
$query: cmd,
$readPreference: readPreference.toJSON()
};
} else if (readPreference?.mode !== 'primary') {
// For mode 'primary', drivers MUST NOT set $readPreference.
// For all other read preference modes (i.e. 'secondary', 'primaryPreferred', ...),
// drivers MUST set $readPreference
cmd.$readPreference = readPreference.toJSON();
}
}

const commandOptions = {
Expand All @@ -389,8 +410,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
checkKeys: false,
// This value is not overridable
secondaryOk: readPreference.secondaryOk(),
...options,
readPreference // ensure we pass in ReadPreference instance
...options
};

const message = this.supportsOpMsg
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/stream_description.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export interface StreamDescriptionOptions {
/** @public */
export class StreamDescription {
address: string;
type: string;
type: ServerType;
minWireVersion?: number;
maxWireVersion?: number;
maxBsonObjectSize: number;
Expand Down
8 changes: 2 additions & 6 deletions src/cmap/wire_protocol/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@ export interface ReadPreferenceOption {
}

export function getReadPreference(options?: ReadPreferenceOption): ReadPreference {
// Default to command version of the readPreference
// Default to command version of the readPreference.
let readPreference = options?.readPreference ?? ReadPreference.primary;
// If we have an option readPreference override the command one
if (options?.readPreference) {
readPreference = options.readPreference;
}

if (typeof readPreference === 'string') {
readPreference = ReadPreference.fromString(readPreference);
Expand All @@ -43,7 +39,7 @@ export function isSharded(topologyOrServer?: Topology | Server | Connection): bo
}

// NOTE: This is incredibly inefficient, and should be removed once command construction
// happens based on `Server` not `Topology`.
// happens based on `Server` not `Topology`.
if (topologyOrServer.description && topologyOrServer.description instanceof TopologyDescription) {
const servers: ServerDescription[] = Array.from(topologyOrServer.description.servers.values());
return servers.some((server: ServerDescription) => server.type === ServerType.Mongos);
Expand Down
1 change: 1 addition & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ export interface MongoOptions
readPreference: ReadPreference;
readConcern: ReadConcern;
loadBalanced: boolean;
directConnection: boolean;
serverApi: ServerApi;
compressors: CompressorName[];
writeConcern: WriteConcern;
Expand Down
5 changes: 4 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}

// Clone the options
const finalOptions = Object.assign({}, options, { wireProtocolCommand: false });
const finalOptions = Object.assign({}, options, {
wireProtocolCommand: false,
directConnection: this.topology.s.options.directConnection
});

// There are cases where we need to flag the read preference not to get sent in
// the command, such as pre-5.0 servers attempting to perform an aggregate write
Expand Down
126 changes: 49 additions & 77 deletions test/integration/max-staleness/max_staleness.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('Max Staleness', function () {
// Primary server states
const serverIsPrimary = [Object.assign({}, defaultFields)];
server.setMessageHandler(request => {
var doc = request.document;
const doc = request.document;
if (isHello(doc)) {
request.reply(serverIsPrimary[0]);
return;
Expand Down Expand Up @@ -46,71 +46,53 @@ describe('Max Staleness', function () {
metadata: {
requires: {
generators: true,
topology: 'single'
topology: 'replicaset'
}
},

test: function (done) {
var self = this;
test: async function () {
const self = this;
const configuration = this.configuration;
const client = configuration.newClient(
`mongodb://${test.server.uri()}/test?readPreference=secondary&maxStalenessSeconds=250`,
{ serverApi: null } // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
);

client.connect(function (err, client) {
expect(err).to.not.exist;
var db = client.db(self.configuration.db);

db.collection('test')
.find({})
.toArray(function (err) {
expect(err).to.not.exist;
expect(test.checkCommand).to.containSubset({
$query: { find: 'test', filter: {} },
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});

client.close(done);
});
await client.connect();
const db = client.db(self.configuration.db);
await db.collection('test').find({}).toArray();
expect(test.checkCommand).to.containSubset({
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});
await client.close();
}
});

it('should correctly set maxStalenessSeconds on Mongos query using db level readPreference', {
metadata: {
requires: {
generators: true,
topology: 'single'
topology: 'replicaset'
}
},

test: function (done) {
test: async function () {
const configuration = this.configuration;
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
client.connect(function (err, client) {
expect(err).to.not.exist;

// Get a db with a new readPreference
var db1 = client.db('test', {
readPreference: new ReadPreference('secondary', null, { maxStalenessSeconds: 250 })
});
await client.connect();

db1
.collection('test')
.find({})
.toArray(function (err) {
expect(err).to.not.exist;
expect(test.checkCommand).to.containSubset({
$query: { find: 'test', filter: {} },
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});

client.close(done);
});
// Get a db with a new readPreference
const db1 = client.db('test', {
readPreference: new ReadPreference('secondary', null, { maxStalenessSeconds: 250 })
});
await db1.collection('test').find({}).toArray();
expect(test.checkCommand).to.containSubset({
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});
await client.close();
}
});

Expand All @@ -120,35 +102,31 @@ describe('Max Staleness', function () {
metadata: {
requires: {
generators: true,
topology: 'single'
topology: 'replicaset'
}
},

test: function (done) {
var self = this;
test: async function () {
const self = this;
const configuration = this.configuration;
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
client.connect(function (err, client) {
expect(err).to.not.exist;
var db = client.db(self.configuration.db);

// Get a db with a new readPreference
db.collection('test', {
await client.connect();
const db = client.db(self.configuration.db);

// Get a db with a new readPreference
await db
.collection('test', {
readPreference: new ReadPreference('secondary', null, { maxStalenessSeconds: 250 })
})
.find({})
.toArray(function (err) {
expect(err).to.not.exist;
expect(test.checkCommand).to.containSubset({
$query: { find: 'test', filter: {} },
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});

client.close(done);
});
.find({})
.toArray();
expect(test.checkCommand).to.containSubset({
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});
await client.close();
}
}
);
Expand All @@ -157,35 +135,29 @@ describe('Max Staleness', function () {
metadata: {
requires: {
generators: true,
topology: 'single'
topology: 'replicaset'
}
},

test: function (done) {
var self = this;
test: async function () {
const self = this;
const configuration = this.configuration;
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
client.connect(function (err, client) {
expect(err).to.not.exist;
var db = client.db(self.configuration.db);
var readPreference = new ReadPreference('secondary', null, { maxStalenessSeconds: 250 });

// Get a db with a new readPreference
db.collection('test')
.find({})
.withReadPreference(readPreference)
.toArray(function (err) {
expect(err).to.not.exist;
expect(test.checkCommand).to.containSubset({
$query: { find: 'test', filter: {} },
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});

client.close(done);
});
await client.connect();
const db = client.db(self.configuration.db);
const readPreference = new ReadPreference('secondary', null, { maxStalenessSeconds: 250 });

// Get a db with a new readPreference
await db.collection('test').find({}).withReadPreference(readPreference).toArray();

expect(test.checkCommand).to.containSubset({
$query: { find: 'test', filter: {} },
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});
await client.close();
}
});
});
7 changes: 1 addition & 6 deletions test/integration/run-command/run_command.spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,5 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

describe('RunCommand spec', () => {
runUnifiedSuite(loadSpecTests('run-command'), test => {
if (test.description === 'does not attach $readPreference to given command on standalone') {
return 'TODO(NODE-5263): Do not send $readPreference to standalone servers';
}
return false;
});
runUnifiedSuite(loadSpecTests('run-command'));
});
Loading

0 comments on commit 937c9c8

Please sign in to comment.