Skip to content

Commit

Permalink
Update existing tests
Browse files Browse the repository at this point in the history
Issue: BB-602
  • Loading branch information
williamlardier committed Oct 17, 2024
1 parent 2c10ae6 commit 20c70a0
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 37 deletions.
3 changes: 3 additions & 0 deletions tests/unit/oplogPopulator/Allocator.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const OplogPopulatorMetrics =
require('../../../extensions/oplogPopulator/OplogPopulatorMetrics');
const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator');
const MultipleBucketsPipelineFactory =
require('../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory');

const logger = new werelogs.Logger('Allocator');

Expand All @@ -19,6 +21,7 @@ const defaultConnectorParams = {
logger,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
getPipeline: new MultipleBucketsPipelineFactory().getPipeline,
};

const connector1 = new Connector({
Expand Down
44 changes: 24 additions & 20 deletions tests/unit/oplogPopulator/Connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const werelogs = require('werelogs');
const Connector =
require('../../../extensions/oplogPopulator/modules/Connector');
const { errors } = require('arsenal');
const MultipleBucketsPipelineFactory =
require('../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory');

const logger = new werelogs.Logger('Connector');

Expand All @@ -30,6 +32,7 @@ describe('Connector', () => {
isRunning: false,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
getPipeline: new MultipleBucketsPipelineFactory().getPipeline,
logger,
});
});
Expand Down Expand Up @@ -152,21 +155,22 @@ describe('Connector', () => {
});
});

describe('_generateConnectorPipeline', () => {
it('should return new pipeline', () => {
const buckets = ['example-bucket-1', 'example-bucket-2'];
const pipeline = connector._generateConnectorPipeline(buckets);
assert.strictEqual(pipeline, JSON.stringify([
{
$match: {
'ns.coll': {
$in: buckets,
}
}
}
]));
});
});
// TODO
// describe('_generateConnectorPipeline', () => {
// it('should return new pipeline', () => {
// const buckets = ['example-bucket-1', 'example-bucket-2'];
// const pipeline = connector._generateConnectorPipeline(buckets);
// assert.strictEqual(pipeline, JSON.stringify([
// {
// $match: {
// 'ns.coll': {
// $in: buckets,
// }
// }
// }
// ]));
// });
// });

describe('_updateConnectorState', () => {
it('should update all fields when a bucket is added/removed', () => {
Expand Down Expand Up @@ -207,7 +211,7 @@ describe('Connector', () => {
it('should only update connector pipeline data if conditions are met', async () => {
connector._state.bucketsGotModified = true;
connector._state.isUpdating = false;
const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline')
const pipelineStub = sinon.stub(connector, '_getPipeline')
.returns('example-pipeline');
const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
.resolves();
Expand All @@ -221,7 +225,7 @@ describe('Connector', () => {
connector._state.bucketsGotModified = true;
connector._state.isUpdating = false;
connector._isRunning = true;
const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline')
const pipelineStub = sinon.stub(connector, '_getPipeline')
.returns('example-pipeline');
const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
.resolves();
Expand All @@ -234,7 +238,7 @@ describe('Connector', () => {
it('should not update when buckets assigned to connector haven\'t changed', async () => {
connector._state.bucketsGotModified = false;
connector._state.isUpdating = false;
const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline')
const pipelineStub = sinon.stub(connector, '_getPipeline')
.returns('example-pipeline');
const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
.resolves();
Expand All @@ -247,7 +251,7 @@ describe('Connector', () => {
it('should not update when connector is updating', async () => {
connector._state.bucketsGotModified = true;
connector._state.isUpdating = true;
const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline')
const pipelineStub = sinon.stub(connector, '_getPipeline')
.returns('example-pipeline');
const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
.resolves();
Expand All @@ -261,7 +265,7 @@ describe('Connector', () => {
connector._state.bucketsGotModified = true;
connector._state.isUpdating = false;
connector._isRunning = false;
const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline')
const pipelineStub = sinon.stub(connector, '_getPipeline')
.returns('example-pipeline');
const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig')
.resolves();
Expand Down
40 changes: 23 additions & 17 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const OplogPopulatorMetrics =
const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator');
const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const constants = require('../../../extensions/oplogPopulator/constants');
const MultipleBucketsPipelineFactory =
require('../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory');

const logger = new werelogs.Logger('ConnectorsManager');

Expand Down Expand Up @@ -72,6 +74,8 @@ describe('ConnectorsManager', () => {
let connectorUpdateStub;
let connectorRestartStub;

const pipelineFactory = new MultipleBucketsPipelineFactory();

beforeEach(() => {
connector1 = new Connector({
name: 'source-connector',
Expand All @@ -81,6 +85,7 @@ describe('ConnectorsManager', () => {
logger,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
getPipeline: pipelineFactory.getPipeline,
});
connectorCreateStub = sinon.stub(connector1._kafkaConnect, 'createConnector')
.resolves();
Expand All @@ -105,7 +110,10 @@ describe('ConnectorsManager', () => {
new LeastFullConnector({
logger,
}),
{ logger }
{
logger,
pipelineFactory,
}
),
logger,
});
Expand Down Expand Up @@ -149,29 +157,26 @@ describe('ConnectorsManager', () => {
});
});

describe('_extractBucketsFromConfig', () => {
it('should extract buckets from connector config', () => {
const config = {
pipeline: JSON.stringify([{
$match: {
'ns.coll': {
$in: ['example-bucket-1, example-bucket-2'],
}
}
}])
};
const buckets = connectorsManager._extractBucketsFromConfig(config);
assert.deepEqual(buckets, ['example-bucket-1, example-bucket-2']);
describe('_getOldConnectors', () => {
it('should delete old connector when the strategy rejects it', async () => {
const config = { ...connectorConfig };
config['topic.namespace.map'] = 'outdated-topic';
config['offset.partitiom.name'] = 'partition-name';
sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig')
.resolves(config);
sinon.stub(connectorsManager._kafkaConnect, 'deleteConnector');
const connectors = await connectorsManager._getOldConnectors(['source-connector']);
assert.strictEqual(connectors.length, 0);
});
});

describe('_getOldConnectors', () => {
it('should update connector config while keeping the extra fields', async () => {
const config = { ...connectorConfig };
config['topic.namespace.map'] = 'outdated-topic';
config['offset.partitiom.name'] = 'partition-name';
sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig')
.resolves(config);
sinon.stub(connectorsManager._allocationStrategy, 'getOldConnectorBucketList').returns(['bucket1']);
sinon.stub(connectorsManager._kafkaConnect, 'deleteConnector');
const connectors = await connectorsManager._getOldConnectors(['source-connector']);
assert.strictEqual(connectors.length, 1);
assert.strictEqual(connectors[0].name, 'source-connector');
Expand All @@ -187,7 +192,8 @@ describe('ConnectorsManager', () => {
sinon.stub(connectorsManager._allocationStrategy, 'maximumBucketsPerConnector').value(1);
sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig')
.resolves(config);
sinon.stub(connectorsManager, '_extractBucketsFromConfig').returns(['bucket1', 'bucket2']);
sinon.stub(connectorsManager._allocationStrategy, 'getOldConnectorBucketList')
.returns(['bucket1', 'bucket2']);
const warnStub = sinon.stub(connectorsManager._logger, 'warn');
const connectors = await connectorsManager._getOldConnectors(['source-connector', 'source-connector-2']);
assert.strictEqual(connectors.length, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const Connector =
require('../../../../extensions/oplogPopulator/modules/Connector');
const ImmutableConnector =
require('../../../../extensions/oplogPopulator/allocationStrategy/ImmutableConnector');
const MultipleBucketsPipelineFactory =
require('../../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory');

const logger = new werelogs.Logger('LeastFullConnector');

Expand All @@ -14,6 +16,7 @@ const defaultConnectorParams = {
logger,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
getPipeline: new MultipleBucketsPipelineFactory().getPipeline,
};

const connector1 = new Connector({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const Connector =
const LeastFullConnector =
require('../../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const constants = require('../../../../extensions/oplogPopulator/constants');
const MultipleBucketsPipelineFactory =
require('../../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory');

const logger = new werelogs.Logger('LeastFullConnector');

Expand All @@ -16,6 +18,7 @@ const defaultConnectorParams = {
logger,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
getPipeline: new MultipleBucketsPipelineFactory().getPipeline,
};

const connector1 = new Connector({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const RetainBucketsDecorator =
require('../../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator');
const ImmutableConnector = require('../../../../extensions/oplogPopulator/allocationStrategy/ImmutableConnector');
const LeastFullConnector = require('../../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector');
const MultipleBucketsPipelineFactory =
require('../../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory');

const logger = new werelogs.Logger('LeastFullConnector');

Expand All @@ -17,6 +19,7 @@ const defaultConnectorParams = {
logger,
kafkaConnectHost: '127.0.0.1',
kafkaConnectPort: 8083,
getPipeline: new MultipleBucketsPipelineFactory().getPipeline,
};

const connector1 = new Connector({
Expand Down

0 comments on commit 20c70a0

Please sign in to comment.