From c34c5f0f6ca6b7ffb2236bea75bbf234601090ef Mon Sep 17 00:00:00 2001 From: Praveen Kumar Singh Date: Fri, 15 Feb 2019 23:31:45 +0530 Subject: [PATCH] noimplicityAny for index.ts,message-queues,message-stream --- package.json | 1 + src/index.ts | 147 +++++++++++++++++++++++++++++++----------- src/message-queues.ts | 20 +++--- src/message-stream.ts | 3 +- src/topic.ts | 43 +++++++----- system-test/pubsub.ts | 8 +-- 6 files changed, 154 insertions(+), 68 deletions(-) diff --git a/package.json b/package.json index 311d8c1cc..2c47a1a82 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,7 @@ }, "repository": "googleapis/nodejs-pubsub", "main": "./build/src/index.js", + "types": "./build/src/index.d.ts", "files": [ "build/protos", "build/src", diff --git a/src/index.ts b/src/index.ts index e9aebf2d5..20d79d14d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -38,13 +38,59 @@ import {Topic, PublishOptions} from './topic'; import {CallOptions} from 'google-gax'; import {Readable} from 'stream'; import {google} from '../proto/pubsub'; -import {ServiceError} from 'grpc'; +import {ServiceError, ChannelCredentials} from 'grpc'; import {FlowControlOptions} from './lease-manager'; import {BatchPublishOptions} from './publisher'; const opts = {} as gax.GrpcClientOptions; const {grpc} = new gax.GrpcClient(opts); +export interface GetTopicMetadataCallback { + (err?: ServiceError|null, res?: google.pubsub.v1.ITopic|null): void; +} + +export interface ClientConfig { + projectId?: string; + keyFilename?: string; + apiEndpoint?: string; + email?: string; + autoRetry?: boolean; + maxRetries?: number; + promise?: PromiseConstructor; + servicePath?: string; + port?: string; + sslCreds?: ChannelCredentials; +} + +// tslint:disable-next-line no-any +type Arguments = [(Error | null)?,(T| null)?,any?]; + +interface Options { + gaxOpts?: CallOptions; + pageSize?: number; + pageToken?: string; + autoPaginate?: boolean; +} + +export interface GetSnapshotsOptions extends Options {} +export interface GetSnapshotsCallback { + (err?: Error|null, snapshots?: Snapshot[]|null, apiResponse?: object): void; +} + +export interface GetSubscriptionsOptions extends Options { + topic?: Topic; + project?: string; +} +export interface GetSubscriptionsCallback { + (err?: Error|null, subscriptions?: Subscription[]|null, + apiResponse?: object): void; +} + +export interface GetTopicsOptions extends Options {} +export interface GetTopicsCallback { + (err?: Error|null, topics?: Topic[]|null, apiResponse?: object): void; +} + export type SeekCallback = RequestCallback; export interface GetSubscriptionMetadataCallback { @@ -77,7 +123,7 @@ export interface SubscriptionCallOptions { /** - * @callback CreateTopicCallback + * @callback CreateSnapshotCallback * @param {?Error} err Request error, if any. * @param {Snapshot} snapshot * @param {object} apiResponse The full API response. @@ -243,7 +289,7 @@ interface GetClientCallback { * Full quickstart example: */ export class PubSub { - options; + options: ClientConfig; isEmulator: boolean; api: {[key: string]: gax.ClientStub}; auth: GoogleAuth; @@ -255,12 +301,12 @@ export class PubSub { getSnapshotsStream = paginator.streamify('getSnapshots') as() => Readable; getTopicsStream = paginator.streamify('getTopics') as() => Readable; - constructor(options?) { + constructor(options?: ClientConfig) { options = options || {}; // Determine what scopes are needed. // It is the union of the scopes on both clients. const clientClasses = [v1.SubscriberClient, v1.PublisherClient]; - const allScopes = {}; + const allScopes: {[key: string]: boolean} = {}; for (const clientClass of clientClasses) { for (const scope of clientClass.scopes) { allScopes[scope] = true; @@ -498,7 +544,7 @@ export class PubSub { const baseUrl = apiEndpoint || process.env.PUBSUB_EMULATOR_HOST; const leadingProtocol = new RegExp('^https*://'); const trailingSlashes = new RegExp('/*$'); - const baseUrlParts = baseUrl.replace(leadingProtocol, '') + const baseUrlParts = baseUrl!.replace(leadingProtocol, '') .replace(trailingSlashes, '') .split(':'); this.options.servicePath = baseUrlParts[0]; @@ -552,12 +598,20 @@ export class PubSub { * const snapshots = data[0]; * }); */ - getSnapshots(options?, callback?) { + getSnapshots(option?: GetSnapshotsOptions): + Promise; + getSnapshots(callback: GetSnapshotsCallback): void; + getSnapshots(option: GetSnapshotsOptions, callback: GetSnapshotsCallback): + void; + getSnapshots( + optionsOrCallback?: GetSnapshotsOptions|GetSnapshotsCallback, + callback?: GetSnapshotsCallback): + void|Promise { const self = this; - if (is.fn(options)) { - callback = options; - options = {}; - } + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : callback; const reqOpts = Object.assign( { project: 'projects/' + this.projectId, @@ -577,17 +631,16 @@ export class PubSub { reqOpts, gaxOpts, }, - // tslint:disable-next-line only-arrow-functions - function() { - const snapshots = arguments[1]; + (...args: Arguments) => { + const snapshots = args[1]; if (snapshots) { - arguments[1] = snapshots.map(snapshot => { - const snapshotInstance = self.snapshot(snapshot.name); + args[1] = snapshots.map((snapshot: Snapshot) => { + const snapshotInstance = self.snapshot(snapshot.name!); snapshotInstance.metadata = snapshot; return snapshotInstance; }); } - callback.apply(null, arguments); + callback!(...args); }); } /** @@ -647,18 +700,28 @@ export class PubSub { * const subscriptions = data[0]; * }); */ - getSubscriptions(options, callback?) { + getSubscriptions(options?: GetSubscriptionsOptions): + Promise; + getSubscriptions(callback: GetSubscriptionsCallback): void; + getSubscriptions( + options: GetSubscriptionsOptions, + callback: GetSubscriptionsCallback): void; + getSubscriptions( + optionsOrCallback?: GetSubscriptionsOptions|GetSubscriptionsCallback, + callback?: GetSubscriptionsCallback): + void|Promise { const self = this; - if (is.fn(options)) { - callback = options; - options = {}; - } + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : callback; + let topic = options.topic; if (topic) { if (!(topic instanceof Topic)) { topic = this.topic(topic); } - return topic.getSubscriptions(options, callback); + return topic.getSubscriptions(options, callback!); } const reqOpts = Object.assign({}, options); reqOpts.project = 'projects/' + this.projectId; @@ -676,17 +739,16 @@ export class PubSub { reqOpts, gaxOpts, }, - // tslint:disable-next-line only-arrow-functions - function() { - const subscriptions = arguments[1]; + (...args: Arguments) => { + const subscriptions = args[1]; if (subscriptions) { - arguments[1] = subscriptions.map(sub => { + args[1] = subscriptions.map((sub: Subscription) => { const subscriptionInstance = self.subscription(sub.name); subscriptionInstance.metadata = sub; return subscriptionInstance; }); } - callback.apply(null, arguments); + callback!(...args); }); } /** @@ -745,12 +807,20 @@ export class PubSub { * const topics = data[0]; * }); */ - getTopics(options, callback?) { + getTopics(options: GetTopicsOptions): + Promise; + getTopics(callback: GetTopicsCallback): void; + getTopics(options: GetTopicsOptions, callback: GetTopicsCallback): void; + getTopics( + optionsOrCallback?: GetTopicsOptions|GetTopicsCallback, + callback?: GetTopicsCallback): + void|Promise { const self = this; - if (is.fn(options)) { - callback = options; - options = {}; - } + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : callback; + const reqOpts = Object.assign( { project: 'projects/' + this.projectId, @@ -770,17 +840,16 @@ export class PubSub { reqOpts, gaxOpts, }, - // tslint:disable-next-line only-arrow-functions - function() { - const topics = arguments[1]; + (...args: Arguments) => { + const topics = args[1]; if (topics) { - arguments[1] = topics.map(topic => { + args[1] = topics.map((topic: Topic) => { const topicInstance = self.topic(topic.name); topicInstance.metadata = topic; return topicInstance; }); } - callback.apply(null, arguments); + callback!(...args); }); } /** @@ -892,7 +961,7 @@ export class PubSub { * // message.publishTime = Date when Pub/Sub received the message. * }); */ - subscription(name: string, options?) { + subscription(name: string, options?: SubscriptionCallOptions) { if (!name) { throw new Error('A name must be specified for a subscription.'); } diff --git a/src/message-queues.ts b/src/message-queues.ts index da0f9a0d6..9801213f9 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -158,7 +158,7 @@ export abstract class MessageQueue { * @param {BatchOptions} options Batching options. * @private */ - setOptions(options): void { + setOptions(options: BatchOptions): void { const defaults: BatchOptions = {maxMessages: 3000, maxMilliseconds: 100}; this._options = Object.assign(defaults, options); @@ -212,14 +212,16 @@ export class ModAckQueue extends MessageQueue { protected async _sendBatch(batch: QueuedMessages): Promise { const client = await this._subscriber.getClient(); const subscription = this._subscriber.name; - const modAckTable = batch.reduce((table, [ackId, deadline]) => { - if (!table[deadline!]) { - table[deadline!] = []; - } - - table[deadline!].push(ackId); - return table; - }, {}); + const modAckTable: {[index: string]: string[]} = batch.reduce( + (table: {[index: string]: string[]}, [ackId, deadline]) => { + if (!table[deadline!]) { + table[deadline!] = []; + } + + table[deadline!].push(ackId); + return table; + }, + {}); const modAckRequests = Object.keys(modAckTable).map(async (deadline) => { const ackIds = modAckTable[deadline]; diff --git a/src/message-stream.ts b/src/message-stream.ts index 58ec3c542..f72f58409 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -15,6 +15,7 @@ */ import {promisify} from '@google-cloud/promisify'; +import {Gaxios} from 'gaxios'; import {ClientStub} from 'google-gax'; import {ClientDuplexStream, Metadata, ServiceError, status, StatusObject} from 'grpc'; import * as isStreamEnded from 'is-stream-ended'; @@ -224,7 +225,7 @@ export class MessageStream extends PassThrough { * @returns {Promise} */ private async _fillStreamPool(): Promise { - let client; + let client!: ClientStub; try { client = await this._getClient(); diff --git a/src/topic.ts b/src/topic.ts index 7eda9d188..a0eedd6e3 100644 --- a/src/topic.ts +++ b/src/topic.ts @@ -22,7 +22,7 @@ import {Readable} from 'stream'; import {google} from '../proto/pubsub'; -import {CreateSubscriptionCallback, CreateSubscriptionOptions, CreateSubscriptionResponse, CreateTopicCallback, CreateTopicResponse, ExistsCallback, GetCallOptions, Metadata, PubSub, RequestCallback, SubscriptionCallOptions} from '.'; +import {Attributes, CreateSubscriptionCallback, CreateSubscriptionOptions, CreateSubscriptionResponse, CreateTopicCallback, CreateTopicResponse, ExistsCallback, GetCallOptions, GetTopicMetadataCallback, Metadata, PubSub, RequestCallback, SubscriptionCallOptions} from '.'; import {IAM} from './iam'; import {PublishCallback, Publisher, PublishOptions} from './publisher'; import {Subscription} from './subscription'; @@ -423,14 +423,12 @@ export class Topic { * const apiResponse = data[0]; * }); */ - getMetadata(callback: RequestCallback): void; - getMetadata( - gaxOpts: CallOptions, - callback: RequestCallback): void; + getMetadata(callback: GetTopicMetadataCallback): void; + getMetadata(gaxOpts: CallOptions, callback: GetTopicMetadataCallback): void; getMetadata(gaxOpts?: CallOptions): Promise; getMetadata( - gaxOptsOrCallback?: CallOptions|RequestCallback, - callback?: RequestCallback): + gaxOptsOrCallback?: CallOptions|GetTopicMetadataCallback, + callback?: GetTopicMetadataCallback): void|Promise { const gaxOpts = typeof gaxOptsOrCallback === 'object' ? gaxOptsOrCallback : {}; @@ -580,11 +578,19 @@ export class Topic { * //- * topic.publish(data).then((messageId) => {}); */ - publish(data: Buffer, attributes?: object): Promise; + publish(data: Buffer, attributes?: Attributes): Promise; publish(data: Buffer, callback: PublishCallback): void; - publish(data: Buffer, attributes: object, callback: PublishCallback): void; - publish(data: Buffer, attributes?, callback?): Promise|void { - return this.publisher.publish(data, attributes, callback); + publish(data: Buffer, attributes: Attributes, callback: PublishCallback): + void; + publish( + data: Buffer, attributesOrCallback?: Attributes|PublishCallback, + callback?: PublishCallback): Promise|void { + const attributes = + typeof attributesOrCallback === 'object' ? attributesOrCallback : {}; + callback = typeof attributesOrCallback === 'function' ? + attributesOrCallback : + callback; + return this.publisher.publish(data, attributes, callback!); } /** * Publish the provided JSON. It should be noted that all messages published @@ -634,17 +640,24 @@ export class Topic { * //- * topic.publishJSON(data).then((messageId) => {}); */ - publishJSON(json: object, attributes?: object): Promise; + publishJSON(json: object, attributes?: Attributes): Promise; publishJSON(json: object, callback: PublishCallback): void; - publishJSON(json: object, attributes: object, callback: PublishCallback): + publishJSON(json: object, attributes: Attributes, callback: PublishCallback): void; - publishJSON(json: object, attributes?, callback?): Promise|void { + publishJSON( + json: object, attributesOrCallback?: Attributes|PublishCallback, + callback?: PublishCallback): Promise|void { if (!is.object(json)) { throw new Error('First parameter should be an object.'); } + const attributes = + typeof attributesOrCallback === 'object' ? attributesOrCallback : {}; + callback = typeof attributesOrCallback === 'function' ? + attributesOrCallback : + callback; const data = Buffer.from(JSON.stringify(json)); - return this.publish(data, attributes, callback); + return this.publish(data, attributes, callback!); } /** * Set the publisher options. diff --git a/system-test/pubsub.ts b/system-test/pubsub.ts index 583127efd..2c820ca67 100644 --- a/system-test/pubsub.ts +++ b/system-test/pubsub.ts @@ -81,7 +81,7 @@ describe('pubsub', () => { pubsub.getTopics((err, topics) => { assert.ifError(err); - const results = topics.filter(topic => { + const results = topics!.filter(topic => { const name = getTopicName(topic); return TOPIC_FULL_NAMES.indexOf(name) !== -1; }); @@ -123,7 +123,7 @@ describe('pubsub', () => { }, (err, topics) => { assert.ifError(err); - assert.strictEqual(topics.length, TOPIC_NAMES.length - 1); + assert.strictEqual(topics!.length, TOPIC_NAMES.length - 1); done(); }); }); @@ -616,8 +616,8 @@ describe('pubsub', () => { it('should get a list of snapshots', done => { pubsub.getSnapshots((err, snapshots) => { assert.ifError(err); - assert.strictEqual(snapshots.length, 1); - assert.strictEqual(snapshots[0].name.split('/').pop(), SNAPSHOT_NAME); + assert.strictEqual(snapshots!.length, 1); + assert.strictEqual(snapshots![0].name.split('/').pop(), SNAPSHOT_NAME); done(); }); });