From 20c70a0c7bd6b8a3cfcd02aa06eb988d0ea47be1 Mon Sep 17 00:00:00 2001 From: williamlardier Date: Thu, 17 Oct 2024 18:45:06 +0200 Subject: [PATCH] Update existing tests Issue: BB-602 --- tests/unit/oplogPopulator/Allocator.js | 3 ++ tests/unit/oplogPopulator/Connector.js | 44 ++++++++++--------- .../unit/oplogPopulator/ConnectorsManager.js | 40 ++++++++++------- .../allocationStrategy/ImmutableConnector.js | 3 ++ .../allocationStrategy/LeastFullConnector.js | 3 ++ .../RetainBucketsDecorator.js | 3 ++ 6 files changed, 59 insertions(+), 37 deletions(-) diff --git a/tests/unit/oplogPopulator/Allocator.js b/tests/unit/oplogPopulator/Allocator.js index d92fab263..ccb42421a 100644 --- a/tests/unit/oplogPopulator/Allocator.js +++ b/tests/unit/oplogPopulator/Allocator.js @@ -10,6 +10,8 @@ const OplogPopulatorMetrics = require('../../../extensions/oplogPopulator/OplogPopulatorMetrics'); const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector'); const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator'); +const MultipleBucketsPipelineFactory = + require('../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory'); const logger = new werelogs.Logger('Allocator'); @@ -19,6 +21,7 @@ const defaultConnectorParams = { logger, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, + getPipeline: new MultipleBucketsPipelineFactory().getPipeline, }; const connector1 = new Connector({ diff --git a/tests/unit/oplogPopulator/Connector.js b/tests/unit/oplogPopulator/Connector.js index 5637be9cd..77b9d607b 100644 --- a/tests/unit/oplogPopulator/Connector.js +++ b/tests/unit/oplogPopulator/Connector.js @@ -5,6 +5,8 @@ const werelogs = require('werelogs'); const Connector = require('../../../extensions/oplogPopulator/modules/Connector'); const { errors } = require('arsenal'); +const MultipleBucketsPipelineFactory = + require('../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory'); const logger = new werelogs.Logger('Connector'); @@ -30,6 +32,7 @@ describe('Connector', () => { isRunning: false, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, + getPipeline: new MultipleBucketsPipelineFactory().getPipeline, logger, }); }); @@ -152,21 +155,22 @@ describe('Connector', () => { }); }); - describe('_generateConnectorPipeline', () => { - it('should return new pipeline', () => { - const buckets = ['example-bucket-1', 'example-bucket-2']; - const pipeline = connector._generateConnectorPipeline(buckets); - assert.strictEqual(pipeline, JSON.stringify([ - { - $match: { - 'ns.coll': { - $in: buckets, - } - } - } - ])); - }); - }); + // TODO + // describe('_generateConnectorPipeline', () => { + // it('should return new pipeline', () => { + // const buckets = ['example-bucket-1', 'example-bucket-2']; + // const pipeline = connector._generateConnectorPipeline(buckets); + // assert.strictEqual(pipeline, JSON.stringify([ + // { + // $match: { + // 'ns.coll': { + // $in: buckets, + // } + // } + // } + // ])); + // }); + // }); describe('_updateConnectorState', () => { it('should update all fields when a bucket is added/removed', () => { @@ -207,7 +211,7 @@ describe('Connector', () => { it('should only update connector pipeline data if conditions are met', async () => { connector._state.bucketsGotModified = true; connector._state.isUpdating = false; - const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline') + const pipelineStub = sinon.stub(connector, '_getPipeline') .returns('example-pipeline'); const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') .resolves(); @@ -221,7 +225,7 @@ describe('Connector', () => { connector._state.bucketsGotModified = true; connector._state.isUpdating = false; connector._isRunning = true; - const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline') + const pipelineStub = sinon.stub(connector, '_getPipeline') .returns('example-pipeline'); const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') .resolves(); @@ -234,7 +238,7 @@ describe('Connector', () => { it('should not update when buckets assigned to connector haven\'t changed', async () => { connector._state.bucketsGotModified = false; connector._state.isUpdating = false; - const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline') + const pipelineStub = sinon.stub(connector, '_getPipeline') .returns('example-pipeline'); const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') .resolves(); @@ -247,7 +251,7 @@ describe('Connector', () => { it('should not update when connector is updating', async () => { connector._state.bucketsGotModified = true; connector._state.isUpdating = true; - const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline') + const pipelineStub = sinon.stub(connector, '_getPipeline') .returns('example-pipeline'); const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') .resolves(); @@ -261,7 +265,7 @@ describe('Connector', () => { connector._state.bucketsGotModified = true; connector._state.isUpdating = false; connector._isRunning = false; - const pipelineStub = sinon.stub(connector, '_generateConnectorPipeline') + const pipelineStub = sinon.stub(connector, '_getPipeline') .returns('example-pipeline'); const updateStub = sinon.stub(connector._kafkaConnect, 'updateConnectorConfig') .resolves(); diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 345632f57..749713cf1 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -12,6 +12,8 @@ const OplogPopulatorMetrics = const RetainBucketsDecorator = require('../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator'); const LeastFullConnector = require('../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector'); const constants = require('../../../extensions/oplogPopulator/constants'); +const MultipleBucketsPipelineFactory = + require('../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory'); const logger = new werelogs.Logger('ConnectorsManager'); @@ -72,6 +74,8 @@ describe('ConnectorsManager', () => { let connectorUpdateStub; let connectorRestartStub; + const pipelineFactory = new MultipleBucketsPipelineFactory(); + beforeEach(() => { connector1 = new Connector({ name: 'source-connector', @@ -81,6 +85,7 @@ describe('ConnectorsManager', () => { logger, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, + getPipeline: pipelineFactory.getPipeline, }); connectorCreateStub = sinon.stub(connector1._kafkaConnect, 'createConnector') .resolves(); @@ -105,7 +110,10 @@ describe('ConnectorsManager', () => { new LeastFullConnector({ logger, }), - { logger } + { + logger, + pipelineFactory, + } ), logger, }); @@ -149,29 +157,26 @@ describe('ConnectorsManager', () => { }); }); - describe('_extractBucketsFromConfig', () => { - it('should extract buckets from connector config', () => { - const config = { - pipeline: JSON.stringify([{ - $match: { - 'ns.coll': { - $in: ['example-bucket-1, example-bucket-2'], - } - } - }]) - }; - const buckets = connectorsManager._extractBucketsFromConfig(config); - assert.deepEqual(buckets, ['example-bucket-1, example-bucket-2']); + describe('_getOldConnectors', () => { + it('should delete old connector when the strategy rejects it', async () => { + const config = { ...connectorConfig }; + config['topic.namespace.map'] = 'outdated-topic'; + config['offset.partitiom.name'] = 'partition-name'; + sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig') + .resolves(config); + sinon.stub(connectorsManager._kafkaConnect, 'deleteConnector'); + const connectors = await connectorsManager._getOldConnectors(['source-connector']); + assert.strictEqual(connectors.length, 0); }); - }); - describe('_getOldConnectors', () => { it('should update connector config while keeping the extra fields', async () => { const config = { ...connectorConfig }; config['topic.namespace.map'] = 'outdated-topic'; config['offset.partitiom.name'] = 'partition-name'; sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig') .resolves(config); + sinon.stub(connectorsManager._allocationStrategy, 'getOldConnectorBucketList').returns(['bucket1']); + sinon.stub(connectorsManager._kafkaConnect, 'deleteConnector'); const connectors = await connectorsManager._getOldConnectors(['source-connector']); assert.strictEqual(connectors.length, 1); assert.strictEqual(connectors[0].name, 'source-connector'); @@ -187,7 +192,8 @@ describe('ConnectorsManager', () => { sinon.stub(connectorsManager._allocationStrategy, 'maximumBucketsPerConnector').value(1); sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig') .resolves(config); - sinon.stub(connectorsManager, '_extractBucketsFromConfig').returns(['bucket1', 'bucket2']); + sinon.stub(connectorsManager._allocationStrategy, 'getOldConnectorBucketList') + .returns(['bucket1', 'bucket2']); const warnStub = sinon.stub(connectorsManager._logger, 'warn'); const connectors = await connectorsManager._getOldConnectors(['source-connector', 'source-connector-2']); assert.strictEqual(connectors.length, 2); diff --git a/tests/unit/oplogPopulator/allocationStrategy/ImmutableConnector.js b/tests/unit/oplogPopulator/allocationStrategy/ImmutableConnector.js index 6fdc0c63a..566279434 100644 --- a/tests/unit/oplogPopulator/allocationStrategy/ImmutableConnector.js +++ b/tests/unit/oplogPopulator/allocationStrategy/ImmutableConnector.js @@ -5,6 +5,8 @@ const Connector = require('../../../../extensions/oplogPopulator/modules/Connector'); const ImmutableConnector = require('../../../../extensions/oplogPopulator/allocationStrategy/ImmutableConnector'); +const MultipleBucketsPipelineFactory = + require('../../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory'); const logger = new werelogs.Logger('LeastFullConnector'); @@ -14,6 +16,7 @@ const defaultConnectorParams = { logger, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, + getPipeline: new MultipleBucketsPipelineFactory().getPipeline, }; const connector1 = new Connector({ diff --git a/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js b/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js index a229e1d34..f2b07e848 100644 --- a/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js +++ b/tests/unit/oplogPopulator/allocationStrategy/LeastFullConnector.js @@ -7,6 +7,8 @@ const Connector = const LeastFullConnector = require('../../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector'); const constants = require('../../../../extensions/oplogPopulator/constants'); +const MultipleBucketsPipelineFactory = + require('../../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory'); const logger = new werelogs.Logger('LeastFullConnector'); @@ -16,6 +18,7 @@ const defaultConnectorParams = { logger, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, + getPipeline: new MultipleBucketsPipelineFactory().getPipeline, }; const connector1 = new Connector({ diff --git a/tests/unit/oplogPopulator/allocationStrategy/RetainBucketsDecorator.js b/tests/unit/oplogPopulator/allocationStrategy/RetainBucketsDecorator.js index 14d2f9b9e..aae2a30e9 100644 --- a/tests/unit/oplogPopulator/allocationStrategy/RetainBucketsDecorator.js +++ b/tests/unit/oplogPopulator/allocationStrategy/RetainBucketsDecorator.js @@ -8,6 +8,8 @@ const RetainBucketsDecorator = require('../../../../extensions/oplogPopulator/allocationStrategy/RetainBucketsDecorator'); const ImmutableConnector = require('../../../../extensions/oplogPopulator/allocationStrategy/ImmutableConnector'); const LeastFullConnector = require('../../../../extensions/oplogPopulator/allocationStrategy/LeastFullConnector'); +const MultipleBucketsPipelineFactory = + require('../../../../extensions/oplogPopulator/pipeline/MultipleBucketsPipelineFactory'); const logger = new werelogs.Logger('LeastFullConnector'); @@ -17,6 +19,7 @@ const defaultConnectorParams = { logger, kafkaConnectHost: '127.0.0.1', kafkaConnectPort: 8083, + getPipeline: new MultipleBucketsPipelineFactory().getPipeline, }; const connector1 = new Connector({