Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration option to allow ignoring command reply publishing errors #441

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions components/requestreply/backend_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type PubSubBackendModifyNotificationMessageFn func(msg *message.Message, params

type PubSubBackendOnListenForReplyFinishedFn func(ctx context.Context, params PubSubBackendSubscribeParams)

type ReplyPublishErrorHandler func(replyTopic string, notificationMsg *message.Message, err error) error

type PubSubBackendConfig struct {
Publisher message.Publisher
SubscriberConstructor PubSubBackendSubscriberConstructorFn
Expand All @@ -85,10 +87,15 @@ type PubSubBackendConfig struct {
OnListenForReplyFinished PubSubBackendOnListenForReplyFinishedFn

// AckCommandErrors determines if the command should be acked or nacked when handler returns an error.
// Command will be always nacked, when sending reply fails.
// Command will be nacked by default when sending reply fails, you can control this behaviour with the
// ReplyPublishErrorHandler config option.
// You should use this option instead of cqrs.CommandProcessorConfig.AckCommandHandlingErrors, as it's aware
// if error was returned by handler or sending reply failed.
AckCommandErrors bool

// ReplyPublishErrorHandler if not nil will be invoked when sending the reply fails. If it returns an error
// the command will ba nacked.
ReplyPublishErrorHandler ReplyPublishErrorHandler
}

func (p *PubSubBackendConfig) setDefaults() {
Expand Down Expand Up @@ -245,7 +252,13 @@ func (p PubSubBackend[Result]) OnCommandProcessed(ctx context.Context, params Ba
return errors.Wrap(err, "cannot generate request/reply notify topic")
}

if err := p.config.Publisher.Publish(replyTopic, notificationMsg); err != nil {
err = p.config.Publisher.Publish(replyTopic, notificationMsg)
if err != nil {
if p.config.ReplyPublishErrorHandler != nil {
err = p.config.ReplyPublishErrorHandler(replyTopic, notificationMsg, err)
}
}
if err != nil {
return errors.Wrap(err, "cannot publish command executed message")
}

Expand Down
Loading