Skip to content

Commit

Permalink
Ensure no race condition when handling connector events
Browse files Browse the repository at this point in the history
Issue: BB-601
  • Loading branch information
williamlardier committed Sep 19, 2024
1 parent 93efc96 commit bcf782e
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 13 deletions.
2 changes: 1 addition & 1 deletion extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class OplogPopulator {
});
// For now, we always use the RetainBucketsDecorator
// so, we map the events from the classes
this._connectorsManager.on('connector-updated', connector =>
this._connectorsManager.on(constants.connectorUpdatedEvent, connector =>
this._allocationStrategy.onConnectorUpdatedOrDestroyed(connector));
this._allocator.on('bucket-removed', (bucket, connector) =>
this._allocationStrategy.onBucketRemoved(bucket, connector));
Expand Down
1 change: 1 addition & 0 deletions extensions/oplogPopulator/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const constants = {
// to 16 MB (16777216B) / 64 (max length of a bucket name) ~= 260000
maxBucketsPerConnector: 260000,
mongodbVersionWithImmutablePipelines: '6.0.0',
connectorUpdatedEvent: 'connector-updated',
defaultConnectorConfig: {
'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector',
'pipeline': '[]',
Expand Down
2 changes: 1 addition & 1 deletion extensions/oplogPopulator/modules/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class Allocator extends EventEmitter {
try {
const connector = this._bucketsToConnectors.get(bucket);
if (connector) {
await connector.removeBucket(bucket);
this.emit('bucket-removed', bucket, connector);
await connector.removeBucket(bucket);
this._bucketsToConnectors.delete(bucket);
this._metricsHandler.onConnectorConfigured(connector, 'delete');
this._logger.info('Stopped listening to bucket', {
Expand Down
7 changes: 6 additions & 1 deletion extensions/oplogPopulator/modules/Connector.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
const joi = require('joi');
const uuid = require('uuid');
const { errors } = require('arsenal');
const { EventEmitter } = require('stream');
const KafkaConnectWrapper = require('../../../lib/wrappers/KafkaConnectWrapper');
const constants = require('../constants');

const connectorParams = joi.object({
name: joi.string().required(),
Expand All @@ -21,7 +23,7 @@ const connectorParams = joi.object({
* destroy and update the config of the connector when adding
* or removing buckets from it
*/
class Connector {
class Connector extends EventEmitter {

/**
* @constructor
Expand All @@ -36,6 +38,7 @@ class Connector {
* @param {number} params.kafkaConnectPort kafka connect port
*/
constructor(params) {
super();
joi.attempt(params, connectorParams);
this._name = params.name;
this._config = params.config;
Expand Down Expand Up @@ -185,6 +188,7 @@ class Connector {
}
try {
await this._kafkaConnect.deleteConnector(this._name);
this.emit(constants.connectorUpdatedEvent, this);
this._isRunning = false;
// resetting the resume point to set a new one on creation of the connector
delete this._config['startup.mode.timestamp.start.at.operation.time'];
Expand Down Expand Up @@ -339,6 +343,7 @@ class Connector {
if (doUpdate && this._isRunning) {
const timeBeforeUpdate = Date.now();
this._state.isUpdating = true;
this.emit(constants.connectorUpdatedEvent, this);
await this._kafkaConnect.updateConnectorConfig(this._name, this._config);
this._updateConnectorState(false, timeBeforeUpdate);
this._state.isUpdating = false;
Expand Down
12 changes: 5 additions & 7 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class ConnectorsManager extends EventEmitter {
kafkaConnectHost: this._kafkaConnectHost,
kafkaConnectPort: this._kafkaConnectPort,
});
connector.on(constants.connectorUpdatedEvent, connector => {

Check warning on line 131 in extensions/oplogPopulator/modules/ConnectorsManager.js

View workflow job for this annotation

GitHub Actions / tests

Unexpected block statement surrounding arrow body; move the returned value immediately after the `=>`
return this.emit(constants.connectorUpdatedEvent, connector);

Check warning on line 132 in extensions/oplogPopulator/modules/ConnectorsManager.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/oplogPopulator/modules/ConnectorsManager.js#L132

Added line #L132 was not covered by tests
});
this._connectors.push(connector);
return connector;
}
Expand Down Expand Up @@ -237,7 +240,7 @@ class ConnectorsManager extends EventEmitter {
}

/**
* Spawns a connector when buckets are configured for it and is not running,
* Spawns a connector when buckets aremit(e configured for it and is not running,
* or destroys connector with no buckets configured
* @param {Connector} connector connector instance
* @returns {Promise<Boolean>} true if connector state changed
Expand All @@ -246,7 +249,6 @@ class ConnectorsManager extends EventEmitter {
async _spawnOrDestroyConnector(connector) {
try {
if (connector.isRunning && connector.bucketCount === 0) {
this.emit('connector-updated', connector);
await connector.destroy();
this._metricsHandler.onConnectorDestroyed();
this._logger.info('Successfully destroyed a connector', {
Expand All @@ -263,11 +265,7 @@ class ConnectorsManager extends EventEmitter {
});
return true;
} else if (connector.isRunning && this._allocationStrategy.canUpdate()) {
const isPipelineUpdated = connector.updatePipeline(true);
if (isPipelineUpdated) {
this.emit('connector-updated', connector);
}
return isPipelineUpdated;
return connector.updatePipeline(true);
}

return false;
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const OplogPopulatorMetrics =
require('../../../extensions/oplogPopulator/OplogPopulatorMetrics');
const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator');
const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const constants = require('../../../extensions/oplogPopulator/constants');

const logger = new werelogs.Logger('ConnectorsManager');

Expand Down Expand Up @@ -238,9 +239,9 @@ describe('ConnectorsManager', () => {
connector1._isRunning = true;
connector1._state.bucketsGotModified = false;
connector1._buckets = new Set();
const emitStub = sinon.stub(connectorsManager, 'emit');
const emitStub = sinon.stub(connector1, 'emit');
await connectorsManager._spawnOrDestroyConnector(connector1);
assert(emitStub.calledOnceWith('connector-updated', connector1));
assert(emitStub.calledOnceWith(constants.connectorUpdatedEvent, connector1));
});

it('should spawn a non running connector when buckets are configured', async () => {
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/oplogPopulator/oplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/alloc
const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const ImmutableConnector = require('../../../extensions/oplogPopulator/allocationStrategy/ImmutableConnector');
const AllocationStrategy = require('../../../extensions/oplogPopulator/allocationStrategy/AllocationStrategy');
const constants = require('../../../extensions/oplogPopulator/constants');

const oplogPopulatorConfig = {
topic: 'oplog',
Expand Down Expand Up @@ -209,7 +210,7 @@ describe('OplogPopulator', () => {
assert(initializeConnectorsManagerStub.calledOnce);
const onConnectorUpdatedOrDestroyedStub =
sinon.stub(oplogPopulator._allocationStrategy, 'onConnectorUpdatedOrDestroyed');
oplogPopulator._connectorsManager.emit('connector-updated');
oplogPopulator._connectorsManager.emit(constants.connectorUpdatedEvent);
assert(onConnectorUpdatedOrDestroyedStub.calledOnce);
});

Expand Down

0 comments on commit bcf782e

Please sign in to comment.