diff --git a/lib/operations/aggregate.js b/lib/operations/aggregate.js index 57be1c8bb4..e99015a128 100644 --- a/lib/operations/aggregate.js +++ b/lib/operations/aggregate.js @@ -11,6 +11,8 @@ const toError = require('../utils').toError; const DB_AGGREGATE_COLLECTION = 1; +const MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT = 8; + /** * Perform an aggregate operation. See Collection.prototype.aggregate or Db.prototype.aggregate for more information. * @@ -51,8 +53,9 @@ function aggregate(db, coll, pipeline, options, callback) { } const takesWriteConcern = topology.capabilities().commandsTakeWriteConcern; + const ismaster = topology.lastIsMaster() || {}; - if (!hasOutStage) { + if (!hasOutStage || ismaster.maxWireVersion >= MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) { decorateWithReadConcern(command, target, options); } diff --git a/test/functional/readconcern_tests.js b/test/functional/readconcern_tests.js index 601fbb7241..63b03866a4 100644 --- a/test/functional/readconcern_tests.js +++ b/test/functional/readconcern_tests.js @@ -435,7 +435,7 @@ describe('ReadConcern', function() { }); it('Should set majority readConcern aggregate command but ignore due to out', { - metadata: { requires: { topology: 'replicaset', mongodb: '>= 3.2' } }, + metadata: { requires: { topology: 'replicaset', mongodb: '>= 3.2 < 4.1' } }, test: function(done) { var listener = require('../..').instrument(function(err) { @@ -500,6 +500,64 @@ describe('ReadConcern', function() { } }); + it('Should set majority readConcern aggregate command against server >= 4.1', { + metadata: { requires: { topology: 'replicaset', mongodb: '>= 4.1' } }, + + test: function(done) { + // Contains all the apm events + const started = []; + const succeeded = []; + // Get a new instance + const client = this.configuration.newClient( + { w: 1 }, + { poolSize: 1, readConcern: { level: 'majority' }, monitorCommands: true } + ); + + client + .connect() + .then(() => { + // Get a collection + const collection = client + .db(this.configuration.db) + .collection('readConcernCollectionAggregate1'); + + // Listen to apm events + client.on('commandStarted', event => { + if (event.commandName === 'aggregate') started.push(event); + }); + client.on('commandSucceeded', event => { + if (event.commandName === 'aggregate') succeeded.push(event); + }); + + // Execute find + return collection + .aggregate([{ $match: {} }, { $out: 'readConcernCollectionAggregate1Output' }]) + .toArray() + .then(() => { + expect(started).to.have.a.lengthOf(1); + expect(started[0]).to.have.property('commandName', 'aggregate'); + expect(succeeded[0]).to.have.property('commandName', 'aggregate'); + expect(started[0]).to.have.nested.property('command.readConcern.level', 'majority'); + + // Execute find + return collection + .aggregate([{ $match: {} }], { out: 'readConcernCollectionAggregate2Output' }) + .toArray() + .then(() => { + expect(started).to.have.a.lengthOf(2); + expect(started[1]).to.have.property('commandName', 'aggregate'); + expect(succeeded[1]).to.have.property('commandName', 'aggregate'); + expect(started[1]).to.have.nested.property( + 'command.readConcern.level', + 'majority' + ); + }); + }); + }) + .then(() => client.close(done), e => client.close(() => done(e))); + } + }); + it('Should set majority readConcern mapReduce command but be ignored', { metadata: { requires: { topology: 'replicaset', mongodb: '>= 3.2' } },