Skip to content

Commit

Permalink
Add documentation and error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
murgatroid99 committed Aug 29, 2019
1 parent fb2e763 commit 01977e6
Show file tree
Hide file tree
Showing 17 changed files with 759 additions and 174 deletions.
101 changes: 101 additions & 0 deletions packages/grpc-js/src/backoff-timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

const INITIAL_BACKOFF_MS = 1000;
const BACKOFF_MULTIPLIER = 1.6;
const MAX_BACKOFF_MS = 120000;
const BACKOFF_JITTER = 0.2;

/**
* Get a number uniformly at random in the range [min, max)
* @param min
* @param max
*/
function uniformRandom(min: number, max: number) {
return Math.random() * (max - min) + min;
}

export interface BackoffOptions {
initialDelay?: number;
multiplier?: number;
jitter?: number;
maxDelay?: number;
}

export class BackoffTimeout {
private initialDelay: number = INITIAL_BACKOFF_MS;
private multiplier: number = BACKOFF_MULTIPLIER;
private maxDelay: number = MAX_BACKOFF_MS;
private jitter: number = BACKOFF_JITTER;
private nextDelay: number;
private timerId: NodeJS.Timer;
private running: boolean = false;

constructor(private callback: () => void, options?: BackoffOptions) {
if (options) {
if (options.initialDelay) {
this.initialDelay = options.initialDelay;
}
if (options.multiplier) {
this.multiplier = options.multiplier;
}
if (options.jitter) {
this.jitter = options.jitter;
}
if (options.maxDelay) {
this.maxDelay = options.maxDelay;
}
}
this.nextDelay = this.initialDelay;
this.timerId = setTimeout(() => {}, 0);
clearTimeout(this.timerId);
}

/**
* Call the callback after the current amount of delay time
*/
runOnce() {
this.running = true;
this.timerId = setTimeout(() => {
this.callback();
this.running = false;
}, this.nextDelay);
const nextBackoff = Math.min(this.nextDelay * this.multiplier, this.maxDelay);
const jitterMagnitude = nextBackoff * this.jitter;
this.nextDelay = nextBackoff + uniformRandom(-jitterMagnitude, jitterMagnitude);
}

/**
* Stop the timer. The callback will not be called until `runOnce` is called
* again.
*/
stop() {
clearTimeout(this.timerId);
this.running = false;
}

/**
* Reset the delay time to its initial value.
*/
reset() {
this.nextDelay = this.initialDelay;
}

isRunning() {
return this.running;
}
}
12 changes: 9 additions & 3 deletions packages/grpc-js/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import { SubchannelPool, getSubchannelPool } from "./subchannel-pool";
import { ChannelControlHelper } from "./load-balancer";
import { UnavailablePicker, Picker, PickResultType } from "./picker";
import { Metadata } from "./metadata";
import { SubchannelConnectivityState } from "./subchannel";
import { Status } from "./constants";
import { FilterStackFactory } from "./filter-stack";
import { CallCredentialsFilterFactory } from "./call-credentials-filter";
import { DeadlineFilterFactory } from "./deadline-filter";
import { MetadataStatusFilterFactory } from "./metadata-status-filter";
import { CompressionFilterFactory } from "./compression-filter";
import { getDefaultAuthority } from "./resolver";
import { LoadBalancingConfig } from "./load-balancing-config";
import { ServiceConfig } from "./service-config";

export enum ConnectivityState {
CONNECTING,
Expand Down Expand Up @@ -136,7 +137,11 @@ export class ChannelImplementation implements Channel {
}
};
// TODO: check channel arg for default service config
this.resolvingLoadBalancer = new ResolvingLoadBalancer(target, channelControlHelper, null);
const defaultServiceConfig: ServiceConfig = {
loadBalancingConfig: [],
methodConfig: []
}
this.resolvingLoadBalancer = new ResolvingLoadBalancer(target, channelControlHelper, defaultServiceConfig);
this.filterStackFactory = new FilterStackFactory([
new CallCredentialsFilterFactory(this),
new DeadlineFilterFactory(this),
Expand All @@ -163,12 +168,13 @@ export class ChannelImplementation implements Channel {
switch(pickResult.pickResultType) {
case PickResultType.COMPLETE:
if (pickResult.subchannel === null) {
callStream.cancelWithStatus(Status.UNAVAILABLE, "Request dropped by load balancing policy");
// End the call with an error
} else {
/* If the subchannel disconnects between calling pick and getting
* the filter stack metadata, the call will end with an error. */
callStream.filterStack.sendMetadata(Promise.resolve(new Metadata())).then((finalMetadata) => {
if (pickResult.subchannel!.getConnectivityState() === SubchannelConnectivityState.READY) {
if (pickResult.subchannel!.getConnectivityState() === ConnectivityState.READY) {
pickResult.subchannel!.startCallStream(callMetadata, callStream);
} else {
callStream.cancelWithStatus(Status.UNAVAILABLE, 'Connection dropped while starting call');
Expand Down
4 changes: 2 additions & 2 deletions packages/grpc-js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
} from './call';
import { CallCredentials } from './call-credentials';
import { Call, Deadline, StatusObject, WriteObject } from './call-stream';
import { Channel, ConnectivityState, Http2Channel } from './channel';
import { Channel, ConnectivityState, ChannelImplementation } from './channel';
import { ChannelCredentials } from './channel-credentials';
import { ChannelOptions } from './channel-options';
import { Status } from './constants';
Expand Down Expand Up @@ -78,7 +78,7 @@ export class Client {
options
);
} else {
this[CHANNEL_SYMBOL] = new Http2Channel(address, credentials, options);
this[CHANNEL_SYMBOL] = new ChannelImplementation(address, credentials, options);
}
}

Expand Down
14 changes: 12 additions & 2 deletions packages/grpc-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
} from './call';
import { CallCredentials } from './call-credentials';
import { Deadline, StatusObject } from './call-stream';
import { Channel, ConnectivityState, Http2Channel } from './channel';
import { Channel, ConnectivityState, ChannelImplementation } from './channel';
import { ChannelCredentials } from './channel-credentials';
import { CallOptions, Client } from './client';
import { LogVerbosity, Status } from './constants';
Expand Down Expand Up @@ -183,7 +183,7 @@ export {
loadPackageDefinition,
makeClientConstructor,
makeClientConstructor as makeGenericClientConstructor,
Http2Channel as Channel,
ChannelImplementation as Channel,
};

/**
Expand Down Expand Up @@ -283,3 +283,13 @@ export const InterceptingCall = () => {
};

export { GrpcObject } from './make-client';

import * as resolver from './resolver';
import * as load_balancer from './load-balancer';

function setup() {
resolver.registerAll();
load_balancer.registerAll();
}

setup();
Loading

0 comments on commit 01977e6

Please sign in to comment.