From 250b996ee553376caeb4a75b838c3f4df6b077c6 Mon Sep 17 00:00:00 2001 From: Sergey Revenko Date: Fri, 2 Dec 2022 17:00:43 +0300 Subject: [PATCH] Recreate topology after reconnection --- pkg/amqp/subscriber.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/amqp/subscriber.go b/pkg/amqp/subscriber.go index 6795ee0..326699a 100644 --- a/pkg/amqp/subscriber.go +++ b/pkg/amqp/subscriber.go @@ -122,10 +122,6 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa exchangeName := s.config.Exchange.GenerateName(topic) logFields["amqp_exchange_name"] = exchangeName - if err := s.prepareConsume(queueName, exchangeName, logFields); err != nil { - return nil, errors.Wrap(err, "failed to prepare consume") - } - s.subscriberWaitGroup.Add(1) s.connectionWaitGroup.Add(1) @@ -192,7 +188,7 @@ func (s *Subscriber) SubscribeInitialize(topic string) (err error) { s.logger.Info("Initializing subscribe", logFields) - return errors.Wrap(s.prepareConsume(queueName, exchangeName, logFields), "failed to prepare consume") + return nil } // Close closes all subscriptions with their output channels. @@ -238,6 +234,11 @@ func (s *Subscriber) runSubscriber( } }() + if err = s.prepareConsume(queueName, exchangeName, logFields); err != nil { + s.logger.Error("Failed to prepare consume", err, logFields) + return + } + notifyCloseChannel := channel.NotifyClose(make(chan *amqp.Error, 1)) sub := subscription{