diff --git a/package.json b/package.json index ecdc5042a..8e84bd7bc 100644 --- a/package.json +++ b/package.json @@ -101,6 +101,7 @@ "@google-cloud/nodejs-repo-tools": "^2.3.3", "@types/arrify": "^1.0.4", "@types/async": "^2.0.50", + "@types/duplexify": "^3.6.0", "@types/extend": "^3.0.0", "@types/is": "0.0.20", "@types/mocha": "^5.2.5", diff --git a/src/connection-pool.ts b/src/connection-pool.ts index 471d96d72..a8bfa986a 100644 --- a/src/connection-pool.ts +++ b/src/connection-pool.ts @@ -18,10 +18,14 @@ import {replaceProjectIdToken} from '@google-cloud/projectify'; const duplexify = require('duplexify'); const each = require('async-each'); import {EventEmitter} from 'events'; -import * as is from 'is'; import * as through from 'through2'; import * as uuid from 'uuid'; import * as util from './util'; +import { Subscription } from './subscription'; +import { PubSub } from '.'; +import { Duplex } from 'stream'; +import { StatusObject } from 'grpc'; +import { Subscriber } from './subscriber'; const CHANNEL_READY_EVENT = 'channel.ready'; const CHANNEL_ERROR_EVENT = 'channel.error'; @@ -49,6 +53,16 @@ const RETRY_CODES = [ 15, // dataloss ]; +export interface ConnectionPoolSettings { + maxConnections: number; + ackDeadline: number; +} + +export type ConnectionResponse = [Duplex]; +export interface ConnectionCallback { + (err: Error|null, connection?: Duplex): void; +} + /*! * ConnectionPool is used to manage the stream connections created via * StreamingPull rpc. @@ -62,19 +76,19 @@ const RETRY_CODES = [ * creating a connection. */ export class ConnectionPool extends EventEmitter { - subscription; - pubsub; - connections; - isPaused; - isOpen; - isGettingChannelState; - failedConnectionAttempts; - noConnectionsTime; - settings; - queue; - keepAliveHandle; - client; - constructor(subscription) { + subscription: Subscription; + pubsub: PubSub; + connections: Map; + isPaused: boolean; + isOpen: boolean; + isGettingChannelState: boolean; + failedConnectionAttempts: number; + noConnectionsTime: number; + settings: ConnectionPoolSettings; + queue: NodeJS.Timer[]; + keepAliveHandle?: NodeJS.Timer; + client?: Subscriber|null; + constructor(subscription: Subscription) { super(); this.subscription = subscription; this.pubsub = subscription.pubsub; @@ -103,21 +117,23 @@ export class ConnectionPool extends EventEmitter { * connection. * @param {stream} callback.connection A duplex stream. */ - acquire(id, callback) { - if (is.fn(id)) { - callback = id; - id = null; - } + acquire(id?: string): Promise; + acquire(id: string, callback: ConnectionCallback): void; + acquire(callback: ConnectionCallback): void; + acquire(idOrCallback?: string|ConnectionCallback, cb?: ConnectionCallback): void|Promise { + let id = typeof idOrCallback === 'string' ? idOrCallback : null; + const callback = typeof idOrCallback === 'function' ? idOrCallback : cb!; + if (!this.isOpen) { callback(new Error('No connections available to make request.')); return; } // it's possible that by the time a user acks the connection could have // closed, so in that case we'll just return any connection - if (!this.connections.has(id)) { + if (!this.connections.has(id!)) { id = this.connections.keys().next().value; } - const connection = this.connections.get(id); + const connection = this.connections.get(id!); if (connection) { callback(null, connection); return; @@ -137,7 +153,7 @@ export class ConnectionPool extends EventEmitter { close(callback) { const connections = Array.from(this.connections.values()); callback = callback || util.noop; - clearInterval(this.keepAliveHandle); + clearInterval(this.keepAliveHandle!); this.connections.clear(); this.queue.forEach(clearTimeout); this.queue.length = 0; @@ -345,7 +361,8 @@ export class ConnectionPool extends EventEmitter { const interator = this.connections.values(); let connection = interator.next().value; while (connection) { - if (connection.isConnected) { + // tslint:disable-next-line no-any + if ((connection as any).isConnected) { return true; } connection = interator.next().value; @@ -416,7 +433,7 @@ export class ConnectionPool extends EventEmitter { * * @private */ - resume() { + resume(): void { this.isPaused = false; this.connections.forEach(connection => { connection.resume(); @@ -427,7 +444,7 @@ export class ConnectionPool extends EventEmitter { * * @private */ - sendKeepAlives() { + sendKeepAlives(): void { this.connections.forEach(connection => { connection.write({}); }); @@ -440,7 +457,7 @@ export class ConnectionPool extends EventEmitter { * @param {object} status The gRPC status object. * @return {boolean} */ - shouldReconnect(status) { + shouldReconnect(status: StatusObject): boolean { // If the pool was closed, we should definitely not reconnect if (!this.isOpen) { return false; diff --git a/src/histogram.ts b/src/histogram.ts index 6a058a25a..8c25285a9 100644 --- a/src/histogram.ts +++ b/src/histogram.ts @@ -16,6 +16,11 @@ import * as extend from 'extend'; +export interface HistogramOptions { + min?: number; + max?: number; +} + /*! * The Histogram class is used to capture the lifespan of messages within the * the client. These durations are then used to calculate the 99th percentile @@ -25,10 +30,10 @@ import * as extend from 'extend'; * @class */ export class Histogram { - options; - data; - length; - constructor(options?) { + options: HistogramOptions; + data: Map; + length: number; + constructor(options?: HistogramOptions) { this.options = extend( { min: 10000, @@ -45,14 +50,14 @@ export class Histogram { * @private * @param {numnber} value - The value in milliseconds. */ - add(value) { - value = Math.max(value, this.options.min); - value = Math.min(value, this.options.max); + add(value: number): void { + value = Math.max(value, this.options.min!); + value = Math.min(value, this.options.max!); value = Math.ceil(value / 1000) * 1000; if (!this.data.has(value)) { this.data.set(value, 0); } - const count = this.data.get(value); + const count = this.data.get(value)!; this.data.set(value, count + 1); this.length += 1; } @@ -63,18 +68,18 @@ export class Histogram { * @param {number} percent The requested percentage. * @return {number} */ - percentile(percent) { + percentile(percent: number): number { percent = Math.min(percent, 100); let target = this.length - this.length * (percent / 100); const keys = Array.from(this.data.keys()); let key; for (let i = keys.length - 1; i > -1; i--) { key = keys[i]; - target -= this.data.get(key); + target -= this.data.get(key)!; if (target <= 0) { return key; } } - return this.options.min; + return this.options.min!; } } diff --git a/src/iam.ts b/src/iam.ts index 2c74842b0..0047acdc8 100644 --- a/src/iam.ts +++ b/src/iam.ts @@ -21,6 +21,7 @@ import * as arrify from 'arrify'; import {promisifyAll} from '@google-cloud/promisify'; import * as is from 'is'; +import { PubSub } from '.'; /** * [IAM (Identity and Access Management)](https://cloud.google.com/pubsub/access_control) @@ -61,11 +62,11 @@ import * as is from 'is'; * // subscription.iam */ export class IAM { - Promise; - pubsub; - request; - id; - constructor(pubsub, id) { + Promise?: PromiseConstructor; + pubsub: PubSub; + request: typeof PubSub.prototype.request; + id: string; + constructor(pubsub: PubSub, id: string) { if (pubsub.Promise) { this.Promise = pubsub.Promise; } @@ -282,7 +283,7 @@ export class IAM { * const apiResponse = data[1]; * }); */ - testPermissions(permissions, gaxOpts, callback?) { + testPermissions(permissions: string|string[], gaxOpts, callback?) { if (!is.array(permissions) && !is.string(permissions)) { throw new Error('Permissions are required.'); } @@ -307,7 +308,7 @@ export class IAM { return; } const availablePermissions = arrify(resp.permissions); - const permissionHash = permissions.reduce(function(acc, permission) { + const permissionHash = (permissions as string[]).reduce(function(acc, permission) { acc[permission] = availablePermissions.indexOf(permission) > -1; return acc; }, {}); diff --git a/src/index.ts b/src/index.ts index 579f3f6ce..2eb951647 100644 --- a/src/index.ts +++ b/src/index.ts @@ -19,8 +19,7 @@ import {paginator} from '@google-cloud/paginator'; import {promisifyAll} from '@google-cloud/promisify'; import * as extend from 'extend'; import {GoogleAuth} from 'google-auth-library'; -const gax = require('google-gax'); -const {grpc} = new gax.GrpcClient(); +import * as gax from 'google-gax'; import * as is from 'is'; const PKG = require('../../package.json'); @@ -31,6 +30,9 @@ import {Subscription} from './subscription'; import {Topic} from './topic'; import { Readable } from 'stream'; +const opts = {} as gax.GrpcClientOptions; +const {grpc} = new gax.GrpcClient(opts); + /** * @type {string} - Project ID placeholder. * @private @@ -206,14 +208,14 @@ export class PubSub { * const apiResponse = data[1]; * }); */ - createSubscription(topic, name, options, callback) { + createSubscription(topic: Topic|string, name: string, options, callback) { if (!is.string(topic) && !(topic instanceof Topic)) { throw new Error('A Topic is required for a new subscription.'); } if (!is.string(name)) { throw new Error('A subscription name is required.'); } - if (is.string(topic)) { + if (typeof topic === 'string') { topic = this.topic(topic); } if (is.fn(options)) { @@ -286,7 +288,7 @@ export class PubSub { * const apiResponse = data[1]; * }); */ - createTopic(name, gaxOpts, callback?) { + createTopic(name: string, gaxOpts, callback?) { const topic = this.topic(name); const reqOpts = { name: topic.name, @@ -691,7 +693,7 @@ export class PubSub { * * const snapshot = pubsub.snapshot('my-snapshot'); */ - snapshot(name) { + snapshot(name: string) { if (!is.string(name)) { throw new Error('You must supply a valid name for the snapshot.'); } diff --git a/src/publisher.ts b/src/publisher.ts index abc62ca2b..5a9e5db11 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -48,8 +48,8 @@ import { Topic } from './topic'; * const publisher = topic.publisher(); */ export class Publisher { - Promise; - topic; + Promise?: PromiseConstructor; + topic: Topic; inventory_; settings; timeoutHandle_; @@ -148,7 +148,7 @@ export class Publisher { * //- * publisher.publish(data).then((messageId) => {}); */ - publish(data, attributes, callback?) { + publish(data: Buffer, attributes, callback?) { if (!(data instanceof Buffer)) { throw new TypeError('Data must be in the form of a Buffer.'); } diff --git a/src/snapshot.ts b/src/snapshot.ts index 5e46116c2..0434da106 100644 --- a/src/snapshot.ts +++ b/src/snapshot.ts @@ -17,6 +17,7 @@ import * as util from './util'; import {promisifyAll} from '@google-cloud/promisify'; import * as is from 'is'; +import { PubSub } from '.'; /** * A Snapshot object will give you access to your Cloud Pub/Sub snapshot. @@ -84,12 +85,12 @@ import * as is from 'is'; */ export class Snapshot { parent; - name; - Promise; + name: string; + Promise?: PromiseConstructor; create; seek; metadata; - constructor(parent, name) { + constructor(parent, name: string) { if (parent.Promise) { this.Promise = parent.Promise; } @@ -201,7 +202,7 @@ export class Snapshot { * * @private */ - static formatName_(projectId, name) { + static formatName_(projectId: string, name: string) { return 'projects/' + projectId + '/snapshots/' + name.split('/').pop(); } } diff --git a/src/subscriber.ts b/src/subscriber.ts index 43d986298..5ea305776 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -26,6 +26,7 @@ import * as os from 'os'; import {ConnectionPool} from './connection-pool'; import {Histogram} from './histogram'; +import { Subscription } from '.'; /** * @type {number} - The maximum number of ackIds to be sent in acknowledge/modifyAckDeadline @@ -389,7 +390,8 @@ export class Subscriber extends EventEmitter { * @private */ openConnection_() { - const pool = (this.connectionPool = new ConnectionPool(this)); + // TODO: fixup this cast + const pool = (this.connectionPool = new ConnectionPool(this as {} as Subscription)); this.isOpen = true; pool.on('error', err => { this.emit('error', err); @@ -477,7 +479,7 @@ export class Subscriber extends EventEmitter { } // we can ignore any errors that come from this since they'll be // re-emitted later - connection.write(data, err => { + connection!.write(data, err => { if (!err) { this.latency_.add(Date.now() - startTime); } diff --git a/src/subscription.ts b/src/subscription.ts index 8fb23eace..563f16688 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -146,7 +146,7 @@ export class Subscription extends Subscriber { create!: Function; iam: IAM; metadata; - constructor(pubsub, name, options) { + constructor(pubsub: PubSub, name: string, options) { options = options || {}; super(options); if (pubsub.Promise) { @@ -686,7 +686,7 @@ export class Subscription extends Subscriber { * @example * const snapshot = subscription.snapshot('my-snapshot'); */ - snapshot(name) { + snapshot(name: string) { return this.pubsub.snapshot.call(this, name); } /*! @@ -717,7 +717,7 @@ export class Subscription extends Subscriber { * * @private */ - static formatName_(projectId, name) { + static formatName_(projectId: string, name: string) { // Simple check if the name is already formatted. if (name.indexOf('/') > -1) { return name; diff --git a/src/topic.ts b/src/topic.ts index 973022190..f2e060a3b 100644 --- a/src/topic.ts +++ b/src/topic.ts @@ -41,9 +41,9 @@ import { Readable } from 'stream'; export class Topic { Promise?: PromiseConstructor; name: string; - parent; + parent: PubSub; pubsub: PubSub; - request; + request: typeof PubSub.prototype.request; iam: IAM; metadata; getSubscriptionsStream = paginator.streamify('getSubscriptions') as () => Readable; @@ -178,7 +178,7 @@ export class Topic { * const apiResponse = data[1]; * }); */ - createSubscription(name, options, callback?) { + createSubscription(name: string, options, callback?) { this.pubsub.createSubscription(this, name, options, callback); } /** diff --git a/system-test/pubsub.ts b/system-test/pubsub.ts index b28d458c3..83b1000fa 100644 --- a/system-test/pubsub.ts +++ b/system-test/pubsub.ts @@ -17,7 +17,7 @@ import * as assert from 'assert'; import * as async from 'async'; import * as uuid from 'uuid'; -import {PubSub, Subscription} from '../src'; +import {PubSub, Subscription, Topic} from '../src'; const pubsub = new PubSub(); @@ -48,7 +48,7 @@ describe('pubsub', function() { return 'test-topic-' + uuid.v4(); } - function getTopicName(topic) { + function getTopicName(topic: Topic) { return topic.name.split('/').pop(); }