Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
williamlardier committed Sep 10, 2024
1 parent 1d94901 commit 45a2e59
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 42 deletions.
75 changes: 53 additions & 22 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class OplogPopulator {
this._logger = params.logger;
this._changeStreamWrapper = null;
this._allocator = null;
this._connectorsManager = null;
this._connectorsManager = null;
// contains OplogPopulatorUtils class of each supported extension
this._extHelpers = {};
// MongoDB related
Expand All @@ -78,9 +78,9 @@ class OplogPopulator {
async _setupMongoClient() {
try {
const client = await MongoClient.connect(this._mongoUrl, {
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
replicaSet: this._replicaSet,
useNewUrlParser: true,
useUnifiedTopology: true,
});
// connect to metadata DB
this._mongoClient = client.db(this._database, {
Expand Down Expand Up @@ -242,35 +242,66 @@ class OplogPopulator {
this._changeStreamWrapper.start();
}

_isPipelineImmutable() {
return semver.gte(this._mongoVersion, constants.mongodbVersionWithImmutablePipelines);

Check warning on line 246 in extensions/oplogPopulator/OplogPopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/OplogPopulator.js#L246

Added line #L246 was not covered by tests
}

/**
* Sets the OplogPopulator
* @returns {Promise|undefined} undefined
* @throws {InternalError}
*/
async setup() {
try {
this._loadOplogHelperClasses();
this._connectorsManager = new ConnectorsManager({
nbConnectors: this._config.numberOfConnectors,
database: this._database,
mongoUrl: this._mongoUrl,
oplogTopic: this._config.topic,
cronRule: this._config.connectorsUpdateCronRule,
prefix: this._config.prefix,
heartbeatIntervalMs: this._config.heartbeatIntervalMs,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
logger: this._logger,
try {
this._loadOplogHelperClasses();

Check warning on line 256 in extensions/oplogPopulator/OplogPopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/OplogPopulator.js#L255-L256

Added lines #L255 - L256 were not covered by tests
// initialize mongo client
await this._setupMongoClient();

Check warning on line 258 in extensions/oplogPopulator/OplogPopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/OplogPopulator.js#L258

Added line #L258 was not covered by tests

if (this._isPipelineImmutable()) {

Check warning on line 260 in extensions/oplogPopulator/OplogPopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/OplogPopulator.js#L260

Added line #L260 was not covered by tests
// In this case, mongodb does not support reusing a
// resume token from a different pipeline. In other
// words, we cannot alter an existing pipeline. In this
// case, the strategy is to allow a maximum of one
// bucket per kafka connector.
this._maximumBucketsPerConnector = 1;

Check warning on line 266 in extensions/oplogPopulator/OplogPopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/OplogPopulator.js#L266

Added line #L266 was not covered by tests
} else {
// In this case, we can have multiple buckets per
// kafka connector. However, we want to proactively
// ensure that the pipeline will be accepted by
// mongodb.
this._maximumBucketsPerConnector = constants.maxBucketPerConnector;

Check warning on line 272 in extensions/oplogPopulator/OplogPopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/OplogPopulator.js#L272

Added line #L272 was not covered by tests
}
// If the flag useSingleChangeStream is set to true, we
// set the max number to infinity, and the number of connectors
// to 1.
if (this._config.singleChangeStream) {
this._maximumBucketsPerConnector = Infinity;
this._config.numberOfConnectors = 1;

Check warning on line 279 in extensions/oplogPopulator/OplogPopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/OplogPopulator.js#L277-L279

Added lines #L277 - L279 were not covered by tests
}

this._connectorsManager = new ConnectorsManager({

Check warning on line 282 in extensions/oplogPopulator/OplogPopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/OplogPopulator.js#L282

Added line #L282 was not covered by tests
nbConnectors: this._config.numberOfConnectors,
singleChangeStream: this._config.singleChangeStream,
maximumBucketsPerConnector: this._maximumBucketsPerConnector,
isPipelineImmutable: this._isPipelineImmutable(),
database: this._database,
mongoUrl: this._mongoUrl,
oplogTopic: this._config.topic,
cronRule: this._config.connectorsUpdateCronRule,
prefix: this._config.prefix,
heartbeatIntervalMs: this._config.heartbeatIntervalMs,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
logger: this._logger,
});
await this._connectorsManager.initializeConnectors();
this._allocator = new Allocator({
connectorsManager: this._connectorsManager,
metricsHandler: this._metricsHandler,
maximumBucketsPerConnector: this._maximumBucketsPerConnector,
logger: this._logger,
});
// initialize mongo client
await this._setupMongoClient();
// get currently valid buckets from mongo
const validBuckets = await this._getBackbeatEnabledBuckets();
// listen to valid buckets
Expand All @@ -291,13 +322,13 @@ class OplogPopulator {
this._logger.info('OplogPopulator setup complete', {
method: 'OplogPopulator.setup',
});
} catch (err) {
} catch (err) {
this._logger.error('An error occured when setting up the OplogPopulator', {
method: 'OplogPopulator.setup',
error: err.description || err.message,
});
throw errors.InternalError.customizeDescription(err.description);
}
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions extensions/oplogPopulator/OplogPopulatorConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const joiSchema = joi.object({
probeServer: probeServerJoi.default(),
connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'),
heartbeatIntervalMs: joi.number().default(10000),
singleChangeStream: joi.boolean().default(false),
});

function configValidator(backbeatConfig, extConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ class AllocationStrategy {

/**
* @constructor
* @param {Logger} logger logger object
* @param {Object} params params
* @param {Number} params.maximumBucketsPerConnector maximum number of buckets per connector
* @param {Function} params.addConnector function to add a connector
* @param {Logger} params.logger logger object
*/
constructor(logger) {
this._logger = logger;
constructor(params) {
this._logger = params.logger;
this._maximumBucketsPerConnector = params.maximumBucketsPerConnector;
this._addConnector = params.addConnector.bind(this);
}

/**
Expand Down
20 changes: 8 additions & 12 deletions extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,22 @@ const AllocationStrategy = require('./AllocationStrategy');
* @classdesc LeastFullConnector is an allocation
* strategy that assigns buckets to connectors based
* on the number of buckets assigned to each connector.
* Connectors with the fewest buckets are filled first
* Connectors with the fewest buckets are filled first.
* If a connector reached the maximum number of buckets,
* a new connector is created.
*/
class LeastFullConnector extends AllocationStrategy {

/**
* @constructor
* @param {Object} params params
* @param {Logger} params.logger logger object
*/
constructor(params) {
super(params.logger);
}

/**
* Get best connector for assigning a bucket
* @param {Connector[]} connectors available connectors
* @returns {Connector} connector
*/
getConnector(connectors) {
return connectors.reduce((prev, elt) => (elt.bucketCount < prev.bucketCount ? elt : prev));
const connector = connectors.reduce((prev, elt) => (elt.bucketCount < prev.bucketCount ? elt : prev));
if (connector.buckets.length >= this._maximumBucketsPerConnector) {
return this._addConnector();

Check warning on line 22 in extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/allocationStrategy/LeastFullConnector.js#L22

Added line #L22 was not covered by tests
}
return connector;
}
}

Expand Down
5 changes: 5 additions & 0 deletions extensions/oplogPopulator/constants.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
const constants = {
bucketMetastore: '__metastore',
defaultConnectorName: 'source-connector',
// Max length in a pipeline is equal to the MongoDB BSON max document size,
// so 16MB. To allow for other parameters in the pipeline, we round the max
// to 16 MB (16777216B) / 64 (max length of a bucket name) ~= 260000
maxBucketPerConnector: 260000,
mongodbVersionWithImmutablePipelines: '6.0.0',
defaultConnectorConfig: {
'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector',
'pipeline': '[]',
Expand Down
4 changes: 4 additions & 0 deletions extensions/oplogPopulator/modules/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ const { errors } = require('arsenal');

const OplogPopulatorMetrics = require('../OplogPopulatorMetrics');
const LeastFullConnector = require('../allocationStrategy/LeastFullConnector');
const constants = require('../constants');

const paramsJoi = joi.object({
connectorsManager: joi.object().required(),
maximumBucketsPerConnector: joi.number().default(constants.maxBucketPerConnector),
metricsHandler: joi.object()
.instance(OplogPopulatorMetrics).required(),
logger: joi.object().required(),
Expand All @@ -31,6 +33,8 @@ class Allocator {
this._logger = params.logger;
this._allocationStrategy = new LeastFullConnector({
logger: params.logger,
maximumBucketsPerConnector: params.maximumBucketsPerConnector,
addConnector: this._connectorsManager.addConnector.bind(this._connectorsManager),
});
this._metricsHandler = params.metricsHandler;
// Stores connector assigned for each bucket
Expand Down
35 changes: 33 additions & 2 deletions extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const joi = require('joi');
const uuid = require('uuid');
const { errors } = require('arsenal');
const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper');
const constants = require('../constants');

const connectorParams = joi.object({
name: joi.string().required(),
Expand All @@ -11,6 +12,9 @@ const connectorParams = joi.object({
logger: joi.object().required(),
kafkaConnectHost: joi.string().required(),
kafkaConnectPort: joi.number().required(),
maximumBucketsPerConnector: joi.number().default(constants.maxBucketPerConnector),
isPipelineImmutable: joi.boolean().default(false),
singleChangeStream: joi.boolean().default(false),
});

/**
Expand All @@ -34,6 +38,10 @@ class Connector {
* @param {Logger} params.logger logger object
* @param {string} params.kafkaConnectHost kafka connect host
* @param {number} params.kafkaConnectPort kafka connect port
* @param {number} params.maximumBucketsPerConnector maximum number of
* buckets per connector
* @param {Boolean} params.singleChangeStream if true, one connector binds to
* one bucket maximum
*/
constructor(params) {
joi.attempt(params, connectorParams);
Expand All @@ -59,6 +67,9 @@ class Connector {
kafkaConnectPort: params.kafkaConnectPort,
logger: this._logger,
});
this._singleChangeStream = params.singleChangeStream;
this._maximumBucketsPerConnector = params.maximumBucketsPerConnector;
this._isPipelineImmutable = params.isPipelineImmutable;
}

/**
Expand Down Expand Up @@ -233,6 +244,9 @@ class Connector {
* @throws {InternalError}
*/
async addBucket(bucket, doUpdate = false) {
if (this._buckets.size > this._maximumBucketsPerConnector) {
throw errors.InternalError.customizeDescription('Connector reached maximum number of buckets');

Check warning on line 248 in extensions/oplogPopulator/modules/Connector.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/modules/Connector.js#L248

Added line #L248 was not covered by tests
}
this._buckets.add(bucket);
this._updateConnectorState(true);
try {
Expand Down Expand Up @@ -260,7 +274,19 @@ class Connector {
this._buckets.delete(bucket);
this._updateConnectorState(true);
try {
await this.updatePipeline(doUpdate);
if (this._isPipelineImmutable && this._buckets.size > 1) {
this.logger.warning('Removing a bucket from an immutable pipeline', {

Check warning on line 278 in extensions/oplogPopulator/modules/Connector.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/modules/Connector.js#L278

Added line #L278 was not covered by tests
method: 'Connector.removeBucket',
connector: this._name,
bucket,
});
} else if (this._isPipelineImmutable) {
// If the pipeline is immutable and only one bucket is left,
// we can destroy the connector, so it will be recreated with
// a new bucket later.
return this.destroy();

Check warning on line 287 in extensions/oplogPopulator/modules/Connector.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/modules/Connector.js#L287

Added line #L287 was not covered by tests
}
return this.updatePipeline(doUpdate);
} catch (err) {
this._logger.error('Error while removing bucket from connector', {
method: 'Connector.removeBucket',
Expand All @@ -274,11 +300,16 @@ class Connector {

/**
* Makes new connector pipeline that includes
* buckets assigned to this connector
* buckets assigned to this connector. If the
* singleChangeStream parameter is set to true,
* returns a pipeline that listens to all collections.
* @param {string[]} buckets list of bucket names
* @returns {string} new connector pipeline
*/
_generateConnectorPipeline(buckets) {
if (this._singleChangeStream) {
return '[]';

Check warning on line 311 in extensions/oplogPopulator/modules/Connector.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/modules/Connector.js#L311

Added line #L311 was not covered by tests
}
const pipeline = [
{
$match: {
Expand Down
31 changes: 28 additions & 3 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const paramsJoi = joi.object({
heartbeatIntervalMs: joi.number().required(),
kafkaConnectHost: joi.string().required(),
kafkaConnectPort: joi.number().required(),
singleChangeStream: joi.boolean().default(false),
isPipelineImmutable: joi.boolean().default(false),
maximumBucketsPerConnector: joi.number().default(constants.maxBucketPerConnector),
metricsHandler: joi.object()
.instance(OplogPopulatorMetrics).required(),
logger: joi.object().required(),
Expand All @@ -40,6 +43,12 @@ class ConnectorsManager {
* @constructor
* @param {Object} params params
* @param {number} params.nbConnectors number of connectors to have
* @param {boolean} params.maximumBucketsPerConnector maximum number of
* buckets per connector
* @param {boolean} params.isPipelineImmutable true if the mongodb pipelines
* are immutable
* @param {boolean} params.singleChangeStream wether to use a single change
* stream per bucket
* @param {string} params.database MongoDB database to use (for connector)
* @param {string} params.mongoUrl MongoDB connection url
* @param {string} params.oplogTopic topic to use for oplog
Expand Down Expand Up @@ -69,6 +78,13 @@ class ConnectorsManager {
this._connectors = [];
// used for initial clean up of old connector pipelines
this._oldConnectors = [];
this._singleChangeStream = params.singleChangeStream;
this._maximumBucketsPerConnector = params.maximumBucketsPerConnector;
this._isPipelineImmutable = params.isPipelineImmutable;

if (this._singleChangeStream) {
this._nbConnectors = 1;

Check warning on line 86 in extensions/oplogPopulator/modules/ConnectorsManager.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/modules/ConnectorsManager.js#L86

Added line #L86 was not covered by tests
}
}

/**
Expand Down Expand Up @@ -120,6 +136,9 @@ class ConnectorsManager {
logger: this._logger,
kafkaConnectHost: this._kafkaConnectHost,
kafkaConnectPort: this._kafkaConnectPort,
maximumBucketsPerConnector: this._maximumBucketsPerConnector,
isPipelineImmutable: this._isPipelineImmutable,
singleChangeStream: this._singleChangeStream,
});
return connector;
}
Expand All @@ -129,7 +148,7 @@ class ConnectorsManager {
* @param {Object} connectorConfig connector config
* @returns {string[]} list of buckets
*/
_extractBucketsFromConfig(connectorConfig) {
_extractBucketsFromConfig(connectorConfig) {
const pipeline = connectorConfig.pipeline ?
JSON.parse(connectorConfig.pipeline) : null;
if (!pipeline || pipeline.length === 0) {
Expand Down Expand Up @@ -203,6 +222,12 @@ class ConnectorsManager {
}
// Add connectors if required number of connectors not reached
const nbConnectorsToAdd = this._nbConnectors - this._connectors.length;
if (nbConnectorsToAdd > 0 && this._singleChangeStream) {
this._logger.warn('Single change stream is enabled but multiple connectors are running', {

Check warning on line 226 in extensions/oplogPopulator/modules/ConnectorsManager.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/modules/ConnectorsManager.js#L226

Added line #L226 was not covered by tests
method: 'ConnectorsManager.initializeConnectors',
numberOfActiveConnectors: this._connectors.length,
});
}
for (let i = 0; i < nbConnectorsToAdd; i++) {
const newConnector = this.addConnector();
this._connectors.push(newConnector);
Expand Down Expand Up @@ -235,15 +260,15 @@ class ConnectorsManager {
this._metricsHandler.onConnectorDestroyed();
this._logger.info('Successfully destroyed a connector', {
method: 'ConnectorsManager._spawnOrDestroyConnector',
connector: connector.name
connector: connector.name,
});
return true;
} else if (!connector.isRunning && connector.bucketCount > 0) {
await connector.spawn();
this._metricsHandler.onConnectorsInstantiated(false);
this._logger.info('Successfully spawned a connector', {
method: 'ConnectorsManager._spawnOrDestroyConnector',
connector: connector.name
connector: connector.name,
});
return true;
} else if (connector.isRunning) {
Expand Down
Loading

0 comments on commit 45a2e59

Please sign in to comment.