Skip to content
This repository has been archived by the owner on May 10, 2023. It is now read-only.

Commit

Permalink
Drained state added for inactivity handling and bunyan version updated (
Browse files Browse the repository at this point in the history
#140)

Merging after fixing constant event name usages and adding unit test for timeout event & drained state.
  • Loading branch information
stopal-r7 authored Nov 9, 2016
1 parent 2d7346e commit adf61ba
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 30 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"babel-plugin-transform-runtime": "6.6.0",
"babel-preset-es2015": "6.6.0",
"babel-preset-stage-3": "6.5.0",
"bunyan": "1.3.4",
"bunyan": "1.8.4",
"eslint": "3.3.0",
"eslint-config-airbnb": "10.0.0",
"eslint-plugin-import": "1.12.0",
Expand Down
69 changes: 40 additions & 29 deletions src/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ import BunyanStream from './bunyanstream';
const newline = /\n/g;
const tokenPattern = /[a-f\d]{8}-([a-f\d]{4}-){3}[a-f\d]{12}/;

// exposed Logger events
const errorEvent = 'error';
const logEvent = 'log';
const connectedEvent = 'connected';
const disconnectedEvent = 'disconnected';
const timeoutEvent = 'timed out';
const drainWritableEvent = 'drain';
const finishWritableEvent = 'finish';
const pipeWritableEvent = 'pipe';
const unpipeWritableEvent = 'unpipe';
const bufferDrainEvent = 'buffer drain';

/**
* Append log string to provided token.
*
Expand Down Expand Up @@ -170,8 +182,23 @@ class Logger extends Writable {
this.debugLogger.log('Buffer is full, will be shifting records until buffer is drained.');
});

this.on('buffer drain', () => {
this.on(bufferDrainEvent, () => {
this.debugLogger.log('RingBuffer drained.');
this.drained = true;
});

this.on(timeoutEvent, () => {
if (this.drained) {
this.debugLogger.log(
`Socket was inactive for ${this.inactivityTimeout / 1000} seconds. Destroying.`);
this.closeConnection();
} else {
this.debugLogger.log('Inactivity timeout event emitted but buffer was not drained.');
this.once(bufferDrainEvent, () => {
this.debugLogger.log('Buffer drain event emitted for inactivity timeout.');
this.closeConnection();
});
}
});
}

Expand All @@ -181,6 +208,7 @@ class Logger extends Writable {
* to Logentries connection when its available
*/
_write(ch, enc, cb) {
this.drained = false;
this.connection.then(conn => {
const record = this.ringBuffer.read();
if (record) {
Expand All @@ -189,7 +217,7 @@ class Logger extends Writable {
if (this.ringBuffer.isEmpty()) {
conn.write(record, () => {
process.nextTick(() => {
this.emit('buffer drain');
this.emit(bufferDrainEvent);
// this event is DEPRECATED - will be removed in next major release.
// new users should use 'buffer drain' event instead.
this.emit('connection drain');
Expand All @@ -203,7 +231,7 @@ class Logger extends Writable {
}
cb();
}).catch(err => {
this.emit('error', err);
this.emit(errorEvent, err);
this.debugLogger.log(`Error: ${err}`);
cb();
});
Expand Down Expand Up @@ -233,7 +261,7 @@ class Logger extends Writable {

// If lvl is present, it must be recognized
if (!modifiedLevel && modifiedLevel !== 0) {
this.emit('error', new LogentriesError(text.unknownLevel(modifiedLevel)));
this.emit(errorEvent, new LogentriesError(text.unknownLevel(modifiedLevel)));
return;
}

Expand All @@ -248,7 +276,7 @@ class Logger extends Writable {
if (modifiedLog.length) {
for (const $modifiedLog of modifiedLog) this.log(modifiedLevel, $modifiedLog);
} else {
this.emit('error', new LogentriesError(text.noLogMessage()));
this.emit(errorEvent, new LogentriesError(text.noLogMessage()));
}
return;
}
Expand All @@ -272,7 +300,7 @@ class Logger extends Writable {
modifiedLog = this._serialize(modifiedLog);

if (!modifiedLog) {
this.emit('error', new LogentriesError(text.serializedEmpty()));
this.emit(errorEvent, new LogentriesError(text.serializedEmpty()));
return;
}

Expand All @@ -284,7 +312,7 @@ class Logger extends Writable {
if (safeLevel) delete modifiedLog[safeLevel];
} else {
if (_.isEmpty(modifiedLog)) {
this.emit('error', new LogentriesError(text.noLogMessage()));
this.emit(errorEvent, new LogentriesError(text.noLogMessage()));
return;
}

Expand All @@ -305,7 +333,7 @@ class Logger extends Writable {
}
}

this.emit('log', modifiedLog);
this.emit(logEvent, modifiedLog);

// if RingBuffer.write returns false, don't create any other write request for
// the writable stream to avoid memory leak this means there are already 'bufferSize'
Expand All @@ -326,6 +354,7 @@ class Logger extends Writable {
}
// this makes sure retry mechanism and connection will be closed.
this.reconnection.disconnect();
this.connection = null;
}

// Private methods
Expand Down Expand Up @@ -385,17 +414,11 @@ class Logger extends Writable {
// reconnection listeners
this.reconnection.on('connect', (connection) => {
this.debugLogger.log('Connected');
this.emit('connected');
this.emit(connectedEvent);

// connection listeners
connection.on('timeout', () => {
// we owe a lot to inactivity timeout handling with regards to clearing
// unwanted opened connections hanging around.
this.debugLogger.log(
`Socket was inactive for ${this.inactivityTimeout / 1000} seconds. Destroying.`);
this.closeConnection();
this.connection = null;
this.emit('timed out');
this.emit(timeoutEvent);
});
resolve(connection);
});
Expand All @@ -409,7 +432,7 @@ class Logger extends Writable {
this.reconnection.once('disconnect', () => {
this.debugLogger.log('Socket was disconnected');
this.connection = null;
this.emit('disconnected');
this.emit(disconnectedEvent);
});

this.reconnection.on('error', (err) => {
Expand Down Expand Up @@ -806,18 +829,6 @@ const winston2 = requirePeer('winston2x', { optional: true });
if (winston1) Logger.provisionWinston(winston1);
if (winston2) Logger.provisionWinston(winston2);

// exposed Logger events
const errorEvent = 'error';
const logEvent = 'log';
const connectedEvent = 'connected';
const disconnectedEvent = 'disconnected';
const timeoutEvent = 'timed out';
const drainWritableEvent = 'drain';
const finishWritableEvent = 'finish';
const pipeWritableEvent = 'pipe';
const unpipeWritableEvent = 'unpipe';
const bufferDrainEvent = 'buffer drain';

export {
Logger as default,
errorEvent,
Expand Down
33 changes: 33 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,39 @@ tape('Socket gets re-opened as needed.', function (t) {

});

tape('Socket is not closed after inactivity timeout when buffer is not empty.', function (t) {
t.plan(3);
t.timeoutAfter(1000);
const lvl = defaults.levels[3];
const tkn = x;
const logger = new Logger({ token: x , inactivityTimeout: 300});

const mock = mitm();

mock.on('connection', function (socket, opts) {
socket.once('data', function (buffer) {
const log1 = buffer.toString();
const expected1 = [tkn, lvl, 'first log' + '\n'].join(' ');
t.equals(log1, expected1, 'first log received.');
});

logger.on('timed out', function () {
t.true(logger.drained, 'timeout event triggered and logger was drained.');
});

setTimeout(function () {
logger.log(lvl, 'second log');
socket.once('data', function (buffer) {
const log2 = buffer.toString();
const expected2 = [tkn, lvl, 'second log' + '\n'].join(' ');
t.equals(log2, expected2, 'log before inactivity timeout received.');
});
}, 301);
mock.disable();
});
logger.log(lvl, 'first log');
});


tape('RingBuffer buffers and shifts when it is full', function (t) {
t.plan(5);
Expand Down

0 comments on commit adf61ba

Please sign in to comment.