diff --git a/internal/events/rabbitmq/outboxPublisher/failedPublisher.go b/internal/events/rabbitmq/outboxPublisher/failedPublisher.go new file mode 100644 index 0000000..e2ee0b1 --- /dev/null +++ b/internal/events/rabbitmq/outboxPublisher/failedPublisher.go @@ -0,0 +1,18 @@ +package outboxpublisher + +import ( + "github.com/TheRafaBonin/roxy" + "github.com/ThreeDotsLabs/watermill/message" +) + +// Dummy publisher that always returns error +type FailedPublisher struct {} + +func (f FailedPublisher) Publish(topic string, messages ...*message.Message) error { + return roxy.New("failed to connect to broker on startup, can't publish messages") +} + + +func (f FailedPublisher) Close() error { + return roxy.New("failed to connect to broker on startup, can't close publisher") +} diff --git a/internal/events/rabbitmq/outboxPublisher/outPublisher.go b/internal/events/rabbitmq/outboxPublisher/outPublisher.go index 96f73f2..04d91fe 100644 --- a/internal/events/rabbitmq/outboxPublisher/outPublisher.go +++ b/internal/events/rabbitmq/outboxPublisher/outPublisher.go @@ -1,6 +1,8 @@ package outboxpublisher import ( + "strings" + "github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp" "github.com/ThreeDotsLabs/watermill/message" "github.com/alexdrl/zerowater" @@ -69,6 +71,15 @@ func newWatermillConfig(logger *zerolog.Logger) amqp.Config { // newRabbitMQOutPublisher creates a new rabbitmq publisher that publishes messages to the rabbitmq broker // It uses the watermill library to publish messages // It is used by the forwarder to publish messages from the outbox table to the rabbitmq broker +// If we can't connect to the broker, returns a dummy publisher that always returns error func newRabbitMQOutPublisher(logger *zerolog.Logger) (message.Publisher, error) { - return amqp.NewPublisher(newWatermillConfig(logger), zerowater.NewZerologLoggerAdapter(logger.With().Logger())) + publisher, err := amqp.NewPublisher(newWatermillConfig(logger), zerowater.NewZerologLoggerAdapter(logger.With().Logger())) + if err != nil { + if strings.Contains(err.Error(), "create new connection") { + logger.Error().Msg("failed to connect to publisher! Using dummy publisher that always fails") + return FailedPublisher{}, nil + } + return nil, err + } + return publisher, nil } diff --git a/internal/events/rabbitmq/publisher/close.go b/internal/events/rabbitmq/publisher/close.go index 9e06cce..de367c4 100644 --- a/internal/events/rabbitmq/publisher/close.go +++ b/internal/events/rabbitmq/publisher/close.go @@ -8,6 +8,9 @@ import ( // Graceful shutdown of the publisher. func (r *rabbitmqPublisher) Close(ctx context.Context) error { + if r.chManager == nil { + return eris.New("r.chManager is nil! Invalid publisher") + } r.logger.Info().Msg("closing publisher") // Wait till all events are published. diff --git a/internal/events/rabbitmq/publisher/health.go b/internal/events/rabbitmq/publisher/health.go index 55fee51..a342d69 100644 --- a/internal/events/rabbitmq/publisher/health.go +++ b/internal/events/rabbitmq/publisher/health.go @@ -39,6 +39,9 @@ const ( ) func (r *rabbitmqPublisher) healthCheckLoop() { + if r.chManager == nil { + return + } logger := r.logger.With().Str("component", "publisher_health_check").Logger() ticker := time.NewTicker(timeCheckTicker) diff --git a/internal/events/rabbitmq/publisher/publish.go b/internal/events/rabbitmq/publisher/publish.go index 5d28233..6a1b525 100644 --- a/internal/events/rabbitmq/publisher/publish.go +++ b/internal/events/rabbitmq/publisher/publish.go @@ -29,6 +29,9 @@ type message struct { // The message is published asynchronously // The message will be republished if the connection is lost func (r *rabbitmqPublisher) Publish(ctx context.Context, topic string, payload interface{}) error { + if r.chManager == nil { + return eris.New("r.chManager is nil! Invalid publisher") + } ctx, span := otel.Tracer(scope).Start(ctx, "rabbitmqPublisher.Publish", trace.WithSpanKind(trace.SpanKindInternal), trace.WithAttributes( diff --git a/internal/events/rabbitmq/publisher/startPublisher.go b/internal/events/rabbitmq/publisher/startPublisher.go index 0829611..3000a7e 100644 --- a/internal/events/rabbitmq/publisher/startPublisher.go +++ b/internal/events/rabbitmq/publisher/startPublisher.go @@ -11,6 +11,9 @@ func (r *rabbitmqPublisher) StartPublisher(ctx context.Context) error { go r.healthCheckLoop() for { + if r.chManager == nil { + return eris.New("r.chManager is nil! Invalid publisher") + } err := r.chManager.Channel.Confirm(false) if err != nil { return eris.Wrap(err, "failed to enable publisher confirms") diff --git a/pkg/events/rabbitmq/consumer.go b/pkg/events/rabbitmq/consumer.go index 95059ab..4654e06 100644 --- a/pkg/events/rabbitmq/consumer.go +++ b/pkg/events/rabbitmq/consumer.go @@ -26,10 +26,7 @@ func registerNamedConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Log consumer, err := NewRabbitMQConsumer(logger, WithQueueNamePosfix(namedHandler.QueuePosfix())) if err != nil { logger.Error().Err(err).Msg("failed to create consumer") - err = s.Shutdown() - if err != nil { - logger.Error().Err(err).Msg("failed to shutdown") - } + return } registerProvidedConsumer(lc, s, logger, namedHandler, consumer) @@ -39,10 +36,7 @@ func registerConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger, consumer, err := NewRabbitMQConsumer(logger) if err != nil { logger.Error().Err(err).Msg("failed to create consumer") - err = s.Shutdown() - if err != nil { - logger.Error().Err(err).Msg("failed to shutdown") - } + return } lc.Append( @@ -52,10 +46,6 @@ func registerConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger, err := consumer.Subscribe(ctx, handler) if err != nil { logger.Error().Err(err).Msg("failed to subscribe to topics") - err = s.Shutdown() - if err != nil { - logger.Error().Err(err).Msg("failed to shutdown") - } } }() @@ -85,10 +75,6 @@ func registerProvidedConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog. err := consumer.Subscribe(ctx, handler) if err != nil { logger.Error().Err(err).Msg("failed to subscribe to topics") - err = s.Shutdown() - if err != nil { - logger.Error().Err(err).Msg("failed to shutdown") - } } }() diff --git a/pkg/events/rabbitmq/publisher.go b/pkg/events/rabbitmq/publisher.go index 2c9e35b..070d94c 100644 --- a/pkg/events/rabbitmq/publisher.go +++ b/pkg/events/rabbitmq/publisher.go @@ -20,10 +20,6 @@ func provideRabbitMQPublisher(logger *zerolog.Logger, s fx.Shutdowner) events.Ev publisher, err := NewRabbitMQPublisher(logger) if err != nil { logger.Error().Err(err).Msg("failed to create publisher") - err = s.Shutdown() - if err != nil { - logger.Error().Err(err).Msg("failed to shutdown") - } } return publisher @@ -55,10 +51,6 @@ func startPublisher(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger, pu err := publisher.StartPublisher(context.Background()) if err != nil { logger.Error().Err(err).Msg("failed to start publisher") - err = s.Shutdown() - if err != nil { - logger.Error().Err(err).Msg("failed to shutdown") - } } }()