diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 6242fc3c7..e1db479c1 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -37,7 +37,7 @@ import { CompressionFilterFactory } from './compression-filter'; import { getDefaultAuthority } from './resolver'; import { LoadBalancingConfig } from './load-balancing-config'; import { ServiceConfig, validateServiceConfig } from './service-config'; -import { trace } from './logging'; +import { trace, log } from './logging'; import { SubchannelAddress } from './subchannel'; export enum ConnectivityState { @@ -212,6 +212,18 @@ export class ChannelImplementation implements Channel { */ private tryPick(callStream: Http2CallStream, callMetadata: Metadata) { const pickResult = this.currentPicker.pick({ metadata: callMetadata }); + trace( + LogVerbosity.DEBUG, + 'channel', + 'Pick result: ' + + PickResultType[pickResult.pickResultType] + + ' subchannel: ' + + pickResult.subchannel?.getAddress() + + ' status: ' + + pickResult.status?.code + + ' ' + + pickResult.status?.details + ); switch (pickResult.pickResultType) { case PickResultType.COMPLETE: if (pickResult.subchannel === null) { @@ -221,16 +233,29 @@ export class ChannelImplementation implements Channel { ); // End the call with an error } else { - /* If the subchannel disconnects between calling pick and getting - * the filter stack metadata, the call will end with an error. */ + /* If the subchannel is not in the READY state, that indicates a bug + * somewhere in the load balancer or picker. So, we log an error and + * queue the pick to be tried again later. */ + if ( + pickResult.subchannel!.getConnectivityState() !== + ConnectivityState.READY + ) { + log( + LogVerbosity.ERROR, + 'Error: COMPLETE pick result subchannel ' + + pickResult.subchannel!.getAddress() + + ' has state ' + + ConnectivityState[pickResult.subchannel!.getConnectivityState()] + ); + this.pickQueue.push({ callStream, callMetadata }); + break; + } callStream.filterStack .sendMetadata(Promise.resolve(callMetadata)) .then( finalMetadata => { - if ( - pickResult.subchannel!.getConnectivityState() === - ConnectivityState.READY - ) { + const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState(); + if (subchannelState === ConnectivityState.READY) { try { pickResult.subchannel!.startCallStream( finalMetadata, @@ -250,11 +275,27 @@ export class ChannelImplementation implements Channel { * the stream because the correct behavior may be * re-queueing instead, based on the logic in the rest of * tryPick */ + trace( + LogVerbosity.INFO, + 'channel', + 'Failed to start call on picked subchannel ' + + pickResult.subchannel!.getAddress() + + '. Retrying pick' + ); this.tryPick(callStream, callMetadata); } } else { /* The logic for doing this here is the same as in the catch * block above */ + trace( + LogVerbosity.INFO, + 'channel', + 'Picked subchannel ' + + pickResult.subchannel!.getAddress() + + ' has state ' + + ConnectivityState[subchannelState] + + ' after metadata filters. Retrying pick' + ); this.tryPick(callStream, callMetadata); } },