Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Microservice Does not validate DTO #5974

Closed
kodeine opened this issue Dec 17, 2020 · 37 comments · Fixed by #9586
Closed

Kafka Microservice Does not validate DTO #5974

kodeine opened this issue Dec 17, 2020 · 37 comments · Fixed by #9586

Comments

@kodeine
Copy link

kodeine commented Dec 17, 2020

Bug Report

Kafka Microservice does not validate dto's. kafka payload response is incorrect according to docs.

Current behavior

@MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any

it does not output KillDragonMessage interface but interface IncomingMessage

@Controller('account')
export class AccountController {

  @MessagePattern('account_me')
  getMe(
    @Payload() payload: SomeDto
  ) {
      console.log('Received payload', payload);
    return { };
  }

AccountController code above gives an error, [ERROR] 15:45:00 TypeError: The first argument must be of type string or an instance of Buffer, ArrayBuffer, or Array or an Array-like Object. Received an instance of Object

value of payload comes out to IncomingMessage which is

payload {
  magicByte: 2,
  attributes: 0,
  timestamp: '1608237989883',
  offset: '33',
  key: null,
  value: { id: 999, something: 1 },
  headers: {
    kafka_correlationId: '58c52bab-3fad-4b7a-9f96-4d46bb5ceb78',
    kafka_replyTopic: 'account_me.reply',
    kafka_replyPartition: '0'
  },
  isControlRecord: false,
  batchContext: {
    firstOffset: '33',
    firstTimestamp: '1608237989883',
    partitionLeaderEpoch: 0,
    inTransaction: false,
    isControlBatch: false,
    lastOffsetDelta: 0,
    producerId: '-1',
    producerEpoch: 0,
    firstSequence: 0,
    maxTimestamp: '1608237989883',
    timestampType: 0,
    magicByte: 2
  },
  topic: 'account_me',
  partition: 0
}
@kodeine kodeine added the needs triage This issue has not been looked into label Dec 17, 2020
@jmcdo29
Copy link
Member

jmcdo29 commented Dec 17, 2020

Please provide a minimum reproduction

@underfisk
Copy link

@kodeine I had the same issue also for RabbitMQ, what happens is that you need ValidationPipe, since Kafka has a custom object, you need to pass on the payload your Pipe to validate, i ended up creating my own but class-validator will do just fine
Try to import globally validation pipe(if you have an hybrid app) or just use in your method the decorator to test

@enesyalcin
Copy link

enesyalcin commented Dec 22, 2020

@underfisk already said that we need to have a ValidationPipe for Micrsoservices (be it kafka or RabbitMQ). Check this out:

import { Controller, Logger, UseFilters, UsePipes, ValidationPipe } from '@nestjs/common';

@Controller('event-broker')
export class EventBrokerController {

  @UseFilters(new ExceptionFilter())
  @UsePipes(new ValidationPipe())
  @EventPattern('jsontest', Transport.KAFKA)
  async processStream(
    @Payload() message: CommunicationEventDto,
    @Ctx() context: KafkaContext,
  ): Promise<void> {
    const { value, headers, topic, key } = message;

    // here your logic
    });
  }

my validation object

import { IsNumber, IsObject, IsString } from 'class-validator';

interface IncomingMessage {
  topic: string;
  partition: number;
  timestamp: string;
  magicByte: number;
  attributes: number;
  offset: string;
  key: any;
  value: any;
  headers: Record<string, any>;
}

export interface CommunicationEventHeader {
  mvno_id?: number;
  endpoint?: string;
  msisdn?: string;
  email?: string;
  priority?: 'HIGH' | 'MID' | 'LOW';
  allowedChannels?: any;
}

export class CommunicationEventDto implements IncomingMessage {
  @IsString()
  readonly topic: string;

  @IsNumber()
  readonly partition: number;

  @IsString()
  readonly timestamp: string;

  @IsNumber()
  readonly magicByte: number;

  @IsNumber()
  readonly attributes: number;

  @IsString()
  readonly offset: string;

  @IsString()
  readonly key: string;

  @IsObject()
  readonly value: { payload: any };

  @IsObject()
  readonly headers: CommunicationEventHeader | Record<string, any>;
}

@underfisk
Copy link

@enesyalcin Its weird because its probably happening on Kafka, on rabbit mq it does work fine

@kodeine
Copy link
Author

kodeine commented Dec 22, 2020

I just extended regular validation pipe and passed payload.value to it for the validation to work. So a decorator for kafka validation

@underfisk
Copy link

@kodeine it seems to be a good alternative having a custom decorator but i think this might be an internal issue and Nestjs should do this out of the box, if it does for most of the transports, Kafka is no different

@enesyalcin
Copy link

enesyalcin commented Dec 22, 2020

@enesyalcin Its weird because its probably happening on Kafka, on rabbit mq it does work fine

My project is not a hybrid but a Kafka Project (so no REST API is being provided on this application). Maybe that changes anything?

@kodeine it seems to be a good alternative having a custom decorator but i think this might be an internal issue and Nestjs should do this out of the box, if it does for most of the transports, Kafka is no different

I remember that there was some kind of reason for that but I forgot. Does your solution work with the provided example?

@kodeine
Copy link
Author

kodeine commented Dec 22, 2020

@enesyalcin i forgot which method i was using...

you can try doing this first,

export class PayloadDto {
  @Expose()
  @Type(() => SomeInputDto)
  value: SomeInputDto;
}

in controller use like @Payload() payload: PayloadDto validation will start working. make sure to add validation pipe in global or class.

if that doesnt work, following is the pipe that you can use. Problem is kafka payload.value has the dto needed so validator doesnt work as we need to define @type for validator. so if you modify the pipe to use payload.value you will achieve the same.

@Injectable()
export class KafkaValidationPipe extends ValidationPipe {
  public async transform(value: any, metadata: ArgumentMetadata) {
/// change data etc
    return await super.transform(value, metadata);
  }
}

and in your main.ts

    app.useGlobalPipes(
      new KafkaValidationPipe({
        transform: true,
        whitelist: true,
        forbidUnknownValues: true,
        forbidNonWhitelisted: true,
        skipMissingProperties: false,
      }),
    );

@tanyudii
Copy link

any solution from nest.js for this issue?

@underfisk
Copy link

@tanyudii Nestjs uses Payload for convention because its also possible to pass context, its not going to do anything extra. By default if you need a validation you have to perform it by yourself, the solution presented by @kodeine will work just fine.
With this said, Nestjs by default in microservices do not run class-validator even though you may pass the DTO which i'm not sure if will be supported anytime soon
@jmcdo29 Is this the intended or should the package perform the class-validators automatically? I think this was something already predicted but that's also the reason Payload decorators does exist, with or without the decorator we do receive the data fine but i think the decorator is there to identify which is the target payload for future validations or anything related to the payload

@MickL
Copy link
Contributor

MickL commented Feb 23, 2021

Not sure if I get this wrong or the documentation is wrong but it says:

the ValidationPipe works the same for WebSockets and microservices, regardless of the transport method that is used.

I now assumed that it works the .useGlobalPipes(new ValidationPipe()) in my main.ts but my @Payload is not validated (using NATS).

@underfisk
Copy link

@MickL I'm not sure if its a bug but probably the way that microservices are, they rely upon the normal controller flow but if you do enable global validation pipe it should validate but you have to pass the dto
For example if you do

@MessageHandler(Whatever)
handle(@Payload() data) {
}

This will not invoke the validation pipe, its going to be ignored since you omitted but if you pass the type it should validate (but its not related with kafka or nats, its related with the core itself because they are relying on the natural procedures. If you check the source code, you'll see that the reason microservices use controllers like HTTP its because they can inherit pipes,guards, interceptors etc out of the box without having to create a special adjustment for them in the microservices context

As a workaround @kodeine solution by creating a custom pipe just to invoke manually class validator is a solution but i'm looking forward to hearing from @jmcdo29 in order to consider whether its a bug or intended to be this way

@MickL
Copy link
Contributor

MickL commented Feb 23, 2021

@jmcdo29 if this is intended than the documentation is misleading and it might be noted that validation is not working on microservice controllers the same way.

@jmcdo29
Copy link
Member

jmcdo29 commented Feb 23, 2021

I mean, does anyone want to provide a minimum reproduction, as I asked for Dec 17, 2020? I've only done basic work with microservices, and haven't looked too deeply into how the @Payload() decorator works, but without a reproduction, I'm kinda being asked to fire blindly.

@tanyudii
Copy link

@underfisk @jmcdo29

this is my minimum reproduction, i want to validate create request organization entity from api gateway

https://github.com/tanyudii/nestjs-kafka

@jmcdo29
Copy link
Member

jmcdo29 commented Feb 23, 2021

@tanyudii thank you. For reproduction steps, just need a bit more info.

docker-compose up -d
# start the two servers
# send request to what?
# ???
# Profit

@tanyudii
Copy link

tanyudii commented Feb 23, 2021

@tanyudii thank you. For reproduction steps, just need a bit more info.

docker-compose up -d
# start the two servers
# send request to what?
# ???
# Profit

open http://localhost:3000/docs/ (the service api-gateway) and submit the organization post to create organization.

I'm expect validation run in account-service, because i don't wanna manage validation rule in api gateway.
Api gateway only gate for authorization and authentication every request.
i'm overide the validation pipe to throw exception using RpcException not BadRequestException in my microservices. In redis or tcp transport it's work correcly. But in kafka it's doesn't work because payload in kafka is wrap on attribute "value", i have try what @kodeine said, my payload is success to transform but validation in dto keep doesn't work.

Sorry for my english😂

@jmcdo29 jmcdo29 added priority: medium (3) Medium priority issue that needs to be resolved scope: microservices type: bug 😭 type: potential issue 💔 and removed needs triage This issue has not been looked into labels Feb 24, 2021
@jmcdo29
Copy link
Member

jmcdo29 commented Feb 24, 2021

Okay, @kamilmysliwiec I finally had time to confirm this, and can say that unless a custom pipe is created for Kafka, the ValidationPipe does not properly run due to the returned object from @Payload(). If this is the intended functionality, we definitely should make a note of it in the docs, and possibly expose some sort of KafkaValidationPipe. It seems that what's returned by the PARAMS_ARGS_METADATA does not match what is expected.

I'm not sure where this is happening, if it comes from @nestjs/core or @nestjs/microservice, but if someone wants to look into it, or help provide some more context it could be useful. I've added some flags in the meantime, Kamil feel free to remove them or change them if you disagree.

@kamilmysliwiec
Copy link
Member

In v8 (#6349), it will be possible to pass a property name to the @Payload() decorator. With this, you should be able to get the validation pipe working as expected:

@Payload('value') dto: DtoClass

@MickL
Copy link
Contributor

MickL commented Feb 24, 2021

I wonder why it needs a name to work. But anyway, will it work like this then?

const app = await NestFactory.createMicroservice<MicroserviceOptions>(...);
app.useGlobalPipes(new ValidationPipe());

And also with hybrid apps and inheritAppConfig: true?

@jmcdo29
Copy link
Member

jmcdo29 commented Feb 24, 2021

So long as (on V8) you use the @Payload('value') then it would work properly.

@kamilmysliwiec should we maybe provide a specific @KafaPayload() that does this to help minimize confusion? Or maybe make @Payload() map to this value like it does for the others and then have @KafkaPayload() map to the entire Object. Thoughts?

@kamilmysliwiec
Copy link
Member

I wonder why it needs a name to work. But anyway, will it work like this then?

@MickL This needs a name to work because this is how pipes work in general (they run for a specific argument which is the whole Kafka incoming message, so in the case described in the main post, it just can't work properly).

@kamilmysliwiec should we maybe provide a specific @KafaPayload() that does this to help minimize confusion? Or maybe make @payload() map to this value like it does for the others and then have @KafkaPayload() map to the entire Object. Thoughts?

In Kafka, we pass the entire incoming message object because it may contain very important, additional details & information about the message/event (and this isn't the case for most of the other existing, built-in transport strategies, that's why it's different for this one), so imo @KafkaPayload() decorator may look & sound slightly confusing(?). I do agree though that passing 'value' every time might be too repetitive and a nice, dedicated & type-safe abstraction could be useful (especially in preventing typos when someone misspells "value" or sth like that). Also, updating the documentation is def a must and we should do it as soon as possible.

@tanyudii
Copy link

In v8 (#6349), it will be possible to pass a property name to the @Payload() decorator. With this, you should be able to get the validation pipe working as expected:

@Payload('value') dto: DtoClass

i'm update to V8 package:

"@nestjs/common": "^8.0.0-alpha.1",
"@nestjs/core": "^8.0.0-alpha.1",
"@nestjs/microservices": "^8.0.0-alpha.1",
"@nestjs/platform-express": "^8.0.0-alpha.1""

and i getting problem when i register @nestjs/config module in app.module.ts

Nest cannot export a provider/module that is not a part of the currently processed module (ConfigModule). Please verify whether the exported ConfigHostModule is a
vailable in this particular context.

Possible Solutions:

  • Is ConfigHostModule part of the relevant providers/imports within ConfigModule?

@kamilmysliwiec
Copy link
Member

@tanyudii can you please try again with 8.0.0-alpha.2?

@tanyudii
Copy link

tanyudii commented Feb 24, 2021

@tanyudii can you please try again with 8.0.0-alpha.2?

for all package?

@kamilmysliwiec
Copy link
Member

All packages from this repository (core, common, platform-X, microservices, testing, and websockets).

@tanyudii
Copy link

tanyudii commented Feb 24, 2021

All packages from this repository (core, common, platform-X, microservices, testing, and websockets).

It's working, thank you.
and when v8 go to stable? it's good to use in production app?

@jmcdo29
Copy link
Member

jmcdo29 commented Feb 24, 2021

In Kafka, we pass the entire incoming message object because it may contain very important, additional details & information about the message/event (and this isn't the case for most of the other existing, built-in transport strategies, that's why it's different for this one), so imo @KafkaPayload() decorator may look & sound slightly confusing(?). I do agree though that passing 'value' every time might be too repetitive and a nice, dedicated & type-safe abstraction could be useful (especially in preventing typos when someone misspells "value" or sth like that). Also, updating the documentation is def a must and we should do it as soon as possible.

Maybe something like @KafkaMessage() then? Whatever the solution, it absolutely needs to be documented. I feel it's kind of weird that for all other microservices @Payload() gets the incoming body, but for Kafka it doesn't (though your explanation as to why makes sense) so I was trying to see if we could find a way to make it act the same and still give that important Kafka message info.

@kamilmysliwiec
Copy link
Member

@KafkaMessage() might be confusing for event handlers (e.g., why not "KafkaEvent" etc). I've been thinking about @Payload.Value but this, on the other hand, would be confusing for people using other transport strategies (not Kafka). @KafkaPayloadValue() would be appropriate but it seems to be slightly too long.

@MickL
Copy link
Contributor

MickL commented Mar 16, 2021

I think the documentation is misleading because it says:

the ValidationPipe works the same for WebSockets and microservices, regardless of the transport method that is used

So I assumed setting app.useGlobalPipes will apply auto-validation for microservice controllers and websocket gateways, too.

I tried with websocket gateway but they are not auto-validated, I need to add @UsePipes(new ValidationPipe()) to each @SubscribeMessage. Or is this a bug?

@ericmorand
Copy link

@kamilmysliwiec ,

Can you explain how you got your example in the documentation working?

https://docs.nestjs.com/microservices/kafka#outgoing

@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const dragonId = message.dragonId;
    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];
    return items;
  }
}

Because using this exact code doesn't work - dragonId is not a member of message.

From what you are explaining in the previous comments, it seems like it was never intended to work like explained in the documentation:

In Kafka, we pass the entire incoming message object because it may contain very important, additional details & information about the message/event (and this isn't the case for most of the other existing, built-in transport strategies

I'm confused.

@kodeine
Copy link
Author

kodeine commented Mar 20, 2021

@ericmorand payload for kafka is different, docs don’t portray this correctly. You need to use message.payload.dragonid something like that I’m on my phone so cant confirm the syntax exactly but do a console log and im sure ull figure it out.

@ColinMorris83
Copy link

I got this interface from someone on another thread on here and am using that for the @Payload type.

export interface IncomingKafkaMessage<V = unknown, K = unknown, H = Record<string, unknown>> {
  magicByte: number;
  topic: string;
  partition: number;
  timestamp: string;
  size: number;
  attributes: number;
  offset: string;
  key: K;
  value: V;
  headers: H;
  isControlRecord: boolean;
  batchContext: {
    firstOffset: string;
    firstTimestamp: string;
    partitionLeaderEpoch: number;
    inTransaction: boolean;
    isControlBatch: boolean;
    lastOffsetDelta: number;
    producerId: string;
    producerEpoch: number;
    firstSequence: number;
    maxTimestamp: string;
    magicByte: number;
  };
}

So for the example would be:

@Payload() message: IncomingKafkaMessage<KillDragonMessage>

And to access:

message.value.dragonId

Where value will be correctly typed as KillDragonMessage

@vlad-rz
Copy link

vlad-rz commented Dec 17, 2021

Any news on this? What is the best practices to use Kafka payload validation?
Is it @payload('value') or @payload() message: IncomingKafkaMessage?

@Notekunn
Copy link

Any news on this?

@FahmyChaabane
Copy link

FahmyChaabane commented May 9, 2022

I think the documentation is misleading because it says:

the ValidationPipe works the same for WebSockets and microservices, regardless of the transport method that is used

So I assumed setting app.useGlobalPipes will apply auto-validation for microservice controllers and websocket gateways, too.

I tried with websocket gateway but they are not auto-validated, I need to add @UsePipes(new ValidationPipe()) to each @SubscribeMessage. Or is this a bug?

I am facing the same thing. I thought global validation would be applied, but i had to manually apply the @UsePipes on each @EventPattern & @MessagePattern i got inside my controller.

I am not really sure weither if it is a feature or a bug. Anyone got an answer ?

--- Update: setting @UsePipes on the controller level did do the job too.

@kamilmysliwiec
Copy link
Member

@nestjs nestjs locked and limited conversation to collaborators Jul 13, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging a pull request may close this issue.