Skip to content

Commit

Permalink
Delivery confirmation of published messages
Browse files Browse the repository at this point in the history
This commit adds a flag to the publish config that turns on delivery confirmation. If set to true then the Publish function waits until a confirmation is received from the AMQP server to guarantee that the message is delivered. By default, delivery of published messages is not confirmed.

Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed May 8, 2022
1 parent dce6c03 commit b47fd6b
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 18 deletions.
94 changes: 82 additions & 12 deletions pkg/amqp/channelprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ import (
)

type channel interface {
// AMQPChannel returns the underlying AMQP channel.
AMQPChannel() *amqp.Channel
// DeliveryConfirmationEnabled returns true if delivery confirmation of published messages is enabled.
DeliveryConfirmationEnabled() bool
// Delivered waits until confirmation of delivery has been received from the AMQP server and returns true if delivery
// was successful, otherwise false is returned. If delivery confirmation is not enabled then true is immediately returned.
Delivered() bool
// Close closes the channel.
Close() error
}

Expand All @@ -20,27 +27,32 @@ type channelProvider interface {
Close()
}

func newChannelProvider(conn *ConnectionWrapper, poolSize int, logger watermill.LoggerAdapter) (channelProvider, error) {
func newChannelProvider(conn *ConnectionWrapper, poolSize int, confirmDelivery bool,
logger watermill.LoggerAdapter) (channelProvider, error) {
if poolSize == 0 {
return &defaultChannelProvider{conn}, nil
return newDefaultChannelProvider(conn, confirmDelivery), nil
}

return newPooledChannelProvider(conn, poolSize, logger)
return newPooledChannelProvider(conn, poolSize, confirmDelivery, logger)
}

type pooledChannel struct {
logger watermill.LoggerAdapter
conn *ConnectionWrapper
amqpChan *amqp.Channel
closedChan chan *amqp.Error
logger watermill.LoggerAdapter
conn *ConnectionWrapper
amqpChan *amqp.Channel
closedChan chan *amqp.Error
confirmDelivery bool
confirmChan chan amqp.Confirmation
}

func newPooledChannel(conn *ConnectionWrapper, logger watermill.LoggerAdapter) (*pooledChannel, error) {
func newPooledChannel(conn *ConnectionWrapper, logger watermill.LoggerAdapter, confirmDelivery bool) (*pooledChannel, error) {
c := &pooledChannel{
logger,
conn,
nil,
nil,
confirmDelivery,
nil,
}

if err := c.openAMQPChannel(); err != nil {
Expand All @@ -54,6 +66,22 @@ func (c *pooledChannel) AMQPChannel() *amqp.Channel {
return c.amqpChan
}

func (c *pooledChannel) Delivered() bool {
if c.confirmChan == nil {
// Delivery confirmation is not enabled. Simply return true.
return true
}

confirmed := <-c.confirmChan

return confirmed.Ack
}

// DeliveryConfirmationEnabled returns true if delivery confirmation of published messages is enabled.
func (c *pooledChannel) DeliveryConfirmationEnabled() bool {
return c.confirmChan != nil
}

func (c *pooledChannel) openAMQPChannel() error {
var err error

Expand All @@ -66,6 +94,15 @@ func (c *pooledChannel) openAMQPChannel() error {

c.amqpChan.NotifyClose(c.closedChan)

if c.confirmDelivery {
err = c.amqpChan.Confirm(false)
if err != nil {
return fmt.Errorf("confirm AMQP channel: %w", err)
}

c.confirmChan = c.amqpChan.NotifyPublish(make(chan amqp.Confirmation, 1))
}

return nil
}

Expand All @@ -86,16 +123,37 @@ func (c *pooledChannel) Close() error {

type channelWrapper struct {
*amqp.Channel
confirmChan chan amqp.Confirmation
}

func (c *channelWrapper) AMQPChannel() *amqp.Channel {
return c.Channel
}

func (c *channelWrapper) DeliveryConfirmationEnabled() bool {
return c.confirmChan != nil
}

func (c *channelWrapper) Delivered() bool {
if c.confirmChan == nil {
// Delivery confirmation is not enabled. Simply return true.
return true
}

confirmed := <-c.confirmChan

return confirmed.Ack
}

// defaultChannelProvider simply opens a new channel when Channel() is called and closes the channel
// when CloseChannel is called.
type defaultChannelProvider struct {
conn *ConnectionWrapper
conn *ConnectionWrapper
confirmDelivery bool
}

func newDefaultChannelProvider(conn *ConnectionWrapper, confirmDelivery bool) *defaultChannelProvider {
return &defaultChannelProvider{conn, confirmDelivery}
}

func (p *defaultChannelProvider) Channel() (channel, error) {
Expand All @@ -104,7 +162,18 @@ func (p *defaultChannelProvider) Channel() (channel, error) {
return nil, fmt.Errorf("create AMQP channel: %w", err)
}

return &channelWrapper{Channel: amqpChan}, nil
var confirmChan chan amqp.Confirmation

if p.confirmDelivery {
err = amqpChan.Confirm(false)
if err != nil {
return nil, fmt.Errorf("confirm AMQP channel: %w", err)
}

confirmChan = amqpChan.NotifyPublish(make(chan amqp.Confirmation, 1))
}

return &channelWrapper{amqpChan, confirmChan}, nil
}

func (p *defaultChannelProvider) CloseChannel(c channel) error {
Expand All @@ -130,7 +199,8 @@ type pooledChannelProvider struct {
closedChan chan struct{}
}

func newPooledChannelProvider(conn *ConnectionWrapper, poolSize int, logger watermill.LoggerAdapter) (channelProvider, error) {
func newPooledChannelProvider(conn *ConnectionWrapper, poolSize int, confirmDelivery bool,
logger watermill.LoggerAdapter) (channelProvider, error) {
logger.Info("Creating pooled channel provider", watermill.LogFields{"pool-size": poolSize})

channels := make([]*pooledChannel, poolSize)
Expand All @@ -140,7 +210,7 @@ func newPooledChannelProvider(conn *ConnectionWrapper, poolSize int, logger wate
// Create the channels and add them to the pool.

for i := 0; i < poolSize; i++ {
c, err := newPooledChannel(conn, logger)
c, err := newPooledChannel(conn, logger, confirmDelivery)
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/amqp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,11 @@ type PublishConfig struct {
// If this value is set to 0 (default) then channels are not pooled and a new channel is opened/closed for every
// Publish operation.
ChannelPoolSize int

// ConfirmDelivery indicates whether the Publish function should wait until a confirmation is received from
// the AMQP server in order to guarantee that the message is delivered. Setting this value to true may
// negatively impact performance but will increase reliability.
ConfirmDelivery bool
}

type ConsumeConfig struct {
Expand Down
24 changes: 18 additions & 6 deletions pkg/amqp/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewPublisher(config Config, logger watermill.LoggerAdapter) (*Publisher, er
return nil, fmt.Errorf("create new connection: %w", err)
}

chanProvider, err := newChannelProvider(conn, config.Publish.ChannelPoolSize, logger)
chanProvider, err := newChannelProvider(conn, config.Publish.ChannelPoolSize, config.Publish.ConfirmDelivery, logger)
if err != nil {
return nil, fmt.Errorf("create new channel pool: %w", err)
}
Expand Down Expand Up @@ -62,7 +62,7 @@ func NewPublisherWithConnection(config Config, logger watermill.LoggerAdapter, c
return nil, err
}

chanProvider, err := newChannelProvider(conn, config.Publish.ChannelPoolSize, logger)
chanProvider, err := newChannelProvider(conn, config.Publish.ChannelPoolSize, config.Publish.ConfirmDelivery, logger)
if err != nil {
return nil, fmt.Errorf("create new channel pool: %w", err)
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) (err err
logFields["amqp_routing_key"] = routingKey

for _, msg := range messages {
if err := p.publishMessage(exchangeName, routingKey, msg, channel, logFields); err != nil {
if err := p.publishMessage(exchangeName, routingKey, msg, c, logFields); err != nil {
return err
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func (p *Publisher) commitTransaction(channel *amqp.Channel, err error) error {
func (p *Publisher) publishMessage(
exchangeName, routingKey string,
msg *message.Message,
channel *amqp.Channel,
channel channel,
logFields watermill.LogFields,
) error {
logFields = logFields.Add(watermill.LogFields{"message_uuid": msg.UUID})
Expand All @@ -187,7 +187,7 @@ func (p *Publisher) publishMessage(
return errors.Wrap(err, "cannot marshal message")
}

if err = channel.Publish(
if err = channel.AMQPChannel().Publish(
exchangeName,
routingKey,
p.config.Publish.Mandatory,
Expand All @@ -197,7 +197,19 @@ func (p *Publisher) publishMessage(
return errors.Wrap(err, "cannot publish msg")
}

p.logger.Trace("Message published", logFields)
if !channel.DeliveryConfirmationEnabled() {
p.logger.Trace("Message published", logFields)

return nil
}

p.logger.Trace("Message published. Waiting for delivery confirmation.", logFields)

if !channel.Delivered() {
return fmt.Errorf("delivery not confirmed for message [%s]", msg.UUID)
}

p.logger.Trace("Delivery confirmed for message", logFields)

return nil
}
Expand Down
53 changes: 53 additions & 0 deletions pkg/amqp/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ func createPubSub(t *testing.T) (message.Publisher, message.Subscriber) {
return createPubSubWithConfig(t, publisherCfg)
}

func createPubSubWithDeliveryConfirmation(t *testing.T) (message.Publisher, message.Subscriber) {
publisherCfg := amqp.NewDurablePubSubConfig(
amqpURI(),
nil,
)

publisherCfg.Publish.ConfirmDelivery = true

return createPubSubWithConfig(t, publisherCfg)
}

func createPubSubWithPublisherChannelPool(t *testing.T) (message.Publisher, message.Subscriber) {
publisherCfg := amqp.NewDurablePubSubConfig(
amqpURI(),
Expand All @@ -43,6 +54,18 @@ func createPubSubWithPublisherChannelPool(t *testing.T) (message.Publisher, mess
return createPubSubWithConfig(t, publisherCfg)
}

func createPubSubWithPublisherChannelPoolAndDeliveryConfirmation(t *testing.T) (message.Publisher, message.Subscriber) {
publisherCfg := amqp.NewDurablePubSubConfig(
amqpURI(),
nil,
)

publisherCfg.Publish.ChannelPoolSize = 50
publisherCfg.Publish.ConfirmDelivery = true

return createPubSubWithConfig(t, publisherCfg)
}

func createPubSubWithConfig(t *testing.T, publisherCfg amqp.Config) (message.Publisher, message.Subscriber) {
publisher, err := amqp.NewPublisher(
publisherCfg,
Expand Down Expand Up @@ -121,6 +144,21 @@ func TestPublishSubscribe_pubsub(t *testing.T) {
)
}

func TestPublishSubscribe_pubsub_delivery_confirmation(t *testing.T) {
tests.TestPubSub(
t,
tests.Features{
ConsumerGroups: true,
ExactlyOnceDelivery: false,
GuaranteedOrder: true,
GuaranteedOrderWithSingleSubscriber: true,
Persistent: true,
},
createPubSubWithDeliveryConfirmation,
createPubSubWithConsumerGroup,
)
}

func TestPublishSubscribe_pubsub_with_channel_pool(t *testing.T) {
tests.TestPubSub(
t,
Expand All @@ -136,6 +174,21 @@ func TestPublishSubscribe_pubsub_with_channel_pool(t *testing.T) {
)
}

func TestPublishSubscribe_pubsub_with_channel_pool_and_delivery_confirmation(t *testing.T) {
tests.TestPubSub(
t,
tests.Features{
ConsumerGroups: true,
ExactlyOnceDelivery: false,
GuaranteedOrder: true,
GuaranteedOrderWithSingleSubscriber: true,
Persistent: true,
},
createPubSubWithPublisherChannelPoolAndDeliveryConfirmation,
createPubSubWithConsumerGroup,
)
}

func createQueuePubSub(t *testing.T) (message.Publisher, message.Subscriber) {
config := amqp.NewDurableQueueConfig(
amqpURI(),
Expand Down

0 comments on commit b47fd6b

Please sign in to comment.