diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index 5f4a56994..0bd83cab9 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -243,6 +243,9 @@ class OplogPopulator { } _isPipelineImmutable() { + if (process.env.FORCE_MONGO6) { + return true; + } return semver.gte(this._mongoVersion, constants.mongodbVersionWithImmutablePipelines); } @@ -261,7 +264,11 @@ class OplogPopulator { // initialize mongo client await this._setupMongoClient(); - if (this._isPipelineImmutable()) { + if (this._config.numberOfConnectors === 0) { + // If the number of connector is set to 0, then we + // use a single connector to listen to the whole DB. + this._maximumBucketsPerConnector = Infinity; + } else if (this._isPipelineImmutable()) { // In this case, mongodb does not support reusing a // resume token from a different pipeline. In other // words, we cannot alter an existing pipeline. In this diff --git a/extensions/oplogPopulator/modules/Allocator.js b/extensions/oplogPopulator/modules/Allocator.js index e43cc5b4d..85a0dfeb7 100644 --- a/extensions/oplogPopulator/modules/Allocator.js +++ b/extensions/oplogPopulator/modules/Allocator.js @@ -10,6 +10,7 @@ const paramsJoi = joi.object({ connectorsManager: joi.object().required(), maximumBucketsPerConnector: joi.alternatives().try( joi.number().integer(), + joi.any().valid(Infinity), ).default(constants.maxBucketPerConnector), metricsHandler: joi.object() .instance(OplogPopulatorMetrics).required(), diff --git a/extensions/oplogPopulator/modules/Connector.js b/extensions/oplogPopulator/modules/Connector.js index 9ea3e5ecd..384e4460e 100644 --- a/extensions/oplogPopulator/modules/Connector.js +++ b/extensions/oplogPopulator/modules/Connector.js @@ -1,6 +1,6 @@ const joi = require('joi'); const uuid = require('uuid'); -const { errors } = require('arsenal'); +const { errors, constants } = require('arsenal'); const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper'); const oplogPopulatorConstants = require('../constants'); @@ -14,6 +14,7 @@ const connectorParams = joi.object({ kafkaConnectPort: joi.number().required(), maximumBucketsPerConnector: joi.alternatives().try( joi.number().integer(), + joi.any().valid(Infinity), ).default(oplogPopulatorConstants.maxBucketPerConnector), }); @@ -311,11 +312,26 @@ class Connector { /** * Makes new connector pipeline that includes - * buckets assigned to this connector. + * buckets assigned to this connector. If the + * max number of buckets per connector is infinite, + * we listen to all non-special collections. * @param {string[]} buckets buckets assigned to this connector * @returns {string} new connector pipeline */ _generateConnectorPipeline(buckets) { + if (this._maximumBucketsPerConnector === Infinity) { + return JSON.stringify([ + { + $match: { + 'ns.coll': { + $not: { + $regex: `^(${constants.mpuBucketPrefix}|__).*`, + }, + } + } + } + ]); + } const pipeline = [ { $match: { diff --git a/extensions/oplogPopulator/modules/ConnectorsManager.js b/extensions/oplogPopulator/modules/ConnectorsManager.js index f5a163ef5..5ee1f341b 100644 --- a/extensions/oplogPopulator/modules/ConnectorsManager.js +++ b/extensions/oplogPopulator/modules/ConnectorsManager.js @@ -22,6 +22,7 @@ const paramsJoi = joi.object({ kafkaConnectPort: joi.number().required(), maximumBucketsPerConnector: joi.alternatives().try( joi.number().integer(), + joi.any().valid(Infinity), ).default(constants.maxBucketPerConnector), metricsHandler: joi.object() .instance(OplogPopulatorMetrics).required(), @@ -220,6 +221,12 @@ class ConnectorsManager { } // Add connectors if required number of connectors not reached const nbConnectorsToAdd = this._nbConnectors - this._connectors.length; + if (nbConnectorsToAdd > 0 && this._maximumBucketsPerConnector === Infinity) { + this._logger.warn('Single change stream is enabled but multiple connectors are running', { + method: 'ConnectorsManager.initializeConnectors', + numberOfActiveConnectors: this._connectors.length, + }); + } for (let i = 0; i < nbConnectorsToAdd; i++) { this.addConnector(); } diff --git a/tests/unit/oplogPopulator/Connector.js b/tests/unit/oplogPopulator/Connector.js index 1acdb1c27..835aef82d 100644 --- a/tests/unit/oplogPopulator/Connector.js +++ b/tests/unit/oplogPopulator/Connector.js @@ -4,7 +4,7 @@ const werelogs = require('werelogs'); const Connector = require('../../../extensions/oplogPopulator/modules/Connector'); -const { errors } = require('arsenal'); +const { errors, constants } = require('arsenal'); const logger = new werelogs.Logger('Connector'); @@ -190,6 +190,26 @@ describe('Connector', () => { } ])); }); + + it('should listen to everything if the single change stream mode is set', () => { + connector = new Connector({ + ...baseConnectorOptions, + maximumBucketsPerConnector: Infinity, + }); + + const pipeline = connector._generateConnectorPipeline(); + assert.strictEqual(pipeline, JSON.stringify([ + { + $match: { + 'ns.coll': { + $not: { + $regex: `^(${constants.mpuBucketPrefix}|__).*`, + }, + } + } + } + ])); + }); }); describe('_updateConnectorState', () => { diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 4a524a046..b02284d6b 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -108,6 +108,16 @@ describe('ConnectorsManager', () => { sinon.reset(); }); + it('should set the number of connectors to 1 if the single change stream mode is set', () => { + const options = { + ...connectorsManagerBaseOptions, + maximumBucketsPerConnector: Infinity, + }; + + const manager = new ConnectorsManager(options); + assert.strictEqual(manager._nbConnectors, 1); + }); + describe('_getDefaultConnectorConfiguration', () => { it('should return default configuration', () => { const config = connectorsManager._getDefaultConnectorConfiguration( @@ -168,6 +178,8 @@ describe('ConnectorsManager', () => { }); describe('_getOldConnectors', () => { + afterEach(() => sinon.restore()); + it('Should update connector config while keeping the extra fields', async () => { const config = { ...connectorConfig }; config['topic.namespace.map'] = 'outdated-topic'; @@ -224,6 +236,21 @@ describe('ConnectorsManager', () => { assert.deepEqual(connectorsManager._connectors, [connector1]); assert.deepEqual(connectorsManager._oldConnectors, []); }); + + it('should warn if the number of connector is higher than 1 with the single change stream mode', async () => { + connectorsManager._nbConnectors = 2; + connectorsManager._maximumBucketsPerConnector = Infinity; + sinon.stub(connectorsManager._kafkaConnect, 'getConnectors') + .resolves([]); + sinon.stub(connectorsManager, 'addConnector') + .callsFake(() => { + connectorsManager._connectors.push(connector1); + return connector1; + }); + const warnStub = sinon.stub(logger, 'warn'); + await connectorsManager.initializeConnectors(); + assert(warnStub.calledOnce); + }); }); describe('_spawnOrDestroyConnector', () => { diff --git a/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js b/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js index 29f92f99d..f90d95521 100644 --- a/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js +++ b/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js @@ -50,5 +50,19 @@ describe('LeastFullConnector', () => { const connector = strategy.getConnector([connector2]); assert.strictEqual(connector, null); }); + + it('Should return connector with fewest buckets (single stream case)', () => { + strategy = new LeastFullConnector({ + maximumBucketsPerConnector: Infinity, + addConnector: () => new Connector({ + name: 'example-connector-3', + buckets: [], + ...defaultConnectorParams, + }), + logger, + }); + const connector = strategy.getConnector([connector1, connector2]); + assert.strictEqual(connector.name, connector1.name); + }); }); }); diff --git a/tests/unit/oplogPopulator/oplogPopulator.js b/tests/unit/oplogPopulator/oplogPopulator.js index c39ff3d5d..0dbbc42d4 100644 --- a/tests/unit/oplogPopulator/oplogPopulator.js +++ b/tests/unit/oplogPopulator/oplogPopulator.js @@ -155,6 +155,22 @@ describe('OplogPopulator', () => { assert(initializeConnectorsManagerStub.calledOnce); }); + it('should setup oplog populator with single change stream', async () => { + const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves(); + const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream'); + const initializeConnectorsManagerStub = sinon.stub(oplogPopulator, '_initializeConnectorsManager'); + const getBackbeatEnabledBucketsStub = sinon.stub(oplogPopulator, '_getBackbeatEnabledBuckets').resolves([]); + + oplogPopulator._config.maximumBucketsPerConnector = Infinity; + + await oplogPopulator.setup(); + + assert(setupMongoClientStub.calledOnce); + assert(getBackbeatEnabledBucketsStub.calledOnce); + assert(setMetastoreChangeStreamStub.calledOnce); + assert(initializeConnectorsManagerStub.calledOnce); + }); + it('should setup oplog populator with immutable pipelines', async () => { const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves(); const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream');