Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Build Topology to accept the routing key and topic #27

Merged
merged 12 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/ThreeDotsLabs/watermill-amqp/v2
module github.com/ThreeDotsLabs/watermill-amqp/v3

go 1.21

Expand Down
135 changes: 108 additions & 27 deletions pkg/amqp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator
Marshaler: DefaultMarshaler{},

Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return topic
},
Type: "fanout",
Durable: true,
GenerateName: GenerateExchangeNameTopicName,
Type: "fanout",
Durable: true,
},
Queue: QueueConfig{
GenerateName: generateQueueName,
Expand All @@ -61,13 +59,13 @@ func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator
}
}

// NewNonDurablePubSubConfig creates config for non durable PubSub.
// NewNonDurablePubSubConfig creates config for non-durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and routing key is empty.
// Exchange name is set to the topic name and the routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
// To check how the topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
Expand All @@ -80,10 +78,8 @@ func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenera
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},

Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return topic
},
Type: "fanout",
GenerateName: GenerateExchangeNameTopicName,
Type: "fanout",
},
Queue: QueueConfig{
GenerateName: generateQueueName,
Expand Down Expand Up @@ -126,9 +122,7 @@ func NewDurableQueueConfig(amqpURI string) Config {
Marshaler: DefaultMarshaler{},

Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return ""
},
GenerateName: GenerateExchangeNameConstant(""),
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameTopicName,
Expand All @@ -153,12 +147,12 @@ func NewDurableQueueConfig(amqpURI string) Config {
}
}

// NewNonDurableQueueConfig creates config for non durable Queue.
// NewNonDurableQueueConfig creates config for non-durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
// To check how the topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
Expand All @@ -171,9 +165,7 @@ func NewNonDurableQueueConfig(amqpURI string) Config {
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},

Exchange: ExchangeConfig{
GenerateName: func(topic string) string {
return ""
},
GenerateName: GenerateExchangeNameConstant(""),
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameTopicName,
Expand All @@ -197,6 +189,79 @@ func NewNonDurableQueueConfig(amqpURI string) Config {
}
}

// NewDurableTopicConfig creates config for topic exchange for durable Queue.
// Queue name and Exchange are set to the parameters.
func NewDurableTopicConfig(amqpURI string, exchange string, queue string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},

Marshaler: DefaultMarshaler{},

Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(exchange),
Type: "topic",
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameConstant(queue),
Durable: true,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}

// NewNonDurableTopicConfig creates config for topic exchange for non-durable Queue.
// Queue name and Exchange are set to the parameters.
func NewNonDurableTopicConfig(amqpURI string, exchange string, queue string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},

Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},

Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(exchange),
Type: "topic",
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameConstant(queue),
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}

type Config struct {
Connection ConnectionConfig

Expand Down Expand Up @@ -275,6 +340,21 @@ type ConnectionConfig struct {
Reconnect *ReconnectConfig
}

// QueueNameGenerator generates QueueName based on the topic.
type ExchangeNameGenerator func(topic string) string

// GenerateExchangeNameTopicName generates exchangeName equal to the topic.
func GenerateExchangeNameTopicName(topic string) string {
return topic
}

// GenerateExchangeNameConstant generates exchangeName equal to exchangeName.
func GenerateExchangeNameConstant(exchangeName string) ExchangeNameGenerator {
return func(topic string) string {
return exchangeName
}
}

// Config descriptions are based on descriptions from: https://github.com/streadway/amqp
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
// BSD 2-Clause "Simplified" License
Expand All @@ -287,7 +367,7 @@ type ExchangeConfig struct {
// "amq." if the passive option is set, or the exchange already exists. Names can
// consist of a non-empty sequence of letters, digits, hyphen, underscore,
// period, or colon.
GenerateName func(topic string) string
GenerateName ExchangeNameGenerator

// Each exchange belongs to one of a set of exchange kinds/types implemented by
// the server. The exchange types define the functionality of the exchange - i.e.
Expand Down Expand Up @@ -352,7 +432,7 @@ func GenerateQueueNameTopicNameWithSuffix(suffix string) QueueNameGenerator {
}

type QueueConfig struct {
// GenerateRoutingKey is generated based on the topic provided for Subscribe.
// GenerateName generates the queue name based on the topic provided for Publish or Subscribe method.
GenerateName QueueNameGenerator

// Durable and Non-Auto-Deleted queues will survive server restarts and remain
Expand All @@ -377,12 +457,12 @@ type QueueConfig struct {
// delete a queue with the same name.
Exclusive bool

// When noWait is true, the queue will assume to be declared on the server. A
// channel exception will arrive if the conditions are met for existing queues
// When NoWait is true, the queue is assumed to be declared on the server.
// A channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
NoWait bool

// Optional amqpe.Table of arguments that are specific to the server's implementation of
// Optional amqp.Table of arguments that are specific to the server's implementation of
// the queue can be sent for queue types that require extra parameters.
Arguments amqp.Table
}
Expand All @@ -391,6 +471,7 @@ type QueueConfig struct {
// be routed to the queue when the publishing routing key matches the binding
// routing key.
type QueueBindConfig struct {
// GenerateRoutingKey generates the routing key based on the topic provided Subscribe.
GenerateRoutingKey func(topic string) string

// When noWait is false and the queue could not be bound, the channel will be
Expand All @@ -403,7 +484,7 @@ type QueueBindConfig struct {
}

type PublishConfig struct {
// GenerateRoutingKey is generated based on the topic provided for Publish.
// GenerateRoutingKey generates the routing key based on the topic provided for Publish.
GenerateRoutingKey func(topic string) string

// Publishings can be undeliverable when the mandatory flag is true and no queue is
Expand Down Expand Up @@ -464,7 +545,7 @@ type ConsumeConfig struct {
Arguments amqp.Table
}

// Qos controls how many messages or how many bytes the server will try to keep on
// QosConfig controls how many messages or how many bytes the server will try to keep on
// the network for consumers before receiving delivery acks. The intent of Qos is
// to make sure the network buffers stay full between the server and client.
type QosConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp/correlatingmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
stdAmqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// - TLS support
// - Publish Transactions support (optional, can be enabled in config)
//
// Nomenclature
// # Nomenclature
//
// Unfortunately, Watermill's nomenclature is not fully compatible with AMQP's nomenclature.
// Depending of the configuration, topic can be mapped to exchange name, routing key and queue name.
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
stdAmqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
Expand Down
1 change: 1 addition & 0 deletions pkg/amqp/pubsub_reconnect_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build reconnect
// +build reconnect

package amqp_test
Expand Down
1 change: 1 addition & 0 deletions pkg/amqp/pubsub_stress_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build stress
// +build stress

package amqp_test
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp"
"github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/tests"
)
Expand Down
37 changes: 28 additions & 9 deletions pkg/amqp/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewSubscriberWithConnection(config Config, logger watermill.LoggerAdapter,

// Subscribe consumes messages from AMQP broker.
//
// Watermill's topic in Subscribe is not mapped to AMQP's topic, but depending on configuration it can be mapped
// Watermill's topic in Subscribe is not mapped to AMQP's topic, but depending on configuration, it can be mapped
// to exchange, queue or routing key.
// For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
Expand All @@ -122,7 +122,17 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
exchangeName := s.config.Exchange.GenerateName(topic)
logFields["amqp_exchange_name"] = exchangeName

if err := s.prepareConsume(queueName, exchangeName, logFields); err != nil {
routingKey := s.config.QueueBind.GenerateRoutingKey(topic)
logFields["amqp_routing_key"] = routingKey

params := BuildTopologyParams{
Topic: topic,
QueueName: queueName,
ExchangeName: exchangeName,
RoutingKey: routingKey,
}

if err := s.prepareConsume(params, logFields); err != nil {
return nil, errors.Wrap(err, "failed to prepare consume")
}

Expand All @@ -137,7 +147,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
s.subscriberWaitGroup.Done()
}()

reconnecting := false
reconnecting := false
ReconnectLoop:
for {
s.logger.Debug("Waiting for s.connected or s.closing in ReconnectLoop", logFields)
Expand All @@ -155,7 +165,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
}

if reconnecting {
if err := s.prepareConsume(queueName, exchangeName, logFields); err != nil {
if err := s.prepareConsume(params, logFields); err != nil {
s.logger.Error("Failed to prepare consume", err, logFields)
}
}
Expand All @@ -164,7 +174,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
case <-s.connected:
s.logger.Debug("Connection established in ReconnectLoop", logFields)
// runSubscriber blocks until connection fails or Close() is called
s.runSubscriber(ctx, out, queueName, exchangeName, logFields)
s.runSubscriber(ctx, out, queueName, logFields)
case <-s.closing:
s.logger.Debug("Stopping ReconnectLoop (closing)", logFields)
break ReconnectLoop
Expand Down Expand Up @@ -199,17 +209,27 @@ func (s *Subscriber) SubscribeInitialize(topic string) (err error) {
exchangeName := s.config.Exchange.GenerateName(topic)
logFields["amqp_exchange_name"] = exchangeName

routingKey := s.config.QueueBind.GenerateRoutingKey(topic)
logFields["amqp_routing_key"] = routingKey

s.logger.Info("Initializing subscribe", logFields)

return errors.Wrap(s.prepareConsume(queueName, exchangeName, logFields), "failed to prepare consume")
params := BuildTopologyParams{
Topic: topic,
QueueName: queueName,
ExchangeName: exchangeName,
RoutingKey: routingKey,
}

return errors.Wrap(s.prepareConsume(params, logFields), "failed to prepare consume")
}

// Close closes all subscriptions with their output channels.
func (s *Subscriber) Close() error {
return s.closeSubscriber()
}

func (s *Subscriber) prepareConsume(queueName string, exchangeName string, logFields watermill.LogFields) (err error) {
func (s *Subscriber) prepareConsume(params BuildTopologyParams, logFields watermill.LogFields) (err error) {
channel, err := s.openSubscribeChannel(logFields)
if err != nil {
return err
Expand All @@ -220,7 +240,7 @@ func (s *Subscriber) prepareConsume(queueName string, exchangeName string, logFi
}
}()

if err = s.config.TopologyBuilder.BuildTopology(channel, queueName, exchangeName, s.config, s.logger); err != nil {
if err = s.config.TopologyBuilder.BuildTopology(channel, params, s.config, s.logger); err != nil {
return err
}

Expand All @@ -233,7 +253,6 @@ func (s *Subscriber) runSubscriber(
ctx context.Context,
out chan *message.Message,
queueName string,
exchangeName string,
logFields watermill.LogFields,
) {
channel, err := s.openSubscribeChannel(logFields)
Expand Down
Loading
Loading