Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mojaloop/#3750): add timer for party lookup in cache #471

Merged
merged 3 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion audit-ci.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
// Issue with PostCSS library (https://github.com/advisories/GHSA-7fh5-64p2-3v2j)
"GHSA-7fh5-64p2-3v2j",
// SSRF attacks against npm IP (https://github.com/advisories/GHSA-78xj-cgh5-2h22)
"GHSA-78xj-cgh5-2h22"
"GHSA-78xj-cgh5-2h22",
// https://github.com/advisories/GHSA-rm97-x556-q36h
"GHSA-rm97-x556-q36h",
]
}
3 changes: 3 additions & 0 deletions modules/api-svc/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,6 @@ PM4ML_ENABLED=false
# Maximum payload limits
FSPIOP_API_SERVER_MAX_REQUEST_BYTES=209715200
BACKEND_API_SERVER_MAX_REQUEST_BYTES=209715200

# How much time to wait for cache to unsubscribe from a channel
UNSUBSCRIBE_TIMEOUT_MS=5000
16 changes: 8 additions & 8 deletions modules/api-svc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@
"@mojaloop/central-services-error-handling": "^12.0.7",
"@mojaloop/central-services-logger": "^11.2.2",
"@mojaloop/central-services-metrics": "^12.0.8",
"@mojaloop/central-services-shared": "18.2.0",
"@mojaloop/central-services-shared": "18.3.0",
"@mojaloop/event-sdk": "^14.0.0",
"@mojaloop/sdk-scheme-adapter-private-shared-lib": "workspace:^",
"@mojaloop/sdk-standard-components": "v17.4.0",
"@mojaloop/sdk-standard-components": "v18.0.0",
"ajv": "8.12.0",
"axios": "^1.6.7",
"co-body": "^6.1.0",
"dotenv": "^16.4.5",
"env-var": "^7.4.1",
"express": "^4.18.2",
"express": "^4.18.3",
"fast-json-patch": "^3.1.1",
"fast-safe-stringify": "^2.1.1",
"javascript-state-machine": "^3.1.0",
Expand All @@ -96,21 +96,21 @@
"ws": "^8.16.0"
},
"devDependencies": {
"@babel/core": "^7.23.9",
"@babel/preset-env": "^7.23.9",
"@babel/core": "^7.24.0",
"@babel/preset-env": "^7.24.0",
"@redocly/openapi-cli": "^1.0.0-beta.94",
"@types/jest": "^29.5.12",
"babel-jest": "^29.7.0",
"eslint": "^8.56.0",
"eslint": "^8.57.0",
"eslint-config-airbnb-base": "^15.0.0",
"eslint-plugin-import": "^2.29.1",
"eslint-plugin-jest": "^27.9.0",
"jest": "^29.7.0",
"jest-junit": "^16.0.0",
"nock": "^13.5.3",
"nock": "^13.5.4",
"npm-check-updates": "^16.7.10",
"openapi-response-validator": "^12.1.3",
"openapi-typescript": "^6.7.4",
"openapi-typescript": "^6.7.5",
"redis-mock": "^0.56.3",
"replace": "^1.2.2",
"standard-version": "^9.5.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ module.exports.handleSDKOutboundBulkAcceptPartyInfoRequestedDmEvt = async (
currentState: BulkTransactionState.WAITING_FOR_PARTY_ACCEPTANCE,
});
} catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handleSDKOutboundBulkAcceptPartyInfoRequested');
logger.isErrorEnabled && logger.push({ err }).error('Error in handleSDKOutboundBulkAcceptPartyInfoRequested');
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ module.exports.handleSDKOutboundBulkAcceptQuoteRequestedDmEvt = async (
currentState: BulkTransactionState.WAITING_FOR_QUOTE_ACCEPTANCE,
});
} catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handleSDKOutboundBulkAcceptQuoteRequested');
logger.isErrorEnabled && logger.push({ err }).error('Error in handleSDKOutboundBulkAcceptQuoteRequested');
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ module.exports.handleSDKOutboundBulkResponsePreparedDmEvt = async (
});
await options.producer.sendDomainEvent(sdkOutboundBulkResponseSentDmEvt);
} catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handleSDKOutboundBulkResponsePreparedDmEvt');
logger.isErrorEnabled && logger.push({ err }).error('Error in handleSDKOutboundBulkResponsePreparedDmEvt');
}
};
12 changes: 6 additions & 6 deletions modules/api-svc/src/BackendEventHandler/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ class BackendEventHandler {

async start() {
const config = this._conf;
this._logger.isInfoEnabled() && this._logger.info('start');
this._logger.isInfoEnabled && this._logger.info('start');

this._consumer = new KafkaDomainEventConsumer(this._messageHandler.bind(this), config.backendEventHandler.domainEventConsumer, this._loggerFromLoggingBC);
this._logger.isInfoEnabled() && this._logger.info(`Created Message Consumer of type ${this._consumer.constructor.name}`);
this._logger.isInfoEnabled && this._logger.info(`Created Message Consumer of type ${this._consumer.constructor.name}`);

this._producer = new KafkaDomainEventProducer(config.backendEventHandler.domainEventProducer, this._loggerFromLoggingBC);
this._logger.isInfoEnabled() && this._logger.info(`Created Message Producer of type ${this._producer.constructor.name}`);
this._logger.isInfoEnabled && this._logger.info(`Created Message Producer of type ${this._producer.constructor.name}`);
await this._producer.init();

// Create options for handlers
Expand All @@ -79,7 +79,7 @@ class BackendEventHandler {
}

async stop() {
this._logger.isInfoEnabled() && this._logger.info('stop');
this._logger.isInfoEnabled && this._logger.info('stop');
await Promise.all([
this._consumer?.destroy(),
this._producer?.destroy(),
Expand All @@ -88,7 +88,7 @@ class BackendEventHandler {
}

async _messageHandler(message) {
this._logger.isInfoEnabled() && this._logger.info(`Got domain event message: ${message.getName()}`);
this._logger.isInfoEnabled && this._logger.info(`Got domain event message: ${message.getName()}`);
// TODO: Handle errors validation here
switch (message.getName()) {
case SDKOutboundBulkAcceptPartyInfoRequestedDmEvt.name: {
Expand All @@ -104,7 +104,7 @@ class BackendEventHandler {
break;
}
default: {
this._logger.isDebugEnabled() && this._logger.debug(`${message?.getName()}:${message?.getKey()} - Skipping unknown domain event`);
this._logger.isDebugEnabled && this._logger.debug(`${message?.getName()}:${message?.getKey()} - Skipping unknown domain event`);
return;
}
}
Expand Down
14 changes: 7 additions & 7 deletions modules/api-svc/src/ControlAgent/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,22 @@ class Client extends ws {

async send(msg) {
const data = typeof msg === 'string' ? msg : serialise(msg);
this._logger.isDebugEnabled() && this._logger.push({ data }).debug('Sending message');
this._logger.isDebugEnabled && this._logger.push({ data }).debug('Sending message');
return new Promise((resolve) => super.send.call(this, data, resolve));
}

// Receive a single message
async receive() {
return new Promise((resolve) => this.once('message', (data) => {
const msg = deserialise(data);
this._logger.isDebugEnabled() && this._logger.push({ msg }).debug('Received');
this._logger.isDebugEnabled && this._logger.push({ msg }).debug('Received');
resolve(msg);
}));
}

// Close connection
async stop() {
this._logger.isInfoEnabled() && this._logger.info('Control client shutting down...');
this._logger.isInfoEnabled && this._logger.info('Control client shutting down...');
this.close();
}

Expand All @@ -171,24 +171,24 @@ class Client extends ws {
try {
msg = deserialise(data);
} catch (err) {
this._logger.isErrorEnabled() && this._logger.push({ data }).console.error();('Couldn\'t parse received message');
this._logger.isErrorEnabled && this._logger.push({ data }).console.error();('Couldn\'t parse received message');
this.send(build.ERROR.NOTIFY.JSON_PARSE_ERROR());
}
this._logger.isDebugEnabled() && this._logger.push({ msg }).debug('Handling received message');
this._logger.isDebugEnabled && this._logger.push({ msg }).debug('Handling received message');
switch (msg.msg) {
case MESSAGE.CONFIGURATION:
switch (msg.verb) {
case VERB.NOTIFY: {
const dup = JSON.parse(JSON.stringify(this._appConfig)); // fast-json-patch explicitly mutates
_.merge(dup, msg.data);
this._logger.isDebugEnabled() && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this._logger.isDebugEnabled && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this.emit(EVENT.RECONFIGURE, dup);
break;
}
case VERB.PATCH: {
const dup = JSON.parse(JSON.stringify(this._appConfig)); // fast-json-patch explicitly mutates
jsonPatch.applyPatch(dup, msg.data);
this._logger.isDebugEnabled() && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this._logger.isDebugEnabled && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this.emit(EVENT.RECONFIGURE, dup);
break;
}
Expand Down
24 changes: 12 additions & 12 deletions modules/api-svc/src/ControlServer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,15 @@ class Client extends ws {

async send(msg) {
const data = typeof msg === 'string' ? msg : serialise(msg);
this._logger.isDebugEnabled() && this._logger.push({ data }).debug('Sending message');
this._logger.isDebugEnabled && this._logger.push({ data }).debug('Sending message');
return new Promise((resolve) => super.send.call(this, data, resolve));
}

// Receive a single message
async receive() {
return new Promise((resolve) => this.once('message', (data) => {
const msg = deserialise(data);
this._logger.isDebugEnabled() && this._logger.push({ msg }).debug('Received');
this._logger.isDebugEnabled && this._logger.push({ msg }).debug('Received');
resolve(msg);
}));
}
Expand Down Expand Up @@ -186,7 +186,7 @@ class Server extends ws.Server {
this._clientData = new Map();

this.on('error', err => {
this._logger.isErrorEnabled() && this._logger.push({ err })
this._logger.isErrorEnabled && this._logger.push({ err })
.error('Unhandled websocket error occurred. Shutting down.');
process.exit(1);
});
Expand All @@ -197,18 +197,18 @@ class Server extends ws.Server {
ip: getWsIp(req),
remoteAddress: req.socket.remoteAddress,
});
logger.isInfoEnabled() && logger.info('Websocket connection received');
logger.isInfoEnabled && logger.info('Websocket connection received');
this._clientData.set(socket, { ip: req.connection.remoteAddress, logger });

socket.on('close', (code, reason) => {
logger.isInfoEnabled() && logger.push({ code, reason }).info('Websocket connection closed');
logger.isInfoEnabled && logger.push({ code, reason }).info('Websocket connection closed');
this._clientData.delete(socket);
});

socket.on('message', this._handle(socket, logger));
});

this._logger.isInfoEnabled() && this._logger.push(this.address()).info('running on');
this._logger.isInfoEnabled && this._logger.push(this.address()).info('running on');
}

// Close the server then wait for all the client sockets to close
Expand All @@ -218,14 +218,14 @@ class Server extends ws.Server {
client.terminate();
}
await closing;
this._logger.isInfoEnabled() && this._logger.info('Control server shutdown complete');
this._logger.isInfoEnabled && this._logger.info('Control server shutdown complete');
}


async notifyClientsOfCurrentConfig() {
const updateConfMsg = build.CONFIGURATION.NOTIFY(this._appConfig);
const logError = (socket, message) => (err) =>
this._logger.isErrorEnabled() && this._logger
this._logger.isErrorEnabled && this._logger
.push({ message, ip: this._clientData.get(socket).ip, err })
.error('Error sending reconfigure notification to client');
const sendToAllClients = (msg) => Promise.all(
Expand All @@ -244,10 +244,10 @@ class Server extends ws.Server {
try {
msg = deserialise(data);
} catch (err) {
logger.isErrorEnabled() && logger.push({ data }).error('Couldn\'t parse received message');
logger.isErrorEnabled && logger.push({ data }).error('Couldn\'t parse received message');
client.send(build.ERROR.NOTIFY.JSON_PARSE_ERROR());
}
logger.isDebugEnabled() && logger.push({ msg }).debug('Handling received message');
logger.isDebugEnabled && logger.push({ msg }).debug('Handling received message');
switch (msg.msg) {
case MESSAGE.CONFIGURATION:
switch (msg.verb) {
Expand All @@ -257,7 +257,7 @@ class Server extends ws.Server {
case VERB.NOTIFY: {
const dup = structuredClone(this._appConfig); // fast-json-patch explicitly mutates
_.merge(dup, msg.data);
this._logger.isDebugEnabled() && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this._logger.isDebugEnabled && this._logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this.emit(EVENT.RECONFIGURE, dup);
break;
}
Expand All @@ -266,7 +266,7 @@ class Server extends ws.Server {
// client library?
const dup = structuredClone(this._appConfig); // fast-json-patch explicitly mutates
jsonPatch.applyPatch(dup, msg.data);
logger.isDebugEnabled() && logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
logger.isDebugEnabled && logger.push({ oldConf: this._appConfig, newConf: dup }).debug('Emitting new configuration');
this.emit(EVENT.RECONFIGURE, dup);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ module.exports.handleBulkQuotesRequestedDmEvt = async (
await options.producer.sendDomainEvent(bulkQuotesCallbackReceivedDmEvt);
}
catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handleBulkQuotesRequestedDmEvt');
logger.isErrorEnabled && logger.push({ err }).error('Error in handleBulkQuotesRequestedDmEvt');
const bulkQuotesCallbackReceivedDmEvt = new BulkQuotesCallbackReceivedDmEvt({
bulkId: event.getKey(),
content: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ module.exports.handleBulkTransfersRequestedDmEvt = async (

await options.producer.sendDomainEvent(bulkTransfersCallbackReceivedDmEvt);
} catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handleBulkTransfersRequestedDmEvt');
logger.isErrorEnabled && logger.push({ err }).error('Error in handleBulkTransfersRequestedDmEvt');
const bulkTransfersCallbackReceivedDmEvt = new BulkTransfersCallbackReceivedDmEvt({
bulkId: event.getKey(),
content: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ module.exports.handlePartyInfoRequestedDmEvt = async (
});
await options.producer.sendDomainEvent(partyInfoCallbackReceivedDmEvt);
} catch (err) {
logger.isErrorEnabled() && logger.push({ err }).error('Error in handlePartyInfoRequestedDmEvt');
logger.isErrorEnabled && logger.push({ err }).error('Error in handlePartyInfoRequestedDmEvt');
const { code, message } = Errors.MojaloopApiErrorCodes.SERVER_TIMED_OUT;
const partyInfoCallbackReceivedDmEvt = new PartyInfoCallbackReceivedDmEvt({
bulkId: event.getKey(),
Expand Down
12 changes: 6 additions & 6 deletions modules/api-svc/src/FSPIOPEventHandler/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ class FSPIOPEventHandler {

async start() {
const config = this._conf;
this._logger.isInfoEnabled() && this._logger.info('start');
this._logger.isInfoEnabled && this._logger.info('start');

this._consumer = new KafkaDomainEventConsumer(this._messageHandler.bind(this), config.fspiopEventHandler.domainEventConsumer, this._loggerFromLoggingBC);
this._logger.isInfoEnabled() && this._logger.info(`Created Message Consumer of type ${this._consumer.constructor.name}`);
this._logger.isInfoEnabled && this._logger.info(`Created Message Consumer of type ${this._consumer.constructor.name}`);

this._producer = new KafkaDomainEventProducer(config.fspiopEventHandler.domainEventProducer, this._loggerFromLoggingBC);
this._logger.isInfoEnabled() && this._logger.info(`Created Message Producer of type ${this._producer.constructor.name}`);
this._logger.isInfoEnabled && this._logger.info(`Created Message Producer of type ${this._producer.constructor.name}`);
await this._producer.init();

// Create options for handlers
Expand All @@ -75,15 +75,15 @@ class FSPIOPEventHandler {
}

async stop() {
this._logger.isInfoEnabled() && this._logger.info('stop');
this._logger.isInfoEnabled && this._logger.info('stop');
await Promise.all([
this._consumer?.destroy(),
this._producer?.destroy(),
]);
}

async _messageHandler(message) {
this._logger.isInfoEnabled() && this._logger.info(`Got domain event message: ${message.getName()}`);
this._logger.isInfoEnabled && this._logger.info(`Got domain event message: ${message.getName()}`);
// TODO: Handle errors validation here
switch (message.getName()) {
case PartyInfoRequestedDmEvt.name: {
Expand All @@ -99,7 +99,7 @@ class FSPIOPEventHandler {
break;
}
default: {
this._logger.isDebugEnabled() && this._logger.debug(`${message?.getName()}:${message?.getKey()} - Skipping unknown domain event`);
this._logger.isDebugEnabled && this._logger.debug(`${message?.getName()}:${message?.getKey()} - Skipping unknown domain event`);
return;
}
}
Expand Down
Loading