Skip to content

Commit

Permalink
noImplicitAny for Snapshot and Publisher, done change request - 02
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveen Kumar Singh committed Feb 5, 2019
1 parent 503b1f2 commit ad83a2c
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 50 deletions.
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import {BatchPublishOptions} from './publisher';
const opts = {} as gax.GrpcClientOptions;
const {grpc} = new gax.GrpcClient(opts);

export type SeekCallback = RequestCallback<google.pubsub.v1.ISeekResponse>;

export interface GetSubscriptionMetadataCallback {
(err: ServiceError|null, res?: google.pubsub.v1.Subscription|null): void;
}
Expand Down
18 changes: 12 additions & 6 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,22 @@ import * as extend from 'extend';
import * as is from 'is';
import {Topic} from './topic';
import {RequestCallback, Attributes} from '.';
import {ServiceError} from 'grpc';

interface Inventory {
callbacks: Array<RequestCallback<string>>;
queued: Array<{}>;
callbacks: QueueCallback[];
queued: google.pubsub.v1.IPubsubMessage[];
bytes: number;
}

export interface PublishCallback {
(err?: null|Error, messageId?: string|null): void;
(err: ServiceError, messageId?: null): void;
(err: null, messageId: string): void;
}

interface QueueCallback {
(err: ServiceError, res?: null): void;
(err: null, res: string): void;
}

/**
Expand Down Expand Up @@ -284,9 +291,8 @@ export class Publisher {
* @param {function} callback The callback function.
*/
queue_(data: Buffer, attrs: Attributes): Promise<string>;
queue_(data: Buffer, attrs: Attributes, callback: RequestCallback<string>):
void;
queue_(data: Buffer, attrs: Attributes, callback?: RequestCallback<string>):
queue_(data: Buffer, attrs: Attributes, callback: QueueCallback): void;
queue_(data: Buffer, attrs: Attributes, callback?: QueueCallback):
void|Promise<string> {
this.inventory_.queued.push({
data,
Expand Down
35 changes: 17 additions & 18 deletions src/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

import {promisifyAll} from '@google-cloud/promisify';
import {CallOptions} from 'google-gax';

import {google} from '../proto/pubsub';
import {CreateSnapshotCallback, CreateSnapshotResponse, RequestCallback, Subscription} from '.';

import {CreateSnapshotCallback, CreateSnapshotResponse, RequestCallback, SeekCallback, Subscription} from '.';
import {PubSub} from './index';
import * as util from './util';

Expand Down Expand Up @@ -126,14 +128,13 @@ export class Snapshot {
snapshot: this.name,
};
callback = callback || util.noop;
(this.parent as PubSub)
.request<google.protobuf.Empty>(
{
client: 'SubscriberClient',
method: 'deleteSnapshot',
reqOpts,
},
callback);
this.parent.request<google.protobuf.Empty>(
{
client: 'SubscriberClient',
method: 'deleteSnapshot',
reqOpts,
},
callback);
}
/*@
* Format the name of a snapshot. A snapshot's full name is in the format of
Expand All @@ -153,23 +154,21 @@ export class Snapshot {
gaxOpts?: CallOptions|CreateSnapshotCallback,
callback?: CreateSnapshotCallback): void|Promise<CreateSnapshotResponse> {
if (!(this.parent instanceof Subscription)) {
throw new Error(`Subscription#snapshot`);
throw new Error(
`This is only available if you accessed this object through Subscription#snapshot`);
}
return (this.parent as Subscription)
.createSnapshot(this.name, gaxOpts! as CallOptions, callback!);
}

seek(gaxOpts?: CallOptions): Promise<google.pubsub.v1.SeekResponse>;
seek(callback: google.pubsub.v1.Subscriber.SeekCallback): void;
seek(
gaxOpts: CallOptions,
callback: google.pubsub.v1.Subscriber.SeekCallback): void;
seek(
gaxOpts?: CallOptions|google.pubsub.v1.Subscriber.SeekCallback,
callback?: google.pubsub.v1.Subscriber.SeekCallback):
seek(callback: SeekCallback): void;
seek(gaxOpts: CallOptions, callback: SeekCallback): void;
seek(gaxOpts?: CallOptions|SeekCallback, callback?: SeekCallback):
void|Promise<google.pubsub.v1.SeekResponse> {
if (!(this.parent instanceof Subscription)) {
throw new Error(`Subscription#snapshot`);
throw new Error(
`This is only available if you accessed this object through Subscription#snapshot`);
}
return (this.parent as Subscription)
.seek(this.name, gaxOpts! as CallOptions, callback!);
Expand Down
46 changes: 22 additions & 24 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import * as extend from 'extend';
import {CallOptions} from 'google-gax';
import * as is from 'is';
import * as snakeCase from 'lodash.snakecase';

import {google} from '../proto/pubsub';
import {CreateSnapshotCallback, CreateSnapshotResponse, CreateSubscriptionCallback, CreateSubscriptionResponse, ExistsCallback, GetCallOptions, GetSubscriptionMetadataCallback, Metadata, PubSub, RequestCallback, SubscriptionCallOptions} from '.';

import {CreateSnapshotCallback, CreateSnapshotResponse, CreateSubscriptionCallback, CreateSubscriptionResponse, ExistsCallback, GetCallOptions, GetSubscriptionMetadataCallback, Metadata, PubSub, RequestCallback, SeekCallback, SubscriptionCallOptions} from '.';
import {IAM} from './iam';
import {Snapshot} from './snapshot';
import {Subscriber, SubscriberOptions} from './subscriber';
Expand Down Expand Up @@ -201,15 +203,16 @@ export class Subscription extends EventEmitter {
name: string;

metadata: Metadata;
request: Function;
request: typeof PubSub.prototype.request;
private _subscriber: Subscriber;
constructor(pubsub: PubSub, name: string, options?: SubscriptionCallOptions) {
super();

options = options || {};

this.pubsub = pubsub;
this.request = pubsub.request.bind(pubsub);
// tslint:disable-next-line no-any
this.request = pubsub.request.bind(pubsub) as any;
this.name = Subscription.formatName_(this.projectId, name);

if (options.topic) {
Expand Down Expand Up @@ -365,20 +368,20 @@ export class Subscription extends EventEmitter {
name: snapshot.name,
subscription: this.name,
};
this.request(
this.request<google.pubsub.v1.Snapshot>(
{
client: 'SubscriberClient',
method: 'createSnapshot',
reqOpts,
gaxOpts,
},
(err: Error, resp: google.pubsub.v1.Snapshot) => {
(err, resp) => {
if (err) {
callback!(err, null, resp);
callback!(err, null, resp!);
return;
}
snapshot.metadata = resp;
callback!(null, snapshot, resp);
snapshot.metadata = resp!;
callback!(null, snapshot, resp!);
});
}
/**
Expand Down Expand Up @@ -610,18 +613,18 @@ export class Subscription extends EventEmitter {
const reqOpts = {
subscription: this.name,
};
this.request(
this.request<google.pubsub.v1.Subscription>(
{
client: 'SubscriberClient',
method: 'getSubscription',
reqOpts,
gaxOpts,
},
(err: Error, apiResponse: google.pubsub.v1.Subscription) => {
(err, apiResponse) => {
if (!err) {
this.metadata = apiResponse;
}
callback!(err, apiResponse);
callback!(err!, apiResponse);
});
}
/**
Expand Down Expand Up @@ -700,7 +703,7 @@ export class Subscription extends EventEmitter {
reqOpts,
gaxOpts,
},
callback);
callback!);
}
/**
* Opens the Subscription to receive messages. In general this method
Expand Down Expand Up @@ -767,17 +770,12 @@ export class Subscription extends EventEmitter {
*/
seek(snapshot: string|Date, gaxOpts?: CallOptions):
Promise<google.pubsub.v1.SeekResponse>;
seek(snapshot: string|Date, callback: SeekCallback): void;
seek(snapshot: string|Date, gaxOpts: CallOptions, callback: SeekCallback):
void;
seek(
snapshot: string|Date,
callback: google.pubsub.v1.Subscriber.SeekCallback): void;
seek(
snapshot: string|Date, gaxOpts: CallOptions,
callback: google.pubsub.v1.Subscriber.SeekCallback): void;
seek(
snapshot: string|Date,
gaxOptsOrCallback?: CallOptions|google.pubsub.v1.Subscriber.SeekCallback,
callback?: google.pubsub.v1.Subscriber.SeekCallback):
void|Promise<google.pubsub.v1.SeekResponse> {
snapshot: string|Date, gaxOptsOrCallback?: CallOptions|SeekCallback,
callback?: SeekCallback): void|Promise<google.pubsub.v1.SeekResponse> {
const gaxOpts =
typeof gaxOptsOrCallback === 'object' ? gaxOptsOrCallback : {};
callback =
Expand Down Expand Up @@ -809,7 +807,7 @@ export class Subscription extends EventEmitter {
reqOpts,
gaxOpts,
},
callback);
callback!);
}
/**
* @typedef {array} SetSubscriptionMetadataResponse
Expand Down Expand Up @@ -882,7 +880,7 @@ export class Subscription extends EventEmitter {
reqOpts,
gaxOpts,
},
callback);
callback!);
}
/**
* Sets the Subscription options.
Expand Down
8 changes: 6 additions & 2 deletions test/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,15 @@ describe('Snapshot', () => {
});

it('should throw on create method', () => {
assert.throws(() => snapshot.create(), /Subscription#snapshot/);
assert.throws(
() => snapshot.create(),
/This is only available if you accessed this object through Subscription#snapshot/);
});

it('should throw on seek method', () => {
assert.throws(() => snapshot.seek(), /Subscription#snapshot/);
assert.throws(
() => snapshot.seek(),
/This is only available if you accessed this object through Subscription#snapshot/);
});
});
});
Expand Down

0 comments on commit ad83a2c

Please sign in to comment.