diff --git a/packages/grpc-js-xds/interop/xds-interop-client.ts b/packages/grpc-js-xds/interop/xds-interop-client.ts index a414ffe7f..6cd3aeb33 100644 --- a/packages/grpc-js-xds/interop/xds-interop-client.ts +++ b/packages/grpc-js-xds/interop/xds-interop-client.ts @@ -130,6 +130,13 @@ class CallStatsTracker { private subscribers: CallSubscriber[] = []; + private removeSubscriber(subscriber: CallSubscriber) { + const index = this.subscribers.indexOf(subscriber); + if (index >= 0) { + this.subscribers.splice(index, 1); + } + } + getCallStats(callCount: number, timeoutSec: number): Promise { return new Promise((resolve, reject) => { let finished = false; @@ -142,7 +149,7 @@ class CallStatsTracker { setTimeout(() => { if (!finished) { finished = true; - this.subscribers.splice(this.subscribers.indexOf(subscriber), 1); + this.removeSubscriber(subscriber); resolve(subscriber.getFinalStats()); } }, timeoutSec * 1000) @@ -155,7 +162,7 @@ class CallStatsTracker { for (const subscriber of callSubscribers) { subscriber.addCallStarted(); if (!subscriber.needsMoreCalls()) { - this.subscribers.splice(this.subscribers.indexOf(subscriber), 1); + this.removeSubscriber(subscriber); } } return { diff --git a/packages/grpc-js-xds/scripts/xds.sh b/packages/grpc-js-xds/scripts/xds.sh index 131cbd551..a06cde84b 100755 --- a/packages/grpc-js-xds/scripts/xds.sh +++ b/packages/grpc-js-xds/scripts/xds.sh @@ -59,7 +59,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh --gcp_suffix=$(date '+%s') \ --verbose \ ${XDS_V3_OPT-} \ - --client_cmd="$(which node) grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \ + --client_cmd="$(which node) --enable-source-maps grpc-node/packages/grpc-js-xds/build/interop/xds-interop-client \ --server=xds:///{server_uri} \ --stats_port={stats_port} \ --qps={qps} \ diff --git a/packages/grpc-js-xds/src/xds-bootstrap.ts b/packages/grpc-js-xds/src/xds-bootstrap.ts index 64a7bcb94..876b6d958 100644 --- a/packages/grpc-js-xds/src/xds-bootstrap.ts +++ b/packages/grpc-js-xds/src/xds-bootstrap.ts @@ -109,7 +109,7 @@ function validateXdsServerConfig(obj: any): XdsServerConfig { return { serverUri: obj.server_uri, channelCreds: obj.channel_creds.map(validateChannelCredsConfig), - serverFeatures: obj.server_features + serverFeatures: obj.server_features ?? [] }; } diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index 41bc27147..2b8fae44c 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -341,6 +341,16 @@ export class XdsClient { return; } trace('Loaded bootstrap info: ' + JSON.stringify(bootstrapInfo, undefined, 2)); + if (bootstrapInfo.xdsServers.length < 1) { + trace('Failed to initialize xDS Client. No servers provided in bootstrap info.'); + // Bubble this error up to any listeners + this.reportStreamError({ + code: status.INTERNAL, + details: 'Failed to initialize xDS Client. No servers provided in bootstrap info.', + metadata: new Metadata(), + }); + return; + } if (bootstrapInfo.xdsServers[0].serverFeatures.indexOf('xds_v3') >= 0) { this.apiVersion = XdsApiVersion.V3; } else { @@ -425,8 +435,7 @@ export class XdsClient { {channelOverride: channel} ); this.maybeStartLrsStream(); - }, - (error) => { + }).catch((error) => { trace('Failed to initialize xDS Client. ' + error.message); // Bubble this error up to any listeners this.reportStreamError({ @@ -507,13 +516,15 @@ export class XdsClient { } } - private handleAdsCallError(error: ServiceError) { + private handleAdsCallStatus(streamStatus: StatusObject) { trace( - 'ADS stream ended. code=' + error.code + ' details= ' + error.details + 'ADS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details ); this.adsCallV2 = null; this.adsCallV3 = null; - this.reportStreamError(error); + if (streamStatus.code !== status.OK) { + this.reportStreamError(streamStatus); + } /* If the backoff timer is no longer running, we do not need to wait any * more to start the new call. */ if (!this.adsBackoff.isRunning()) { @@ -535,9 +546,10 @@ export class XdsClient { this.adsCallV2.on('data', (message: DiscoveryResponse__Output) => { this.handleAdsResponse(message); }); - this.adsCallV2.on('error', (error: ServiceError) => { - this.handleAdsCallError(error); + this.adsCallV2.on('status', (status: StatusObject) => { + this.handleAdsCallStatus(status); }); + this.adsCallV2.on('error', () => {}); return true; } @@ -555,9 +567,10 @@ export class XdsClient { this.adsCallV3.on('data', (message: DiscoveryResponse__Output) => { this.handleAdsResponse(message); }); - this.adsCallV3.on('error', (error: ServiceError) => { - this.handleAdsCallError(error); + this.adsCallV3.on('status', (status: StatusObject) => { + this.handleAdsCallStatus(status); }); + this.adsCallV3.on('error', () => {}); return true; } @@ -763,9 +776,9 @@ export class XdsClient { this.receivedLrsSettingsForCurrentStream = true; } - private handleLrsCallError(error: ServiceError) { + private handleLrsCallStatus(streamStatus: StatusObject) { trace( - 'LRS stream ended. code=' + error.code + ' details= ' + error.details + 'LRS stream ended. code=' + streamStatus.code + ' details= ' + streamStatus.details ); this.lrsCallV2 = null; this.lrsCallV3 = null; @@ -789,9 +802,10 @@ export class XdsClient { this.lrsCallV2.on('data', (message: LoadStatsResponse__Output) => { this.handleLrsResponse(message); }); - this.lrsCallV2.on('error', (error: ServiceError) => { - this.handleLrsCallError(error); + this.lrsCallV2.on('status', (status: StatusObject) => { + this.handleLrsCallStatus(status); }); + this.lrsCallV2.on('error', () => {}); return true; } @@ -807,9 +821,10 @@ export class XdsClient { this.lrsCallV3.on('data', (message: LoadStatsResponse__Output) => { this.handleLrsResponse(message); }); - this.lrsCallV3.on('error', (error: ServiceError) => { - this.handleLrsCallError(error); + this.lrsCallV3.on('status', (status: StatusObject) => { + this.handleLrsCallStatus(status); }); + this.lrsCallV3.on('error', () => {}); return true; } diff --git a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts index dbf18ef81..3861f4d2a 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/eds-state.ts @@ -163,7 +163,6 @@ export class EdsState implements XdsStreamState { } } trace('Received EDS updates for cluster names ' + Array.from(allClusterNames)); - this.handleMissingNames(allClusterNames); return null; } diff --git a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts index 7e8ec0456..10e71babf 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/lds-state.ts @@ -163,8 +163,13 @@ export class LdsState implements XdsStreamState { this.latestResponses = responses; this.latestIsV2 = isV2; const allTargetNames = new Set(); + const allRouteConfigNames = new Set(); for (const message of responses) { allTargetNames.add(message.name); + const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener!.value); + if (httpConnectionManager.rds) { + allRouteConfigNames.add(httpConnectionManager.rds.route_config_name); + } const watchers = this.watchers.get(message.name) ?? []; for (const watcher of watchers) { watcher.onValidUpdate(message, isV2); @@ -172,6 +177,7 @@ export class LdsState implements XdsStreamState { } trace('Received RDS response with route config names ' + Array.from(allTargetNames)); this.handleMissingNames(allTargetNames); + this.rdsState.handleMissingNames(allRouteConfigNames); return null; } diff --git a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts index 9194529d8..ec7abe55a 100644 --- a/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts +++ b/packages/grpc-js-xds/src/xds-stream-state/rds-state.ts @@ -172,7 +172,7 @@ export class RdsState implements XdsStreamState { return true; } - private handleMissingNames(allRouteConfigNames: Set) { + handleMissingNames(allRouteConfigNames: Set) { for (const [routeConfigName, watcherList] of this.watchers.entries()) { if (!allRouteConfigNames.has(routeConfigName)) { for (const watcher of watcherList) { @@ -200,7 +200,6 @@ export class RdsState implements XdsStreamState { } } trace('Received RDS response with route config names ' + Array.from(allRouteConfigNames)); - this.handleMissingNames(allRouteConfigNames); return null; } diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 4aa5e87a8..d25abb05c 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -192,18 +192,8 @@ export class ChannelImplementation implements Channel { ); } if (options) { - if ( - typeof options !== 'object' || - !Object.values(options).every( - (value) => - typeof value === 'string' || - typeof value === 'number' || - typeof value === 'undefined' - ) - ) { - throw new TypeError( - 'Channel options must be an object with string or number values' - ); + if (typeof options !== 'object') { + throw new TypeError('Channel options must be an object'); } } this.originalTarget = target; diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index caaa2fa3d..fb62163b3 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -347,6 +347,10 @@ export class Server { }; } + const deferredCallback = (error: Error | null, port: number) => { + process.nextTick(() => callback(error, port)); + } + const setupServer = (): http2.Http2Server | http2.Http2SecureServer => { let http2Server: http2.Http2Server | http2.Http2SecureServer; if (creds._isSecure()) { @@ -388,6 +392,7 @@ export class Server { const http2Server = setupServer(); return new Promise((resolve, reject) => { function onError(err: Error): void { + trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message); resolve(err); } @@ -463,6 +468,7 @@ export class Server { const http2Server = setupServer(); return new Promise((resolve, reject) => { function onError(err: Error): void { + trace('Failed to bind ' + subchannelAddressToString(address) + ' with error ' + err.message); resolve(bindWildcardPort(addressList.slice(1))); } @@ -518,7 +524,7 @@ export class Server { // We only want one resolution result. Discard all future results resolverListener.onSuccessfulResolution = () => {}; if (addressList.length === 0) { - callback(new Error(`No addresses resolved for port ${port}`), 0); + deferredCallback(new Error(`No addresses resolved for port ${port}`), 0); return; } let bindResultPromise: Promise; @@ -541,7 +547,7 @@ export class Server { if (bindResult.count === 0) { const errorString = `No address added out of total ${addressList.length} resolved`; logging.log(LogVerbosity.ERROR, errorString); - callback(new Error(errorString), 0); + deferredCallback(new Error(errorString), 0); } else { if (bindResult.count < addressList.length) { logging.log( @@ -549,18 +555,18 @@ export class Server { `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved` ); } - callback(null, bindResult.port); + deferredCallback(null, bindResult.port); } }, (error) => { const errorString = `No address added out of total ${addressList.length} resolved`; logging.log(LogVerbosity.ERROR, errorString); - callback(new Error(errorString), 0); + deferredCallback(new Error(errorString), 0); } ); }, onError: (error) => { - callback(new Error(error.details), 0); + deferredCallback(new Error(error.details), 0); }, };