Skip to content

Commit

Permalink
refactor(ts): added ts style fix for src/connection-pool.ts (#353)
Browse files Browse the repository at this point in the history
  • Loading branch information
vijay-qlogic authored and JustinBeckwith committed Nov 21, 2018
1 parent b642898 commit 74a91aa
Showing 1 changed file with 66 additions and 71 deletions.
137 changes: 66 additions & 71 deletions src/connection-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import {EventEmitter} from 'events';
import * as through from 'through2';
import * as uuid from 'uuid';
import * as util from './util';
import { Subscription } from './subscription';
import { PubSub } from '.';
import { Duplex } from 'stream';
import { StatusObject } from 'grpc';
import { Subscriber } from './subscriber';
import {Subscription} from './subscription';
import {PubSub} from '.';
import {Duplex} from 'stream';
import {StatusObject} from 'grpc';
import {Subscriber} from './subscriber';

const CHANNEL_READY_EVENT = 'channel.ready';
const CHANNEL_ERROR_EVENT = 'channel.error';
Expand All @@ -42,17 +42,21 @@ const MAX_TIMEOUT = 300000;
* codes to retry streams
*/
const RETRY_CODES = [
0, // ok
1, // canceled
2, // unknown
4, // deadline exceeded
8, // resource exhausted
10, // aborted
13, // internal error
14, // unavailable
15, // dataloss
0, // ok
1, // canceled
2, // unknown
4, // deadline exceeded
8, // resource exhausted
10, // aborted
13, // internal error
14, // unavailable
15, // dataloss
];

class ConnectionError extends Error {
code?: string;
}

export interface ConnectionPoolSettings {
maxConnections: number;
ackDeadline: number;
Expand Down Expand Up @@ -120,7 +124,8 @@ export class ConnectionPool extends EventEmitter {
acquire(id?: string): Promise<ConnectionResponse>;
acquire(id: string, callback: ConnectionCallback): void;
acquire(callback: ConnectionCallback): void;
acquire(idOrCallback?: string|ConnectionCallback, cb?: ConnectionCallback): void|Promise<ConnectionResponse> {
acquire(idOrCallback?: string|ConnectionCallback, cb?: ConnectionCallback):
void|Promise<ConnectionResponse> {
let id = typeof idOrCallback === 'string' ? idOrCallback : null;
const callback = typeof idOrCallback === 'function' ? idOrCallback : cb!;

Expand Down Expand Up @@ -160,26 +165,25 @@ export class ConnectionPool extends EventEmitter {
this.isOpen = false;
this.isGettingChannelState = false;
this.removeAllListeners('newListener')
.removeAllListeners(CHANNEL_READY_EVENT)
.removeAllListeners(CHANNEL_ERROR_EVENT);
.removeAllListeners(CHANNEL_READY_EVENT)
.removeAllListeners(CHANNEL_ERROR_EVENT);
this.failedConnectionAttempts = 0;
this.noConnectionsTime = 0;
each(
connections,
(connection, onEndCallback) => {
connection.end(err => {
connection.cancel();
onEndCallback(err);
connections,
(connection, onEndCallback) => {
connection.end(err => {
connection.cancel();
onEndCallback(err);
});
},
err => {
if (this.client) {
this.client.close();
this.client = null;
}
callback(err);
});
},
err => {
if (this.client) {
this.client.close();
this.client = null;
}
callback(err);
}
);
}
/*!
* Creates a connection. This is async but instead of providing a callback
Expand All @@ -194,14 +198,12 @@ export class ConnectionPool extends EventEmitter {
return;
}
const requestStream = client.streamingPull();
const readStream = requestStream.pipe(
through.obj((chunk, enc, next) => {
chunk.receivedMessages.forEach(message => {
readStream.push(message);
});
next();
})
);
const readStream = requestStream.pipe(through.obj((chunk, enc, next) => {
chunk.receivedMessages.forEach(message => {
readStream.push(message);
});
next();
}));
const connection = duplexify(requestStream, readStream, {
objectMode: true,
});
Expand Down Expand Up @@ -248,29 +250,23 @@ export class ConnectionPool extends EventEmitter {
if (this.shouldReconnect(status)) {
this.queueConnection();
} else if (this.isOpen && !this.connections.size) {
const error = new Error(status.details);
(error as any).code = status.code;
const error = new ConnectionError(status.details);
error.code = status.code;
this.emit('error', error);
}
};

this.once(CHANNEL_ERROR_EVENT, onChannelError).once(
CHANNEL_READY_EVENT,
onChannelReady
);
requestStream.on('status', status =>
setImmediate(onConnectionStatus, status)
);
connection
.on('error', onConnectionError)
.on('data', onConnectionData)
.write({
subscription: replaceProjectIdToken(
this.subscription.name,
this.pubsub.projectId
),
streamAckDeadlineSeconds: this.settings.ackDeadline / 1000,
});
this.once(CHANNEL_ERROR_EVENT, onChannelError)
.once(CHANNEL_READY_EVENT, onChannelReady);
requestStream.on(
'status', status => setImmediate(onConnectionStatus, status));
connection.on('error', onConnectionError)
.on('data', onConnectionData)
.write({
subscription: replaceProjectIdToken(
this.subscription.name, this.pubsub.projectId),
streamAckDeadlineSeconds: this.settings.ackDeadline / 1000,
});
this.connections.set(id, connection);
});
}
Expand All @@ -284,14 +280,14 @@ export class ConnectionPool extends EventEmitter {
*/
createMessage(connectionId, resp) {
const pt = resp.message.publishTime;
const milliseconds = parseInt(pt.nanos, 10) / 1e6;
const milliseconds = Number(pt.nanos) / 1e6;
const originalDataLength = resp.message.data.length;
const message = {
connectionId: connectionId,
connectionId,
ackId: resp.ackId,
id: resp.message.messageId,
attributes: resp.message.attributes,
publishTime: new Date(parseInt(pt.seconds, 10) * 1000 + milliseconds),
publishTime: new Date(Number(pt.seconds) * 1000 + milliseconds),
received: Date.now(),
data: resp.message.data,
// using get here to prevent user from overwriting data
Expand All @@ -304,7 +300,7 @@ export class ConnectionPool extends EventEmitter {
nack: (delay?: number) => {
this.subscription.nack_(message, delay);
}
}
};
return message;
}
/*!
Expand Down Expand Up @@ -340,8 +336,8 @@ export class ConnectionPool extends EventEmitter {
});
}
/*!
* Gets the Subscriber client. We need to bypass GAX until they allow deadlines
* to be optional.
* Gets the Subscriber client. We need to bypass GAX until they allow
* deadlines to be optional.
*
* @private
* @param {function} callback The callback function.
Expand Down Expand Up @@ -415,9 +411,8 @@ export class ConnectionPool extends EventEmitter {
queueConnection() {
let delay = 0;
if (this.failedConnectionAttempts > 0) {
delay =
Math.pow(2, this.failedConnectionAttempts) * 1000 +
Math.floor(Math.random() * 1000);
delay = Math.pow(2, this.failedConnectionAttempts) * 1000 +
Math.floor(Math.random() * 1000);
}
const createConnection = () => {
setImmediate(() => {
Expand All @@ -429,7 +424,8 @@ export class ConnectionPool extends EventEmitter {
this.queue.push(timeoutHandle);
}
/*!
* Calls resume on each connection, allowing `message` events to fire off again.
* Calls resume on each connection, allowing `message` events to fire off
* again.
*
* @private
*/
Expand Down Expand Up @@ -466,9 +462,8 @@ export class ConnectionPool extends EventEmitter {
if (RETRY_CODES.indexOf(status.code) === -1) {
return false;
}
const exceededRetryLimit =
this.noConnectionsTime &&
Date.now() - this.noConnectionsTime > MAX_TIMEOUT;
const exceededRetryLimit = this.noConnectionsTime &&
Date.now() - this.noConnectionsTime > MAX_TIMEOUT;
if (exceededRetryLimit) {
return false;
}
Expand Down

0 comments on commit 74a91aa

Please sign in to comment.