Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka commitOffsets #9283

Closed
1 task done
davidschuette opened this issue Mar 2, 2022 · 5 comments
Closed
1 task done

Kafka commitOffsets #9283

davidschuette opened this issue Mar 2, 2022 · 5 comments
Labels
needs triage This issue has not been looked into type: enhancement 🐺

Comments

@davidschuette
Copy link
Contributor

davidschuette commented Mar 2, 2022

Is there an existing issue that is already proposing this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe it

I tried implementing the new commitOffsets function of ClientKafka, I added. This is when I realized that the instance of ClientKafka that will be injected, is only used for producing not consuming messages. As a result the consumer prop is undefined and cannot be used to commit offsets.

To properly use the commitOffsets function of a consumer, access to ServerKafka is needed which is used when connecting or creating a microservice in main.ts.

Describe the solution you'd like

I have thought of two other options to provide access to commitOffsets.

  1. As part of the @Ctx() decorator.
@EventPattern('topic')
async handleEvent(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext): Promise<void> {
  // handle event logic
 await context.commitOffset()
}
  1. As an option when configuring kafka in main.ts
app.connectMicroservice({
  transport: Transport.KAFKA,
  options: {
    commitAfterFunctionCompleted: true, // This is the new option
    client: {
      clientId: 'client-id',
      brokers: ['localhost:9092']
    },
    consumer: {
      groupId: 'group-id',
    },
    run: {
      autoCommit: false,
    },
  },
})

This would await the callback that is provided by the @EventPattern('topic') decorator and then commit the offset automatically without any additional logic needed.
The example from 1. could be simplified as follows:

@EventPattern('topic')
async handleEvent(@Payload() data: IncomingMessage): Promise<void> {
  // handle event logic
}

Teachability, documentation, adoption, migration strategy

This enables a simple but still flexible approach to commit offsets when working with kafka.

What is the motivation / use case for changing the behavior?

The implementation I previously offered did not meet my expectations.

@davidschuette davidschuette added needs triage This issue has not been looked into type: enhancement 🐺 labels Mar 2, 2022
@kamilmysliwiec
Copy link
Member

This would await the callback that is provided by the @EventPattern('topic') decorator and then commit the offset automatically without any additional logic needed.

What's the difference between this approach and the default behavior when autoCommit: true?

As part of the @ctx() decorator.

Sounds good. Would you like to create a PR for this issue? @davidschuette

@davidschuette
Copy link
Contributor Author

What's the difference between this approach and the default behavior when autoCommit: true?

I have to do more investigation but I think kafkajs commits offsets regardless of the result of the callback (e.g. commits offset even though callback threw error).

I will create a PR for the this issue. For now just for the @Ctx() feature.

davidschuette added a commit to davidschuette/nest that referenced this issue Mar 4, 2022
Provide access to kafkajs commitOffset by using @ctx() decorator when handeling an event.

This commit closes Kafka commitOffsets nestjs#9283.
davidschuette added a commit to davidschuette/nest that referenced this issue Mar 4, 2022
Error thrown by event handling method are no longer being caught by RcpExecptionFilter.
Instead errors are passed to kafkajs's eachMessage.
This results in proper interaction with kafka.

This commit closes Kafka commitOffsets nestjs#9283.
@davidschuette
Copy link
Contributor Author

After doing more investigation, I determined that the catching of errors in RpcProxy is problematic when working with kafkajs. Throwing in error in the callback should pass that error to kafkajs's eachMessage. This stops kafkajs from auto-committing the offset.
This is the behavior I expected.
This is also the problem mentioned in issue #9044.

@kamilmysliwiec
Copy link
Member

Let's track this here #9293

@dangoslen
Copy link

As a (very late) follow-up to this, I think there is still value for having a ctx.commitOffset() available - mainly for syntax sugar - for consuming applications that want to have more fine-grained control of offsets.

If that is still valuable, I'm open to making a PR to add that utility method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage This issue has not been looked into type: enhancement 🐺
Projects
None yet
Development

No branches or pull requests

3 participants