diff --git a/src/index.ts b/src/index.ts index 101d9b371..a6b7771b4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -45,6 +45,8 @@ import {BatchPublishOptions} from './publisher'; const opts = {} as gax.GrpcClientOptions; const {grpc} = new gax.GrpcClient(opts); +export type SeekCallback = RequestCallback; + export interface GetSubscriptionMetadataCallback { (err: ServiceError|null, res?: google.pubsub.v1.Subscription|null): void; } diff --git a/src/publisher.ts b/src/publisher.ts index e600beb63..3ffe3d5f8 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -24,15 +24,22 @@ import * as extend from 'extend'; import * as is from 'is'; import {Topic} from './topic'; import {RequestCallback, Attributes} from '.'; +import {ServiceError} from 'grpc'; interface Inventory { - callbacks: Array>; - queued: Array<{}>; + callbacks: QueueCallback[]; + queued: google.pubsub.v1.IPubsubMessage[]; bytes: number; } export interface PublishCallback { - (err?: null|Error, messageId?: string|null): void; + (err: ServiceError, messageId?: null): void; + (err: null, messageId: string): void; +} + +interface QueueCallback { + (err: ServiceError, res?: null): void; + (err: null, res: string): void; } /** @@ -284,9 +291,8 @@ export class Publisher { * @param {function} callback The callback function. */ queue_(data: Buffer, attrs: Attributes): Promise; - queue_(data: Buffer, attrs: Attributes, callback: RequestCallback): - void; - queue_(data: Buffer, attrs: Attributes, callback?: RequestCallback): + queue_(data: Buffer, attrs: Attributes, callback: QueueCallback): void; + queue_(data: Buffer, attrs: Attributes, callback?: QueueCallback): void|Promise { this.inventory_.queued.push({ data, diff --git a/src/snapshot.ts b/src/snapshot.ts index 2562b8f48..57e4c6bd1 100644 --- a/src/snapshot.ts +++ b/src/snapshot.ts @@ -16,8 +16,10 @@ import {promisifyAll} from '@google-cloud/promisify'; import {CallOptions} from 'google-gax'; + import {google} from '../proto/pubsub'; -import {CreateSnapshotCallback, CreateSnapshotResponse, RequestCallback, Subscription} from '.'; + +import {CreateSnapshotCallback, CreateSnapshotResponse, RequestCallback, SeekCallback, Subscription} from '.'; import {PubSub} from './index'; import * as util from './util'; @@ -126,14 +128,13 @@ export class Snapshot { snapshot: this.name, }; callback = callback || util.noop; - (this.parent as PubSub) - .request( - { - client: 'SubscriberClient', - method: 'deleteSnapshot', - reqOpts, - }, - callback); + this.parent.request( + { + client: 'SubscriberClient', + method: 'deleteSnapshot', + reqOpts, + }, + callback); } /*@ * Format the name of a snapshot. A snapshot's full name is in the format of @@ -153,23 +154,21 @@ export class Snapshot { gaxOpts?: CallOptions|CreateSnapshotCallback, callback?: CreateSnapshotCallback): void|Promise { if (!(this.parent instanceof Subscription)) { - throw new Error(`Subscription#snapshot`); + throw new Error( + `This is only available if you accessed this object through Subscription#snapshot`); } return (this.parent as Subscription) .createSnapshot(this.name, gaxOpts! as CallOptions, callback!); } seek(gaxOpts?: CallOptions): Promise; - seek(callback: google.pubsub.v1.Subscriber.SeekCallback): void; - seek( - gaxOpts: CallOptions, - callback: google.pubsub.v1.Subscriber.SeekCallback): void; - seek( - gaxOpts?: CallOptions|google.pubsub.v1.Subscriber.SeekCallback, - callback?: google.pubsub.v1.Subscriber.SeekCallback): + seek(callback: SeekCallback): void; + seek(gaxOpts: CallOptions, callback: SeekCallback): void; + seek(gaxOpts?: CallOptions|SeekCallback, callback?: SeekCallback): void|Promise { if (!(this.parent instanceof Subscription)) { - throw new Error(`Subscription#snapshot`); + throw new Error( + `This is only available if you accessed this object through Subscription#snapshot`); } return (this.parent as Subscription) .seek(this.name, gaxOpts! as CallOptions, callback!); diff --git a/src/subscription.ts b/src/subscription.ts index 61fe4d8e1..3d41bc479 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -20,8 +20,10 @@ import * as extend from 'extend'; import {CallOptions} from 'google-gax'; import * as is from 'is'; import * as snakeCase from 'lodash.snakecase'; + import {google} from '../proto/pubsub'; -import {CreateSnapshotCallback, CreateSnapshotResponse, CreateSubscriptionCallback, CreateSubscriptionResponse, ExistsCallback, GetCallOptions, GetSubscriptionMetadataCallback, Metadata, PubSub, RequestCallback, SubscriptionCallOptions} from '.'; + +import {CreateSnapshotCallback, CreateSnapshotResponse, CreateSubscriptionCallback, CreateSubscriptionResponse, ExistsCallback, GetCallOptions, GetSubscriptionMetadataCallback, Metadata, PubSub, RequestCallback, SeekCallback, SubscriptionCallOptions} from '.'; import {IAM} from './iam'; import {Snapshot} from './snapshot'; import {Subscriber, SubscriberOptions} from './subscriber'; @@ -201,7 +203,7 @@ export class Subscription extends EventEmitter { name: string; metadata: Metadata; - request: Function; + request: typeof PubSub.prototype.request; private _subscriber: Subscriber; constructor(pubsub: PubSub, name: string, options?: SubscriptionCallOptions) { super(); @@ -209,7 +211,8 @@ export class Subscription extends EventEmitter { options = options || {}; this.pubsub = pubsub; - this.request = pubsub.request.bind(pubsub); + // tslint:disable-next-line no-any + this.request = pubsub.request.bind(pubsub) as any; this.name = Subscription.formatName_(this.projectId, name); if (options.topic) { @@ -365,20 +368,20 @@ export class Subscription extends EventEmitter { name: snapshot.name, subscription: this.name, }; - this.request( + this.request( { client: 'SubscriberClient', method: 'createSnapshot', reqOpts, gaxOpts, }, - (err: Error, resp: google.pubsub.v1.Snapshot) => { + (err, resp) => { if (err) { - callback!(err, null, resp); + callback!(err, null, resp!); return; } - snapshot.metadata = resp; - callback!(null, snapshot, resp); + snapshot.metadata = resp!; + callback!(null, snapshot, resp!); }); } /** @@ -610,18 +613,18 @@ export class Subscription extends EventEmitter { const reqOpts = { subscription: this.name, }; - this.request( + this.request( { client: 'SubscriberClient', method: 'getSubscription', reqOpts, gaxOpts, }, - (err: Error, apiResponse: google.pubsub.v1.Subscription) => { + (err, apiResponse) => { if (!err) { this.metadata = apiResponse; } - callback!(err, apiResponse); + callback!(err!, apiResponse); }); } /** @@ -700,7 +703,7 @@ export class Subscription extends EventEmitter { reqOpts, gaxOpts, }, - callback); + callback!); } /** * Opens the Subscription to receive messages. In general this method @@ -767,17 +770,12 @@ export class Subscription extends EventEmitter { */ seek(snapshot: string|Date, gaxOpts?: CallOptions): Promise; + seek(snapshot: string|Date, callback: SeekCallback): void; + seek(snapshot: string|Date, gaxOpts: CallOptions, callback: SeekCallback): + void; seek( - snapshot: string|Date, - callback: google.pubsub.v1.Subscriber.SeekCallback): void; - seek( - snapshot: string|Date, gaxOpts: CallOptions, - callback: google.pubsub.v1.Subscriber.SeekCallback): void; - seek( - snapshot: string|Date, - gaxOptsOrCallback?: CallOptions|google.pubsub.v1.Subscriber.SeekCallback, - callback?: google.pubsub.v1.Subscriber.SeekCallback): - void|Promise { + snapshot: string|Date, gaxOptsOrCallback?: CallOptions|SeekCallback, + callback?: SeekCallback): void|Promise { const gaxOpts = typeof gaxOptsOrCallback === 'object' ? gaxOptsOrCallback : {}; callback = @@ -809,7 +807,7 @@ export class Subscription extends EventEmitter { reqOpts, gaxOpts, }, - callback); + callback!); } /** * @typedef {array} SetSubscriptionMetadataResponse @@ -882,7 +880,7 @@ export class Subscription extends EventEmitter { reqOpts, gaxOpts, }, - callback); + callback!); } /** * Sets the Subscription options. diff --git a/test/snapshot.ts b/test/snapshot.ts index 4ca24cd12..edc63a530 100644 --- a/test/snapshot.ts +++ b/test/snapshot.ts @@ -155,11 +155,15 @@ describe('Snapshot', () => { }); it('should throw on create method', () => { - assert.throws(() => snapshot.create(), /Subscription#snapshot/); + assert.throws( + () => snapshot.create(), + /This is only available if you accessed this object through Subscription#snapshot/); }); it('should throw on seek method', () => { - assert.throws(() => snapshot.seek(), /Subscription#snapshot/); + assert.throws( + () => snapshot.seek(), + /This is only available if you accessed this object through Subscription#snapshot/); }); }); });