Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(typescript): noImplicityAny for index.ts #482

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
},
"repository": "googleapis/nodejs-pubsub",
"main": "./build/src/index.js",
"types": "./build/src/index.d.ts",
"files": [
"build/protos",
"build/src",
Expand Down
147 changes: 108 additions & 39 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,59 @@ import {Topic, PublishOptions} from './topic';
import {CallOptions} from 'google-gax';
import {Readable} from 'stream';
import {google} from '../proto/pubsub';
import {ServiceError} from 'grpc';
import {ServiceError, ChannelCredentials} from 'grpc';
import {FlowControlOptions} from './lease-manager';
import {BatchPublishOptions} from './publisher';

const opts = {} as gax.GrpcClientOptions;
const {grpc} = new gax.GrpcClient(opts);

export interface GetTopicMetadataCallback {
(err?: ServiceError|null, res?: google.pubsub.v1.ITopic|null): void;
}

export interface ClientConfig {
projectId?: string;
keyFilename?: string;
apiEndpoint?: string;
email?: string;
autoRetry?: boolean;
maxRetries?: number;
promise?: PromiseConstructor;
servicePath?: string;
port?: string;
sslCreds?: ChannelCredentials;
}

// tslint:disable-next-line no-any
type Arguments<T> = [(Error | null)?,(T| null)?,any?];

interface Options {
gaxOpts?: CallOptions;
pageSize?: number;
pageToken?: string;
autoPaginate?: boolean;
}

export interface GetSnapshotsOptions extends Options {}
export interface GetSnapshotsCallback {
(err?: Error|null, snapshots?: Snapshot[]|null, apiResponse?: object): void;
}

export interface GetSubscriptionsOptions extends Options {
topic?: Topic;
project?: string;
}
export interface GetSubscriptionsCallback {
(err?: Error|null, subscriptions?: Subscription[]|null,
apiResponse?: object): void;
}

export interface GetTopicsOptions extends Options {}
export interface GetTopicsCallback {
(err?: Error|null, topics?: Topic[]|null, apiResponse?: object): void;
}

export type SeekCallback = RequestCallback<google.pubsub.v1.ISeekResponse>;

export interface GetSubscriptionMetadataCallback {
Expand Down Expand Up @@ -77,7 +123,7 @@ export interface SubscriptionCallOptions {


/**
* @callback CreateTopicCallback
* @callback CreateSnapshotCallback
* @param {?Error} err Request error, if any.
* @param {Snapshot} snapshot
* @param {object} apiResponse The full API response.
Expand Down Expand Up @@ -243,7 +289,7 @@ interface GetClientCallback {
* Full quickstart example:
*/
export class PubSub {
options;
options: ClientConfig;
isEmulator: boolean;
api: {[key: string]: gax.ClientStub};
auth: GoogleAuth;
Expand All @@ -255,12 +301,12 @@ export class PubSub {
getSnapshotsStream = paginator.streamify('getSnapshots') as() => Readable;
getTopicsStream = paginator.streamify('getTopics') as() => Readable;

constructor(options?) {
constructor(options?: ClientConfig) {
options = options || {};
// Determine what scopes are needed.
// It is the union of the scopes on both clients.
const clientClasses = [v1.SubscriberClient, v1.PublisherClient];
const allScopes = {};
const allScopes: {[key: string]: boolean} = {};
for (const clientClass of clientClasses) {
for (const scope of clientClass.scopes) {
allScopes[scope] = true;
Expand Down Expand Up @@ -498,7 +544,7 @@ export class PubSub {
const baseUrl = apiEndpoint || process.env.PUBSUB_EMULATOR_HOST;
const leadingProtocol = new RegExp('^https*://');
const trailingSlashes = new RegExp('/*$');
const baseUrlParts = baseUrl.replace(leadingProtocol, '')
const baseUrlParts = baseUrl!.replace(leadingProtocol, '')
.replace(trailingSlashes, '')
.split(':');
this.options.servicePath = baseUrlParts[0];
Expand Down Expand Up @@ -552,12 +598,20 @@ export class PubSub {
* const snapshots = data[0];
* });
*/
getSnapshots(options?, callback?) {
getSnapshots(option?: GetSnapshotsOptions):
Promise<google.pubsub.v1.IListSnapshotsResponse>;
getSnapshots(callback: GetSnapshotsCallback): void;
getSnapshots(option: GetSnapshotsOptions, callback: GetSnapshotsCallback):
void;
getSnapshots(
optionsOrCallback?: GetSnapshotsOptions|GetSnapshotsCallback,
callback?: GetSnapshotsCallback):
void|Promise<google.pubsub.v1.IListSnapshotsResponse> {
const self = this;
if (is.fn(options)) {
callback = options;
options = {};
}
const options =
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : callback;
const reqOpts = Object.assign(
{
project: 'projects/' + this.projectId,
Expand All @@ -577,17 +631,16 @@ export class PubSub {
reqOpts,
gaxOpts,
},
// tslint:disable-next-line only-arrow-functions
function() {
const snapshots = arguments[1];
(...args: Arguments<Snapshot[]>) => {
const snapshots = args[1];
if (snapshots) {
arguments[1] = snapshots.map(snapshot => {
const snapshotInstance = self.snapshot(snapshot.name);
args[1] = snapshots.map((snapshot: Snapshot) => {
const snapshotInstance = self.snapshot(snapshot.name!);
snapshotInstance.metadata = snapshot;
return snapshotInstance;
});
}
callback.apply(null, arguments);
callback!(...args);
});
}
/**
Expand Down Expand Up @@ -647,18 +700,28 @@ export class PubSub {
* const subscriptions = data[0];
* });
*/
getSubscriptions(options, callback?) {
getSubscriptions(options?: GetSubscriptionsOptions):
Promise<google.pubsub.v1.IListSubscriptionsResponse>;
getSubscriptions(callback: GetSubscriptionsCallback): void;
getSubscriptions(
options: GetSubscriptionsOptions,
callback: GetSubscriptionsCallback): void;
getSubscriptions(
optionsOrCallback?: GetSubscriptionsOptions|GetSubscriptionsCallback,
callback?: GetSubscriptionsCallback):
void|Promise<google.pubsub.v1.IListSubscriptionsResponse> {
const self = this;
if (is.fn(options)) {
callback = options;
options = {};
}
const options =
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : callback;

let topic = options.topic;
if (topic) {
if (!(topic instanceof Topic)) {
topic = this.topic(topic);
}
return topic.getSubscriptions(options, callback);
return topic.getSubscriptions(options, callback!);
}
const reqOpts = Object.assign({}, options);
reqOpts.project = 'projects/' + this.projectId;
Expand All @@ -676,17 +739,16 @@ export class PubSub {
reqOpts,
gaxOpts,
},
// tslint:disable-next-line only-arrow-functions
function() {
const subscriptions = arguments[1];
(...args: Arguments<Subscription[]>) => {
const subscriptions = args[1];
if (subscriptions) {
arguments[1] = subscriptions.map(sub => {
args[1] = subscriptions.map((sub: Subscription) => {
const subscriptionInstance = self.subscription(sub.name);
subscriptionInstance.metadata = sub;
return subscriptionInstance;
});
}
callback.apply(null, arguments);
callback!(...args);
});
}
/**
Expand Down Expand Up @@ -745,12 +807,20 @@ export class PubSub {
* const topics = data[0];
* });
*/
getTopics(options, callback?) {
getTopics(options: GetTopicsOptions):
Promise<google.pubsub.v1.IListTopicsResponse>;
getTopics(callback: GetTopicsCallback): void;
getTopics(options: GetTopicsOptions, callback: GetTopicsCallback): void;
getTopics(
optionsOrCallback?: GetTopicsOptions|GetTopicsCallback,
callback?: GetTopicsCallback):
void|Promise<google.pubsub.v1.IListTopicsResponse> {
const self = this;
if (is.fn(options)) {
callback = options;
options = {};
}
const options =
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : callback;

const reqOpts = Object.assign(
{
project: 'projects/' + this.projectId,
Expand All @@ -770,17 +840,16 @@ export class PubSub {
reqOpts,
gaxOpts,
},
// tslint:disable-next-line only-arrow-functions
function() {
const topics = arguments[1];
(...args: Arguments<Topic[]>) => {
const topics = args[1];
if (topics) {
arguments[1] = topics.map(topic => {
args[1] = topics.map((topic: Topic) => {
const topicInstance = self.topic(topic.name);
topicInstance.metadata = topic;
return topicInstance;
});
}
callback.apply(null, arguments);
callback!(...args);
});
}
/**
Expand Down Expand Up @@ -892,7 +961,7 @@ export class PubSub {
* // message.publishTime = Date when Pub/Sub received the message.
* });
*/
subscription(name: string, options?) {
subscription(name: string, options?: SubscriptionCallOptions) {
if (!name) {
throw new Error('A name must be specified for a subscription.');
}
Expand Down
20 changes: 11 additions & 9 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ export abstract class MessageQueue {
* @param {BatchOptions} options Batching options.
* @private
*/
setOptions(options): void {
setOptions(options: BatchOptions): void {
const defaults: BatchOptions = {maxMessages: 3000, maxMilliseconds: 100};

this._options = Object.assign(defaults, options);
Expand Down Expand Up @@ -212,14 +212,16 @@ export class ModAckQueue extends MessageQueue {
protected async _sendBatch(batch: QueuedMessages): Promise<void> {
const client = await this._subscriber.getClient();
const subscription = this._subscriber.name;
const modAckTable = batch.reduce((table, [ackId, deadline]) => {
if (!table[deadline!]) {
table[deadline!] = [];
}

table[deadline!].push(ackId);
return table;
}, {});
const modAckTable: {[index: string]: string[]} = batch.reduce(
(table: {[index: string]: string[]}, [ackId, deadline]) => {
if (!table[deadline!]) {
table[deadline!] = [];
}

table[deadline!].push(ackId);
return table;
},
{});

const modAckRequests = Object.keys(modAckTable).map(async (deadline) => {
const ackIds = modAckTable[deadline];
Expand Down
3 changes: 2 additions & 1 deletion src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import {promisify} from '@google-cloud/promisify';
import {Gaxios} from 'gaxios';
import {ClientStub} from 'google-gax';
import {ClientDuplexStream, Metadata, ServiceError, status, StatusObject} from 'grpc';
import * as isStreamEnded from 'is-stream-ended';
Expand Down Expand Up @@ -224,7 +225,7 @@ export class MessageStream extends PassThrough {
* @returns {Promise}
*/
private async _fillStreamPool(): Promise<void> {
let client;
let client!: ClientStub;

try {
client = await this._getClient();
Expand Down
Loading