From b47fd6bb36f9978d8e76fe2c327cc46bd9971d30 Mon Sep 17 00:00:00 2001 From: Bob Stasyszyn Date: Sun, 8 May 2022 11:47:56 -0400 Subject: [PATCH] Delivery confirmation of published messages This commit adds a flag to the publish config that turns on delivery confirmation. If set to true then the Publish function waits until a confirmation is received from the AMQP server to guarantee that the message is delivered. By default, delivery of published messages is not confirmed. Signed-off-by: Bob Stasyszyn --- pkg/amqp/channelprovider.go | 94 ++++++++++++++++++++++++++++++++----- pkg/amqp/config.go | 5 ++ pkg/amqp/publisher.go | 24 +++++++--- pkg/amqp/pubsub_test.go | 53 +++++++++++++++++++++ 4 files changed, 158 insertions(+), 18 deletions(-) diff --git a/pkg/amqp/channelprovider.go b/pkg/amqp/channelprovider.go index da10af3..cedcefa 100644 --- a/pkg/amqp/channelprovider.go +++ b/pkg/amqp/channelprovider.go @@ -10,7 +10,14 @@ import ( ) type channel interface { + // AMQPChannel returns the underlying AMQP channel. AMQPChannel() *amqp.Channel + // DeliveryConfirmationEnabled returns true if delivery confirmation of published messages is enabled. + DeliveryConfirmationEnabled() bool + // Delivered waits until confirmation of delivery has been received from the AMQP server and returns true if delivery + // was successful, otherwise false is returned. If delivery confirmation is not enabled then true is immediately returned. + Delivered() bool + // Close closes the channel. Close() error } @@ -20,27 +27,32 @@ type channelProvider interface { Close() } -func newChannelProvider(conn *ConnectionWrapper, poolSize int, logger watermill.LoggerAdapter) (channelProvider, error) { +func newChannelProvider(conn *ConnectionWrapper, poolSize int, confirmDelivery bool, + logger watermill.LoggerAdapter) (channelProvider, error) { if poolSize == 0 { - return &defaultChannelProvider{conn}, nil + return newDefaultChannelProvider(conn, confirmDelivery), nil } - return newPooledChannelProvider(conn, poolSize, logger) + return newPooledChannelProvider(conn, poolSize, confirmDelivery, logger) } type pooledChannel struct { - logger watermill.LoggerAdapter - conn *ConnectionWrapper - amqpChan *amqp.Channel - closedChan chan *amqp.Error + logger watermill.LoggerAdapter + conn *ConnectionWrapper + amqpChan *amqp.Channel + closedChan chan *amqp.Error + confirmDelivery bool + confirmChan chan amqp.Confirmation } -func newPooledChannel(conn *ConnectionWrapper, logger watermill.LoggerAdapter) (*pooledChannel, error) { +func newPooledChannel(conn *ConnectionWrapper, logger watermill.LoggerAdapter, confirmDelivery bool) (*pooledChannel, error) { c := &pooledChannel{ logger, conn, nil, nil, + confirmDelivery, + nil, } if err := c.openAMQPChannel(); err != nil { @@ -54,6 +66,22 @@ func (c *pooledChannel) AMQPChannel() *amqp.Channel { return c.amqpChan } +func (c *pooledChannel) Delivered() bool { + if c.confirmChan == nil { + // Delivery confirmation is not enabled. Simply return true. + return true + } + + confirmed := <-c.confirmChan + + return confirmed.Ack +} + +// DeliveryConfirmationEnabled returns true if delivery confirmation of published messages is enabled. +func (c *pooledChannel) DeliveryConfirmationEnabled() bool { + return c.confirmChan != nil +} + func (c *pooledChannel) openAMQPChannel() error { var err error @@ -66,6 +94,15 @@ func (c *pooledChannel) openAMQPChannel() error { c.amqpChan.NotifyClose(c.closedChan) + if c.confirmDelivery { + err = c.amqpChan.Confirm(false) + if err != nil { + return fmt.Errorf("confirm AMQP channel: %w", err) + } + + c.confirmChan = c.amqpChan.NotifyPublish(make(chan amqp.Confirmation, 1)) + } + return nil } @@ -86,16 +123,37 @@ func (c *pooledChannel) Close() error { type channelWrapper struct { *amqp.Channel + confirmChan chan amqp.Confirmation } func (c *channelWrapper) AMQPChannel() *amqp.Channel { return c.Channel } +func (c *channelWrapper) DeliveryConfirmationEnabled() bool { + return c.confirmChan != nil +} + +func (c *channelWrapper) Delivered() bool { + if c.confirmChan == nil { + // Delivery confirmation is not enabled. Simply return true. + return true + } + + confirmed := <-c.confirmChan + + return confirmed.Ack +} + // defaultChannelProvider simply opens a new channel when Channel() is called and closes the channel // when CloseChannel is called. type defaultChannelProvider struct { - conn *ConnectionWrapper + conn *ConnectionWrapper + confirmDelivery bool +} + +func newDefaultChannelProvider(conn *ConnectionWrapper, confirmDelivery bool) *defaultChannelProvider { + return &defaultChannelProvider{conn, confirmDelivery} } func (p *defaultChannelProvider) Channel() (channel, error) { @@ -104,7 +162,18 @@ func (p *defaultChannelProvider) Channel() (channel, error) { return nil, fmt.Errorf("create AMQP channel: %w", err) } - return &channelWrapper{Channel: amqpChan}, nil + var confirmChan chan amqp.Confirmation + + if p.confirmDelivery { + err = amqpChan.Confirm(false) + if err != nil { + return nil, fmt.Errorf("confirm AMQP channel: %w", err) + } + + confirmChan = amqpChan.NotifyPublish(make(chan amqp.Confirmation, 1)) + } + + return &channelWrapper{amqpChan, confirmChan}, nil } func (p *defaultChannelProvider) CloseChannel(c channel) error { @@ -130,7 +199,8 @@ type pooledChannelProvider struct { closedChan chan struct{} } -func newPooledChannelProvider(conn *ConnectionWrapper, poolSize int, logger watermill.LoggerAdapter) (channelProvider, error) { +func newPooledChannelProvider(conn *ConnectionWrapper, poolSize int, confirmDelivery bool, + logger watermill.LoggerAdapter) (channelProvider, error) { logger.Info("Creating pooled channel provider", watermill.LogFields{"pool-size": poolSize}) channels := make([]*pooledChannel, poolSize) @@ -140,7 +210,7 @@ func newPooledChannelProvider(conn *ConnectionWrapper, poolSize int, logger wate // Create the channels and add them to the pool. for i := 0; i < poolSize; i++ { - c, err := newPooledChannel(conn, logger) + c, err := newPooledChannel(conn, logger, confirmDelivery) if err != nil { return nil, err } diff --git a/pkg/amqp/config.go b/pkg/amqp/config.go index eebb534..4bc9e45 100644 --- a/pkg/amqp/config.go +++ b/pkg/amqp/config.go @@ -407,6 +407,11 @@ type PublishConfig struct { // If this value is set to 0 (default) then channels are not pooled and a new channel is opened/closed for every // Publish operation. ChannelPoolSize int + + // ConfirmDelivery indicates whether the Publish function should wait until a confirmation is received from + // the AMQP server in order to guarantee that the message is delivered. Setting this value to true may + // negatively impact performance but will increase reliability. + ConfirmDelivery bool } type ConsumeConfig struct { diff --git a/pkg/amqp/publisher.go b/pkg/amqp/publisher.go index 679b7c3..de318ca 100644 --- a/pkg/amqp/publisher.go +++ b/pkg/amqp/publisher.go @@ -33,7 +33,7 @@ func NewPublisher(config Config, logger watermill.LoggerAdapter) (*Publisher, er return nil, fmt.Errorf("create new connection: %w", err) } - chanProvider, err := newChannelProvider(conn, config.Publish.ChannelPoolSize, logger) + chanProvider, err := newChannelProvider(conn, config.Publish.ChannelPoolSize, config.Publish.ConfirmDelivery, logger) if err != nil { return nil, fmt.Errorf("create new channel pool: %w", err) } @@ -62,7 +62,7 @@ func NewPublisherWithConnection(config Config, logger watermill.LoggerAdapter, c return nil, err } - chanProvider, err := newChannelProvider(conn, config.Publish.ChannelPoolSize, logger) + chanProvider, err := newChannelProvider(conn, config.Publish.ChannelPoolSize, config.Publish.ConfirmDelivery, logger) if err != nil { return nil, fmt.Errorf("create new channel pool: %w", err) } @@ -140,7 +140,7 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) (err err logFields["amqp_routing_key"] = routingKey for _, msg := range messages { - if err := p.publishMessage(exchangeName, routingKey, msg, channel, logFields); err != nil { + if err := p.publishMessage(exchangeName, routingKey, msg, c, logFields); err != nil { return err } } @@ -175,7 +175,7 @@ func (p *Publisher) commitTransaction(channel *amqp.Channel, err error) error { func (p *Publisher) publishMessage( exchangeName, routingKey string, msg *message.Message, - channel *amqp.Channel, + channel channel, logFields watermill.LogFields, ) error { logFields = logFields.Add(watermill.LogFields{"message_uuid": msg.UUID}) @@ -187,7 +187,7 @@ func (p *Publisher) publishMessage( return errors.Wrap(err, "cannot marshal message") } - if err = channel.Publish( + if err = channel.AMQPChannel().Publish( exchangeName, routingKey, p.config.Publish.Mandatory, @@ -197,7 +197,19 @@ func (p *Publisher) publishMessage( return errors.Wrap(err, "cannot publish msg") } - p.logger.Trace("Message published", logFields) + if !channel.DeliveryConfirmationEnabled() { + p.logger.Trace("Message published", logFields) + + return nil + } + + p.logger.Trace("Message published. Waiting for delivery confirmation.", logFields) + + if !channel.Delivered() { + return fmt.Errorf("delivery not confirmed for message [%s]", msg.UUID) + } + + p.logger.Trace("Delivery confirmed for message", logFields) return nil } diff --git a/pkg/amqp/pubsub_test.go b/pkg/amqp/pubsub_test.go index b5516b6..05b9f94 100644 --- a/pkg/amqp/pubsub_test.go +++ b/pkg/amqp/pubsub_test.go @@ -32,6 +32,17 @@ func createPubSub(t *testing.T) (message.Publisher, message.Subscriber) { return createPubSubWithConfig(t, publisherCfg) } +func createPubSubWithDeliveryConfirmation(t *testing.T) (message.Publisher, message.Subscriber) { + publisherCfg := amqp.NewDurablePubSubConfig( + amqpURI(), + nil, + ) + + publisherCfg.Publish.ConfirmDelivery = true + + return createPubSubWithConfig(t, publisherCfg) +} + func createPubSubWithPublisherChannelPool(t *testing.T) (message.Publisher, message.Subscriber) { publisherCfg := amqp.NewDurablePubSubConfig( amqpURI(), @@ -43,6 +54,18 @@ func createPubSubWithPublisherChannelPool(t *testing.T) (message.Publisher, mess return createPubSubWithConfig(t, publisherCfg) } +func createPubSubWithPublisherChannelPoolAndDeliveryConfirmation(t *testing.T) (message.Publisher, message.Subscriber) { + publisherCfg := amqp.NewDurablePubSubConfig( + amqpURI(), + nil, + ) + + publisherCfg.Publish.ChannelPoolSize = 50 + publisherCfg.Publish.ConfirmDelivery = true + + return createPubSubWithConfig(t, publisherCfg) +} + func createPubSubWithConfig(t *testing.T, publisherCfg amqp.Config) (message.Publisher, message.Subscriber) { publisher, err := amqp.NewPublisher( publisherCfg, @@ -121,6 +144,21 @@ func TestPublishSubscribe_pubsub(t *testing.T) { ) } +func TestPublishSubscribe_pubsub_delivery_confirmation(t *testing.T) { + tests.TestPubSub( + t, + tests.Features{ + ConsumerGroups: true, + ExactlyOnceDelivery: false, + GuaranteedOrder: true, + GuaranteedOrderWithSingleSubscriber: true, + Persistent: true, + }, + createPubSubWithDeliveryConfirmation, + createPubSubWithConsumerGroup, + ) +} + func TestPublishSubscribe_pubsub_with_channel_pool(t *testing.T) { tests.TestPubSub( t, @@ -136,6 +174,21 @@ func TestPublishSubscribe_pubsub_with_channel_pool(t *testing.T) { ) } +func TestPublishSubscribe_pubsub_with_channel_pool_and_delivery_confirmation(t *testing.T) { + tests.TestPubSub( + t, + tests.Features{ + ConsumerGroups: true, + ExactlyOnceDelivery: false, + GuaranteedOrder: true, + GuaranteedOrderWithSingleSubscriber: true, + Persistent: true, + }, + createPubSubWithPublisherChannelPoolAndDeliveryConfirmation, + createPubSubWithConsumerGroup, + ) +} + func createQueuePubSub(t *testing.T) (message.Publisher, message.Subscriber) { config := amqp.NewDurableQueueConfig( amqpURI(),