Skip to content

Commit

Permalink
Added some additional loggers to help perf processing
Browse files Browse the repository at this point in the history
  • Loading branch information
mdebarros committed Nov 8, 2018
1 parent 37bfad0 commit 070db5d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mojaloop/ml-api-adapter",
"version": "3.7.13-snapshot",
"version": "3.7.14-snapshot",
"description": "Convert from ML API to/from internal Central Services messaging format.",
"license": "Apache-2.0",
"private": true,
Expand Down
15 changes: 13 additions & 2 deletions src/handlers/notification/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ const consumeMessage = async (error, message) => {
Logger.info(`[cid=${message.value.id}, fsp=${message.value.from}, source=${message.value.from}, dest=${message.value.to}] ~ Transfer::handler::notification - START`)
Logger.info('Notification::consumeMessage::processMessage')
await processMessage(message)
Logger.info(`[cid=${message.value.id}, fsp=${message.value.from}, source=${message.value.from}, dest=${message.value.to}] ~ Transfer::handler::notification - POST-CALLBACK`)
Logger.info('Committing message back to kafka')
if (!autoCommitEnabled) {
notificationConsumer.commitMessageSync(message)
Expand Down Expand Up @@ -160,46 +161,56 @@ const processMessage = async (msg) => {
const status = state.status

let headers
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage action: ${action}`)
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage status: ${status}`)
// Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage action: ${action}`)
// Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage status: ${status}`)
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage - [action: ${action}, status: ${status}]`)
if (action === 'prepare' && status === 'success') {
let callbackURL = await Participant.getEndpoint(to, FSPIOP_CALLBACK_URL_TRANSFER_POST, id)
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage - PRE-CALLBACK`)
return Callback.sendCallback(callbackURL, 'post', content.headers, content.payload, id, to)
} else if (action.toLowerCase() === 'prepare' && status.toLowerCase() !== 'success') {
let callbackURL = await Participant.getEndpoint(from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, id)
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage - PRE-CALLBACK`)
return Callback.sendCallback(callbackURL, 'put', content.headers, content.payload, id, from)
} else if (action.toLowerCase() === 'commit' && status.toLowerCase() === 'success') {
let callbackURLFrom = await Participant.getEndpoint(from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, id)
let callbackURLTo = await Participant.getEndpoint(to, FSPIOP_CALLBACK_URL_TRANSFER_PUT, id)
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage - PRE-CALLBACK`)
headers = Object.assign({}, content.headers, { 'FSPIOP-Destination': from })
await Callback.sendCallback(callbackURLFrom, 'put', headers, content.payload, id, from)
headers = Object.assign({}, content.headers, { 'FSPIOP-Destination': to })
return Callback.sendCallback(callbackURLTo, 'put', headers, content.payload, id, to)
} else if (action.toLowerCase() === 'commit' && status.toLowerCase() !== 'success') {
let callbackURL = await Participant.getEndpoint(from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, id)
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage - PRE-CALLBACK`)
return Callback.sendCallback(callbackURL, 'put', content.headers, content.payload, id, from)
} else if (action.toLowerCase() === 'reject') {
let callbackURLFrom = await Participant.getEndpoint(from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, id)
let callbackURLTo = await Participant.getEndpoint(to, FSPIOP_CALLBACK_URL_TRANSFER_PUT, id)
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage - PRE-CALLBACK`)
headers = Object.assign({}, content.headers, { 'FSPIOP-Destination': from })
await Callback.sendCallback(callbackURLFrom, 'put', headers, content.payload, id, from)
headers = Object.assign({}, content.headers, { 'FSPIOP-Destination': to })
return Callback.sendCallback(callbackURLTo, 'put', headers, content.payload, id, to)
} else if (action.toLowerCase() === 'abort') {
let callbackURLFrom = await Participant.getEndpoint(from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, id)
let callbackURLTo = await Participant.getEndpoint(to, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, id)
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage - PRE-CALLBACK`)
headers = Object.assign({}, content.headers, { 'FSPIOP-Destination': from })
await Callback.sendCallback(callbackURLFrom, 'put', headers, content.payload, id, from)
headers = Object.assign({}, content.headers, { 'FSPIOP-Destination': to })
return Callback.sendCallback(callbackURLTo, 'put', headers, content.payload, id, to)
} else if (action.toLowerCase() === 'timeout-received') {
let callbackURL = await Participant.getEndpoint(from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, id)
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage - PRE-CALLBACK`)
return Callback.sendCallback(callbackURL, 'put', content.headers, content.payload, id, from)
} else if (action === 'prepare-duplicate') {
let callbackURL = await Participant.getEndpoint(from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, id)
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage - PRE-CALLBACK`)
return Callback.sendCallback(callbackURL, 'put', content.headers, content.payload, id, from)
} else if (action === 'get') {
let callbackURL = await Participant.getEndpoint(from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, id)
Logger.info(`[cid=${id}, fsp=${from}, source=${from}, dest=${to}] ~ Notification::processMessage - PRE-CALLBACK`)
return Callback.sendCallback(callbackURL, 'put', content.headers, content.payload, id, from)
} else {
const err = new Error('invalid action received from kafka')
Expand Down

0 comments on commit 070db5d

Please sign in to comment.