Skip to content

Commit

Permalink
Updated kafka client-id's to be unique, and also ensured the kafka gr…
Browse files Browse the repository at this point in the history
…oup for notifications is aligned to standards.
  • Loading branch information
mdebarros committed Nov 8, 2018
1 parent 9417e53 commit 37bfad0
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 7 deletions.
2 changes: 1 addition & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
},
"rdkafkaConf": {
"client.id": "ml-con-notification-event",
"group.id": "kafka-ml-api-adapter",
"group.id": "ml-group-notification-event",
"metadata.broker.list": "localhost:9092",
"socket.blocking.max.ms": 1,
"fetch.wait.max.ms": 1,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/ml-api-adapter",
"version": "3.7.12-snapshot",
"version": "3.7.13-snapshot",
"description": "Convert from ML API to/from internal Central Services messaging format.",
"license": "Apache-2.0",
"private": true,
Expand Down
8 changes: 8 additions & 0 deletions src/handlers/notification/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const Logger = require('@mojaloop/central-services-shared').Logger
const Participant = require('../../domain/participant')
const Utility = require('../../lib/utility')
const Callback = require('./callbacks.js')
const uuid = require('uuid4')

const NOTIFICATION = 'notification'
const EVENT = 'event'
Expand Down Expand Up @@ -61,6 +62,13 @@ const startConsumer = async () => {
if (config.rdkafkaConf['enable.auto.commit'] !== undefined) {
autoCommitEnabled = config.rdkafkaConf['enable.auto.commit']
}

if (config.rdkafkaConf['client.id'] !== undefined) {
config.rdkafkaConf['client.id'] = `${config.rdkafkaConf['client.id']}-${uuid()}`
} else {
config.rdkafkaConf['client.id'] = `default-client-id-${uuid()}`
}

notificationConsumer = new Consumer([topicName], config)
Logger.info('Notification::startConsumer::Consumer: new')

Expand Down
6 changes: 6 additions & 0 deletions src/lib/kafka/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

const Producer = require('@mojaloop/central-services-shared').Kafka.Producer
const Logger = require('@mojaloop/central-services-shared').Logger
const uuid = require('uuid4')

let listOfProducers = {}

Expand All @@ -53,6 +54,11 @@ let listOfProducers = {}
const produceMessage = async (messageProtocol, topicConf, config) => {
try {
let producer

if (config.rdkafkaConf !== undefined && config.rdkafkaConf['client.id'] !== undefined) {
config.rdkafkaConf['client.id'] = `${config.rdkafkaConf['client.id']}-${uuid()}`
}

if (listOfProducers[topicConf.topicName]) {
producer = listOfProducers[topicConf.topicName]
} else {
Expand Down
9 changes: 8 additions & 1 deletion test/unit/api/transfers/handler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const createRequest = (payload) => {
const requestPayload = payload || {}
return {
payload: requestPayload,
headers: {
'fspiop-source': 'payer',
'fspiop-destination': 'payee'
},
server: {
log: () => { }
}
Expand All @@ -44,6 +48,10 @@ const createPutRequest = (params, payload) => {
const requestParams = params || {}
return {
params: requestParams,
headers: {
'fspiop-source': 'payer',
'fspiop-destination': 'payee'
},
payload: requestPayload,
server: {
log: () => { }
Expand Down Expand Up @@ -199,7 +207,6 @@ Test('transfer handler', handlerTest => {
}
}
}

Handler.fulfilTransfer(request, reply)
})

Expand Down
12 changes: 8 additions & 4 deletions test/unit/handlers/notification/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -665,10 +665,12 @@ Test('Notification Service tests', notificationTest => {
}
try {
await Notification.consumeMessage(null, [msg])
test.fail('Error should be thrown')
test.pass()
// test.fail('Error should be thrown')
test.end()
} catch (e) {
test.pass()
// test.pass()
test.fail('Error should NOT be thrown')
test.end()
}
})
Expand All @@ -695,10 +697,12 @@ Test('Notification Service tests', notificationTest => {
try {
test.ok(await Notification.startConsumer())
await Notification.consumeMessage(null, [msg])
test.fail('Error should be thrown')
test.pass()
// test.fail('Error should be thrown')
test.end()
} catch (e) {
test.pass()
// test.pass()
test.fail('Error should NOT be thrown')
test.end()
}
Config.KAFKA_CONFIG.CONSUMER.NOTIFICATION.EVENT.config.rdkafkaConf['enable.auto.commit'] = false
Expand Down

0 comments on commit 37bfad0

Please sign in to comment.