Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ts): added ts style fix for src/connection-pool.ts #353

Merged
merged 3 commits into from
Nov 21, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 66 additions & 71 deletions src/connection-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import {EventEmitter} from 'events';
import * as through from 'through2';
import * as uuid from 'uuid';
import * as util from './util';
import { Subscription } from './subscription';
import { PubSub } from '.';
import { Duplex } from 'stream';
import { StatusObject } from 'grpc';
import { Subscriber } from './subscriber';
import {Subscription} from './subscription';
import {PubSub} from '.';
import {Duplex} from 'stream';
import {StatusObject} from 'grpc';
import {Subscriber} from './subscriber';

const CHANNEL_READY_EVENT = 'channel.ready';
const CHANNEL_ERROR_EVENT = 'channel.error';
Expand All @@ -42,17 +42,21 @@ const MAX_TIMEOUT = 300000;
* codes to retry streams
*/
const RETRY_CODES = [
0, // ok
1, // canceled
2, // unknown
4, // deadline exceeded
8, // resource exhausted
10, // aborted
13, // internal error
14, // unavailable
15, // dataloss
0, // ok
1, // canceled
2, // unknown
4, // deadline exceeded
8, // resource exhausted
10, // aborted
13, // internal error
14, // unavailable
15, // dataloss
];

class ConnectionError extends Error {
code?: string;
}

export interface ConnectionPoolSettings {
maxConnections: number;
ackDeadline: number;
Expand Down Expand Up @@ -120,7 +124,8 @@ export class ConnectionPool extends EventEmitter {
acquire(id?: string): Promise<ConnectionResponse>;
acquire(id: string, callback: ConnectionCallback): void;
acquire(callback: ConnectionCallback): void;
acquire(idOrCallback?: string|ConnectionCallback, cb?: ConnectionCallback): void|Promise<ConnectionResponse> {
acquire(idOrCallback?: string|ConnectionCallback, cb?: ConnectionCallback):
void|Promise<ConnectionResponse> {
let id = typeof idOrCallback === 'string' ? idOrCallback : null;
const callback = typeof idOrCallback === 'function' ? idOrCallback : cb!;

Expand Down Expand Up @@ -160,26 +165,25 @@ export class ConnectionPool extends EventEmitter {
this.isOpen = false;
this.isGettingChannelState = false;
this.removeAllListeners('newListener')
.removeAllListeners(CHANNEL_READY_EVENT)
.removeAllListeners(CHANNEL_ERROR_EVENT);
.removeAllListeners(CHANNEL_READY_EVENT)
.removeAllListeners(CHANNEL_ERROR_EVENT);
this.failedConnectionAttempts = 0;
this.noConnectionsTime = 0;
each(
connections,
(connection, onEndCallback) => {
connection.end(err => {
connection.cancel();
onEndCallback(err);
connections,
(connection, onEndCallback) => {
connection.end(err => {
connection.cancel();
onEndCallback(err);
});
},
err => {
if (this.client) {
this.client.close();
this.client = null;
}
callback(err);
});
},
err => {
if (this.client) {
this.client.close();
this.client = null;
}
callback(err);
}
);
}
/*!
* Creates a connection. This is async but instead of providing a callback
Expand All @@ -194,14 +198,12 @@ export class ConnectionPool extends EventEmitter {
return;
}
const requestStream = client.streamingPull();
const readStream = requestStream.pipe(
through.obj((chunk, enc, next) => {
chunk.receivedMessages.forEach(message => {
readStream.push(message);
});
next();
})
);
const readStream = requestStream.pipe(through.obj((chunk, enc, next) => {
chunk.receivedMessages.forEach(message => {
readStream.push(message);
});
next();
}));
const connection = duplexify(requestStream, readStream, {
objectMode: true,
});
Expand Down Expand Up @@ -248,29 +250,23 @@ export class ConnectionPool extends EventEmitter {
if (this.shouldReconnect(status)) {
this.queueConnection();
} else if (this.isOpen && !this.connections.size) {
const error = new Error(status.details);
(error as any).code = status.code;
const error = new ConnectionError(status.details);
error.code = status.code;
this.emit('error', error);
}
};

this.once(CHANNEL_ERROR_EVENT, onChannelError).once(
CHANNEL_READY_EVENT,
onChannelReady
);
requestStream.on('status', status =>
setImmediate(onConnectionStatus, status)
);
connection
.on('error', onConnectionError)
.on('data', onConnectionData)
.write({
subscription: replaceProjectIdToken(
this.subscription.name,
this.pubsub.projectId
),
streamAckDeadlineSeconds: this.settings.ackDeadline / 1000,
});
this.once(CHANNEL_ERROR_EVENT, onChannelError)
.once(CHANNEL_READY_EVENT, onChannelReady);
requestStream.on(
'status', status => setImmediate(onConnectionStatus, status));
connection.on('error', onConnectionError)
.on('data', onConnectionData)
.write({
subscription: replaceProjectIdToken(
this.subscription.name, this.pubsub.projectId),
streamAckDeadlineSeconds: this.settings.ackDeadline / 1000,
});
this.connections.set(id, connection);
});
}
Expand All @@ -284,14 +280,14 @@ export class ConnectionPool extends EventEmitter {
*/
createMessage(connectionId, resp) {
const pt = resp.message.publishTime;
const milliseconds = parseInt(pt.nanos, 10) / 1e6;
const milliseconds = Number(pt.nanos) / 1e6;
const originalDataLength = resp.message.data.length;
const message = {
connectionId: connectionId,
connectionId,
ackId: resp.ackId,
id: resp.message.messageId,
attributes: resp.message.attributes,
publishTime: new Date(parseInt(pt.seconds, 10) * 1000 + milliseconds),
publishTime: new Date(Number(pt.seconds) * 1000 + milliseconds),
received: Date.now(),
data: resp.message.data,
// using get here to prevent user from overwriting data
Expand All @@ -304,7 +300,7 @@ export class ConnectionPool extends EventEmitter {
nack: (delay?: number) => {
this.subscription.nack_(message, delay);
}
}
};
return message;
}
/*!
Expand Down Expand Up @@ -340,8 +336,8 @@ export class ConnectionPool extends EventEmitter {
});
}
/*!
* Gets the Subscriber client. We need to bypass GAX until they allow deadlines
* to be optional.
* Gets the Subscriber client. We need to bypass GAX until they allow
* deadlines to be optional.
*
* @private
* @param {function} callback The callback function.
Expand Down Expand Up @@ -415,9 +411,8 @@ export class ConnectionPool extends EventEmitter {
queueConnection() {
let delay = 0;
if (this.failedConnectionAttempts > 0) {
delay =
Math.pow(2, this.failedConnectionAttempts) * 1000 +
Math.floor(Math.random() * 1000);
delay = Math.pow(2, this.failedConnectionAttempts) * 1000 +
Math.floor(Math.random() * 1000);
}
const createConnection = () => {
setImmediate(() => {
Expand All @@ -429,7 +424,8 @@ export class ConnectionPool extends EventEmitter {
this.queue.push(timeoutHandle);
}
/*!
* Calls resume on each connection, allowing `message` events to fire off again.
* Calls resume on each connection, allowing `message` events to fire off
* again.
*
* @private
*/
Expand Down Expand Up @@ -466,9 +462,8 @@ export class ConnectionPool extends EventEmitter {
if (RETRY_CODES.indexOf(status.code) === -1) {
return false;
}
const exceededRetryLimit =
this.noConnectionsTime &&
Date.now() - this.noConnectionsTime > MAX_TIMEOUT;
const exceededRetryLimit = this.noConnectionsTime &&
Date.now() - this.noConnectionsTime > MAX_TIMEOUT;
if (exceededRetryLimit) {
return false;
}
Expand Down