Skip to content

Commit

Permalink
feat(microservices): Add grpc metadata to server responce
Browse files Browse the repository at this point in the history
example of usage:

import { Metadata } from 'grpc';

@GrpcMethod('Service', 'Method')
public method(request: any, metadata: Metadata, sendMetadata: any): any {
   // 1. nothing to do with request that is empty in this case
   // 2. may something with metadata from client if needed
   // 3. send metadata with responce........
   const srvMetadata = new Metadata();

   srvMetadata.add('Set-Cookie', 'yummy_cookie=choco');

   sendMetadata(srvMetadata);

   return 'Hello World!!!';
}
  • Loading branch information
alex committed Jul 24, 2020
1 parent 1b43bac commit b766c74
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 4 deletions.
5 changes: 5 additions & 0 deletions integration/microservices/e2e/orders-grpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ describe('Advanced GRPC transport', () => {
it('GRPC Sending and receiving Stream from RX handler', async () => {
const callHandler = client.sync();

// Get Set-Cookie from Metadata
callHandler.on('metadata', (metadata: GRPC.Metadata) => {
expect(metadata.get('Set-Cookie')[0]).to.eq('test_cookie=abcd')
});

callHandler.on('data', (msg: number) => {
// Do deep comparison (to.eql)
expect(msg).to.eql({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
} from '@nestjs/microservices';
import { join } from 'path';
import { Observable, of, ReplaySubject, Subject } from 'rxjs';
import { Metadata } from 'grpc';

@Controller()
export class AdvancedGrpcController {
Expand Down Expand Up @@ -84,7 +85,12 @@ export class AdvancedGrpcController {
* @param messages
*/
@GrpcStreamMethod('orders.OrderService')
async sync(messages: Observable<any>): Promise<any> {
async sync(messages: Observable<any>, metadata: Metadata, sendMetadata: Function): Promise<any> {
// Set Set-Cookie from Metadata
const srvMetadata = new Metadata();
srvMetadata.add('Set-Cookie', 'test_cookie=abcd');
sendMetadata(srvMetadata);

const s = new Subject();
const o = s.asObservable();
messages.subscribe(msg => {
Expand Down
1 change: 1 addition & 0 deletions packages/microservices/context/rpc-metadata-constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ export const DEFAULT_CALLBACK_METADATA = {
};
export const DEFAULT_GRPC_CALLBACK_METADATA = {
[`${RpcParamtype.CONTEXT}:1`]: { index: 1, data: undefined, pipes: [] },
[`${RpcParamtype.GRPCCALL}:2`]: { index: 2, data: undefined, pipes: [] },
...DEFAULT_CALLBACK_METADATA,
};
1 change: 1 addition & 0 deletions packages/microservices/enums/rpc-paramtype.enum.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export enum RpcParamtype {
PAYLOAD = 3,
CONTEXT = 6,
GRPCCALL = 9,
}
2 changes: 2 additions & 0 deletions packages/microservices/factories/rpc-params-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export class RpcParamsFactory {
return args[0];
case RpcParamtype.CONTEXT:
return args[1];
case RpcParamtype.GRPCCALL:
return args[2];
default:
return null;
}
Expand Down
20 changes: 17 additions & 3 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from '@nestjs/common/utils/shared.utils';
import { EMPTY, fromEvent, Subject } from 'rxjs';
import { catchError, takeUntil } from 'rxjs/operators';
import { Metadata } from 'grpc';
import {
CANCEL_EVENT,
GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH,
Expand All @@ -26,6 +27,7 @@ let grpcProtoLoaderPackage: any = {};
interface GrpcCall<TRequest = any, TMetadata = any> {
request: TRequest;
metadata: TMetadata;
sendMetadata: Function;
end: Function;
write: Function;
on: Function;
Expand Down Expand Up @@ -206,7 +208,11 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {

public createUnaryServiceMethod(methodHandler: Function): Function {
return async (call: GrpcCall, callback: Function) => {
const handler = methodHandler(call.request, call.metadata);
const handler = methodHandler(
call.request,
call.metadata,
(meta: Metadata) => call.sendMetadata(meta),
);
this.transformToObservable(await handler).subscribe(
data => callback(null, data),
(err: any) => callback(err),
Expand All @@ -216,7 +222,11 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {

public createStreamServiceMethod(methodHandler: Function): Function {
return async (call: GrpcCall, callback: Function) => {
const handler = methodHandler(call.request, call.metadata);
const handler = methodHandler(
call.request,
call.metadata,
(meta: Metadata) => call.sendMetadata(meta),
);
const result$ = this.transformToObservable(await handler);
await result$
.pipe(
Expand Down Expand Up @@ -254,7 +264,11 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
});
call.on('end', () => req.complete());

const handler = methodHandler(req.asObservable(), call.metadata);
const handler = methodHandler(
req.asObservable(),
call.metadata,
(meta: Metadata) => call.sendMetadata(meta),
);
const res = this.transformToObservable(await handler);
if (isResponseStream) {
await res
Expand Down
2 changes: 2 additions & 0 deletions packages/microservices/test/server/server-grpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,13 @@ describe('ServerGrpc', () => {
metadata: {
test: '123',
},
sendMetadata: sinon.spy(),
};
fn(call as any, sinon.spy());

expect(handler.called).to.be.true;
expect(handler.args[0][1]).to.eq(call.metadata);
expect(handler.args[0][2]).to.be.an.instanceof(Function);
});
describe('when response is not a stream', () => {
it('should call callback', async () => {
Expand Down

0 comments on commit b766c74

Please sign in to comment.