diff --git a/components/requestreply/backend_pubsub.go b/components/requestreply/backend_pubsub.go index d6fb90a5d..7d9a42c5e 100644 --- a/components/requestreply/backend_pubsub.go +++ b/components/requestreply/backend_pubsub.go @@ -69,6 +69,8 @@ type PubSubBackendModifyNotificationMessageFn func(msg *message.Message, params type PubSubBackendOnListenForReplyFinishedFn func(ctx context.Context, params PubSubBackendSubscribeParams) +type ReplyPublishErrorHandler func(replyTopic string, notificationMsg *message.Message, err error) error + type PubSubBackendConfig struct { Publisher message.Publisher SubscriberConstructor PubSubBackendSubscriberConstructorFn @@ -85,10 +87,15 @@ type PubSubBackendConfig struct { OnListenForReplyFinished PubSubBackendOnListenForReplyFinishedFn // AckCommandErrors determines if the command should be acked or nacked when handler returns an error. - // Command will be always nacked, when sending reply fails. + // Command will be nacked by default when sending reply fails, you can control this behaviour with the + // ReplyPublishErrorHandler config option. // You should use this option instead of cqrs.CommandProcessorConfig.AckCommandHandlingErrors, as it's aware // if error was returned by handler or sending reply failed. AckCommandErrors bool + + // ReplyPublishErrorHandler if not nil will be invoked when sending the reply fails. If it returns an error + // the command will ba nacked. + ReplyPublishErrorHandler ReplyPublishErrorHandler } func (p *PubSubBackendConfig) setDefaults() { @@ -245,7 +252,13 @@ func (p PubSubBackend[Result]) OnCommandProcessed(ctx context.Context, params Ba return errors.Wrap(err, "cannot generate request/reply notify topic") } - if err := p.config.Publisher.Publish(replyTopic, notificationMsg); err != nil { + err = p.config.Publisher.Publish(replyTopic, notificationMsg) + if err != nil { + if p.config.ReplyPublishErrorHandler != nil { + err = p.config.ReplyPublishErrorHandler(replyTopic, notificationMsg, err) + } + } + if err != nil { return errors.Wrap(err, "cannot publish command executed message") }