From efcf7c4d19cd5f8a73036c8abe5d6336d07fd057 Mon Sep 17 00:00:00 2001 From: Georgi Georgiev Date: Thu, 20 Jun 2019 22:12:06 +0300 Subject: [PATCH] Feature/Bulk Transfers POC - Bulk prepare (#116) * package-lock.json * Fix issues with object store usage * Additional changes to Object Store and Kafka config * Add MONGODB.URI config * Updated package.json * Add license info * Updated integration tests run config * Init BulkProcessingHandler * WIP #1 * message.id=uuid() during Transfer Prepare * Removed kafkaConf.key Switched to message.id=UUID for fulfil and get 28 unit tests fail * Fixed unit tests * Unifying mongo schema definitions * Finilize ml-api-adapter, but unit tests hang * Fix mongoose unique index issue * Fixed integration tests: uriParams added * Changes as per PR review comments * Putting central-object-store library - wip * Re-factored objStore lib to be re-usable between both ml-api and central-ledger * Fixing issue with unit tests * Fix standard issues * Add mdebarros as contributor and remove unused code * Removing local objectStoreLib * updated objectstore connection server setup to handle bad connection --- config/default.json | 6 +- package-lock.json | 75 +++++++ package.json | 21 +- src/bulkApi/handlers/bulkTransfers.js | 16 +- src/bulkApi/handlers/bulkTransfers/{id}.js | 6 +- src/bulkApi/lib/mongodb/db.js | 43 ---- .../models/bulkTransfers/bulkModels.js | 123 ----------- src/bulkApi/package-lock.json | 43 +++- src/bulkApi/package.json | 19 +- src/bulkApi/server.js | 11 +- src/domain/transfer/index.js | 76 +++---- src/handlers/notification/index.js | 28 ++- src/lib/enum.js | 6 +- src/lib/kafka/index.js | 1 - src/lib/utility.js | 4 +- src/models/bulkTransfers/facade.js | 0 src/shared/setup.js | 15 ++ test/integration-config.json | 2 +- .../handlers/notification/index.test.js | 20 +- test/unit/handlers/notification/index.test.js | 191 ++++++++++-------- test/unit/shared/setup.test.js | 7 + 21 files changed, 378 insertions(+), 335 deletions(-) delete mode 100644 src/bulkApi/lib/mongodb/db.js delete mode 100644 src/bulkApi/models/bulkTransfers/bulkModels.js create mode 100644 src/models/bulkTransfers/facade.js diff --git a/config/default.json b/config/default.json index 7abebc94..31b3d492 100644 --- a/config/default.json +++ b/config/default.json @@ -10,7 +10,7 @@ "generateTimeout": 30000 }, "MONGODB": { - "URI": "mongodb://localhost:27017/bulk_transfers" + "URI": "mongodb://localhost:27017/mlos" }, "ENDPOINT_SECURITY":{ "TLS": { @@ -87,7 +87,7 @@ } }, "PRODUCER": { - "BULK-TRANSFER": { + "BULK": { "PREPARE": { "config": { "options": { @@ -101,7 +101,7 @@ "socket.keepalive.enable": true, "queue.buffering.max.messages": 10000000 }, - "topicConf": { + "topicConf": { "request.required.acks": "all" } } diff --git a/package-lock.json b/package-lock.json index 6835710a..e2e16d26 100644 --- a/package-lock.json +++ b/package-lock.json @@ -351,6 +351,42 @@ "@hapi/hoek": "6.x.x" } }, + "@mojaloop/central-object-store": { + "version": "6.4.0-snapshot", + "resolved": "https://registry.npmjs.org/@mojaloop/central-object-store/-/central-object-store-6.4.0-snapshot.tgz", + "integrity": "sha512-i31FpiwN0cAeZqZ6q9SM27hiWaacnlO5k5Kkf6Ioig0caQTaeMRqs8Tu9lwvDVQzOsmB+hNhtuhfGv6OTCeT8Q==", + "requires": { + "@mojaloop/central-services-shared": "5.2.0", + "mongoose": "5.6.0" + }, + "dependencies": { + "mongoose": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/mongoose/-/mongoose-5.6.0.tgz", + "integrity": "sha512-bhevx8u4NfZf2Un+CcKWRsiNekrLH7dSI8mBC49FcY2SUXQPZf3w+Yby+cgDrpZA46nkqRW9Qaqhs7PT0XCtYQ==", + "requires": { + "async": "2.6.2", + "bson": "~1.1.1", + "kareem": "2.3.0", + "mongodb": "3.2.7", + "mongodb-core": "3.2.7", + "mongoose-legacy-pluralize": "1.0.2", + "mpath": "0.6.0", + "mquery": "3.2.1", + "ms": "2.1.2", + "regexp-clone": "1.0.0", + "safe-buffer": "5.1.2", + "sift": "7.0.1", + "sliced": "1.0.1" + } + }, + "ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + } + } + }, "@mojaloop/central-services-auth": { "version": "5.2.1", "resolved": "https://registry.npmjs.org/@mojaloop/central-services-auth/-/central-services-auth-5.2.1.tgz", @@ -1103,11 +1139,50 @@ "bulk-api": { "version": "file:src/bulkApi", "requires": { + "@mojaloop/central-object-store": "6.4.0-snapshot", "@mojaloop/central-services-shared": "^5.2.0", "boom": "^7.1.1", "hapi": "^17.0.0", "hapi-openapi": "^1.0.0", "mongoose": "^5.5.14" + }, + "dependencies": { + "@mojaloop/central-object-store": { + "version": "6.4.0-snapshot", + "resolved": "https://registry.npmjs.org/@mojaloop/central-object-store/-/central-object-store-6.4.0-snapshot.tgz", + "integrity": "sha512-i31FpiwN0cAeZqZ6q9SM27hiWaacnlO5k5Kkf6Ioig0caQTaeMRqs8Tu9lwvDVQzOsmB+hNhtuhfGv6OTCeT8Q==", + "requires": { + "@mojaloop/central-services-shared": "5.2.0", + "mongoose": "5.6.0" + }, + "dependencies": { + "mongoose": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/mongoose/-/mongoose-5.6.0.tgz", + "integrity": "sha512-bhevx8u4NfZf2Un+CcKWRsiNekrLH7dSI8mBC49FcY2SUXQPZf3w+Yby+cgDrpZA46nkqRW9Qaqhs7PT0XCtYQ==", + "requires": { + "async": "2.6.2", + "bson": "~1.1.1", + "kareem": "2.3.0", + "mongodb": "3.2.7", + "mongodb-core": "3.2.7", + "mongoose-legacy-pluralize": "1.0.2", + "mpath": "0.6.0", + "mquery": "3.2.1", + "ms": "2.1.2", + "regexp-clone": "1.0.0", + "safe-buffer": "5.1.2", + "sift": "7.0.1", + "sliced": "1.0.1" + } + } + } + }, + "ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + } } }, "bytes": { diff --git a/package.json b/package.json index 1fd3d244..eb6fd3d2 100644 --- a/package.json +++ b/package.json @@ -59,7 +59,15 @@ "generate-docs": "node_modules/.bin/jsdoc -c jsdoc.json" }, "dependencies": { - "bulk-api": "file:src/bulkApi", + "@hapi/basic": "5.1.0", + "@hapi/boom": "7.4.2", + "@hapi/good": "8.2.0", + "@hapi/hapi": "18.3.1", + "@hapi/inert": "5.2.0", + "@hapi/joi": "15.0.3", + "@hapi/joi-date": "1.3.0", + "@hapi/vision": "5.5.2", + "@mojaloop/central-object-store": "6.4.0-snapshot", "@mojaloop/central-services-auth": "5.2.1", "@mojaloop/central-services-database": "5.2.1", "@mojaloop/central-services-error-handling": "5.2.0", @@ -70,23 +78,17 @@ "@now-ims/hapi-now-auth": "1.3.1", "blipp": "3.1.2", "bluebird": "3.5.3", - "@hapi/boom": "7.4.2", + "bulk-api": "file:src/bulkApi", "catbox": "10.0.6", "catbox-memory": "4.0.1", "commander": "2.19.0", "docdash": "1.0.3", "flat": "4.1.0", "glob": "7.1.3", - "@hapi/good": "8.2.0", - "@hapi/hapi": "18.3.1", - "@hapi/basic": "5.1.0", "hapi-auth-bearer-token": "6.1.1", "hapi-swagger": "9.4.2", "immutable": "3.8.2", - "@hapi/inert": "5.2.0", - "@hapi/joi": "15.0.3", "joi-currency-code": "2.0.2", - "@hapi/joi-date": "1.3.0", "jsdoc": "3.6.2", "knex": "0.16.3", "lodash": "4.17.11", @@ -95,8 +97,7 @@ "rc": "1.2.8", "request": "2.88.0", "urlsafe-base64": "1.0.0", - "uuid4": "1.1.4", - "@hapi/vision": "5.5.2" + "uuid4": "1.1.4" }, "devDependencies": { "async-request": "1.2.0", diff --git a/src/bulkApi/handlers/bulkTransfers.js b/src/bulkApi/handlers/bulkTransfers.js index 9e4cf81b..e2c3420a 100644 --- a/src/bulkApi/handlers/bulkTransfers.js +++ b/src/bulkApi/handlers/bulkTransfers.js @@ -23,6 +23,7 @@ - Name Surname * Georgi Georgiev + * Miguel de Barros * Valentin Genev -------------- ******/ @@ -31,8 +32,9 @@ const TransferService = require('../../domain/transfer') const Logger = require('@mojaloop/central-services-shared').Logger const Boom = require('boom') -const { BulkTransferModel } = require('../models/bulkTransfers/bulkModels') +const BulkTransferModels = require('@mojaloop/central-object-store').Models.BulkTransfer const Util = require('../../lib/util') +const Uuid = require('uuid4') /** * Operations on /bulkTransfers @@ -48,12 +50,14 @@ module.exports = { post: async function postBulkTransfers (request, h) { try { Logger.debug('create::payload(%s)', JSON.stringify(request.payload)) - let { bulkTransferId, bulkQuoteId, payerFsp, payeeFsp, expiration, extensionList } = request.payload - let hash = Util.createHash(JSON.stringify(request.payload)) - let newBulk = new BulkTransferModel(Object.assign({}, { headers: request.headers }, request.payload)) + const { bulkTransferId, bulkQuoteId, payerFsp, payeeFsp, expiration, extensionList } = request.payload + const hash = Util.createHash(JSON.stringify(request.payload)) + const messageId = Uuid() + let BulkTransferModel = BulkTransferModels.getBulkTransferModel() + const newBulk = new BulkTransferModel(Object.assign({}, { messageId, headers: request.headers }, request.payload)) await newBulk.save() - let message = { bulkTransferId, bulkQuoteId, payerFsp, payeeFsp, expiration, extensionList, hash } - await TransferService.bulkPrepare(request.headers, message) + const message = { bulkTransferId, bulkQuoteId, payerFsp, payeeFsp, expiration, extensionList, hash } + await TransferService.bulkPrepare(messageId, request.headers, message) return h.response().code(202) } catch (err) { Logger.error(err) diff --git a/src/bulkApi/handlers/bulkTransfers/{id}.js b/src/bulkApi/handlers/bulkTransfers/{id}.js index 8fb4f874..9f4758b3 100644 --- a/src/bulkApi/handlers/bulkTransfers/{id}.js +++ b/src/bulkApi/handlers/bulkTransfers/{id}.js @@ -22,13 +22,14 @@ * Gates Foundation - Name Surname + * Georgi Georgiev * Valentin Genev -------------- ******/ 'use strict' const Boom = require('boom') -const { IndividualTransferModel } = require('../../models/bulkTransfers/bulkModels') +const BulkTransferModels = require('@mojaloop/central-object-store').Models.BulkTransfer /** * Operations on /bulkTransfers/{id} @@ -43,10 +44,11 @@ module.exports = { */ get: async function getBulkTransfersId (request, h) { let { id } = request.params + let IndividualTransferModel = BulkTransferModels.getIndividualTransferModel() try { let indvidualTransfers = await IndividualTransferModel .find({ bulkTransferId: id }, '-dataUri -_id') - .populate('bulkDocument', 'headers -_id') // TODO in bulk-handler first get only headers, then compose each individual transfer without population + .populate('_id_bulkTransfers', 'headers -_id') // TODO in bulk-handler first get only headers, then compose each individual transfer without population return h.response(indvidualTransfers) } catch (e) { throw e diff --git a/src/bulkApi/lib/mongodb/db.js b/src/bulkApi/lib/mongodb/db.js deleted file mode 100644 index 88187d81..00000000 --- a/src/bulkApi/lib/mongodb/db.js +++ /dev/null @@ -1,43 +0,0 @@ -/***** - License - -------------- - Copyright © 2017 Bill & Melinda Gates Foundation - The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - - Contributors - -------------- - This is the official list of the Mojaloop project contributors for this file. - Names of the original copyright holders (individuals or organizations) - should be listed with a '*' in the first column. People who have - contributed from an organization can be listed under the organization - that actually holds the copyright for their contributions (see the - Gates Foundation organization for an example). Those individuals should have - their names indented and be marked with a '-'. Email address can be added - optionally within square brackets . - - * Gates Foundation - - Name Surname - - * Valentin Genev - -------------- - ******/ -'use strict' - -const Mongoose = require('mongoose') -const Logger = require('@mojaloop/central-services-shared').Logger - -Mongoose.connection.on('error', (err) => { Logger.info('connection error ', err) }) -Mongoose.connection.once('open', function callback () { - Logger.info('MongoDB succesfully connected') -}) - -Mongoose.set('useFindAndModify', false) -Mongoose.set('useNewUrlParser', true) -Mongoose.set('useCreateIndex', true) - -exports.Mongoose = Mongoose -exports.db = Mongoose.connection diff --git a/src/bulkApi/models/bulkTransfers/bulkModels.js b/src/bulkApi/models/bulkTransfers/bulkModels.js deleted file mode 100644 index 5d1e7c9b..00000000 --- a/src/bulkApi/models/bulkTransfers/bulkModels.js +++ /dev/null @@ -1,123 +0,0 @@ -/***** - License - -------------- - Copyright © 2017 Bill & Melinda Gates Foundation - The Mojaloop files are made available by the Bill & Melinda Gates Foundation under the Apache License, Version 2.0 (the "License") and you may not use these files except in compliance with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - - Contributors - -------------- - This is the official list of the Mojaloop project contributors for this file. - Names of the original copyright holders (individuals or organizations) - should be listed with a '*' in the first column. People who have - contributed from an organization can be listed under the organization - that actually holds the copyright for their contributions (see the - Gates Foundation organization for an example). Those individuals should have - their names indented and be marked with a '-'. Email address can be added - optionally within square brackets . - - * Gates Foundation - - Name Surname - - * Georgi Georgiev - * Valentin Genev - -------------- - ******/ -'use strict' - -const mongoose = require('mongoose') - -// single transfer model -const transfer = { - transferId: { - type: String, required: true, unique: true, index: true - }, - transferAmount: { - currency: { - type: String, - required: true - }, - amount: { - type: Number, - required: true - } - }, - ilpPacket: { - type: String, - required: true - }, - condition: { - type: String, - required: true - }, - extensionList: { - extension: [{ - key: String, - value: String - }] - } -} - -// schema for individual transfer with bulkQuoteId reference -const individualTransferSchema = new mongoose.Schema(Object.assign({}, { payload: transfer }, - { bulkDocument: { type: mongoose.Schema.Types.ObjectId, ref: 'bulktransfers' }, - bulkTransferId: { type: mongoose.Schema.Types.String }, - payload: { type: Object, required: true } - })) - -// schema for bulkquotes -const bulkTransferSchema = new mongoose.Schema({ - headers: { - type: Object, required: true - }, - bulkQuoteId: { - type: String, required: true, unique: true - }, - bulkTransferId: { - type: String, required: true, index: true, unique: true - }, - payerFsp: { - type: String, required: true - }, - payeeFsp: { - type: String, required: true - }, - expiration: { - type: Date - }, - individualTransfers: [new mongoose.Schema(Object.assign({ - _id: false - }, transfer))], - extensionList: [{ - key: String, - value: String - }] -}) - -const IndividualTransferModel = mongoose.model('transfers', individualTransferSchema) - -// after the bulk object is created, before its save, single transfers are created and saved in the transfers collection with the bulk reference -bulkTransferSchema.pre('save', function () { - try { - this.individualTransfers.forEach(async transfer => { - try { - let individualTransfer = new IndividualTransferModel({ payload: transfer._doc }) - individualTransfer.bulkDocument = this._id - individualTransfer.bulkTransferId = this.bulkTransferId - individualTransfer.payload = transfer._doc - await individualTransfer.save() - } catch (e) { - throw e - } - }) - } catch (e) { - throw (e) - } -}) - -const BulkTransferModel = mongoose.model('bulktransfers', bulkTransferSchema) - -module.exports = { BulkTransferModel, IndividualTransferModel } diff --git a/src/bulkApi/package-lock.json b/src/bulkApi/package-lock.json index a71f46ab..cb24e459 100644 --- a/src/bulkApi/package-lock.json +++ b/src/bulkApi/package-lock.json @@ -1,9 +1,48 @@ { - "name": "bulk", - "version": "1.0.0", + "name": "@mojaloop/bulk-transfers-api-adapter", + "version": "0.9.0", "lockfileVersion": 1, "requires": true, "dependencies": { + "@mojaloop/central-object-store": { + "version": "6.4.0-snapshot", + "resolved": "https://registry.npmjs.org/@mojaloop/central-object-store/-/central-object-store-6.4.0-snapshot.tgz", + "integrity": "sha512-i31FpiwN0cAeZqZ6q9SM27hiWaacnlO5k5Kkf6Ioig0caQTaeMRqs8Tu9lwvDVQzOsmB+hNhtuhfGv6OTCeT8Q==", + "requires": { + "@mojaloop/central-services-shared": "5.2.0", + "mongoose": "5.6.0" + }, + "dependencies": { + "async": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/async/-/async-2.6.2.tgz", + "integrity": "sha512-H1qVYh1MYhEEFLsP97cVKqCGo7KfCyTt6uEWqsTBr9SO84oK9Uwbyd/yCW+6rKJLHksBNUVWZDAjfS+Ccx0Bbg==", + "requires": { + "lodash": "^4.17.11" + } + }, + "mongoose": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/mongoose/-/mongoose-5.6.0.tgz", + "integrity": "sha512-bhevx8u4NfZf2Un+CcKWRsiNekrLH7dSI8mBC49FcY2SUXQPZf3w+Yby+cgDrpZA46nkqRW9Qaqhs7PT0XCtYQ==", + "requires": { + "async": "2.6.2", + "bson": "~1.1.1", + "kareem": "2.3.0", + "mongodb": "3.2.7", + "mongodb-core": "3.2.7", + "mongoose-legacy-pluralize": "1.0.2", + "mpath": "0.6.0", + "mquery": "3.2.1", + "ms": "2.1.2", + "regexp-clone": "1.0.0", + "safe-buffer": "5.1.2", + "sift": "7.0.1", + "sliced": "1.0.1" + } + } + } + }, "@mojaloop/central-services-shared": { "version": "5.2.0", "resolved": "https://registry.npmjs.org/@mojaloop/central-services-shared/-/central-services-shared-5.2.0.tgz", diff --git a/src/bulkApi/package.json b/src/bulkApi/package.json index 458407ce..82c0930b 100644 --- a/src/bulkApi/package.json +++ b/src/bulkApi/package.json @@ -1,18 +1,23 @@ { - "name": "bulk", - "description": "", - "version": "1.0.0", - "author": "v ", - "contributors": [], + "name": "@mojaloop/bulk-transfers-api-adapter", + "version": "0.9.0", + "description": "Mojaloop Bulk transfers api adapter", + "license": "Apache-2.0", + "author": "ModusBox", + "contributors": [ + "Georgi Georgiev ", + "Miguel de Barros ", + "Valentin Genev " + ], "repository": { "type": "git", - "url": "git://github.com/v/bulk.git" + "url": "git://github.com/mojaloop/bulk-transfers-api.git" }, - "bugs": "http://github.com/v/bulk/issues", "publishConfig": { "registry": "https://registry.npmjs.org" }, "dependencies": { + "@mojaloop/central-object-store": "6.4.0-snapshot", "@mojaloop/central-services-shared": "^5.2.0", "boom": "^7.1.1", "hapi": "^17.0.0", diff --git a/src/bulkApi/server.js b/src/bulkApi/server.js index 4b2673f2..d7c1c074 100644 --- a/src/bulkApi/server.js +++ b/src/bulkApi/server.js @@ -31,13 +31,20 @@ const Hapi = require('hapi') const HapiOpenAPI = require('hapi-openapi') const Path = require('path') -const Mongoose = require('./lib/mongodb/db').Mongoose const Boom = require('@hapi/boom') const Logger = require('@mojaloop/central-services-shared').Logger +const ObjStoreDb = require('@mojaloop/central-object-store').Db +// const Mongoose = require('@mojaloop/central-object-store').Db.Mongoose const Config = require('../lib/config') +// const connectMongoose = async () => { +// let db = await Mongoose.connect(Config.MONGODB_URI, { +// promiseLibrary: global.Promise +// }) +// return db +// } const connectMongoose = async () => { - let db = await Mongoose.connect(Config.MONGODB_URI, { + let db = await ObjStoreDb.connect(Config.MONGODB_URI, { promiseLibrary: global.Promise }) return db diff --git a/src/domain/transfer/index.js b/src/domain/transfer/index.js index 0808a359..505e9438 100644 --- a/src/domain/transfer/index.js +++ b/src/domain/transfer/index.js @@ -34,7 +34,7 @@ const TRANSFER = 'transfer' const PREPARE = 'prepare' const FULFIL = 'fulfil' const GET = 'get' -const BULK_TRANSFER = 'bulk-transfer' +const BULK_TRANSFER = 'bulk' /** * @module src/domain/transfer @@ -53,9 +53,9 @@ const BULK_TRANSFER = 'bulk-transfer' const prepare = async (headers, message, dataUri) => { Logger.debug('domain::transfer::prepare::start(%s, %s)', headers, message) try { - const kafkaConfig = Utility.getKafkaConfig(Utility.ENUMS.PRODUCER, TRANSFER.toUpperCase(), PREPARE.toUpperCase()) + const messageId = Uuid() const messageProtocol = { - id: message.transferId, + id: messageId, to: message.payeeFsp, from: message.payerFsp, type: 'application/json', @@ -76,7 +76,8 @@ const prepare = async (headers, message, dataUri) => { } } } - const topicConfig = Utility.createGeneralTopicConf(TRANSFER, PREPARE, message.transferId) + const topicConfig = Utility.createGeneralTopicConf(TRANSFER, PREPARE/* , messageId */) + const kafkaConfig = Utility.getKafkaConfig(Utility.ENUMS.PRODUCER, TRANSFER.toUpperCase(), PREPARE.toUpperCase()) Logger.debug(`domain::transfer::prepare::messageProtocol - ${messageProtocol}`) Logger.debug(`domain::transfer::prepare::topicConfig - ${topicConfig}`) Logger.debug(`domain::transfer::prepare::kafkaConfig - ${kafkaConfig}`) @@ -88,13 +89,12 @@ const prepare = async (headers, message, dataUri) => { } } -const bulkPrepare = async (headers, message) => { +const bulkPrepare = async (messageId, headers, message) => { Logger.debug('domain::bulk-transfer::prepare::start(%s, %s)', headers, message) try { - let { bulkTransferId, payerFsp, payeeFsp } = message - const kafkaConfig = Utility.getKafkaConfig(Utility.ENUMS.PRODUCER, BULK_TRANSFER.toUpperCase(), PREPARE.toUpperCase()) + let { payerFsp, payeeFsp } = message const messageProtocol = { - id: bulkTransferId, + id: messageId, to: payeeFsp, from: payerFsp, type: 'application/json', @@ -115,14 +115,15 @@ const bulkPrepare = async (headers, message) => { } } } - const topicConfig = Utility.createGeneralTopicConf(BULK_TRANSFER, PREPARE, message.objectId) - Logger.debug(`domain::transfer::prepare::messageProtocol - ${messageProtocol}`) - Logger.debug(`domain::transfer::prepare::topicConfig - ${topicConfig}`) - Logger.debug(`domain::transfer::prepare::kafkaConfig - ${kafkaConfig}`) + const topicConfig = Utility.createGeneralTopicConf(BULK_TRANSFER, PREPARE/* , message.objectId */) + const kafkaConfig = Utility.getKafkaConfig(Utility.ENUMS.PRODUCER, BULK_TRANSFER.toUpperCase(), PREPARE.toUpperCase()) + Logger.debug(`domain::bulkTransfer::prepare::messageProtocol - ${messageProtocol}`) + Logger.debug(`domain::bulkTransfer::prepare::topicConfig - ${topicConfig}`) + Logger.debug(`domain::bulkTransfer::prepare::kafkaConfig - ${kafkaConfig}`) await Kafka.Producer.produceMessage(messageProtocol, topicConfig, kafkaConfig) return true } catch (err) { - Logger.error(`domain::transfer::prepare::Kafka error:: ERROR:'${err}'`) + Logger.error(`domain::bulkTransfer::prepare::Kafka error:: ERROR:'${err}'`) throw err } } @@ -132,24 +133,25 @@ const bulkPrepare = async (headers, message) => { * @async * @description This will produce a transfer fulfil message to transfer fulfil kafka topic. It gets the kafka configuration from config. It constructs the message and published to kafka * -* @param {string} id - the transferId +* @param {string} transferId - transferId * @param {object} headers - the http header from the request * @param {object} message - the transfer fulfil message * * @returns {boolean} Returns true on successful publishing of message to kafka, throws error on falires */ -const fulfil = async (id, headers, message, dataUri) => { - Logger.debug('domain::transfer::fulfil::start(%s, %s, %s)', id, headers, message) +const fulfil = async (transferId, headers, message, dataUri) => { + Logger.debug('domain::transfer::fulfil::start(%s, %s, %s)', transferId, headers, message) try { + const messageId = Uuid() const action = message.transferState === 'ABORTED' ? 'reject' : 'commit' - const kafkaConfig = Utility.getKafkaConfig(Utility.ENUMS.PRODUCER, TRANSFER.toUpperCase(), FULFIL.toUpperCase()) const messageProtocol = { - id, + id: messageId, to: headers['fspiop-destination'], from: headers['fspiop-source'], type: 'application/json', content: { headers: headers, + uriParams: { id: transferId }, payload: dataUri }, metadata: { @@ -165,10 +167,8 @@ const fulfil = async (id, headers, message, dataUri) => { } } } - // const topicConfig = { - // topicName: Utility.getFulfilTopicName() // `topic-${message.payerFsp}-transfer-prepare` - // } - const topicConfig = Utility.createGeneralTopicConf(TRANSFER, FULFIL, id) + const topicConfig = Utility.createGeneralTopicConf(TRANSFER, FULFIL/* , messageId */) + const kafkaConfig = Utility.getKafkaConfig(Utility.ENUMS.PRODUCER, TRANSFER.toUpperCase(), FULFIL.toUpperCase()) Logger.debug(`domain::transfer::fulfil::messageProtocol - ${messageProtocol}`) Logger.debug(`domain::transfer::fulfil::topicConfig - ${topicConfig}`) Logger.debug(`domain::transfer::fulfil::kafkaConfig - ${kafkaConfig}`) @@ -192,17 +192,18 @@ const fulfil = async (id, headers, message, dataUri) => { * * @returns {boolean} Returns true on successful publishing of message to kafka, throws error on falires */ -const getTransferById = async (id, headers) => { - Logger.info('domain::transfer::transferById::start(%s, %s, %s)', id, headers) +const getTransferById = async (transferId, headers) => { + Logger.info('domain::transfer::transferById::start(%s, %s, %s)', transferId, headers) try { - const kafkaConfig = Utility.getKafkaConfig(Utility.ENUMS.PRODUCER, TRANSFER.toUpperCase(), GET.toUpperCase()) + const messageId = Uuid() const messageProtocol = { - id, + id: messageId, to: headers['fspiop-destination'], from: headers['fspiop-source'], type: 'application/json', content: { headers: headers, + uriParams: { id: transferId }, payload: {} }, metadata: { @@ -218,9 +219,8 @@ const getTransferById = async (id, headers) => { } } } - const topicConfig = { - topicName: Utility.getTransferByIdTopicName() - } + const topicConfig = { topicName: Utility.getTransferByIdTopicName() } + const kafkaConfig = Utility.getKafkaConfig(Utility.ENUMS.PRODUCER, TRANSFER.toUpperCase(), GET.toUpperCase()) Logger.info(`domain::transfer::get::messageProtocol - ${messageProtocol}`) Logger.info(`domain::transfer::get::topicConfig - ${topicConfig}`) Logger.info(`domain::transfer::get::kafkaConfig - ${kafkaConfig}`) @@ -243,17 +243,18 @@ const getTransferById = async (id, headers) => { * * @returns {boolean} Returns true on successful publishing of message to kafka, throws error on falires */ -const transferError = async (id, headers, message, dataUri) => { - Logger.debug('domain::transfer::abort::start(%s, %s, %s)', id, headers, message) +const transferError = async (transferId, headers, message, dataUri) => { + Logger.debug('domain::transfer::abort::start(%s, %s, %s)', transferId, headers, message) try { - const kafkaConfig = Utility.getKafkaConfig(Utility.ENUMS.PRODUCER, TRANSFER.toUpperCase(), FULFIL.toUpperCase()) + const messageId = Uuid() const messageProtocol = { - id, + id: messageId, to: headers['fspiop-destination'], from: headers['fspiop-source'], type: 'application/json', content: { headers: headers, + uriParams: { id: transferId }, payload: dataUri }, metadata: { @@ -269,7 +270,8 @@ const transferError = async (id, headers, message, dataUri) => { } } } - const topicConfig = Utility.createGeneralTopicConf(TRANSFER, FULFIL, id) + const topicConfig = Utility.createGeneralTopicConf(TRANSFER, FULFIL/* , messageId */) + const kafkaConfig = Utility.getKafkaConfig(Utility.ENUMS.PRODUCER, TRANSFER.toUpperCase(), FULFIL.toUpperCase()) Logger.debug(`domain::transfer::abort::messageProtocol - ${messageProtocol}`) Logger.debug(`domain::transfer::abort::topicConfig - ${topicConfig}`) Logger.debug(`domain::transfer::abort::kafkaConfig - ${kafkaConfig}`) @@ -281,9 +283,9 @@ const transferError = async (id, headers, message, dataUri) => { } } module.exports = { - prepare, + bulkPrepare, fulfil, getTransferById, - transferError, - bulkPrepare + prepare, + transferError } diff --git a/src/handlers/notification/index.js b/src/handlers/notification/index.js index 2097ae78..2f112e1d 100644 --- a/src/handlers/notification/index.js +++ b/src/handlers/notification/index.js @@ -36,11 +36,15 @@ const EVENT = 'event' const FSPIOP_CALLBACK_URL_TRANSFER_POST = 'FSPIOP_CALLBACK_URL_TRANSFER_POST' const FSPIOP_CALLBACK_URL_TRANSFER_PUT = 'FSPIOP_CALLBACK_URL_TRANSFER_PUT' const FSPIOP_CALLBACK_URL_TRANSFER_ERROR = 'FSPIOP_CALLBACK_URL_TRANSFER_ERROR' +const FSPIOP_CALLBACK_URL_BULK_TRANSFER_POST = 'FSPIOP_CALLBACK_URL_BULK_TRANSFER_POST' +// const FSPIOP_CALLBACK_URL_BULK_TRANSFER_PUT = 'FSPIOP_CALLBACK_URL_BULK_TRANSFER_PUT' +const FSPIOP_CALLBACK_URL_BULK_TRANSFER_ERROR = 'FSPIOP_CALLBACK_URL_BULK_TRANSFER_ERROR' let notificationConsumer = {} let autoCommitEnabled = true const Metrics = require('@mojaloop/central-services-metrics') const ENUM = require('../../lib/enum') const decodePayload = require('@mojaloop/central-services-stream').Kafka.Protocol.decodePayload +const BulkTransfer = require('@mojaloop/central-object-store').Models.BulkTransfer // note that incoming headers shoud be lowercased by node // const jwsHeaders = ['fspiop-signature', 'fspiop-http-method', 'fspiop-uri'] @@ -148,8 +152,9 @@ const processMessage = async (msg) => { throw new Error('Invalid message received from kafka') } - const { metadata, from, to, content, id } = msg.value + const { metadata, from, to, content } = msg.value const { action, state } = metadata.event + const messageId = msg.value.id const status = state.status const actionLower = action.toLowerCase() @@ -158,7 +163,9 @@ const processMessage = async (msg) => { Logger.info('Notification::processMessage action: ' + action) Logger.info('Notification::processMessage status: ' + status) let decodedPayload = decodePayload(content.payload, { asParsed: false }) + let id = JSON.parse(decodedPayload.body.toString()).transferId || (content.uriParams && content.uriParams.id) let payloadForCallback = decodedPayload.body.toString() + if (actionLower === ENUM.transferEventAction.PREPARE && statusLower === ENUM.messageStatus.SUCCESS) { let callbackURLTo = await Participant.getEndpoint(to, FSPIOP_CALLBACK_URL_TRANSFER_POST, id) let methodTo = ENUM.methods.FSPIOP_CALLBACK_URL_TRANSFER_POST @@ -292,6 +299,25 @@ const processMessage = async (msg) => { return Callback.sendCallback(callbackURLTo, methodTo, content.headers, payloadForCallback, id, from, to) } + if (actionLower === ENUM.transferEventAction.BULK_PREPARE && statusLower === ENUM.messageStatus.SUCCESS) { + let responsePayload = JSON.parse(payloadForCallback) + id = responsePayload.bulkTransferId + let callbackURLTo = await Participant.getEndpoint(to, FSPIOP_CALLBACK_URL_BULK_TRANSFER_POST, id) + let methodTo = ENUM.methods.FSPIOP_CALLBACK_URL_BULK_TRANSFER_POST + Logger.debug(`Notification::processMessage - Callback.sendCallback(${callbackURLTo}, ${methodTo}, ${JSON.stringify(content.headers)}, ${payloadForCallback}, ${id}, ${from}, ${to})`) + let bulkResponseMessage = await BulkTransfer.getBulkTransferResponseByMessageIdDestination(messageId, to) + responsePayload.individualTransferResults = bulkResponseMessage.individualTransferResults + return Callback.sendCallback(callbackURLTo, methodTo, content.headers, JSON.stringify(responsePayload), id, from, to) + } + + if (actionLower === ENUM.transferEventAction.BULK_PREPARE && statusLower !== ENUM.messageStatus.SUCCESS) { + id = JSON.parse(payloadForCallback).bulkTransferId + let callbackURLTo = await Participant.getEndpoint(to, FSPIOP_CALLBACK_URL_BULK_TRANSFER_ERROR, id) + let methodFrom = ENUM.methods.FSPIOP_CALLBACK_URL_BULK_TRANSFER_ERROR + Logger.debug(`Notification::processMessage - Callback.sendCallback(${callbackURLTo}, ${methodFrom}, ${JSON.stringify(content.headers)}, ${payloadForCallback}, ${id}, ${from}, ${to})`) + return Callback.sendCallback(callbackURLTo, methodFrom, content.headers, payloadForCallback, id, from, to) + } + const err = new Error('Unknown action received from kafka') Logger.error(`Error sending notification - ${err}`) throw err diff --git a/src/lib/enum.js b/src/lib/enum.js index 872996f9..06eddee7 100644 --- a/src/lib/enum.js +++ b/src/lib/enum.js @@ -51,7 +51,10 @@ const headers = { const methods = { FSPIOP_CALLBACK_URL_TRANSFER_POST: 'post', FSPIOP_CALLBACK_URL_TRANSFER_ERROR: 'put', - FSPIOP_CALLBACK_URL_TRANSFER_PUT: 'put' + FSPIOP_CALLBACK_URL_TRANSFER_PUT: 'put', + FSPIOP_CALLBACK_URL_BULK_TRANSFER_POST: 'post', + FSPIOP_CALLBACK_URL_BULK_TRANSFER_ERROR: 'put', + FSPIOP_CALLBACK_URL_BULK_TRANSFER_PUT: 'put' } // Code specific (non-DB) enumerations sorted alphabetically @@ -66,6 +69,7 @@ const transferEventType = { } const transferEventAction = { + BULK_PREPARE: 'bulk-prepare', PREPARE: 'prepare', PREPARE_DUPLICATE: 'prepare-duplicate', FULFIL_DUPLICATE: 'fulfil-duplicate', diff --git a/src/lib/kafka/index.js b/src/lib/kafka/index.js index a081607f..661c0679 100644 --- a/src/lib/kafka/index.js +++ b/src/lib/kafka/index.js @@ -19,7 +19,6 @@ - Name Surname -------------- ******/ - 'use strict' const Producer = require('./producer') diff --git a/src/lib/utility.js b/src/lib/utility.js index 7a603d73..9eb9090d 100644 --- a/src/lib/utility.js +++ b/src/lib/utility.js @@ -159,7 +159,7 @@ const fulfilTopicTemplate = () => { * * @returns {string} - Returns topic name to be created, throws error if failure occurs */ -const getTransferByidTopicTemplate = () => { +const getTransferByIdTopicTemplate = () => { try { return Mustache.render(Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GET_TRANSFERS_TOPIC_TEMPLATE.TEMPLATE) } catch (e) { @@ -318,7 +318,7 @@ const getFulfilTopicName = () => { */ const getTransferByIdTopicName = () => { try { - return getTransferByidTopicTemplate() + return getTransferByIdTopicTemplate() } catch (e) { throw e } diff --git a/src/models/bulkTransfers/facade.js b/src/models/bulkTransfers/facade.js new file mode 100644 index 00000000..e69de29b diff --git a/src/shared/setup.js b/src/shared/setup.js index edc1fbe6..c57697cb 100644 --- a/src/shared/setup.js +++ b/src/shared/setup.js @@ -33,6 +33,7 @@ const RegisterHandlers = require('../handlers/register') const Config = require('../lib/config') const ParticipantEndpointCache = require('../domain/participant/lib/cache/participantEndpoint') const Metrics = require('@mojaloop/central-services-metrics') +const Mongoose = require('@mojaloop/central-object-store').Db.Mongoose /** * @module src/shared/setup @@ -48,6 +49,18 @@ const Metrics = require('@mojaloop/central-services-metrics') * @returns {Promise} Returns the Server object */ +const connectMongoose = async () => { + try { + let db = await Mongoose.connect(Config.MONGODB_URI, { + promiseLibrary: global.Promise + }) + return db + } catch (error) { + Logger.error(`error - ${error}`) // TODO: ADD PROPER ERROR HANDLING HERE POST-POC + return null + } +} + const createServer = async (port, modules) => { const server = await new Hapi.Server({ port, @@ -65,6 +78,8 @@ const createServer = async (port, modules) => { } }) + await connectMongoose() + await Plugins.registerPlugins(server) await server.register(modules) diff --git a/test/integration-config.json b/test/integration-config.json index eb3dc6d3..8f0017a7 100644 --- a/test/integration-config.json +++ b/test/integration-config.json @@ -10,7 +10,7 @@ "generateTimeout": 30000 }, "MONGODB": { - "URI": "mongodb://localhost:27017/bulk_transfers" + "URI": "mongodb://localhost:27017/mlos" }, "ENDPOINT_SECURITY":{ "TLS": { diff --git a/test/integration/handlers/notification/index.test.js b/test/integration/handlers/notification/index.test.js index bfaab8ee..560126f8 100644 --- a/test/integration/handlers/notification/index.test.js +++ b/test/integration/handlers/notification/index.test.js @@ -128,6 +128,9 @@ Test('Notification Handler', notificationHandlerTest => { 'fspiop-source': 'switch', 'fspiop-destination': 'dfsp1' }, + uriParams: { + id: transferId + }, payload: { errorInformation: { errorCode: '3100', @@ -137,7 +140,7 @@ Test('Notification Handler', notificationHandlerTest => { }, from: 'switch', to: 'dfsp1', - id: transferId, + id: Uuid(), type: 'application/json' } @@ -198,7 +201,7 @@ Test('Notification Handler', notificationHandlerTest => { }, to: 'dfsp2', from: 'dfsp1', - id: transferId, + id: Uuid(), type: 'application/json' } @@ -255,11 +258,14 @@ Test('Notification Handler', notificationHandlerTest => { errorCode: '3000', errorDescription: 'Generic error' } + }, + uriParams: { + id: transferId } }, from: 'dfsp2', to: 'dfsp1', - id: transferId, + id: Uuid(), type: 'application/json' } @@ -320,7 +326,7 @@ Test('Notification Handler', notificationHandlerTest => { }, to: 'dfsp2', from: 'dfsp1', - id: transferId, + id: Uuid(), type: 'application/json' } @@ -385,7 +391,7 @@ Test('Notification Handler', notificationHandlerTest => { }, to: 'dfsp2', from: 'dfsp1', - id: transferId, + id: Uuid(), type: 'application/json' } @@ -450,7 +456,7 @@ Test('Notification Handler', notificationHandlerTest => { }, from: 'dfsp1', to: 'dfsp2', - id: transferId, + id: Uuid(), type: 'application/json' } @@ -512,7 +518,7 @@ Test('Notification Handler', notificationHandlerTest => { }, to: 'dfsp1', from: 'switch', - id: transferId, + id: Uuid(), type: 'application/json' } diff --git a/test/unit/handlers/notification/index.test.js b/test/unit/handlers/notification/index.test.js index 83e25d41..67612179 100644 --- a/test/unit/handlers/notification/index.test.js +++ b/test/unit/handlers/notification/index.test.js @@ -37,6 +37,7 @@ const P = require('bluebird') const Config = require(`${src}/lib/config.js`) const Participant = require(`${src}/domain/participant`) const ENUM = require('../../../../src/lib/enum') +const Uuid = require('uuid4') Test('Notification Service tests', notificationTest => { let sandbox @@ -67,6 +68,7 @@ Test('Notification Service tests', notificationTest => { notificationTest.test('processMessage should', async processMessageTest => { processMessageTest.test('process the message received from kafka and send out a transfer post callback', async test => { + const uuid = Uuid() const msg = { value: { metadata: { @@ -81,7 +83,9 @@ Test('Notification Service tests', notificationTest => { }, content: { headers: {}, - payload: {} + payload: { + transferId: uuid + } }, to: 'dfsp2', from: 'dfsp1', @@ -89,22 +93,23 @@ Test('Notification Service tests', notificationTest => { } } - const url = await Participant.getEndpoint(msg.value.to, FSPIOP_CALLBACK_URL_TRANSFER_POST, msg.value.id) + const url = await Participant.getEndpoint(msg.value.to, FSPIOP_CALLBACK_URL_TRANSFER_POST, msg.value.content.payload.transferId) const method = 'post' const headers = {} - const message = {} + const message = { transferId: uuid } const expected = 200 - Callback.sendCallback.withArgs(url, method, headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).returns(P.resolve(200)) + Callback.sendCallback.withArgs(url, method, headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).returns(P.resolve(200)) let result = await Notification.processMessage(msg) - test.ok(Callback.sendCallback.calledWith(url, method, headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to)) + test.ok(Callback.sendCallback.calledWith(url, method, headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to)) test.equal(result, expected) test.end() }) processMessageTest.test('process the message received from kafka and send out a transfer error notification to the sender', async test => { + const uuid = Uuid() const msg = { value: { metadata: { @@ -119,29 +124,30 @@ Test('Notification Service tests', notificationTest => { }, content: { headers: {}, - payload: {} + payload: { transferId: uuid } }, to: 'dfsp2', from: 'dfsp1', id: 'b51ec534-ee48-4575-b6a9-ead2955b8098' } } - const url = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.id) + const url = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.content.payload.transferId) const method = 'put' const headers = {} - const message = {} + const message = { transferId: uuid } const expected = 200 - Callback.sendCallback.withArgs(url, method, headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).returns(P.resolve(200)) + Callback.sendCallback.withArgs(url, method, headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).returns(P.resolve(200)) let result = await Notification.processMessage(msg) - test.ok(Callback.sendCallback.calledWith(url, method, headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to)) + test.ok(Callback.sendCallback.calledWith(url, method, headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to)) test.equal(result, expected) test.end() }) processMessageTest.test('throw error if not able to post the transfer to the receiver', async test => { + const uuid = Uuid() const msg = { value: { metadata: { @@ -156,19 +162,19 @@ Test('Notification Service tests', notificationTest => { }, content: { headers: {}, - payload: {} + payload: { transferId: uuid } }, to: 'dfsp2', from: 'dfsp1', id: 'b51ec534-ee48-4575-b6a9-ead2955b8098' } } - const url = await Participant.getEndpoint(msg.value.to, FSPIOP_CALLBACK_URL_TRANSFER_POST, msg.value.id) + const url = await Participant.getEndpoint(msg.value.to, FSPIOP_CALLBACK_URL_TRANSFER_POST, msg.value.content.payload.transferId) const method = 'post' const headers = {} - const message = {} + const message = { transferId: uuid } - Callback.sendCallback.withArgs(url, method, headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).throws(new Error()) + Callback.sendCallback.withArgs(url, method, headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).throws(new Error()) try { await Notification.processMessage(msg) @@ -181,6 +187,7 @@ Test('Notification Service tests', notificationTest => { }) processMessageTest.test('throw error if not able to send the notification to the sender', async test => { + const uuid = Uuid() const msg = { value: { metadata: { @@ -195,19 +202,19 @@ Test('Notification Service tests', notificationTest => { }, content: { headers: {}, - payload: {} + payload: { transferId: uuid } }, to: 'dfsp2', from: 'dfsp1', id: 'b51ec534-ee48-4575-b6a9-ead2955b8098' } } - const url = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.id) + const url = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.content.payload.transferId) const method = 'put' const headers = {} - const message = {} + const message = { transferId: uuid } - Callback.sendCallback.withArgs(url, method, headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).throws(new Error()) + Callback.sendCallback.withArgs(url, method, headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).throws(new Error()) try { await Notification.processMessage(msg) @@ -222,7 +229,7 @@ Test('Notification Service tests', notificationTest => { processMessageTest.test('process the message received from kafka and send out a transfer post callback', async test => { const payerFsp = 'dfsp2' const payeeFsp = 'dfsp1' - + const uuid = Uuid() const msg = { value: { metadata: { @@ -240,7 +247,7 @@ Test('Notification Service tests', notificationTest => { 'FSPIOP-Destination': payeeFsp, 'FSPIOP-Source': payerFsp }, - payload: {} + payload: { transferId: uuid } }, to: payeeFsp, from: payerFsp, @@ -248,20 +255,20 @@ Test('Notification Service tests', notificationTest => { } } - const urlPayer = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.id) - const urlPayee = await Participant.getEndpoint(msg.value.to, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.id) + const urlPayer = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.content.payload.transferId) + const urlPayee = await Participant.getEndpoint(msg.value.to, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.content.payload.transferId) const method = 'put' - const message = {} + const message = { transferId: uuid } const expected = 200 - // console.log(`${urlPayer}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.id}, ${msg.value.from}, ${msg.value.to}`) - Callback.sendCallback.withArgs(urlPayer, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).returns(P.resolve(200)) - // console.log(`${urlPayer}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.id}, ${ENUM.headers.FSPIOP.SWITCH.value}, ${msg.value.from}`) - Callback.sendCallback.withArgs(urlPayee, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from).returns(P.resolve(200)) + // console.log(`${urlPayer}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.content.payload.transferId}, ${msg.value.from}, ${msg.value.to}`) + Callback.sendCallback.withArgs(urlPayer, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).returns(P.resolve(200)) + // console.log(`${urlPayer}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.content.payload.transferId}, ${ENUM.headers.FSPIOP.SWITCH.value}, ${msg.value.from}`) + Callback.sendCallback.withArgs(urlPayee, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from).returns(P.resolve(200)) let result = await Notification.processMessage(msg) - test.ok(Callback.sendCallback.calledWith(urlPayee, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from)) - test.ok(Callback.sendCallback.calledWith(urlPayer, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to)) + test.ok(Callback.sendCallback.calledWith(urlPayee, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from)) + test.ok(Callback.sendCallback.calledWith(urlPayer, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to)) test.equal(result, expected) test.end() }) @@ -269,6 +276,7 @@ Test('Notification Service tests', notificationTest => { processMessageTest.test('throw error if not able to send the notification to the sender', async test => { const payerFsp = 'dfsp2' const payeeFsp = 'dfsp1' + const uuid = Uuid() const msg = { value: { metadata: { @@ -286,18 +294,18 @@ Test('Notification Service tests', notificationTest => { 'FSPIOP-Destination': payeeFsp, 'FSPIOP-Source': payerFsp }, - payload: {} + payload: { transferId: uuid } }, to: payeeFsp, from: payerFsp, id: 'b51ec534-ee48-4575-b6a9-ead2955b8098' } } - const url = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.id) + const url = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.content.payload.transferId) const method = 'put' - const message = {} + const message = { transferId: uuid } - Callback.sendCallback.withArgs(url, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).throws(new Error()) + Callback.sendCallback.withArgs(url, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).throws(new Error()) try { await Notification.processMessage(msg) @@ -310,6 +318,7 @@ Test('Notification Service tests', notificationTest => { }) processMessageTest.test('throw error if invalid action received from kafka', async test => { + const uuid = Uuid() const msg = { value: { metadata: { @@ -324,7 +333,7 @@ Test('Notification Service tests', notificationTest => { }, content: { headers: {}, - payload: {} + payload: { transferId: uuid } }, to: 'dfsp2', from: 'dfsp1', @@ -356,6 +365,7 @@ Test('Notification Service tests', notificationTest => { }) processMessageTest.test('process the reject message received from kafka and send out a transfer put callback', async test => { + const uuid = Uuid() const payerFsp = 'dfsp2' const payeeFsp = 'dfsp1' const msg = { @@ -375,7 +385,7 @@ Test('Notification Service tests', notificationTest => { 'FSPIOP-Destination': payeeFsp, 'FSPIOP-Source': payerFsp }, - payload: {} + payload: { transferId: uuid } }, to: payeeFsp, from: payerFsp, @@ -383,24 +393,25 @@ Test('Notification Service tests', notificationTest => { } } - const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.id) - const toUrl = await Participant.getEndpoint(msg.value.to, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.id) + const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.content.payload.transferId) + const toUrl = await Participant.getEndpoint(msg.value.to, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.content.payload.transferId) const method = 'put' - const message = {} + const message = { transferId: uuid } const expected = 200 - Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from).returns(P.resolve(200)) - Callback.sendCallback.withArgs(toUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).returns(P.resolve(200)) + Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from).returns(P.resolve(200)) + Callback.sendCallback.withArgs(toUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).returns(P.resolve(200)) let result = await Notification.processMessage(msg) - test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from)) - test.ok(Callback.sendCallback.calledWith(toUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to)) + test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from)) + test.ok(Callback.sendCallback.calledWith(toUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to)) test.equal(result, expected) test.end() }) processMessageTest.test('process the abort message received from kafka and send out a transfer put callback', async test => { + const uuid = Uuid() const payerFsp = 'dfsp2' const payeeFsp = 'dfsp1' const msg = { @@ -420,32 +431,33 @@ Test('Notification Service tests', notificationTest => { 'FSPIOP-Destination': payeeFsp, 'FSPIOP-Source': payerFsp }, - payload: {} + payload: { transferId: uuid } }, to: payeeFsp, from: payerFsp, id: 'b51ec534-ee48-4575-b6a9-ead2955b8098' } } - const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.id) - const toUrl = await Participant.getEndpoint(msg.value.to, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.id) + const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.content.payload.transferId) + const toUrl = await Participant.getEndpoint(msg.value.to, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.content.payload.transferId) const method = 'put' - const message = {} + const message = { transferId: uuid } const expected = 200 - Callback.sendCallback.withArgs(toUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).returns(P.resolve(200)) - // console.log(`${urlPayer}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.id}, ${ENUM.headers.FSPIOP.SWITCH.value}, ${msg.value.from}`) - Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from).returns(P.resolve(200)) + Callback.sendCallback.withArgs(toUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).returns(P.resolve(200)) + // console.log(`${urlPayer}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.content.payload.transferId}, ${ENUM.headers.FSPIOP.SWITCH.value}, ${msg.value.from}`) + Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from).returns(P.resolve(200)) let result = await Notification.processMessage(msg) - test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from)) - test.ok(Callback.sendCallback.calledWith(toUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to)) + test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, ENUM.headers.FSPIOP.SWITCH.value, msg.value.from)) + test.ok(Callback.sendCallback.calledWith(toUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to)) test.equal(result, expected) test.end() }) processMessageTest.test('process the fulfil-duplicate message received from kafka and send out a transfer put callback', async test => { + const uuid = Uuid() const payerFsp = 'dfsp1' const payeeFsp = 'dfsp2' @@ -466,29 +478,30 @@ Test('Notification Service tests', notificationTest => { 'FSPIOP-Destination': payerFsp, 'FSPIOP-Source': payeeFsp }, - payload: {} + payload: { transferId: uuid } }, to: payerFsp, from: payeeFsp, id: 'b51ec534-ee48-4575-b6a9-ead2955b8098' } } - const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.id) + const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.content.payload.transferId) const method = 'put' - const message = {} + const message = { transferId: uuid } const expected = 200 - // console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.id}, ${msg.value.from}, ${msg.value.to}`) - Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).returns(P.resolve(200)) + // console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.content.payload.transferId}, ${msg.value.from}, ${msg.value.to}`) + Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).returns(P.resolve(200)) let result = await Notification.processMessage(msg) - test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to)) + test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to)) test.equal(result, expected) test.end() }) processMessageTest.test('process the fulfil-duplicate message received from kafka and send out a transfer error callback', async test => { + const uuid = Uuid() const payerFsp = 'dfsp1' const payeeFsp = 'dfsp2' @@ -509,29 +522,30 @@ Test('Notification Service tests', notificationTest => { 'FSPIOP-Destination': payerFsp, 'FSPIOP-Source': payeeFsp }, - payload: {} + payload: { transferId: uuid } }, to: payerFsp, from: payeeFsp, id: 'b51ec534-ee48-4575-b6a9-ead2955b8098' } } - const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.id) + const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.content.payload.transferId) const method = 'put' - const message = {} + const message = { transferId: uuid } const expected = 200 - // console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.id}, ${msg.value.from}, ${msg.value.to}`) - Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).returns(P.resolve(200)) + // console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.content.payload.transferId}, ${msg.value.from}, ${msg.value.to}`) + Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).returns(P.resolve(200)) let result = await Notification.processMessage(msg) - test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to)) + test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to)) test.equal(result, expected) test.end() }) processMessageTest.test('process the abort-duplicate message received from kafka and send out a transfer put callback', async test => { + const uuid = Uuid() const payerFsp = 'dfsp1' const payeeFsp = 'dfsp2' @@ -552,29 +566,30 @@ Test('Notification Service tests', notificationTest => { 'FSPIOP-Destination': payerFsp, 'FSPIOP-Source': payeeFsp }, - payload: {} + payload: { transferId: uuid } }, to: payerFsp, from: payeeFsp, id: 'b51ec534-ee48-4575-b6a9-ead2955b8098' } } - const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.id) + const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.content.payload.transferId) const method = 'put' - const message = {} + const message = { transferId: uuid } const expected = 200 - // console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.id}, ${msg.value.from}, ${msg.value.to}`) - Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).returns(P.resolve(200)) + // console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.content.payload.transferId}, ${msg.value.from}, ${msg.value.to}`) + Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).returns(P.resolve(200)) let result = await Notification.processMessage(msg) - test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to)) + test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to)) test.equal(result, expected) test.end() }) processMessageTest.test('process the abort-duplicate message received from kafka and send out a transfer error callback', async test => { + const uuid = Uuid() const payerFsp = 'dfsp1' const payeeFsp = 'dfsp2' @@ -595,29 +610,30 @@ Test('Notification Service tests', notificationTest => { 'FSPIOP-Destination': payerFsp, 'FSPIOP-Source': payeeFsp }, - payload: {} + payload: { transferId: uuid } }, to: payerFsp, from: payeeFsp, id: 'b51ec534-ee48-4575-b6a9-ead2955b8098' } } - const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.id) + const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.content.payload.transferId) const method = 'put' - const message = {} + const message = { transferId: uuid } const expected = 200 - // console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.id}, ${msg.value.from}, ${msg.value.to}`) - Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).returns(P.resolve(200)) + // console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.content.payload.transferId}, ${msg.value.from}, ${msg.value.to}`) + Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).returns(P.resolve(200)) let result = await Notification.processMessage(msg) - test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to)) + test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to)) test.equal(result, expected) test.end() }) processMessageTest.test('process the timeout-received message received from kafka and send out a transfer put callback', async test => { + const uuid = Uuid() const payerFsp = 'dfsp2' const payeeFsp = 'dfsp1' @@ -638,29 +654,30 @@ Test('Notification Service tests', notificationTest => { 'FSPIOP-Destination': payeeFsp, 'FSPIOP-Source': payerFsp }, - payload: {} + payload: { transferId: uuid } }, to: payeeFsp, from: payerFsp, id: 'b51ec534-ee48-4575-b6a9-ead2955b8098' } } - const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.id) + const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_ERROR, msg.value.content.payload.transferId) const method = 'put' - const message = {} + const message = { transferId: uuid } const expected = 200 - console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.id}, ${msg.value.to}, ${msg.value.from}`) - Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).returns(P.resolve(200)) + console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.content.payload.transferId}, ${msg.value.to}, ${msg.value.from}`) + Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).returns(P.resolve(200)) let result = await Notification.processMessage(msg) - test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to)) + test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to)) test.equal(result, expected) test.end() }) processMessageTest.test('process the prepare-duplicate message received from kafka and send out a transfer put callback', async test => { + const uuid = Uuid() const payerFsp = 'dfsp2' const payeeFsp = 'dfsp1' @@ -681,24 +698,24 @@ Test('Notification Service tests', notificationTest => { 'FSPIOP-Destination': payeeFsp, 'FSPIOP-Source': payerFsp }, - payload: {} + payload: { transferId: uuid } }, to: payeeFsp, from: payerFsp, id: 'b51ec534-ee48-4575-b6a9-ead2955b8098' } } - const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.id) + const fromUrl = await Participant.getEndpoint(msg.value.from, FSPIOP_CALLBACK_URL_TRANSFER_PUT, msg.value.content.payload.transferId) const method = 'put' - const message = {} + const message = { transferId: uuid } const expected = 200 - // console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.id}, ${msg.value.from}, ${msg.value.to}`) - Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to).returns(P.resolve(200)) + // console.log(`${fromUrl}, ${method}, ${JSON.stringify(msg.value.content.headers)}, ${JSON.stringify(message)}, ${msg.value.content.payload.transferId}, ${msg.value.from}, ${msg.value.to}`) + Callback.sendCallback.withArgs(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to).returns(P.resolve(200)) let result = await Notification.processMessage(msg) - test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.id, msg.value.from, msg.value.to)) + test.ok(Callback.sendCallback.calledWith(fromUrl, method, msg.value.content.headers, JSON.stringify(message), msg.value.content.payload.transferId, msg.value.from, msg.value.to)) test.equal(result, expected) test.end() }) diff --git a/test/unit/shared/setup.test.js b/test/unit/shared/setup.test.js index ce915ab1..1dfba7fd 100644 --- a/test/unit/shared/setup.test.js +++ b/test/unit/shared/setup.test.js @@ -8,6 +8,7 @@ const Config = require(`${src}/lib/config`) const Proxyquire = require('proxyquire') const ParticipantEndpointCache = require(`${src}/domain/participant/lib/cache/participantEndpoint`) const Boom = require('@hapi/boom') +// require('leaked-handles').set({ fullStack: true, timeout: 15000, debugSockets: true }) Test('setup', setupTest => { let sandbox @@ -18,6 +19,7 @@ Test('setup', setupTest => { let PluginsStub let HapiStub let serverStub + // let MongooseStub setupTest.beforeEach(test => { sandbox = Sinon.createSandbox() @@ -47,11 +49,16 @@ Test('setup', setupTest => { registerNotificationHandler: sandbox.stub().returns(P.resolve()) } + // MongooseStub = { + // connect: sandbox.stub().returns(P.resolve(true)) + // } + Setup = Proxyquire('../../../src/shared/setup', { '../handlers/register': RegisterHandlersStub, './plugins': PluginsStub, '@hapi/hapi': HapiStub, '../lib/config': Config + // , '../bulkApi/lib/mongodb': MongooseStub }) oldHostName = Config.HOSTNAME