Skip to content

Commit

Permalink
Recreate topology after reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Revenko committed Dec 2, 2022
1 parent 71ed740 commit 1d7cbd8
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions pkg/amqp/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
exchangeName := s.config.Exchange.GenerateName(topic)
logFields["amqp_exchange_name"] = exchangeName

if err := s.prepareConsume(queueName, exchangeName, logFields); err != nil {
return nil, errors.Wrap(err, "failed to prepare consume")
}

s.subscriberWaitGroup.Add(1)
s.connectionWaitGroup.Add(1)

Expand Down Expand Up @@ -192,7 +188,7 @@ func (s *Subscriber) SubscribeInitialize(topic string) (err error) {

s.logger.Info("Initializing subscribe", logFields)

return errors.Wrap(s.prepareConsume(queueName, exchangeName, logFields), "failed to prepare consume")
return nil
}

// Close closes all subscriptions with their output channels.
Expand Down Expand Up @@ -238,6 +234,11 @@ func (s *Subscriber) runSubscriber(
}
}()

if err := s.prepareConsume(queueName, exchangeName, logFields); err != nil {
s.logger.Error("Failed to prepare consume", err, logFields)
return
}

notifyCloseChannel := channel.NotifyClose(make(chan *amqp.Error, 1))

sub := subscription{
Expand Down

0 comments on commit 1d7cbd8

Please sign in to comment.