Skip to content

Commit

Permalink
fix: [AUEML-2412] remove fail on startup if rabbitmq is offline (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucas1993 authored Oct 9, 2024
1 parent 24a487a commit 22a33f8
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 25 deletions.
18 changes: 18 additions & 0 deletions internal/events/rabbitmq/outboxPublisher/failedPublisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package outboxpublisher

import (
"github.com/TheRafaBonin/roxy"
"github.com/ThreeDotsLabs/watermill/message"
)

// Dummy publisher that always returns error
type FailedPublisher struct {}

func (f FailedPublisher) Publish(topic string, messages ...*message.Message) error {
return roxy.New("failed to connect to broker on startup, can't publish messages")
}


func (f FailedPublisher) Close() error {
return roxy.New("failed to connect to broker on startup, can't close publisher")
}
13 changes: 12 additions & 1 deletion internal/events/rabbitmq/outboxPublisher/outPublisher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package outboxpublisher

import (
"strings"

"github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/alexdrl/zerowater"
Expand Down Expand Up @@ -69,6 +71,15 @@ func newWatermillConfig(logger *zerolog.Logger) amqp.Config {
// newRabbitMQOutPublisher creates a new rabbitmq publisher that publishes messages to the rabbitmq broker
// It uses the watermill library to publish messages
// It is used by the forwarder to publish messages from the outbox table to the rabbitmq broker
// If we can't connect to the broker, returns a dummy publisher that always returns error
func newRabbitMQOutPublisher(logger *zerolog.Logger) (message.Publisher, error) {
return amqp.NewPublisher(newWatermillConfig(logger), zerowater.NewZerologLoggerAdapter(logger.With().Logger()))
publisher, err := amqp.NewPublisher(newWatermillConfig(logger), zerowater.NewZerologLoggerAdapter(logger.With().Logger()))
if err != nil {
if strings.Contains(err.Error(), "create new connection") {
logger.Error().Msg("failed to connect to publisher! Using dummy publisher that always fails")
return FailedPublisher{}, nil
}
return nil, err
}
return publisher, nil
}
3 changes: 3 additions & 0 deletions internal/events/rabbitmq/publisher/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (

// Graceful shutdown of the publisher.
func (r *rabbitmqPublisher) Close(ctx context.Context) error {
if r.chManager == nil {
return eris.New("r.chManager is nil! Invalid publisher")
}
r.logger.Info().Msg("closing publisher")

// Wait till all events are published.
Expand Down
3 changes: 3 additions & 0 deletions internal/events/rabbitmq/publisher/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ const (
)

func (r *rabbitmqPublisher) healthCheckLoop() {
if r.chManager == nil {
return
}
logger := r.logger.With().Str("component", "publisher_health_check").Logger()

ticker := time.NewTicker(timeCheckTicker)
Expand Down
3 changes: 3 additions & 0 deletions internal/events/rabbitmq/publisher/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type message struct {
// The message is published asynchronously
// The message will be republished if the connection is lost
func (r *rabbitmqPublisher) Publish(ctx context.Context, topic string, payload interface{}) error {
if r.chManager == nil {
return eris.New("r.chManager is nil! Invalid publisher")
}
ctx, span := otel.Tracer(scope).Start(ctx, "rabbitmqPublisher.Publish",
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(
Expand Down
3 changes: 3 additions & 0 deletions internal/events/rabbitmq/publisher/startPublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ func (r *rabbitmqPublisher) StartPublisher(ctx context.Context) error {
go r.healthCheckLoop()

for {
if r.chManager == nil {
return eris.New("r.chManager is nil! Invalid publisher")
}
err := r.chManager.Channel.Confirm(false)
if err != nil {
return eris.Wrap(err, "failed to enable publisher confirms")
Expand Down
18 changes: 2 additions & 16 deletions pkg/events/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ func registerNamedConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Log
consumer, err := NewRabbitMQConsumer(logger, WithQueueNamePosfix(namedHandler.QueuePosfix()))
if err != nil {
logger.Error().Err(err).Msg("failed to create consumer")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
return
}

registerProvidedConsumer(lc, s, logger, namedHandler, consumer)
Expand All @@ -39,10 +36,7 @@ func registerConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger,
consumer, err := NewRabbitMQConsumer(logger)
if err != nil {
logger.Error().Err(err).Msg("failed to create consumer")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
return
}

lc.Append(
Expand All @@ -52,10 +46,6 @@ func registerConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger,
err := consumer.Subscribe(ctx, handler)
if err != nil {
logger.Error().Err(err).Msg("failed to subscribe to topics")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
}
}()

Expand Down Expand Up @@ -85,10 +75,6 @@ func registerProvidedConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.
err := consumer.Subscribe(ctx, handler)
if err != nil {
logger.Error().Err(err).Msg("failed to subscribe to topics")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
}
}()

Expand Down
8 changes: 0 additions & 8 deletions pkg/events/rabbitmq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ func provideRabbitMQPublisher(logger *zerolog.Logger, s fx.Shutdowner) events.Ev
publisher, err := NewRabbitMQPublisher(logger)
if err != nil {
logger.Error().Err(err).Msg("failed to create publisher")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
}

return publisher
Expand Down Expand Up @@ -55,10 +51,6 @@ func startPublisher(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger, pu
err := publisher.StartPublisher(context.Background())
if err != nil {
logger.Error().Err(err).Msg("failed to start publisher")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
}
}()

Expand Down

0 comments on commit 22a33f8

Please sign in to comment.