Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support a single change stream to listen to all op log events #2540

Open
wants to merge 9 commits into
base: development/8.6
Choose a base branch
from
41 changes: 20 additions & 21 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ const { mongoJoi } = require('../../lib/config/configItems.joi');
const ImmutableConnector = require('./allocationStrategy/ImmutableConnector');
const RetainBucketsDecorator = require('./allocationStrategy/RetainBucketsDecorator');
const LeastFullConnector = require('./allocationStrategy/LeastFullConnector');
const UniqueConnector = require('./allocationStrategy/UniqueConnector');
const WildcardPipelineFactory = require('./pipeline/WildcardPipelineFactory');
const MultipleBucketsPipelineFactory = require('./pipeline/MultipleBucketsPipelineFactory');

const paramsJoi = joi.object({
config: OplogPopulatorConfigJoiSchema.required(),
Expand Down Expand Up @@ -285,18 +288,8 @@ class OplogPopulator {
allocationStrategy: this._allocationStrategy,
logger: this._logger,
});
// For now, we always use the RetainBucketsDecorator
// so, we map the events from the classes
this._connectorsManager.on(constants.connectorUpdatedEvent, connector =>
this._allocationStrategy.onConnectorUpdatedOrDestroyed(connector));
this._allocator.on(constants.bucketRemovedFromConnectorEvent, (bucket, connector) =>
this._allocationStrategy.onBucketRemoved(bucket, connector));
this._connectorsManager.on(constants.connectorsReconciledEvent, bucketsExceedingLimit => {
this._metricsHandler.onConnectorsReconciled(
bucketsExceedingLimit,
this._allocationStrategy.retainedBucketsCount,
);
});
this._allocationStrategy.bindConnectorEvents(
this._connectorsManager, this._allocator, this._metricsHandler);
// get currently valid buckets from mongo
const validBuckets = await this._getBackbeatEnabledBuckets();
// listen to valid buckets
Expand Down Expand Up @@ -333,32 +326,38 @@ class OplogPopulator {
*/
initStrategy() {
let strategy;
if (this._arePipelinesImmutable()) {
let pipelineFactory;
if (this._config.numberOfConnectors === 0) {
// If the number of connector is set to 0, then we
// use a single connector to listen to the whole DB.
pipelineFactory = new WildcardPipelineFactory();
strategy = new UniqueConnector({
logger: this._logger,
pipelineFactory,
});
} else if (this._arePipelinesImmutable()) {
// In this case, mongodb does not support reusing a
// resume token from a different pipeline. In other
// words, we cannot alter an existing pipeline. In this
// case, the strategy is to allow a maximum of one
// bucket per kafka connector.
pipelineFactory = new MultipleBucketsPipelineFactory();
strategy = new ImmutableConnector({
logger: this._logger,
metricsHandler: this._metricsHandler,
connectorsManager: this._connectorsManager,
pipelineFactory,
});
} else {
// In this case, we can have multiple buckets per
// kafka connector. However, we want to proactively
// ensure that the pipeline will be accepted by
// mongodb.
pipelineFactory = new MultipleBucketsPipelineFactory();
strategy = new LeastFullConnector({
logger: this._logger,
metricsHandler: this._metricsHandler,
connectorsManager: this._connectorsManager,
pipelineFactory,
});
}
return new RetainBucketsDecorator(
strategy,
{ logger: this._logger },
);
return new RetainBucketsDecorator(strategy);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion extensions/oplogPopulator/OplogPopulatorConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const joiSchema = joi.object({
topic: joi.string().required(),
kafkaConnectHost: joi.string().required(),
kafkaConnectPort: joi.number().required(),
numberOfConnectors: joi.number().required().min(1),
numberOfConnectors: joi.number().required().min(0),
prefix: joi.string().optional(),
probeServer: probeServerJoi.default(),
connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'),
Expand Down
26 changes: 26 additions & 0 deletions extensions/oplogPopulator/allocationStrategy/AllocationStrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ class AllocationStrategy {
/**
* @constructor
* @param {Object} params params
* @param {PipelineFactory} params.pipelineFactory pipeline factory
* @param {Logger} params.logger logger object
*/
constructor(params) {
this._pipelineFactory = params.pipelineFactory;
this._logger = params.logger;
}

Expand Down Expand Up @@ -37,6 +39,30 @@ class AllocationStrategy {
get maximumBucketsPerConnector() {
throw errors.NotImplemented;
}

/**
* Getter for the pipeline factory
* @returns {PipelineFactory} pipeline factory
*/
get pipelineFactory() {
return this._pipelineFactory;
}

/**
* Process an old connector configuration, and return
* the list of buckets if the bucket list is valid against
* the current pipeline factory.
* @param {Object} oldConfig old configuration
* @returns {string[] | null} old configuration if valid
*/
getOldConnectorBucketList(oldConfig) {
const bucketList = this.pipelineFactory.extractBucketsFromConfig(oldConfig);
if (this.pipelineFactory.isValid(bucketList)) {
return bucketList;
}
return null;
}

}

module.exports = AllocationStrategy;
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const constants = require('../constants');
const AllocationStrategy = require('./AllocationStrategy');

/**
Expand All @@ -11,18 +12,35 @@ class RetainBucketsDecorator extends AllocationStrategy {
/**
* @constructor
* @param {AllocationStrategy} strategy the strategy to decorate
* @param {Object} params params
* @param {Logger} params.logger logger object
*/
constructor(strategy, params) {
super(params);
constructor(strategy) {
super({
logger: strategy._logger,
pipelineFactory: strategy._pipelineFactory,
});
this._strategy = strategy;

// Stores buckets that should be removed from the connector
// but still in use
this._retainedBuckets = new Map();
}

bindConnectorEvents(connectorsManager, allocator, metricsHandler) {
// Bind events from the connector manager to the strategy
connectorsManager.on(constants.connectorUpdatedEvent, connector =>
this.onConnectorUpdatedOrDestroyed(connector));

allocator.on(constants.bucketRemovedFromConnectorEvent, (bucket, connector) =>
this.onBucketRemoved(bucket, connector));

connectorsManager.on(constants.connectorsReconciledEvent, bucketsExceedingLimit => {
metricsHandler.onConnectorsReconciled(
bucketsExceedingLimit,
this.retainedBucketsCount,
);
});
}

/**
* Get the number of retained buckets
* @returns {Number} number of retained buckets
Expand Down
44 changes: 44 additions & 0 deletions extensions/oplogPopulator/allocationStrategy/UniqueConnector.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
const constants = require('../constants');
const AllocationStrategy = require('./AllocationStrategy');

/**
* @class UniqueConnector
*
* @classdesc UniqueConnector is an allocation
* strategy where each bucket is assigned to a unique
* connector.
*/
class UniqueConnector extends AllocationStrategy {

/**
* Get best connector to assign a bucket to.
* If no connector is available, null is returned.
* @param {Array<Connector>} connectors connectors
* @param {String} bucket bucket name
* @returns {Connector | null} connector
*/
getConnector(connectors, bucket) { // eslint-disable-line no-unused-vars
if (!connectors.length) {
return null;
}
return connectors.find(conn => conn.config.pipeline?.includes(constants.wildCardForAllBuckets)) || null;
}

/**
* Assess if a pipeline can be updated.
* @returns {false} false
*/
canUpdate() {
return false;
}

/**
* Getter for the maximum number of buckets per connector
* @returns {Number} maximum number of buckets per connector
*/
get maximumBucketsPerConnector() {
return 1;
}
}

module.exports = UniqueConnector;
1 change: 1 addition & 0 deletions extensions/oplogPopulator/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const constants = {
// to 16 MB (16777216B) / 64 (max length of a bucket name) ~= 260000
maxBucketsPerConnector: 260000,
mongodbVersionWithImmutablePipelines: '6.0.0',
wildCardForAllBuckets: '*',
connectorUpdatedEvent: 'connector-updated',
bucketRemovedFromConnectorEvent: 'bucket-removed',
connectorsReconciledEvent: 'connectors-reconciled',
Expand Down
24 changes: 4 additions & 20 deletions extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const connectorParams = joi.object({
name: joi.string().required(),
config: joi.object().required(),
buckets: joi.array().required(),
getPipeline: joi.func().required(),
isRunning: joi.boolean().required(),
logger: joi.object().required(),
kafkaConnectHost: joi.string().required(),
Expand All @@ -33,6 +34,7 @@ class Connector extends EventEmitter {
* @param {Object} params.config Kafka-connect MongoDB source
* connector config
* @param {string[]} params.buckets buckets assigned to this connector
* @param {boolean} params.getPipeline callback to get connector pipeline
* @param {Logger} params.logger logger object
* @param {string} params.kafkaConnectHost kafka connect host
* @param {number} params.kafkaConnectPort kafka connect port
Expand All @@ -44,6 +46,7 @@ class Connector extends EventEmitter {
this._config = params.config;
this._buckets = new Set(params.buckets);
this._isRunning = params.isRunning;
this._getPipeline = params.getPipeline;
this._state = {
// Used to check if buckets assigned to this connector
// got modified from the last connector update
Expand Down Expand Up @@ -276,25 +279,6 @@ class Connector extends EventEmitter {
}
}

/**
* Makes new connector pipeline that includes
* buckets assigned to this connector
* @param {string[]} buckets list of bucket names
* @returns {string} new connector pipeline
*/
_generateConnectorPipeline(buckets) {
const pipeline = [
{
$match: {
'ns.coll': {
$in: buckets,
}
}
}
];
return JSON.stringify(pipeline);
}

/**
* Handles updating the values of _bucketsGotModified
* @param {boolean} bucketsGotModified value of _state.bucketsGotModified
Expand Down Expand Up @@ -338,7 +322,7 @@ class Connector extends EventEmitter {
if (!this._state.bucketsGotModified || this._state.isUpdating) {
return false;
}
this._config.pipeline = this._generateConnectorPipeline([...this._buckets]);
this._config.pipeline = this._getPipeline([...this._buckets]);
try {
if (doUpdate && this._isRunning) {
const timeBeforeUpdate = Date.now();
Expand Down
47 changes: 23 additions & 24 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class ConnectorsManager extends EventEmitter {
name: connectorName,
config,
buckets: [],
getPipeline: this._allocationStrategy.pipelineFactory.getPipeline,
isRunning: false,
logger: this._logger,
kafkaConnectHost: this._kafkaConnectHost,
Expand All @@ -134,33 +135,29 @@ class ConnectorsManager extends EventEmitter {
return connector;
}

/**
* Extracts buckets from a connector config pipeline
* @param {Object} connectorConfig connector config
* @returns {string[]} list of buckets
*/
_extractBucketsFromConfig(connectorConfig) {
const pipeline = connectorConfig.pipeline ?
JSON.parse(connectorConfig.pipeline) : null;
if (!pipeline || pipeline.length === 0) {
return [];
}
return pipeline[0].$match['ns.coll'].$in;
}

/**
* Gets old connector configs and initializes connector
* instances
* instances. Deletes connectors as needed if the strategy
* rejects them.
* @param {string[]} connectorNames connector names
* @returns {Promise|Connector[]} list of connectors
*/
async _getOldConnectors(connectorNames) {
async _processOldConnectors(connectorNames) {
try {
const connectors = await Promise.all(connectorNames.map(async connectorName => {
// get old connector config
const oldConfig = await this._kafkaConnect.getConnectorConfig(connectorName);
// extract buckets from old connector config
const buckets = this._extractBucketsFromConfig(oldConfig);
const buckets = this._allocationStrategy.getOldConnectorBucketList(oldConfig);
if (!buckets) {
await this._kafkaConnect.deleteConnector(connectorName);
this._logger.warn('Removed old connector', {
method: 'ConnectorsManager._processOldConnectors',
connector: connectorName,
oldConfig,
});
return undefined;
}
// generating a new config as the old config can be outdated (wrong topic for example)
const config = this._getDefaultConnectorConfiguration(connectorName);
// initializing connector
Expand All @@ -170,33 +167,35 @@ class ConnectorsManager extends EventEmitter {
// added manually like 'offset.topic.name'
config: { ...oldConfig, ...config },
buckets,
getPipeline: this._allocationStrategy.pipelineFactory.getPipeline,
isRunning: true,
logger: this._logger,
kafkaConnectHost: this._kafkaConnectHost,
kafkaConnectPort: this._kafkaConnectPort,
});
if (buckets.length > this._allocationStrategy.maximumBucketsPerConnector) {
this._logger.warn('Connector has more bucket than expected', {
method: 'ConnectorsManager._getOldConnectors',
method: 'ConnectorsManager._processOldConnectors',
connector: connector.name,
numberOfBuckets: buckets.length,
allowed: this._allocationStrategy.maximumBucketsPerConnector,
});
}
this._logger.debug('Successfully retreived old connector', {
method: 'ConnectorsManager._getOldConnectors',
method: 'ConnectorsManager._processOldConnectors',
connector: connector.name
});
return connector;
}));
const validConnectors = connectors.filter(c => !!c);
this._logger.info('Successfully retreived old connectors', {
method: 'ConnectorsManager._getOldConnectors',
numberOfConnectors: connectors.length
method: 'ConnectorsManager._processOldConnectors',
numberOfConnectors: validConnectors.length,
});
return connectors;
return validConnectors;
} catch (err) {
this._logger.error('An error occurred while getting old connectors', {
method: 'ConnectorsManager._getOldConnectors',
method: 'ConnectorsManager._processOldConnectors',
error: err.description || err.message,
});
throw errors.InternalError.customizeDescription(err.description);
Expand All @@ -214,7 +213,7 @@ class ConnectorsManager extends EventEmitter {
// get and initialize old connectors
const oldConnectorNames = await this._kafkaConnect.getConnectors();
if (oldConnectorNames) {
const oldConnectors = await this._getOldConnectors(oldConnectorNames);
const oldConnectors = await this._processOldConnectors(oldConnectorNames);
this._connectors.push(...oldConnectors);
this._oldConnectors.push(...oldConnectors);
this._metricsHandler.onConnectorsInstantiated(true, oldConnectors.length);
Expand Down
Loading
Loading