Skip to content

Commit

Permalink
Merge pull request #14163 from nestjs/fix/grpc-client-streaming-bug
Browse files Browse the repository at this point in the history
fix(microservices): grpc client streaming bugs
  • Loading branch information
kamilmysliwiec authored Nov 20, 2024
2 parents 6094701 + b65c41c commit 71e8ace
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 17 deletions.
42 changes: 36 additions & 6 deletions packages/microservices/decorators/message-pattern.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import {
isObject,
isNumber,
isNil,
isNumber,
isObject,
isSymbol,
} from '@nestjs/common/utils/shared.utils';
/* eslint-disable @typescript-eslint/no-use-before-define */
import {
PATTERN_EXTRAS_METADATA,
PATTERN_HANDLER_METADATA,
PATTERN_METADATA,
TRANSPORT_METADATA,
PATTERN_EXTRAS_METADATA,
} from '../constants';
import { PatternHandler } from '../enums/pattern-handler.enum';
import { PatternMetadata } from '../interfaces/pattern-metadata.interface';
import { Transport } from '../enums';
import { PatternHandler } from '../enums/pattern-handler.enum';
import {
InvalidGrpcDecoratorException,
RpcDecoratorMetadata,
} from '../errors/invalid-grpc-message-decorator.exception';
import { PatternMetadata } from '../interfaces/pattern-metadata.interface';

export enum GrpcMethodStreamingType {
NO_STREAMING = 'no_stream',
Expand Down Expand Up @@ -141,7 +141,37 @@ export function GrpcStreamMethod(
method,
GrpcMethodStreamingType.RX_STREAMING,
);
return MessagePattern(metadata, Transport.GRPC)(target, key, descriptor);

MessagePattern(metadata, Transport.GRPC)(target, key, descriptor);

const originalMethod = descriptor.value;

// Override original method to call the "drainBuffer" method on the first parameter
// This is required to avoid premature message emission
descriptor.value = async function (
this: any,
observable: any,
...args: any[]
) {
const result = await Promise.resolve(
originalMethod.apply(this, [observable, ...args]),
);

// Drain buffer if "drainBuffer" method is available
if (observable && observable.drainBuffer) {
process.nextTick(() => {
observable.drainBuffer();
});
}
return result;
};

// Copy all metadata from the original method to the new one
const metadataKeys = Reflect.getMetadataKeys(originalMethod);
metadataKeys.forEach(metadataKey => {
const metadataValue = Reflect.getMetadata(metadataKey, originalMethod);
Reflect.defineMetadata(metadataKey, metadataValue, descriptor.value);
});
};
}

Expand Down
100 changes: 89 additions & 11 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
import {
EMPTY,
Observable,
ReplaySubject,
Subject,
Subscription,
defaultIfEmpty,
Expand Down Expand Up @@ -165,7 +166,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
if (!methodHandler) {
continue;
}
service[methodName] = await this.createServiceMethod(
service[methodName] = this.createServiceMethod(
methodHandler,
grpcService.prototype[methodName],
streamingType,
Expand All @@ -174,7 +175,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
return service;
}

getMessageHandler(
public getMessageHandler(
serviceName: string,
methodName: string,
streaming: GrpcMethodStreamingType,
Expand Down Expand Up @@ -278,7 +279,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
source: Observable<T>,
call: GrpcCall<T>,
): Promise<void> {
// this promise should **not** reject, as we're handling errors in the observable for the Call
// This promise should **not** reject, as we're handling errors in the observable for the Call
// the promise is only needed to signal when writing/draining has been completed
return new Promise((resolve, _doNotUse) => {
const valuesWaitingToBeDrained: T[] = [];
Expand Down Expand Up @@ -380,8 +381,11 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
call: GrpcCall,
callback: (err: unknown, value: unknown) => void,
) => {
const req = new Subject<any>();
call.on('data', (m: any) => req.next(m));
// Needs to be a Proxy in order to buffer messages that come before handler is executed
// This could happen if handler has any async guards or interceptors registered that would delay
// the execution.
const { subject, next, error, complete } = this.bufferUntilDrained();
call.on('data', (m: any) => next(m));
call.on('error', (e: any) => {
// Check if error means that stream ended on other end
const isCancelledError = String(e).toLowerCase().indexOf('cancelled');
Expand All @@ -391,11 +395,15 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
return;
}
// If another error then just pass it along
req.error(e);
error(e);
});
call.on('end', () => req.complete());
call.on('end', () => complete());

const handler = methodHandler(req.asObservable(), call.metadata, call);
const handler = methodHandler(
subject.asObservable(),
call.metadata,
call,
);
const res = this.transformToObservable(await handler);
if (isResponseStream) {
await this.writeObservableToGrpc(res, call);
Expand Down Expand Up @@ -426,11 +434,15 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
call: GrpcCall,
callback: (err: unknown, value: unknown) => void,
) => {
let handlerStream: Observable<any>;
if (isResponseStream) {
methodHandler(call);
handlerStream = this.transformToObservable(await methodHandler(call));
} else {
methodHandler(call, callback);
handlerStream = this.transformToObservable(
await methodHandler(call, callback),
);
}
await lastValueFrom(handlerStream);
};
}

Expand Down Expand Up @@ -469,7 +481,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
this.messageHandlers.set(route, callback);
}

public async createClient(): Promise<any> {
public async createClient() {
const channelOptions: ChannelOptions =
this.options && this.options.channelOptions
? this.options.channelOptions
Expand Down Expand Up @@ -612,4 +624,70 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
);
}
}

private bufferUntilDrained<T>() {
type DrainableSubject<T> = Subject<T> & { drainBuffer: () => void };

const subject = new Subject<T>();
const replayBuffer = new ReplaySubject<T>();
let hasDrained = false;

function drainBuffer(this: DrainableSubject<T>) {
if (hasDrained) {
return;
}
hasDrained = true;

// Replay buffered values to the new subscriber
replayBuffer.subscribe({
next: val => console.log('emitted', val),
});
replayBuffer.complete();
}

return {
subject: new Proxy<DrainableSubject<T>>(subject as DrainableSubject<T>, {
get(target, prop, receiver) {
if (prop === 'asObservable') {
return () => {
const stream = subject.asObservable();

// "drainBuffer" will be called before the evaluation of the handler
// but after any enhancers have been applied (e.g., `interceptors`)
Object.defineProperty(stream, drainBuffer.name, {
value: drainBuffer,
});
return stream;
};
}
if (hasDrained) {
return Reflect.get(target, prop, receiver);
}
return Reflect.get(replayBuffer, prop, receiver);
},
}),
next: (value: T) => {
if (!hasDrained) {
replayBuffer.next(value);
}
subject.next(value);
},
error: (err: any) => {
if (!hasDrained) {
replayBuffer.error(err);
}
subject.error(err);
},
complete: () => {
if (!hasDrained) {
replayBuffer.complete();
// Replay buffer is no longer needed
// Return early to allow subject to complete later, after the replay buffer
// has been drained
return;
}
subject.complete();
},
};
}
}

0 comments on commit 71e8ace

Please sign in to comment.