Skip to content

Commit

Permalink
Merge pull request #9586 from nestjs/fix/propagate-kafka-errors
Browse files Browse the repository at this point in the history
feat(microservices): add kafka retriable exception, auto-unwrap payloads
  • Loading branch information
kamilmysliwiec authored May 17, 2022
2 parents 1f5a5b4 + 4ca738d commit 66da12a
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 18 deletions.
4 changes: 3 additions & 1 deletion packages/microservices/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ export const RQM_DEFAULT_NOACK = true;
export const RQM_DEFAULT_PERSISTENT = false;
export const GRPC_DEFAULT_PROTO_LOADER = '@grpc/proto-loader';

export const NO_EVENT_HANDLER = (text: TemplateStringsArray, pattern: string) =>
`There is no matching event handler defined in the remote service. Event pattern: ${pattern}`;
export const NO_MESSAGE_HANDLER = `There is no matching message handler defined in the remote service.`;
export const NO_EVENT_HANDLER = `There is no matching event handler defined in the remote service.`;

export const DISCONNECTED_RMQ_MESSAGE = `Disconnected from RMQ. Trying to reconnect.`;

export const KAFKA_DEFAULT_CLIENT = 'nestjs-consumer';
Expand Down
2 changes: 1 addition & 1 deletion packages/microservices/context/rpc-proxy.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
import { Observable, isObservable } from 'rxjs';
import { isObservable, Observable } from 'rxjs';
import { catchError } from 'rxjs/operators';
import { RpcExceptionsHandler } from '../exceptions/rpc-exceptions-handler';

Expand Down
16 changes: 14 additions & 2 deletions packages/microservices/ctx-host/kafka.context.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import { KafkaMessage } from '../external/kafka.interface';
import { Consumer, KafkaMessage } from '../external/kafka.interface';
import { BaseRpcContext } from './base-rpc.context';

type KafkaContextArgs = [KafkaMessage, number, string];
type KafkaContextArgs = [
message: KafkaMessage,
partition: number,
topic: string,
consumer: Consumer,
];

export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
constructor(args: KafkaContextArgs) {
Expand All @@ -28,4 +33,11 @@ export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
getTopic() {
return this.args[2];
}

/**
* Returns the Kafka consumer reference.
*/
getConsumer() {
return this.args[3];
}
}
21 changes: 21 additions & 0 deletions packages/microservices/deserializers/kafka-request.deserializer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { IncomingEvent, IncomingRequest } from '../interfaces';
import { KafkaRequest } from '../serializers/kafka-request.serializer';
import { IncomingRequestDeserializer } from './incoming-request.deserializer';

export class KafkaRequestDeserializer extends IncomingRequestDeserializer {
mapToSchema(
data: KafkaRequest,
options?: Record<string, any>,
): IncomingRequest | IncomingEvent {
if (!options) {
return {
pattern: undefined,
data: undefined,
};
}
return {
pattern: options.channel,
data: data?.value ?? data,
};
}
}
1 change: 1 addition & 0 deletions packages/microservices/exceptions/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './base-rpc-exception-filter';
export * from './kafka-retriable-exception';
export * from './rpc-exception';
17 changes: 17 additions & 0 deletions packages/microservices/exceptions/kafka-retriable-exception.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { RpcException } from './rpc-exception';

/**
* Exception that instructs Kafka driver to instead of introspecting
* error processing flow and sending serialized error message to the consumer,
* force bubble it up to the "eachMessage" callback of the underlying "kafkajs" package
* (even if interceptors are applied, or an observable stream is returned from the message handler).
*
* A transient exception that if retried may succeed.
*
* @publicApi
*/
export class KafkaRetriableException extends RpcException {
public getError(): string | object {
return this;
}
}
3 changes: 1 addition & 2 deletions packages/microservices/microservices-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ export class MicroservicesModule {
private listenersController: ListenersController;

public register(container: NestContainer, config: ApplicationConfig) {
const rpcProxy = new RpcProxy();
const exceptionFiltersContext = new ExceptionFiltersContext(
container,
config,
);
const contextCreator = new RpcContextCreator(
rpcProxy,
new RpcProxy(),
exceptionFiltersContext,
new PipesContextCreator(container, config),
new PipesConsumer(),
Expand Down
67 changes: 62 additions & 5 deletions packages/microservices/server/server-kafka.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import { Logger } from '@nestjs/common/services/logger.service';
import { isNil } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import { isObservable, lastValueFrom, Observable, ReplaySubject } from 'rxjs';
import {
KAFKA_DEFAULT_BROKER,
KAFKA_DEFAULT_CLIENT,
KAFKA_DEFAULT_GROUP,
NO_EVENT_HANDLER,
NO_MESSAGE_HANDLER,
} from '../constants';
import { KafkaContext } from '../ctx-host';
import { KafkaRequestDeserializer } from '../deserializers/kafka-request.deserializer';
import { KafkaHeaders, Transport } from '../enums';
import { KafkaRetriableException } from '../exceptions';
import {
BrokersFunction,
Consumer,
Expand All @@ -26,6 +29,7 @@ import {
CustomTransportStrategy,
KafkaOptions,
OutgoingResponse,
ReadPacket,
} from '../interfaces';
import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer';
import { Server } from './server';
Expand Down Expand Up @@ -162,6 +166,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
rawMessage,
payload.partition,
payload.topic,
this.consumer,
]);
const handler = this.getHandlerByPattern(packet.pattern);
// if the correlation id or reply topic is not set
Expand All @@ -186,7 +191,37 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
const response$ = this.transformToObservable(
await handler(packet.data, kafkaContext),
);
response$ && this.send(response$, publish);

const replayStream$ = new ReplaySubject();
await this.combineStreamsAndThrowIfRetriable(response$, replayStream$);

this.send(replayStream$, publish);
}

private combineStreamsAndThrowIfRetriable(
response$: Observable<any>,
replayStream$: ReplaySubject<unknown>,
) {
return new Promise<void>((resolve, reject) => {
let isPromiseResolved = false;
response$.subscribe({
next: val => {
replayStream$.next(val);
if (!isPromiseResolved) {
isPromiseResolved = true;
resolve();
}
},
error: err => {
if (err instanceof KafkaRetriableException && !isPromiseResolved) {
isPromiseResolved = true;
reject(err);
}
replayStream$.error(err);
},
complete: () => replayStream$.complete(),
});
});
}

public async sendMessage(
Expand Down Expand Up @@ -228,9 +263,12 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
if (!outgoingResponse.err) {
return;
}
outgoingMessage.headers[KafkaHeaders.NEST_ERR] = Buffer.from(
outgoingResponse.err,
);
const stringifiedError =
typeof outgoingResponse.err === 'object'
? JSON.stringify(outgoingResponse.err)
: outgoingResponse.err;
outgoingMessage.headers[KafkaHeaders.NEST_ERR] =
Buffer.from(stringifiedError);
}

public assignCorrelationIdHeader(
Expand All @@ -251,8 +289,27 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
outgoingMessage.partition = parseFloat(replyPartition);
}

public async handleEvent(
pattern: string,
packet: ReadPacket,
context: KafkaContext,
): Promise<any> {
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
}
const resultOrStream = await handler(packet.data, context);
if (isObservable(resultOrStream)) {
await lastValueFrom(resultOrStream);
}
}

protected initializeSerializer(options: KafkaOptions['options']) {
this.serializer =
(options && options.serializer) || new KafkaRequestSerializer();
}

protected initializeDeserializer(options: KafkaOptions['options']) {
this.deserializer = options?.deserializer ?? new KafkaRequestDeserializer();
}
}
4 changes: 1 addition & 3 deletions packages/microservices/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ export abstract class Server {
): Promise<any> {
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
return this.logger.error(
`${NO_EVENT_HANDLER} Event pattern: ${JSON.stringify(pattern)}.`,
);
return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
}
const resultOrStream = await handler(packet.data, context);
if (isObservable(resultOrStream)) {
Expand Down
13 changes: 10 additions & 3 deletions packages/microservices/test/ctx-host/kafka.context.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { expect } from 'chai';
import { KafkaContext } from '../../ctx-host';
import { KafkaMessage } from '../../external/kafka.interface';
import { Consumer, KafkaMessage } from '../../external/kafka.interface';

describe('KafkaContext', () => {
const args = ['test', { test: true }];
const args = ['test', { test: true }, undefined, { test: 'consumer' }];
let context: KafkaContext;

beforeEach(() => {
context = new KafkaContext(args as [KafkaMessage, number, string]);
context = new KafkaContext(
args as [KafkaMessage, number, string, Consumer],
);
});
describe('getTopic', () => {
it('should return topic', () => {
Expand All @@ -24,4 +26,9 @@ describe('KafkaContext', () => {
expect(context.getMessage()).to.be.eql(args[0]);
});
});
describe('getConsumer', () => {
it('should return consumer instance', () => {
expect(context.getConsumer()).to.deep.eq({ test: 'consumer' });
});
});
});
40 changes: 39 additions & 1 deletion packages/microservices/test/server/server-kafka.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Logger } from '@nestjs/common';
import { expect } from 'chai';
import { AssertionError, expect } from 'chai';
import * as sinon from 'sinon';
import { NO_MESSAGE_HANDLER } from '../../constants';
import { KafkaHeaders } from '../../enums';
Expand Down Expand Up @@ -277,6 +277,7 @@ describe('ServerKafka', () => {

sinon.stub(server, 'getPublisher').callsFake(() => getPublisherSpy);
});

it('should call "handleEvent" if correlation identifier is not present', async () => {
const handleEventSpy = sinon.spy(server, 'handleEvent');
await server.handleMessage(eventPayload);
Expand All @@ -289,6 +290,42 @@ describe('ServerKafka', () => {
expect(handleEventSpy.called).to.be.true;
});

it('should call event handler when "handleEvent" is called', async () => {
const messageHandler = sinon.mock();
const context = { test: true } as any;
const messageData = 'some data';
sinon.stub(server, 'getHandlerByPattern').callsFake(() => messageHandler);

await server.handleEvent(
topic,
{ data: messageData, pattern: topic },
context,
);
expect(messageHandler.calledWith(messageData, context)).to.be.true;
});

it('should not catch error thrown by event handler as part of "handleEvent"', async () => {
const error = new Error('handler error');
const messageHandler = sinon.mock().throwsException(error);
sinon.stub(server, 'getHandlerByPattern').callsFake(() => messageHandler);

try {
await server.handleEvent(
topic,
{ data: 'some data', pattern: topic },
{} as any,
);

// code should not be executed
expect(true).to.be.false;
} catch (e) {
if (e instanceof AssertionError) {
throw e;
}
expect(e).to.be.eq(error);
}
});

it('should call "handleEvent" if correlation identifier and reply topic are present but the handler is of type eventHandler', async () => {
const handler = sinon.spy();
(handler as any).isEventHandler = true;
Expand Down Expand Up @@ -320,6 +357,7 @@ describe('ServerKafka', () => {
}),
).to.be.true;
});

it(`should call handler with expected arguments`, async () => {
const handler = sinon.spy();
(server as any).messageHandlers = objectToMap({
Expand Down

0 comments on commit 66da12a

Please sign in to comment.