Skip to content

Commit

Permalink
Feature/747-748 Bulk Processing Handler and existing transfer handlers (
Browse files Browse the repository at this point in the history
#289)

* Configs and flow to support bulk-prepare action
* Init BulkProcessingHandler
* Fix unit tests
* Handling data storage models
* Finilize bulkTransfer response retrieval from DB
* WIP #1
* Removed kafkaConf.key
Switched to message.id=UUID for fulfil and get
195 unit tests fail
* Fixed unit tests
* Finilizing BulkPrepareHandler
* Unifying mongo schema definitions
* Fixed integration tests and added new endpointTypes
* Fixed tests and added endpoints
* Fix mongoose unique index issue
* Reverting back topicConf.key setting
* Changes as per PR review comments
* Changes related to ml-api-adapter/pull/116 review
  • Loading branch information
ggrg authored Jun 20, 2019
1 parent 94e6a97 commit b84e0e5
Show file tree
Hide file tree
Showing 27 changed files with 1,215 additions and 257 deletions.
47 changes: 45 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"POOL_MAX": 30
},
"MONGODB": {
"URI": "mongodb://localhost:27017/bulk_transfers"
"URI": "mongodb://localhost:27017/mlos"
},
"HANDLERS": {
"DISABLED": false,
Expand Down Expand Up @@ -74,7 +74,7 @@
}
},
"CONSUMER": {
"BULK-TRANSFER": {
"BULK": {
"PREPARE": {
"config": {
"options": {
Expand All @@ -97,6 +97,29 @@
"auto.offset.reset": "earliest"
}
}
},
"PROCESSING": {
"config": {
"options": {
"mode": 2,
"batchSize": 1,
"pollFrequency": 10,
"recursiveTimeout": 100,
"messageCharset": "utf8",
"messageAsJSON": true,
"sync": true,
"consumeTimeout": 1000
},
"rdkafkaConf": {
"client.id": "cl-con-bulk-processing",
"group.id": "cl-group-bulk-processing",
"metadata.broker.list": "localhost:9092",
"socket.keepalive.enable": true
},
"topicConf": {
"auto.offset.reset": "earliest"
}
}
}
},
"TRANSFER": {
Expand Down Expand Up @@ -220,6 +243,26 @@
}
},
"PRODUCER": {
"BULK": {
"PROCESSING": {
"config": {
"options": {
"messageCharset": "utf8"
},
"rdkafkaConf": {
"metadata.broker.list": "localhost:9092",
"client.id": "cl-prod-bulk-processing",
"event_cb": true,
"dr_cb": true,
"socket.keepalive.enable": true,
"queue.buffering.max.messages": 10000000
},
"topicConf": {
"request.required.acks": "all"
}
}
}
},
"TRANSFER": {
"PREPARE": {
"config": {
Expand Down
12 changes: 12 additions & 0 deletions seeds/endpointType.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ const endpointTypes = [
{
'name': 'FSPIOP_CALLBACK_URL_QUOTES',
'description': 'Quotes callback URL to which put quotes requests can be sent'
},
{
'name': 'FSPIOP_CALLBACK_URL_BULK_TRANSFER_POST',
'description': 'Participant callback URL to which bulk transfer post can be sent'
},
{
'name': 'FSPIOP_CALLBACK_URL_BULK_TRANSFER_PUT',
'description': 'Participant callback URL to which bulk transfer put can be sent'
},
{
'name': 'FSPIOP_CALLBACK_URL_BULK_TRANSFER_ERROR',
'description': 'Participant callback URL to which bulk transfer error notifications can be sent'
}
]

Expand Down
93 changes: 90 additions & 3 deletions src/domain/bulkTransfer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@
* @module src/domain/transfer/
*/

const BulkTransferFacade = require('../../models/bulkTransfer/facade')
const BulkTransferDuplicateCheckModel = require('../../models/bulkTransfer/bulkTransferDuplicateCheck')
const Enum = require('../../lib/enum')
const BulkTransferAssociationModel = require('../../models/bulkTransfer/bulkTransferAssociation')
const BulkTransferDuplicateCheckModel = require('../../models/bulkTransfer/bulkTransferDuplicateCheck')
const BulkTransferExtensionModel = require('../../models/bulkTransfer/bulkTransferExtension')
const BulkTransferFacade = require('../../models/bulkTransfer/facade')
const BulkTransferModel = require('../../models/bulkTransfer/bulkTransfer')
const BulkTransferStateChangeModel = require('../../models/bulkTransfer/bulkTransferStateChange')
const IndividualTransferModel = require('../../models/bulkTransfer/individualTransfer')
const IndividualTransferExtensionModel = require('../../models/transfer/transferExtension')

const checkDuplicate = async (bulkTransferId, hash, bulkTransferFulfilmentId = false) => {
try {
Expand All @@ -50,10 +56,91 @@ const checkDuplicate = async (bulkTransferId, hash, bulkTransferFulfilmentId = f
}
}

const getBulkTransferById = async (id) => {
try {
let bulkTransfer = await BulkTransferModel.getById(id)
let bulkTransferExtensions = await BulkTransferExtensionModel.getByBulkTransferId(id)
let individualTransfers = await IndividualTransferModel.getAllById(id)
let payeeIndividualTransfers = []
individualTransfers = await Promise.all(individualTransfers.map(async (transfer) => {
return new Promise(async (resolve, reject) => {
let extensions = await IndividualTransferExtensionModel.getByTransferId(transfer.transferId)
let extension
let result = {
transferId: transfer.transferId
}
if (transfer.fulfilment) {
result.fulfilment = transfer.fulfilment
}
if (transfer.errorCode) {
result.errorInformation = {
errorCode: transfer.errorCode,
errorDescription: transfer.errorDescription
}
}
if (extensions.length > 0) {
if (!transfer.fulfilment) {
extension = extensions.map(ext => {
return { key: ext.key, value: ext.value }
})
} else {
extension = extensions.filter(ext => {
return !!ext.transferFulfilmentId
}).map(ext => {
return { key: ext.key, value: ext.value }
})
}
}
if (extension.length > 0) {
result.extensionList = { extension }
}
const allowedPayeeTransfers = [
Enum.TransferStateEnum.RESERVED,
Enum.TransferStateEnum.COMMITTED
]
if (allowedPayeeTransfers.indexOf(transfer.transferStateEnum) !== -1) {
payeeIndividualTransfers.push(result)
}
return resolve(result)
})
}))
let bulkResponse = {
bulkTransferId: bulkTransfer.bulkTransferId,
bulkTransferState: bulkTransfer.bulkTransferStateId
}
if (bulkTransfer.completedTimestamp) {
bulkResponse.completedTimestamp = bulkTransfer.completedTimestamp
}
let payerBulkTransfer = { destination: bulkTransfer.payerFsp, ...bulkResponse }
let payeeBulkTransfer = { destination: bulkTransfer.payeeFsp, ...bulkResponse }
if (bulkTransferExtensions.length > 0) {
let bulkExtensionsResponse = bulkTransferExtensions.map(ext => {
return { key: ext.key, value: ext.value }
})
payerBulkTransfer.extensionList = { extension: bulkExtensionsResponse }
payeeBulkTransfer.extensionList = { extension: bulkExtensionsResponse }
}
if (individualTransfers.length > 0) {
payerBulkTransfer.individualTransferResults = individualTransfers
}
if (payeeIndividualTransfers.length > 0) {
payeeBulkTransfer.individualTransferResults = payeeIndividualTransfers
}
return { payerBulkTransfer, payeeBulkTransfer }
} catch (err) {
throw err
}
}

const BulkTransferService = {
checkDuplicate,
getBulkTransferById,
bulkPrepare: BulkTransferFacade.saveBulkTransferReceived,
bulkTransferAssociationCreate: BulkTransferAssociationModel.create
bulkTransferAssociationCreate: BulkTransferAssociationModel.create,
bulkTransferAssociationExists: BulkTransferAssociationModel.exists,
bulkTransferAssociationUpdate: BulkTransferAssociationModel.update,
createBulkTransferState: BulkTransferStateChangeModel.create,
getBulkTransferState: BulkTransferStateChangeModel.getByTransferId
}

module.exports = BulkTransferService
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,24 @@
const AwaitifyStream = require('awaitify-stream')
const Uuid = require('uuid4')
const Logger = require('@mojaloop/central-services-shared').Logger
const BulkTransferService = require('../../domain/bulkTransfer')
const Util = require('../lib/utility')
const Kafka = require('../lib/kafka')
const Validator = require('./validator')
const Enum = require('../../lib/enum')
const BulkTransferService = require('../../../domain/bulkTransfer')
const Util = require('../../lib/utility')
const Kafka = require('../../lib/kafka')
const Validator = require('../shared/validator')
const Enum = require('../../../lib/enum')
const TransferEventType = Enum.transferEventType
const TransferEventAction = Enum.transferEventAction
const Metrics = require('@mojaloop/central-services-metrics')
const Config = require('../../lib/config')
const Mongoose = require('../../lib/mongodb').Mongoose
const { IndividualTransferModel, BulkTransferModel } = require('./bulkModels')
const Config = require('../../../lib/config')
const Mongoose = require('../../../lib/mongodb').Mongoose
const { IndividualTransferModel, BulkTransferModel } = require('../../../schema/bulkTransfer')
const encodePayload = require('@mojaloop/central-services-stream/src/kafka/protocol').encodePayload

const location = { module: 'BulkPrepareHandler', method: '', path: '' } // var object used as pointer

const consumerCommit = true
// const fromSwitch = true
const toDestination = true

const prepareHandlerMessageProtocol = {
value: {
id: null,
Expand All @@ -61,9 +62,9 @@ const prepareHandlerMessageProtocol = {
metadata: {
event: {
id: Uuid(),
responseTo: 'dfa',
type: 'bulk-prepare',
action: 'prepare',
responseTo: null,
type: 'transfer',
action: 'bulk-prepare',
createdAt: null,
state: {
status: 'success',
Expand All @@ -87,7 +88,7 @@ const getBulkMessage = async (bulkTransferId) => {
}

/**
* @function TransferBulkPrepareHandler
* @function BulkPrepareHandler
*
* @async
* @description This is the consumer callback function that gets registered to a topic. This then gets a list of messages,
Expand All @@ -111,7 +112,6 @@ const bulkPrepare = async (error, messages) => {
['success', 'fspId']
).startTimer()
if (error) {
// Logger.error(error)
throw error
}
let message = {}
Expand All @@ -121,7 +121,7 @@ const bulkPrepare = async (error, messages) => {
} else {
message = messages
}
// decode payload
const messageId = message.value.id
const payload = message.value.content.payload
const headers = message.value.content.headers
const action = message.value.metadata.event.action
Expand All @@ -138,7 +138,7 @@ const bulkPrepare = async (error, messages) => {
return true
}
const actionLetter = action === TransferEventAction.BULK_PREPARE ? Enum.actionLetter.bulkPrepare : Enum.actionLetter.unknown
let params = { message, bulkTransferId, kafkaTopic, consumer }
let params = { message, kafkaTopic, consumer }

Logger.info(Util.breadcrumb(location, { path: 'dupCheck' }))
const { isDuplicateId, isResend } = await BulkTransferService.checkDuplicate(bulkTransferId, payload.hash)
Expand Down Expand Up @@ -168,10 +168,11 @@ const bulkPrepare = async (error, messages) => {
try {
Logger.info(Util.breadcrumb(location, `individualTransfers`))
// stream initialization
let indvidualTransfersStream = IndividualTransferModel.find({ bulkTransferId }).cursor()
let indvidualTransfersStream = IndividualTransferModel.find({ messageId }).cursor()
// enable async/await operations for the stream
let streamReader = AwaitifyStream.createReader(indvidualTransfersStream)
let doc

while ((doc = await streamReader.readAsync()) !== null) {
let individualTransfer = doc.payload
individualTransfer.payerFsp = payload.payerFsp
Expand All @@ -187,18 +188,18 @@ const bulkPrepare = async (error, messages) => {
await BulkTransferService.bulkTransferAssociationCreate(bulkTransferAssociationRecord)

let dataUri = encodePayload(JSON.stringify(individualTransfer), headers['content-type'])
let message = Object.assign({}, prepareHandlerMessageProtocol)
message.value.id = doc.payload.transferId
message.value.from = payload.payerFsp
message.value.to = payload.payeeFsp
message.value.content.headers = headers
message.value.content.payload = dataUri
message.value.metadata.event.createdAt = new Date()
let msg = Object.assign({}, prepareHandlerMessageProtocol)
msg.value.id = messageId
msg.value.from = payload.payerFsp
msg.value.to = payload.payeeFsp
msg.value.content.headers = headers
msg.value.content.payload = dataUri
msg.value.metadata.event.id = message.value.metadata.event.id
msg.value.metadata.event.createdAt = new Date()

Logger.info(Util.breadcrumb(location, JSON.stringify(message)))
params = { message, bulkTransferId, kafkaTopic, consumer }
const producer = { functionality: TransferEventType.TRANSFER, action: TransferEventAction.PREPARE } // TODO: change to BULK_PREPARE?
await Util.proceed(params, { consumerCommit, histTimerEnd, producer, toDestination })
params = { message: msg, kafkaTopic, consumer }
const producer = { functionality: TransferEventType.PREPARE, action: TransferEventAction.BULK_PREPARE }
await Util.proceed(params, { consumerCommit, histTimerEnd, producer })
}
} catch (err) { // TODO: handle individual transfers streaming error
Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}6`))
Expand Down Expand Up @@ -239,8 +240,8 @@ const registerBulkPrepareHandler = async () => {
await connectMongoose()
const bulkPrepareHandler = {
command: bulkPrepare,
topicName: Util.transformGeneralTopicName(TransferEventType.BULK_TRANSFER, TransferEventAction.PREPARE),
config: Util.getKafkaConfig(Util.ENUMS.CONSUMER, TransferEventType.BULK_TRANSFER.toUpperCase(), TransferEventAction.PREPARE.toUpperCase())
topicName: Util.transformGeneralTopicName(TransferEventType.BULK, TransferEventAction.PREPARE),
config: Util.getKafkaConfig(Util.ENUMS.CONSUMER, TransferEventType.BULK.toUpperCase(), TransferEventAction.PREPARE.toUpperCase())
}
bulkPrepareHandler.config.rdkafkaConf['client.id'] = bulkPrepareHandler.topicName
await Kafka.Consumer.createHandler(bulkPrepareHandler.topicName, bulkPrepareHandler.config, bulkPrepareHandler.command)
Expand All @@ -255,7 +256,7 @@ const registerBulkPrepareHandler = async () => {
* @function RegisterAllHandlers
*
* @async
* @description Registers all handlers in transfers ie: bulkPrepare, bulkFulfil, etc.
* @description Registers all module handlers
*
* @returns {boolean} - Returns a boolean: true if successful, or throws and error if failed
*/
Expand Down
Loading

0 comments on commit b84e0e5

Please sign in to comment.