Skip to content

Commit

Permalink
Support using a single change stream
Browse files Browse the repository at this point in the history
- Listen to all collections but the special ones and the
  mpu shadow buckets.
- Adapt the existing logic to handle a unique connector.

Issue: BB-602
  • Loading branch information
williamlardier committed Sep 13, 2024
1 parent f0a136a commit 4c761c4
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 4 deletions.
6 changes: 5 additions & 1 deletion extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ class OplogPopulator {
// initialize mongo client
await this._setupMongoClient();

if (this._isPipelineImmutable()) {
if (this._config.numberOfConnectors === 0) {
// If the number of connector is set to 0, then we
// use a single connector to listen to the whole DB.
this._maximumBucketsPerConnector = Infinity;
} else if (this._isPipelineImmutable()) {
// In this case, mongodb does not support reusing a
// resume token from a different pipeline. In other
// words, we cannot alter an existing pipeline. In this
Expand Down
1 change: 1 addition & 0 deletions extensions/oplogPopulator/modules/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const paramsJoi = joi.object({
connectorsManager: joi.object().required(),
maximumBucketsPerConnector: joi.alternatives().try(
joi.number().integer(),
joi.any().valid(Infinity),
).default(constants.maxBucketPerConnector),
metricsHandler: joi.object()
.instance(OplogPopulatorMetrics).required(),
Expand Down
20 changes: 18 additions & 2 deletions extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const joi = require('joi');
const uuid = require('uuid');
const { errors } = require('arsenal');
const { errors, constants } = require('arsenal');
const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper');
const oplogPopulatorConstants = require('../constants');

Expand All @@ -14,6 +14,7 @@ const connectorParams = joi.object({
kafkaConnectPort: joi.number().required(),
maximumBucketsPerConnector: joi.alternatives().try(
joi.number().integer(),
joi.any().valid(Infinity),
).default(oplogPopulatorConstants.maxBucketPerConnector),
});

Expand Down Expand Up @@ -311,11 +312,26 @@ class Connector {

/**
* Makes new connector pipeline that includes
* buckets assigned to this connector.
* buckets assigned to this connector. If the
* max number of buckets per connector is infinite,
* we listen to all non-special collections.
* @param {string[]} buckets buckets assigned to this connector
* @returns {string} new connector pipeline
*/
_generateConnectorPipeline(buckets) {
if (this._maximumBucketsPerConnector === Infinity) {
return JSON.stringify([
{
$match: {
'ns.coll': {
$not: {
$regex: `^(${constants.mpuBucketPrefix}|__).*`,
},
}
}
}
]);
}
const pipeline = [
{
$match: {
Expand Down
7 changes: 7 additions & 0 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const paramsJoi = joi.object({
kafkaConnectPort: joi.number().required(),
maximumBucketsPerConnector: joi.alternatives().try(
joi.number().integer(),
joi.any().valid(Infinity),
).default(constants.maxBucketPerConnector),
metricsHandler: joi.object()
.instance(OplogPopulatorMetrics).required(),
Expand Down Expand Up @@ -220,6 +221,12 @@ class ConnectorsManager {
}
// Add connectors if required number of connectors not reached
const nbConnectorsToAdd = this._nbConnectors - this._connectors.length;
if (nbConnectorsToAdd > 0 && this._maximumBucketsPerConnector === Infinity) {
this._logger.warn('Single change stream is enabled but multiple connectors are running', {
method: 'ConnectorsManager.initializeConnectors',
numberOfActiveConnectors: this._connectors.length,
});
}
for (let i = 0; i < nbConnectorsToAdd; i++) {
this.addConnector();
}
Expand Down
22 changes: 21 additions & 1 deletion tests/unit/oplogPopulator/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const werelogs = require('werelogs');

const Connector =
require('../../../extensions/oplogPopulator/modules/Connector');
const { errors } = require('arsenal');
const { errors, constants } = require('arsenal');

const logger = new werelogs.Logger('Connector');

Expand Down Expand Up @@ -190,6 +190,26 @@ describe('Connector', () => {
}
]));
});

it('should listen to everything if the single change stream mode is set', () => {
connector = new Connector({
...baseConnectorOptions,
maximumBucketsPerConnector: Infinity,
});

const pipeline = connector._generateConnectorPipeline();
assert.strictEqual(pipeline, JSON.stringify([
{
$match: {
'ns.coll': {
$not: {
$regex: `^(${constants.mpuBucketPrefix}|__).*`,
},
}
}
}
]));
});
});

describe('_updateConnectorState', () => {
Expand Down
27 changes: 27 additions & 0 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ describe('ConnectorsManager', () => {
sinon.reset();
});

it('should set the number of connectors to 1 if the single change stream mode is set', () => {
const options = {
...connectorsManagerBaseOptions,
maximumBucketsPerConnector: Infinity,
};

const manager = new ConnectorsManager(options);
assert.strictEqual(manager._nbConnectors, 1);
});

describe('_getDefaultConnectorConfiguration', () => {
it('should return default configuration', () => {
const config = connectorsManager._getDefaultConnectorConfiguration(
Expand Down Expand Up @@ -168,6 +178,8 @@ describe('ConnectorsManager', () => {
});

describe('_getOldConnectors', () => {
afterEach(() => sinon.restore());

it('Should update connector config while keeping the extra fields', async () => {
const config = { ...connectorConfig };
config['topic.namespace.map'] = 'outdated-topic';
Expand Down Expand Up @@ -224,6 +236,21 @@ describe('ConnectorsManager', () => {
assert.deepEqual(connectorsManager._connectors, [connector1]);
assert.deepEqual(connectorsManager._oldConnectors, []);
});

it('should warn if the number of connector is higher than 1 with the single change stream mode', async () => {
connectorsManager._nbConnectors = 2;
connectorsManager._maximumBucketsPerConnector = Infinity;
sinon.stub(connectorsManager._kafkaConnect, 'getConnectors')
.resolves([]);
sinon.stub(connectorsManager, 'addConnector')
.callsFake(() => {
connectorsManager._connectors.push(connector1);
return connector1;
});
const warnStub = sinon.stub(logger, 'warn');
await connectorsManager.initializeConnectors();
assert(warnStub.calledOnce);
});
});

describe('_spawnOrDestroyConnector', () => {
Expand Down
14 changes: 14 additions & 0 deletions tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,19 @@ describe('LeastFullConnector', () => {
const connector = strategy.getConnector([connector2]);
assert.strictEqual(connector, null);
});

it('Should return connector with fewest buckets (single stream case)', () => {
strategy = new LeastFullConnector({
maximumBucketsPerConnector: Infinity,
addConnector: () => new Connector({
name: 'example-connector-3',
buckets: [],
...defaultConnectorParams,
}),
logger,
});
const connector = strategy.getConnector([connector1, connector2]);
assert.strictEqual(connector.name, connector1.name);
});
});
});
16 changes: 16 additions & 0 deletions tests/unit/oplogPopulator/oplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,22 @@ describe('OplogPopulator', () => {
assert(initializeConnectorsManagerStub.calledOnce);
});

it('should setup oplog populator with single change stream', async () => {
const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves();
const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream');
const initializeConnectorsManagerStub = sinon.stub(oplogPopulator, '_initializeConnectorsManager');
const getBackbeatEnabledBucketsStub = sinon.stub(oplogPopulator, '_getBackbeatEnabledBuckets').resolves([]);

oplogPopulator._config.maximumBucketsPerConnector = Infinity;

await oplogPopulator.setup();

assert(setupMongoClientStub.calledOnce);
assert(getBackbeatEnabledBucketsStub.calledOnce);
assert(setMetastoreChangeStreamStub.calledOnce);
assert(initializeConnectorsManagerStub.calledOnce);
});

it('should setup oplog populator with immutable pipelines', async () => {
const setupMongoClientStub = sinon.stub(oplogPopulator, '_setupMongoClient').resolves();
const setMetastoreChangeStreamStub = sinon.stub(oplogPopulator, '_setMetastoreChangeStream');
Expand Down

0 comments on commit 4c761c4

Please sign in to comment.