Skip to content

Commit

Permalink
Merge branch 'bugfix/ARSN-4-exceptionWhenKMSIsDown' into tmp/octopus/…
Browse files Browse the repository at this point in the history
…w/8.1/bugfix/ARSN-4-exceptionWhenKMSIsDown
  • Loading branch information
bert-e committed Jul 23, 2021
2 parents 856a163 + 6fdfbcb commit 0bdcd86
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 72 deletions.
83 changes: 38 additions & 45 deletions lib/network/kmip/transport/TransportTemplate.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,58 +70,59 @@ class TransportTemplate {
this.options.tls.port || DEFAULT_KMIP_PORT,
this.options.tls,
() => {
socket.on('data', data => {
const queuedCallback = this.callbackPipeline.shift();
queuedCallback(null, socket, data);

if (this.callbackPipeline.length <
this.pipelineDepth &&
this.deferedRequests.length > 0) {
const deferedRequest = this.deferedRequests.shift();
process.nextTick(() => {
this.send(logger,
deferedRequest.encodedMessage,
deferedRequest.cb);
});
} else if (this.callbackPipeline.length === 0 &&
this.deferedRequests.length === 0 &&
this.pipelineDrainedCallback) {
this.pipelineDrainedCallback();
this.pipelineDrainedCallback = null;
}
});
socket.on('end', () => {
const error = Error('Conversation interrupted');
this._drainQueuesWithError(error);
this.socket = null;
});
socket.on('error', err => {
this._drainQueuesWithError(err);
});
if (this.handshakeFunction) {
this.handshakeFunction(logger, readyCallback);
} else {
readyCallback(null);
}
});
socket.on('data', data => {
const queuedCallback = this.callbackPipeline.shift();
queuedCallback(null, socket, data);

if (this.callbackPipeline.length <
this.pipelineDepth &&
this.deferedRequests.length > 0) {
const deferedRequest = this.deferedRequests.shift();
process.nextTick(() => {
this.send(logger,
deferedRequest.encodedMessage,
deferedRequest.cb);
});
} else if (this.callbackPipeline.length === 0 &&
this.deferedRequests.length === 0 &&
this.pipelineDrainedCallback) {
this.pipelineDrainedCallback();
this.pipelineDrainedCallback = null;
}
});
socket.on('end', () => {
const error = Error('Conversation interrupted');
this.socket = null;
this._drainQueuesWithError(error);
});
socket.on('error', err => {
this._drainQueuesWithError(err);
});
this.socket = socket;
} catch (err) {
logger.error();
logger.error(err);
this._drainQueuesWithError(err);
readyCallback(err);
}
}

_doSend(logger, encodedMessage, cb) {
this.callbackPipeline.push(cb);
if (this.socket === null || this.socket.destroyed) {
this._createConversation(logger, () => {});
}
const socket = this.socket;
if (!socket || socket.destroyed) {
const error = new Error('Socket to server not available');
logger.error('TransportTemplate::_doSend', { error });
return cb(error);
if (socket) {
socket.cork();
socket.write(encodedMessage);
socket.uncork();
}
this.callbackPipeline.push(cb);
socket.cork();
socket.write(encodedMessage);
socket.uncork();
return undefined;
}

Expand All @@ -138,14 +139,6 @@ class TransportTemplate {
return this.deferedRequests.push({ encodedMessage, cb });
}
assert(encodedMessage.length !== 0);
if (this.socket === null || this.socket.destroyed) {
return this._createConversation(logger, err => {
if (err) {
return cb(err);
}
return this._doSend(logger, encodedMessage, cb);
});
}
return this._doSend(logger, encodedMessage, cb);
}

Expand Down
43 changes: 43 additions & 0 deletions tests/functional/kmip/tls.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const assert = require('assert');
const net = require('net');
const tls = require('tls');
const TransportTemplate =
require('../../../lib/network/kmip/transport/TransportTemplate.js');
const { logger } = require('../../utils/kmip/ersatz.js');

describe('KMIP Connection Management', () => {
let server;
before(done => {
server = net.createServer(conn => {
// abort the connection as soon as it is accepted
conn.destroy();
});
server.listen(5696);
server.on('listening', done);
});
after(done => {
server.close(done);
});

it('should gracefully handle connection errors', done => {
const transport = new TransportTemplate(
tls,
{
pipelineDepth: 1,
tls: {
port: 5696,
},
});
const request = Buffer.alloc(10).fill(6);
/* Using a for loop here instead of anything
* asynchronous, the callbacks get stuck in
* the conversation queue and are unwind with
* an error. It is the purpose of this test */
transport.send(logger, request, (err, conversation, response) => {
assert(err);
assert(!response);
done();
});
transport.end();
});
});
31 changes: 4 additions & 27 deletions tests/utils/kmip/ersatz.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'; // eslint-disable-line
/* eslint new-cap: "off" */

const { EventEmitter } = require('events');

const logger = {
info: () => {},
Expand All @@ -10,11 +10,10 @@ const logger = {
};

/* Fake tls AND socket objects, duck type */
class EchoChannel {
class EchoChannel extends EventEmitter {
constructor() {
super();
this.clogged = false;
this.eventHandler = {};
this.deferedSignal = {};
}

/* tls object members substitutes */
Expand All @@ -24,16 +23,6 @@ class EchoChannel {
return this;
}

on(event, cb) {
this.eventHandler[event] = cb;
if (this.deferedSignal[event] &&
this.deferedSignal[event].length > 0) {
this.deferedSignal[event].forEach(this.eventHandler[event]);
this.deferedSignal[event] = undefined;
}
return this;
}

/* socket object members substitutes */

cork() {
Expand All @@ -46,7 +35,7 @@ class EchoChannel {

write(data) {
if (!this.clogged) {
return this.emit('data', data);
return process.nextTick(() => this.emit('data', data));
}
return this;
}
Expand All @@ -57,18 +46,6 @@ class EchoChannel {

/* Instrumentation member functions */

emit(event, data) {
if (this.eventHandler[event]) {
this.eventHandler[event](data);
} else {
if (!this.deferedSignal[event]) {
this.deferedSignal[event] = [];
}
this.deferedSignal[event].push(data);
}
return this;
}

clog() {
this.clogged = true;
return this;
Expand Down

0 comments on commit 0bdcd86

Please sign in to comment.