-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: S3C-1966: KMIP TLS Transport #688
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
'use strict'; // eslint-disable-line | ||
|
||
const assert = require('assert'); | ||
|
||
const DEFAULT_PIPELINE_DEPTH = 8; | ||
const DEFAULT_KMIP_PORT = 5696; | ||
|
||
class TransportTemplate { | ||
/** | ||
* Construct a new object of the TransportTemplate class | ||
* @param {Object} channel - Typically the tls object | ||
* @param {Object} options - Instance options | ||
* @param {Number} options.pipelineDepth - depth of the pipeline | ||
* @param {Object} options.tls - Standard TLS socket initialization | ||
* parameters | ||
* @param {Number} options.tls.port - TLS server port to connect to | ||
*/ | ||
constructor(channel, options) { | ||
this.channel = channel; | ||
this.options = options; | ||
this.pipelineDepth = Math.max(1, options.pipelineDepth || | ||
DEFAULT_PIPELINE_DEPTH); | ||
this.callbackPipeline = []; | ||
this.deferedRequests = []; | ||
this.pipelineDrainedCallback = null; | ||
this.socket = null; | ||
} | ||
|
||
/** | ||
* Drain the outstanding and defered request queues by | ||
* calling the associated callback with an error | ||
* @param {Error} error - the error to call the callback function with. | ||
* @returns {undefined} | ||
*/ | ||
_drainQueuesWithError(error) { | ||
this.callbackPipeline.forEach(queuedCallback => { | ||
queuedCallback(error); | ||
}); | ||
this.deferedRequests.forEach(deferedRequest => { | ||
deferedRequest.cb(error); | ||
}); | ||
this.callbackPipeline = []; | ||
this.deferedRequests = []; | ||
} | ||
|
||
/** | ||
* Create a new conversation (e.g. a socket) between the client | ||
* and the server. | ||
* @param {Object} logger - Werelogs logger object | ||
* @param {Function} readyCallback - callback function to call when the | ||
* conversation is ready to be initiated | ||
* @returns {undefined} | ||
*/ | ||
_createConversation(logger, readyCallback) { | ||
try { | ||
const socket = this.channel.connect( | ||
this.options.tls.port || DEFAULT_KMIP_PORT, | ||
this.options.tls, | ||
() => { | ||
socket.on('data', data => { | ||
const queuedCallback = this.callbackPipeline.shift(); | ||
queuedCallback(null, socket, data); | ||
|
||
if (this.callbackPipeline.length < | ||
this.pipelineDepth && | ||
this.deferedRequests.length > 0) { | ||
const deferedRequest = this.deferedRequests.shift(); | ||
process.nextTick(() => { | ||
this.send(logger, | ||
deferedRequest.encodedMessage, | ||
deferedRequest.cb); | ||
}); | ||
} else if (this.callbackPipeline.length === 0 && | ||
this.deferedRequests.length === 0 && | ||
this.pipelineDrainedCallback) { | ||
this.pipelineDrainedCallback(); | ||
this.pipelineDrainedCallback = null; | ||
} | ||
}); | ||
socket.on('end', () => { | ||
const error = Error('Conversation interrupted'); | ||
this._drainQueuesWithError(error); | ||
this.socket = null; | ||
}); | ||
socket.on('error', err => { | ||
this._drainQueuesWithError(err); | ||
}); | ||
readyCallback(null); | ||
}); | ||
this.socket = socket; | ||
} catch (err) { | ||
logger.error(); | ||
readyCallback(err); | ||
} | ||
} | ||
|
||
_doSend(logger, encodedMessage, cb) { | ||
const socket = this.socket; | ||
if (!socket) { | ||
const error = new Error('Socket to server not available'); | ||
logger.error('TransportTemplate::_doSend', { error }); | ||
return cb(error); | ||
} | ||
this.callbackPipeline.push(cb); | ||
socket.cork(); | ||
socket.write(encodedMessage); | ||
socket.uncork(); | ||
return undefined; | ||
} | ||
|
||
/** | ||
* Send an encoded message to the server | ||
* @param {Object} logger - Werelogs logger object | ||
* @param {Buffer} encodedMessage - the encoded message to send to the | ||
* server | ||
* @param {Function} cb - (err, conversation, rawResponse) | ||
* @returns {undefined} | ||
*/ | ||
send(logger, encodedMessage, cb) { | ||
if (this.callbackPipeline.length >= this.pipelineDepth) { | ||
return this.deferedRequests.push({ encodedMessage, cb }); | ||
} | ||
assert(encodedMessage.length !== 0); | ||
if (this.socket === null) { | ||
return this._createConversation(logger, err => { | ||
if (err) { | ||
return cb(err); | ||
} | ||
return this._doSend(logger, encodedMessage, cb); | ||
}); | ||
} | ||
return this._doSend(logger, encodedMessage, cb); | ||
} | ||
|
||
/** | ||
* Gracefuly interrupt the conversation. If the caller keeps sending | ||
* message after calling this function, the conversation won't | ||
* converge to its end. | ||
* @returns {undefined} | ||
*/ | ||
end() { | ||
if (!this.socket) { | ||
return; | ||
} | ||
if (this.callbackPipeline.length !== 0 || | ||
this.deferedRequests.length !== 0) { | ||
this.pipelineDrainedCallback = this.socket.end.bind(this.socket); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any scenario where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, after a misusage of the API during shutdown (calling twice, or race reaping the pending errors). |
||
} else { | ||
this.socket.end(); | ||
} | ||
} | ||
|
||
/** | ||
* Abruptly interrupt the conversation and cancel the outstanding and | ||
* defered requests | ||
* @param {Object} conversation - the conversation to abort | ||
* @returns {undefined} | ||
*/ | ||
abortPipeline(conversation) { | ||
conversation.end(); | ||
} | ||
} | ||
|
||
module.exports = TransportTemplate; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
'use strict'; // eslint-disable-line | ||
|
||
const tls = require('tls'); | ||
const TransportTemplate = require('./TransportTemplate.js'); | ||
|
||
class TlsTransport extends TransportTemplate { | ||
constructor(options) { | ||
super(tls, options); | ||
} | ||
} | ||
|
||
module.exports = TlsTransport; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
'use strict'; // eslint-disable-line | ||
|
||
const async = require('async'); | ||
const assert = require('assert'); | ||
const TransportTemplate = | ||
require('../../../lib/network/kmip/transport/TransportTemplate.js'); | ||
const { logger, EchoChannel } = require('../../utils/kmip/ersatz.js'); | ||
|
||
describe('KMIP Transport Template Class', () => { | ||
const pipelineDepths = [1, 2, 4, 8, 16, 32]; | ||
const requestNumbers = [1, 37, 1021, 8191]; | ||
|
||
pipelineDepths.forEach(pipelineDepth => { | ||
requestNumbers.forEach(iterations => { | ||
it(`should survive ${iterations} iterations` + | ||
` with ${pipelineDepth}way pipeline`, | ||
done => { | ||
const transport = new TransportTemplate( | ||
new EchoChannel, | ||
{ | ||
pipelineDepth, | ||
tls: { | ||
port: 5696, | ||
}, | ||
}); | ||
const request = Buffer.alloc(10).fill(6); | ||
async.times(iterations, (n, next) => { | ||
transport.send(logger, request, | ||
(err, conversation, response) => { | ||
if (err) { | ||
return next(err); | ||
} | ||
if (request.compare(response) !== 0) { | ||
return next(Error('arg')); | ||
} | ||
return next(); | ||
}); | ||
}, err => { | ||
transport.end(); | ||
done(err); | ||
}); | ||
}); | ||
|
||
[true, false].forEach(doEmit => { | ||
it('should report errors to outstanding requests.' + | ||
` w:${pipelineDepth}, i:${iterations}, e:${doEmit}`, | ||
done => { | ||
const echoChannel = new EchoChannel; | ||
echoChannel.clog(); | ||
const transport = new TransportTemplate( | ||
echoChannel, | ||
{ | ||
pipelineDepth, | ||
tls: { | ||
port: 5696, | ||
}, | ||
}); | ||
const request = Buffer.alloc(10).fill(6); | ||
/* Using a for loop here instead of anything | ||
* asynchronous, the callbacks get stuck in | ||
* the conversation queue and are unwind with | ||
* an error. It is the purpose of this test */ | ||
for (let i = 0; i < iterations; ++i) { | ||
transport.send( | ||
logger, request, | ||
(err, conversation, response) => { | ||
assert(err); | ||
assert(!response); | ||
}); | ||
} | ||
if (doEmit) { | ||
echoChannel.emit('error', new Error('awesome')); | ||
} else { | ||
transport.abortPipeline(echoChannel); | ||
} | ||
transport.end(); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,12 +7,7 @@ const KMIP = require('../../../lib/network/kmip'); | |
const ttlvFixtures = require('../../utils/kmip/ttlvFixtures'); | ||
const badTtlvFixtures = require('../../utils/kmip/badTtlvFixtures'); | ||
const messageFixtures = require('../../utils/kmip/messageFixtures'); | ||
|
||
const logger = { | ||
info: () => {}, | ||
debug: () => {}, | ||
error: () => {}, | ||
}; | ||
const { logger } = require('../../utils/kmip/ersatz.js'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the file name here ✅ |
||
|
||
describe('KMIP TTLV Codec', () => { | ||
it('should map, encode and decode an extension', done => { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
'use strict'; // eslint-disable-line | ||
|
||
|
||
const logger = { | ||
info: () => {}, | ||
debug: () => {}, | ||
warn: () => {}, | ||
error: () => {}, | ||
}; | ||
|
||
/* Fake tls AND socket objects, duck type */ | ||
class EchoChannel { | ||
constructor() { | ||
this.clogged = false; | ||
this.eventHandler = {}; | ||
this.deferedSignal = {}; | ||
} | ||
|
||
/* tls object members substitutes */ | ||
|
||
connect(port, options, cb) { | ||
process.nextTick(cb); | ||
return this; | ||
} | ||
|
||
on(event, cb) { | ||
this.eventHandler[event] = cb; | ||
if (this.deferedSignal[event] && | ||
this.deferedSignal[event].length > 0) { | ||
this.deferedSignal[event].forEach(this.eventHandler[event]); | ||
this.deferedSignal[event] = undefined; | ||
} | ||
return this; | ||
} | ||
|
||
/* socket object members substitutes */ | ||
|
||
cork() { | ||
return this; | ||
} | ||
|
||
uncork() { | ||
return this; | ||
} | ||
|
||
write(data) { | ||
if (!this.clogged) { | ||
return this.emit('data', data); | ||
} | ||
return this; | ||
} | ||
|
||
end() { | ||
return this.emit('end'); | ||
} | ||
|
||
/* Instrumentation member functions */ | ||
|
||
emit(event, data) { | ||
if (this.eventHandler[event]) { | ||
this.eventHandler[event](data); | ||
} else { | ||
if (!this.deferedSignal[event]) { | ||
this.deferedSignal[event] = []; | ||
} | ||
this.deferedSignal[event].push(data); | ||
} | ||
return this; | ||
} | ||
|
||
clog() { | ||
this.clogged = true; | ||
return this; | ||
} | ||
|
||
} | ||
|
||
module.exports = { logger, EchoChannel }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to confirm that the 0 case is intended to go to the default rather than the max of 1. For example:
options.pipelineDepth
is < 0this.pipelineDepth
is 1options.pipelineDepth
is equal to 0this.pipelineDepth
is 8There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exact! (actually, 1 is the min)