Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ts): introduce a round of types #319

Merged
merged 2 commits into from
Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
"@google-cloud/nodejs-repo-tools": "^2.3.3",
"@types/arrify": "^1.0.4",
"@types/async": "^2.0.50",
"@types/duplexify": "^3.6.0",
"@types/extend": "^3.0.0",
"@types/is": "0.0.20",
"@types/mocha": "^5.2.5",
Expand Down
69 changes: 43 additions & 26 deletions src/connection-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ import {replaceProjectIdToken} from '@google-cloud/projectify';
const duplexify = require('duplexify');
const each = require('async-each');
import {EventEmitter} from 'events';
import * as is from 'is';
import * as through from 'through2';
import * as uuid from 'uuid';
import * as util from './util';
import { Subscription } from './subscription';
import { PubSub } from '.';
import { Duplex } from 'stream';
import { StatusObject } from 'grpc';
import { Subscriber } from './subscriber';

const CHANNEL_READY_EVENT = 'channel.ready';
const CHANNEL_ERROR_EVENT = 'channel.error';
Expand Down Expand Up @@ -49,6 +53,16 @@ const RETRY_CODES = [
15, // dataloss
];

export interface ConnectionPoolSettings {
maxConnections: number;
ackDeadline: number;
}

export type ConnectionResponse = [Duplex];
export interface ConnectionCallback {
(err: Error|null, connection?: Duplex): void;
}

/*!
* ConnectionPool is used to manage the stream connections created via
* StreamingPull rpc.
Expand All @@ -62,19 +76,19 @@ const RETRY_CODES = [
* creating a connection.
*/
export class ConnectionPool extends EventEmitter {
subscription;
pubsub;
connections;
isPaused;
isOpen;
isGettingChannelState;
failedConnectionAttempts;
noConnectionsTime;
settings;
queue;
keepAliveHandle;
client;
constructor(subscription) {
subscription: Subscription;
pubsub: PubSub;
connections: Map<string, Duplex>;
isPaused: boolean;
isOpen: boolean;
isGettingChannelState: boolean;
failedConnectionAttempts: number;
noConnectionsTime: number;
settings: ConnectionPoolSettings;
queue: NodeJS.Timer[];
keepAliveHandle?: NodeJS.Timer;
client?: Subscriber|null;
constructor(subscription: Subscription) {
super();
this.subscription = subscription;
this.pubsub = subscription.pubsub;
Expand Down Expand Up @@ -103,21 +117,23 @@ export class ConnectionPool extends EventEmitter {
* connection.
* @param {stream} callback.connection A duplex stream.
*/
acquire(id, callback) {
if (is.fn(id)) {
callback = id;
id = null;
}
acquire(id?: string): Promise<ConnectionResponse>;
acquire(id: string, callback: ConnectionCallback): void;
acquire(callback: ConnectionCallback): void;
acquire(idOrCallback?: string|ConnectionCallback, cb?: ConnectionCallback): void|Promise<ConnectionResponse> {
let id = typeof idOrCallback === 'string' ? idOrCallback : null;
const callback = typeof idOrCallback === 'function' ? idOrCallback : cb!;

if (!this.isOpen) {
callback(new Error('No connections available to make request.'));
return;
}
// it's possible that by the time a user acks the connection could have
// closed, so in that case we'll just return any connection
if (!this.connections.has(id)) {
if (!this.connections.has(id!)) {
id = this.connections.keys().next().value;
}
const connection = this.connections.get(id);
const connection = this.connections.get(id!);
if (connection) {
callback(null, connection);
return;
Expand All @@ -137,7 +153,7 @@ export class ConnectionPool extends EventEmitter {
close(callback) {
const connections = Array.from(this.connections.values());
callback = callback || util.noop;
clearInterval(this.keepAliveHandle);
clearInterval(this.keepAliveHandle!);
this.connections.clear();
this.queue.forEach(clearTimeout);
this.queue.length = 0;
Expand Down Expand Up @@ -345,7 +361,8 @@ export class ConnectionPool extends EventEmitter {
const interator = this.connections.values();
let connection = interator.next().value;
while (connection) {
if (connection.isConnected) {
// tslint:disable-next-line no-any
if ((connection as any).isConnected) {
return true;
}
connection = interator.next().value;
Expand Down Expand Up @@ -416,7 +433,7 @@ export class ConnectionPool extends EventEmitter {
*
* @private
*/
resume() {
resume(): void {
this.isPaused = false;
this.connections.forEach(connection => {
connection.resume();
Expand All @@ -427,7 +444,7 @@ export class ConnectionPool extends EventEmitter {
*
* @private
*/
sendKeepAlives() {
sendKeepAlives(): void {
this.connections.forEach(connection => {
connection.write({});
});
Expand All @@ -440,7 +457,7 @@ export class ConnectionPool extends EventEmitter {
* @param {object} status The gRPC status object.
* @return {boolean}
*/
shouldReconnect(status) {
shouldReconnect(status: StatusObject): boolean {
// If the pool was closed, we should definitely not reconnect
if (!this.isOpen) {
return false;
Expand Down
27 changes: 16 additions & 11 deletions src/histogram.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

import * as extend from 'extend';

export interface HistogramOptions {
min?: number;
max?: number;
}

/*!
* The Histogram class is used to capture the lifespan of messages within the
* the client. These durations are then used to calculate the 99th percentile
Expand All @@ -25,10 +30,10 @@ import * as extend from 'extend';
* @class
*/
export class Histogram {
options;
data;
length;
constructor(options?) {
options: HistogramOptions;
data: Map<number, number>;
length: number;
constructor(options?: HistogramOptions) {
this.options = extend(
{
min: 10000,
Expand All @@ -45,14 +50,14 @@ export class Histogram {
* @private
* @param {numnber} value - The value in milliseconds.
*/
add(value) {
value = Math.max(value, this.options.min);
value = Math.min(value, this.options.max);
add(value: number): void {
value = Math.max(value, this.options.min!);
value = Math.min(value, this.options.max!);
value = Math.ceil(value / 1000) * 1000;
if (!this.data.has(value)) {
this.data.set(value, 0);
}
const count = this.data.get(value);
const count = this.data.get(value)!;
this.data.set(value, count + 1);
this.length += 1;
}
Expand All @@ -63,18 +68,18 @@ export class Histogram {
* @param {number} percent The requested percentage.
* @return {number}
*/
percentile(percent) {
percentile(percent: number): number {
percent = Math.min(percent, 100);
let target = this.length - this.length * (percent / 100);
const keys = Array.from(this.data.keys());
let key;
for (let i = keys.length - 1; i > -1; i--) {
key = keys[i];
target -= this.data.get(key);
target -= this.data.get(key)!;
if (target <= 0) {
return key;
}
}
return this.options.min;
return this.options.min!;
}
}
15 changes: 8 additions & 7 deletions src/iam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import * as arrify from 'arrify';
import {promisifyAll} from '@google-cloud/promisify';
import * as is from 'is';
import { PubSub } from '.';

/**
* [IAM (Identity and Access Management)](https://cloud.google.com/pubsub/access_control)
Expand Down Expand Up @@ -61,11 +62,11 @@ import * as is from 'is';
* // subscription.iam
*/
export class IAM {
Promise;
pubsub;
request;
id;
constructor(pubsub, id) {
Promise?: PromiseConstructor;
pubsub: PubSub;
request: typeof PubSub.prototype.request;
id: string;
constructor(pubsub: PubSub, id: string) {
if (pubsub.Promise) {
this.Promise = pubsub.Promise;
}
Expand Down Expand Up @@ -282,7 +283,7 @@ export class IAM {
* const apiResponse = data[1];
* });
*/
testPermissions(permissions, gaxOpts, callback?) {
testPermissions(permissions: string|string[], gaxOpts, callback?) {
if (!is.array(permissions) && !is.string(permissions)) {
throw new Error('Permissions are required.');
}
Expand All @@ -307,7 +308,7 @@ export class IAM {
return;
}
const availablePermissions = arrify(resp.permissions);
const permissionHash = permissions.reduce(function(acc, permission) {
const permissionHash = (permissions as string[]).reduce(function(acc, permission) {
acc[permission] = availablePermissions.indexOf(permission) > -1;
return acc;
}, {});
Expand Down
14 changes: 8 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import {paginator} from '@google-cloud/paginator';
import {promisifyAll} from '@google-cloud/promisify';
import * as extend from 'extend';
import {GoogleAuth} from 'google-auth-library';
const gax = require('google-gax');
const {grpc} = new gax.GrpcClient();
import * as gax from 'google-gax';
import * as is from 'is';

const PKG = require('../../package.json');
Expand All @@ -31,6 +30,9 @@ import {Subscription} from './subscription';
import {Topic} from './topic';
import { Readable } from 'stream';

const opts = {} as gax.GrpcClientOptions;
const {grpc} = new gax.GrpcClient(opts);

/**
* @type {string} - Project ID placeholder.
* @private
Expand Down Expand Up @@ -206,14 +208,14 @@ export class PubSub {
* const apiResponse = data[1];
* });
*/
createSubscription(topic, name, options, callback) {
createSubscription(topic: Topic|string, name: string, options, callback) {
if (!is.string(topic) && !(topic instanceof Topic)) {
throw new Error('A Topic is required for a new subscription.');
}
if (!is.string(name)) {
throw new Error('A subscription name is required.');
}
if (is.string(topic)) {
if (typeof topic === 'string') {
topic = this.topic(topic);
}
if (is.fn(options)) {
Expand Down Expand Up @@ -286,7 +288,7 @@ export class PubSub {
* const apiResponse = data[1];
* });
*/
createTopic(name, gaxOpts, callback?) {
createTopic(name: string, gaxOpts, callback?) {
const topic = this.topic(name);
const reqOpts = {
name: topic.name,
Expand Down Expand Up @@ -691,7 +693,7 @@ export class PubSub {
*
* const snapshot = pubsub.snapshot('my-snapshot');
*/
snapshot(name) {
snapshot(name: string) {
if (!is.string(name)) {
throw new Error('You must supply a valid name for the snapshot.');
}
Expand Down
6 changes: 3 additions & 3 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import { Topic } from './topic';
* const publisher = topic.publisher();
*/
export class Publisher {
Promise;
topic;
Promise?: PromiseConstructor;
topic: Topic;
inventory_;
settings;
timeoutHandle_;
Expand Down Expand Up @@ -148,7 +148,7 @@ export class Publisher {
* //-
* publisher.publish(data).then((messageId) => {});
*/
publish(data, attributes, callback?) {
publish(data: Buffer, attributes, callback?) {
if (!(data instanceof Buffer)) {
throw new TypeError('Data must be in the form of a Buffer.');
}
Expand Down
9 changes: 5 additions & 4 deletions src/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import * as util from './util';
import {promisifyAll} from '@google-cloud/promisify';
import * as is from 'is';
import { PubSub } from '.';

/**
* A Snapshot object will give you access to your Cloud Pub/Sub snapshot.
Expand Down Expand Up @@ -84,12 +85,12 @@ import * as is from 'is';
*/
export class Snapshot {
parent;
name;
Promise;
name: string;
Promise?: PromiseConstructor;
create;
seek;
metadata;
constructor(parent, name) {
constructor(parent, name: string) {
if (parent.Promise) {
this.Promise = parent.Promise;
}
Expand Down Expand Up @@ -201,7 +202,7 @@ export class Snapshot {
*
* @private
*/
static formatName_(projectId, name) {
static formatName_(projectId: string, name: string) {
return 'projects/' + projectId + '/snapshots/' + name.split('/').pop();
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import * as os from 'os';

import {ConnectionPool} from './connection-pool';
import {Histogram} from './histogram';
import { Subscription } from '.';

/**
* @type {number} - The maximum number of ackIds to be sent in acknowledge/modifyAckDeadline
Expand Down Expand Up @@ -389,7 +390,8 @@ export class Subscriber extends EventEmitter {
* @private
*/
openConnection_() {
const pool = (this.connectionPool = new ConnectionPool(this));
// TODO: fixup this cast
const pool = (this.connectionPool = new ConnectionPool(this as {} as Subscription));
this.isOpen = true;
pool.on('error', err => {
this.emit('error', err);
Expand Down Expand Up @@ -477,7 +479,7 @@ export class Subscriber extends EventEmitter {
}
// we can ignore any errors that come from this since they'll be
// re-emitted later
connection.write(data, err => {
connection!.write(data, err => {
if (!err) {
this.latency_.add(Date.now() - startTime);
}
Expand Down
Loading