Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mp 785 fix redis connection leak #103

Merged
merged 11 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,9 @@ jobs:
command: cd src && mkdir -p /test/results
- run:
name: Execute unit tests
command: cd src && (npm run test > /test/results/results.txt)
- store_artifacts:
path: /test/results
command: cd src && npm run test
- store_test_results:
path: /test/results
path: /src/junit.xml

lint:
<<: *defaults_working_directory
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea
node_modules/
.swp
12 changes: 6 additions & 6 deletions src/InboundServer/handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ const putParticipantsById = async (ctx) => {
}

// publish an event onto the cache for subscribers to action
await ctx.state.cache.publish(`${ctx.state.path.params.ID}`, {
await ctx.state.cache.publish(`ac_${ctx.state.path.params.ID}`, {
type: 'accountsCreationSuccessfulResponse',
data: ctx.request.body
});
Expand All @@ -232,7 +232,7 @@ const putParticipantsByIdError = async (ctx) => {
}

// publish an event onto the cache for subscribers to action
await ctx.state.cache.publish(`${ctx.state.path.params.ID}`, {
await ctx.state.cache.publish(`ac_${ctx.state.path.params.ID}`, {
type: 'accountsCreationErrorResponse',
data: ctx.request.body
});
Expand Down Expand Up @@ -292,7 +292,7 @@ const putQuoteById = async (ctx) => {
}

// publish an event onto the cache for subscribers to action
await ctx.state.cache.publish(`${ctx.state.path.params.ID}`, {
await ctx.state.cache.publish(`qt_${ctx.state.path.params.ID}`, {
type: 'quoteResponse',
data: ctx.request.body,
headers: ctx.request.headers
Expand All @@ -317,7 +317,7 @@ const putTransfersById = async (ctx) => {
}

// publish an event onto the cache for subscribers to action
await ctx.state.cache.publish(`${ctx.state.path.params.ID}`, {
await ctx.state.cache.publish(`tf_${ctx.state.path.params.ID}`, {
type: 'transferFulfil',
data: ctx.request.body
});
Expand Down Expand Up @@ -369,7 +369,7 @@ const putQuotesByIdError = async(ctx) => {
}

// publish an event onto the cache for subscribers to action
await ctx.state.cache.publish(`${ctx.state.path.params.ID}`, {
await ctx.state.cache.publish(`qt_${ctx.state.path.params.ID}`, {
type: 'quoteResponseError',
data: ctx.request.body
});
Expand All @@ -394,7 +394,7 @@ const putTransfersByIdError = async (ctx) => {
}

// publish an event onto the cache for subscribers to action
await ctx.state.cache.publish(`${ctx.state.path.params.ID}`, {
await ctx.state.cache.publish(`tf_${ctx.state.path.params.ID}`, {
type: 'transferError',
data: ctx.request.body
});
Expand Down
130 changes: 122 additions & 8 deletions src/lib/cache/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@

'use strict';

const util = require('util');
const redis = require('redis');


/**
* A shard cache abstraction over a REDIS distributed key/value store
* A shared cache abstraction over a REDIS distributed key/value store
*/
class Cache {
constructor(config) {
Expand All @@ -25,13 +26,117 @@ class Cache {
}

this.logger = this.config.logger;

// a redis connection to handle get, set and publish operations
this.client = null;

// a redis connection to handle subscribe operations and published message routing
// Note that REDIS docs suggest a client that is in SUBSCRIBE mode
// should not have any other commands executed against it.
// see: https://redis.io/topics/pubsub
this.subscriptionClient = null;

// a 'hashmap like' callback map
this.callbacks = {};

// tag each callback with an Id so we can gracefully unsubscribe and not leak resources
this.callbackId = 0;
}


/**
* Connects to a redis server and waits for the ready event
* Connects to a redis server and waits for ready events
* Note: We create two connections. One for get, set and publish commands
* and another for subscribe commands. We do this as we are not supposed
* to issue any non-pub/sub related commands on a connection used for sub
* See: https://redis.io/topics/pubsub
*/
async connect() {
this.client = await this.getClient();
this.client = await this._getClient();
this.subscriptionClient = await this._getClient();

// hook up our sub message handler
this.subscriptionClient.on('message', this._onMessage.bind(this));
}


/**
* Subscribes to a channel
*
* @param channel {string} - The channel name to subscribe to
* @param callback {function} - Callback function to be executed when messages arrive on the specified channel
* @returns {Promise} - Promise that resolves with an integer callback Id to submit in unsubscribe request
*/
async subscribe(channel, callback) {
return new Promise((resolve, reject) => {
this.subscriptionClient.subscribe(channel, (err) => {
if(err) {
this.logger.log(`Error subscribing to channel ${channel}: ${err.stack || util.inspect(err)}`);
return reject(err);
}

this.logger.log(`Subscribed to cache pub/sub channel ${channel}`);

if(!this.callbacks[channel]) {
// if this is the first subscriber for this channel we init the hashmap
this.callbacks[channel] = {};
}

// get an id for this callback
const id = this.callbackId++;

// store the callback against the channel/id
this.callbacks[channel][id] = callback;

// return the id we gave the callback
return resolve(id);
});
});
}


/**
* Unsubscribes a callback from a channel
*
* @param channel {string} - name of the channel to unsubscribe from
* @param callbackId {integer} - id of the callback to remove
*/
async unsubscribe(channel, callbackId) {
return new Promise((resolve, reject) => {
if(this.callbacks[channel] && this.callbacks[channel][callbackId]) {
delete this.callbacks[channel][callbackId];
this.logger.log(`Cache unsubscribed callbackId ${callbackId} from channel ${channel}`);

if(Object.keys(this.callbacks[channel]).length < 1) {
//no more callbacks for this channel
delete this.callbacks[channel];
}

return resolve();
}

// we should not be asked to unsubscribe from a subscription we do not have. Raise this as a promise
// rejection so it can be spotted. It may indiate a logic bug somewhere else
this.logger.log(`Cache not subscribed to channel ${channel} for callbackId ${callbackId}`);
return reject(new Error(`Channel ${channel} does not have a callback with id ${callbackId} subscribed`));
});
}


/**
* Handler for published messages
*/
async _onMessage(channel, msg) {
if(this.callbacks[channel]) {
// we have some callbacks to make
Object.keys(this.callbacks[channel]).forEach(k => {
this.logger.log(`Cache message received on channel ${channel}. Making callback with id ${k}`);

// call the callback with the channel name, message and callbackId...
// ...(which is useful for unsubscribe)
this.callbacks[channel][k](channel, msg, k);
});
}
}


Expand All @@ -40,29 +145,38 @@ class Cache {
*
* @returns {object} - a connected REDIS client
* */
async getClient() {
async _getClient() {
return new Promise((resolve, reject) => {
let sub = redis.createClient(this.config);
const client = redis.createClient(this.config);

sub.on('error', (err) => {
client.on('error', (err) => {
this.logger.push({ err }).log('Error from REDIS client getting subscriber');
return reject(err);
});

sub.on('ready', () => {
client.on('ready', () => {
this.logger.log(`Connected to REDIS at: ${this.config.host}:${this.config.port}`);
return resolve(sub);
return resolve(client);
});
});
}


/**
* Publishes the specified message to the specified channel
*
* @param channelName {string} - channel name to publish to
* @param value - any type that will be converted to a JSON string (unless it is already a string) and published as the message
* @returns {Promise} - Promise that will resolve with redis replies or reject with an error
*/
async publish(channelName, value) {
return new Promise((resolve, reject) => {
if(typeof(value) !== 'string') {
// ALWAYS publish string values
value = JSON.stringify(value);
}

// note that we publish on the non-SUBSCRIBE connection
this.client.publish(channelName, value, (err, replies) => {
if(err) {
this.logger.push({ channelName, err }).log(`Error publishing to channel ${channelName}`);
Expand Down
45 changes: 29 additions & 16 deletions src/lib/model/AccountsModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ class AccountsModel {
}

this._initStateMachine(this.data.currentState);

// set up a cache pub/sub subscriber
this.subscriber = await this.cache.getClient();
}


Expand All @@ -131,18 +128,12 @@ class AccountsModel {
}
}


async _executeCreateAccountsRequest(request) {
return new Promise(async (resolve, reject) => {
// set up a timeout for the request
const timeout = setTimeout(() => {
const err = new Error(`Timeout waiting for account creation request ${request.requestId}`);
return reject(err);
}, this.requestProcessingTimeoutSeconds * 1000);
const requestKey = `ac_${request.requestId}`;

const requestKey = `${request.requestId}`;

this.subscriber.subscribe(requestKey);
this.subscriber.on('message', async (cn, msg) => {
const subId = this.cache.subscribe(requestKey, async (cn, msg, subId) => {
try {
let error;
let message = JSON.parse(msg);
Expand All @@ -161,9 +152,11 @@ class AccountsModel {
// cancel the timeout handler
clearTimeout(timeout);

// stop listening for account creation response messages
this.subscriber.unsubscribe(requestKey, () => {
this.logger.log('Account creation subscriber unsubscribed');
// stop listening for account creation response messages.
// no need to await for the unsubscribe to complete.
// we dont really care if the unsubscribe fails but we should log it regardless
this.cache.unsubscribe(requestKey, subId).catch(e => {
this.logger.log(`Error unsubscribing (in callback) ${requestKey} ${subId}: ${e.stack || util.inspect(e)}`);
});

if (error) {
Expand All @@ -179,13 +172,33 @@ class AccountsModel {
}
});

// set up a timeout for the request
const timeout = setTimeout(() => {
const err = new Error(`Timeout waiting for response to account creation request ${request.requestId}`);

// we dont really care if the unsubscribe fails but we should log it regardless
this.cache.unsubscribe(requestKey, subId).catch(e => {
this.logger.log(`Error unsubscribing (in timeout handler) ${requestKey} ${subId}: ${e.stack || util.inspect(e)}`);
});

return reject(err);
}, this.requestProcessingTimeoutSeconds * 1000);

// now we have a timeout handler and a cache subscriber hooked up we can fire off
// a POST /participants request to the switch
try {
const res = await this.requests.postParticipants(request);
this.logger.push({ res }).log('Account creation request sent to ALS');
this.logger.push({ res }).log('Account creation request sent to peer');
}
catch(err) {
// cancel the timout and unsubscribe before rejecting the promise
clearTimeout(timeout);

// we dont really care if the unsubscribe fails but we should log it regardless
this.cache.unsubscribe(requestKey, subId).catch(e => {
this.logger.log(`Error unsubscribing (in error handler) ${requestKey} ${subId}: ${e.stack || util.inspect(e)}`);
});

return reject(err);
}
});
Expand Down
Loading