Skip to content

Commit

Permalink
Change Build Topology to accept the routing key and topic (#27)
Browse files Browse the repository at this point in the history
Original changes by @tobiasjaster in #19
  • Loading branch information
m110 authored Oct 18, 2024
1 parent 5eb52b9 commit 1e388d6
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 54 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/ThreeDotsLabs/watermill-amqp/v2
module github.com/ThreeDotsLabs/watermill-amqp/v3

go 1.21

Expand Down
135 changes: 108 additions & 27 deletions pkg/amqp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator
Marshaler: DefaultMarshaler{},

Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return topic
},
Type: "fanout",
Durable: true,
GenerateName: GenerateExchangeNameTopicName,
Type: "fanout",
Durable: true,
},
Queue: QueueConfig{
GenerateName: generateQueueName,
Expand All @@ -61,13 +59,13 @@ func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator
}
}

// NewNonDurablePubSubConfig creates config for non durable PubSub.
// NewNonDurablePubSubConfig creates config for non-durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and routing key is empty.
// Exchange name is set to the topic name and the routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
// To check how the topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
Expand All @@ -80,10 +78,8 @@ func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenera
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},

Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return topic
},
Type: "fanout",
GenerateName: GenerateExchangeNameTopicName,
Type: "fanout",
},
Queue: QueueConfig{
GenerateName: generateQueueName,
Expand Down Expand Up @@ -126,9 +122,7 @@ func NewDurableQueueConfig(amqpURI string) Config {
Marshaler: DefaultMarshaler{},

Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return ""
},
GenerateName: GenerateExchangeNameConstant(""),
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameTopicName,
Expand All @@ -153,12 +147,12 @@ func NewDurableQueueConfig(amqpURI string) Config {
}
}

// NewNonDurableQueueConfig creates config for non durable Queue.
// NewNonDurableQueueConfig creates config for non-durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
// To check how the topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
Expand All @@ -171,9 +165,7 @@ func NewNonDurableQueueConfig(amqpURI string) Config {
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},

Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return ""
},
GenerateName: GenerateExchangeNameConstant(""),
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameTopicName,
Expand All @@ -197,6 +189,79 @@ func NewNonDurableQueueConfig(amqpURI string) Config {
}
}

// NewDurableTopicConfig creates config for topic exchange for durable Queue.
// Queue name and Exchange are set to the parameters.
func NewDurableTopicConfig(amqpURI string, exchange string, queue string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},

Marshaler: DefaultMarshaler{},

Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(exchange),
Type: "topic",
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameConstant(queue),
Durable: true,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}

// NewNonDurableTopicConfig creates config for topic exchange for non-durable Queue.
// Queue name and Exchange are set to the parameters.
func NewNonDurableTopicConfig(amqpURI string, exchange string, queue string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},

Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},

Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(exchange),
Type: "topic",
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameConstant(queue),
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}

type Config struct {
Connection ConnectionConfig

Expand Down Expand Up @@ -275,6 +340,21 @@ type ConnectionConfig struct {
Reconnect *ReconnectConfig
}

// QueueNameGenerator generates QueueName based on the topic.
type ExchangeNameGenerator func(topic string) string

// GenerateExchangeNameTopicName generates exchangeName equal to the topic.
func GenerateExchangeNameTopicName(topic string) string {
return topic
}

// GenerateExchangeNameConstant generates exchangeName equal to exchangeName.
func GenerateExchangeNameConstant(exchangeName string) ExchangeNameGenerator {
return func(topic string) string {
return exchangeName
}
}

// Config descriptions are based on descriptions from: https://github.com/streadway/amqp
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
// BSD 2-Clause "Simplified" License
Expand All @@ -287,7 +367,7 @@ type ExchangeConfig struct {
// "amq." if the passive option is set, or the exchange already exists. Names can
// consist of a non-empty sequence of letters, digits, hyphen, underscore,
// period, or colon.
GenerateName func(topic string) string
GenerateName ExchangeNameGenerator

// Each exchange belongs to one of a set of exchange kinds/types implemented by
// the server. The exchange types define the functionality of the exchange - i.e.
Expand Down Expand Up @@ -352,7 +432,7 @@ func GenerateQueueNameTopicNameWithSuffix(suffix string) QueueNameGenerator {
}

type QueueConfig struct {
// GenerateRoutingKey is generated based on the topic provided for Subscribe.
// GenerateName generates the queue name based on the topic provided for Publish or Subscribe method.
GenerateName QueueNameGenerator

// Durable and Non-Auto-Deleted queues will survive server restarts and remain
Expand All @@ -377,12 +457,12 @@ type QueueConfig struct {
// delete a queue with the same name.
Exclusive bool

// When noWait is true, the queue will assume to be declared on the server. A
// channel exception will arrive if the conditions are met for existing queues
// When NoWait is true, the queue is assumed to be declared on the server.
// A channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
NoWait bool

// Optional amqpe.Table of arguments that are specific to the server's implementation of
// Optional amqp.Table of arguments that are specific to the server's implementation of
// the queue can be sent for queue types that require extra parameters.
Arguments amqp.Table
}
Expand All @@ -391,6 +471,7 @@ type QueueConfig struct {
// be routed to the queue when the publishing routing key matches the binding
// routing key.
type QueueBindConfig struct {
// GenerateRoutingKey generates the routing key based on the topic provided Subscribe.
GenerateRoutingKey func(topic string) string

// When noWait is false and the queue could not be bound, the channel will be
Expand All @@ -403,7 +484,7 @@ type QueueBindConfig struct {
}

type PublishConfig struct {
// GenerateRoutingKey is generated based on the topic provided for Publish.
// GenerateRoutingKey generates the routing key based on the topic provided for Publish.
GenerateRoutingKey func(topic string) string

// Publishings can be undeliverable when the mandatory flag is true and no queue is
Expand Down Expand Up @@ -464,7 +545,7 @@ type ConsumeConfig struct {
Arguments amqp.Table
}

// Qos controls how many messages or how many bytes the server will try to keep on
// QosConfig controls how many messages or how many bytes the server will try to keep on
// the network for consumers before receiving delivery acks. The intent of Qos is
// to make sure the network buffers stay full between the server and client.
type QosConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp/correlatingmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
stdAmqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// - TLS support
// - Publish Transactions support (optional, can be enabled in config)
//
// Nomenclature
// # Nomenclature
//
// Unfortunately, Watermill's nomenclature is not fully compatible with AMQP's nomenclature.
// Depending of the configuration, topic can be mapped to exchange name, routing key and queue name.
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
stdAmqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
Expand Down
1 change: 1 addition & 0 deletions pkg/amqp/pubsub_reconnect_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build reconnect
// +build reconnect

package amqp_test
Expand Down
1 change: 1 addition & 0 deletions pkg/amqp/pubsub_stress_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build stress
// +build stress

package amqp_test
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/tests"
)
Expand Down
37 changes: 28 additions & 9 deletions pkg/amqp/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewSubscriberWithConnection(config Config, logger watermill.LoggerAdapter,

// Subscribe consumes messages from AMQP broker.
//
// Watermill's topic in Subscribe is not mapped to AMQP's topic, but depending on configuration it can be mapped
// Watermill's topic in Subscribe is not mapped to AMQP's topic, but depending on configuration, it can be mapped
// to exchange, queue or routing key.
// For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
Expand All @@ -122,7 +122,17 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
exchangeName := s.config.Exchange.GenerateName(topic)
logFields["amqp_exchange_name"] = exchangeName

if err := s.prepareConsume(queueName, exchangeName, logFields); err != nil {
routingKey := s.config.QueueBind.GenerateRoutingKey(topic)
logFields["amqp_routing_key"] = routingKey

params := BuildTopologyParams{
Topic: topic,
QueueName: queueName,
ExchangeName: exchangeName,
RoutingKey: routingKey,
}

if err := s.prepareConsume(params, logFields); err != nil {
return nil, errors.Wrap(err, "failed to prepare consume")
}

Expand All @@ -137,7 +147,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
s.subscriberWaitGroup.Done()
}()

reconnecting := false
reconnecting := false
ReconnectLoop:
for {
s.logger.Debug("Waiting for s.connected or s.closing in ReconnectLoop", logFields)
Expand All @@ -155,7 +165,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
}

if reconnecting {
if err := s.prepareConsume(queueName, exchangeName, logFields); err != nil {
if err := s.prepareConsume(params, logFields); err != nil {
s.logger.Error("Failed to prepare consume", err, logFields)
}
}
Expand All @@ -164,7 +174,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
case <-s.connected:
s.logger.Debug("Connection established in ReconnectLoop", logFields)
// runSubscriber blocks until connection fails or Close() is called
s.runSubscriber(ctx, out, queueName, exchangeName, logFields)
s.runSubscriber(ctx, out, queueName, logFields)
case <-s.closing:
s.logger.Debug("Stopping ReconnectLoop (closing)", logFields)
break ReconnectLoop
Expand Down Expand Up @@ -199,17 +209,27 @@ func (s *Subscriber) SubscribeInitialize(topic string) (err error) {
exchangeName := s.config.Exchange.GenerateName(topic)
logFields["amqp_exchange_name"] = exchangeName

routingKey := s.config.QueueBind.GenerateRoutingKey(topic)
logFields["amqp_routing_key"] = routingKey

s.logger.Info("Initializing subscribe", logFields)

return errors.Wrap(s.prepareConsume(queueName, exchangeName, logFields), "failed to prepare consume")
params := BuildTopologyParams{
Topic: topic,
QueueName: queueName,
ExchangeName: exchangeName,
RoutingKey: routingKey,
}

return errors.Wrap(s.prepareConsume(params, logFields), "failed to prepare consume")
}

// Close closes all subscriptions with their output channels.
func (s *Subscriber) Close() error {
return s.closeSubscriber()
}

func (s *Subscriber) prepareConsume(queueName string, exchangeName string, logFields watermill.LogFields) (err error) {
func (s *Subscriber) prepareConsume(params BuildTopologyParams, logFields watermill.LogFields) (err error) {
channel, err := s.openSubscribeChannel(logFields)
if err != nil {
return err
Expand All @@ -220,7 +240,7 @@ func (s *Subscriber) prepareConsume(queueName string, exchangeName string, logFi
}
}()

if err = s.config.TopologyBuilder.BuildTopology(channel, queueName, exchangeName, s.config, s.logger); err != nil {
if err = s.config.TopologyBuilder.BuildTopology(channel, params, s.config, s.logger); err != nil {
return err
}

Expand All @@ -233,7 +253,6 @@ func (s *Subscriber) runSubscriber(
ctx context.Context,
out chan *message.Message,
queueName string,
exchangeName string,
logFields watermill.LogFields,
) {
channel, err := s.openSubscribeChannel(logFields)
Expand Down
Loading

0 comments on commit 1e388d6

Please sign in to comment.