Skip to content

Commit

Permalink
grpc-js-xds: Use valid resources when NACKing messages
Browse files Browse the repository at this point in the history
  • Loading branch information
murgatroid99 committed Oct 19, 2021
1 parent 832afbe commit 2a45b34
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 20 deletions.
14 changes: 9 additions & 5 deletions packages/grpc-js-xds/src/xds-stream-state/cds-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,21 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
}

handleResponses(responses: Cluster__Output[], isV2: boolean): string | null {
const validResponses: Cluster__Output[] = [];
let errorMessage: string | null = null;
for (const message of responses) {
if (!this.validateResponse(message)) {
if (this.validateResponse(message)) {
validResponses.push(message);
} else {
trace('CDS validation failed for message ' + JSON.stringify(message));
return 'CDS Error: Cluster validation failed';
errorMessage = 'CDS Error: Cluster validation failed';
}
}
this.latestResponses = responses;
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allEdsServiceNames: Set<string> = new Set<string>();
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
for (const message of validResponses) {
allClusterNames.add(message.name);
const edsServiceName = message.eds_cluster_config?.service_name ?? '';
allEdsServiceNames.add(
Expand All @@ -161,7 +165,7 @@ export class CdsState implements XdsStreamState<Cluster__Output> {
trace('Received CDS updates for cluster names ' + Array.from(allClusterNames));
this.handleMissingNames(allClusterNames);
this.edsState.handleMissingNames(allEdsServiceNames);
return null;
return errorMessage;
}

reportStreamError(status: StatusObject): void {
Expand Down
14 changes: 9 additions & 5 deletions packages/grpc-js-xds/src/xds-stream-state/eds-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,28 @@ export class EdsState implements XdsStreamState<ClusterLoadAssignment__Output> {
}

handleResponses(responses: ClusterLoadAssignment__Output[], isV2: boolean) {
const validResponses: ClusterLoadAssignment__Output[] = [];
let errorMessage: string | null = null;
for (const message of responses) {
if (!this.validateResponse(message)) {
if (this.validateResponse(message)) {
validResponses.push(message);
} else {
trace('EDS validation failed for message ' + JSON.stringify(message));
return 'EDS Error: ClusterLoadAssignment validation failed';
errorMessage = 'EDS Error: ClusterLoadAssignment validation failed';
}
}
this.latestResponses = responses;
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allClusterNames: Set<string> = new Set<string>();
for (const message of responses) {
for (const message of validResponses) {
allClusterNames.add(message.cluster_name);
const watchers = this.watchers.get(message.cluster_name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message, isV2);
}
}
trace('Received EDS updates for cluster names ' + Array.from(allClusterNames));
return null;
return errorMessage;
}

reportStreamError(status: StatusObject): void {
Expand Down
14 changes: 9 additions & 5 deletions packages/grpc-js-xds/src/xds-stream-state/lds-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,21 @@ export class LdsState implements XdsStreamState<Listener__Output> {
}

handleResponses(responses: Listener__Output[], isV2: boolean): string | null {
const validResponses: Listener__Output[] = [];
let errorMessage: string | null = null;
for (const message of responses) {
if (!this.validateResponse(message, isV2)) {
if (this.validateResponse(message, isV2)) {
validResponses.push(message);
} else {
trace('LDS validation failed for message ' + JSON.stringify(message));
return 'LDS Error: Route validation failed';
errorMessage = 'LDS Error: Route validation failed';
}
}
this.latestResponses = responses;
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allTargetNames = new Set<string>();
const allRouteConfigNames = new Set<string>();
for (const message of responses) {
for (const message of validResponses) {
allTargetNames.add(message.name);
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL_V3, message.api_listener!.api_listener!.value);
if (httpConnectionManager.rds) {
Expand All @@ -178,7 +182,7 @@ export class LdsState implements XdsStreamState<Listener__Output> {
trace('Received RDS response with route config names ' + Array.from(allTargetNames));
this.handleMissingNames(allTargetNames);
this.rdsState.handleMissingNames(allRouteConfigNames);
return null;
return errorMessage;
}

reportStreamError(status: StatusObject): void {
Expand Down
14 changes: 9 additions & 5 deletions packages/grpc-js-xds/src/xds-stream-state/rds-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,24 +183,28 @@ export class RdsState implements XdsStreamState<RouteConfiguration__Output> {
}

handleResponses(responses: RouteConfiguration__Output[], isV2: boolean): string | null {
const validResponses: RouteConfiguration__Output[] = [];
let errorMessage: string | null = null;
for (const message of responses) {
if (!this.validateResponse(message, isV2)) {
if (this.validateResponse(message, isV2)) {
validResponses.push(message);
} else {
trace('RDS validation failed for message ' + JSON.stringify(message));
return 'RDS Error: Route validation failed';
errorMessage = 'RDS Error: Route validation failed';
}
}
this.latestResponses = responses;
this.latestResponses = validResponses;
this.latestIsV2 = isV2;
const allRouteConfigNames = new Set<string>();
for (const message of responses) {
for (const message of validResponses) {
allRouteConfigNames.add(message.name);
const watchers = this.watchers.get(message.name) ?? [];
for (const watcher of watchers) {
watcher.onValidUpdate(message, isV2);
}
}
trace('Received RDS response with route config names ' + Array.from(allRouteConfigNames));
return null;
return errorMessage;
}

reportStreamError(status: StatusObject): void {
Expand Down

0 comments on commit 2a45b34

Please sign in to comment.