Skip to content

Commit

Permalink
Even more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
danog committed Sep 26, 2023
1 parent 086b4cf commit d145e0a
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions consumer/groupcsm/groupcsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (gc *T) run() {
stopped = true
continue
}
gc.actDesc.Log().Info("got new subscriptions, must rebalance")
rebalanceRequired = true

case err := <-rebalanceResultCh:
Expand All @@ -170,6 +171,7 @@ func (gc *T) run() {
goto done
}
case <-nilOrRetryCh:
gc.actDesc.Log().Info("retry rebalancing")
rebalanceScheduled = false
}

Expand All @@ -182,6 +184,7 @@ func (gc *T) run() {
topicConsumersCopy[topic] = tc
}
subscriptions := subscriptions
gc.actDesc.Log().Info("about to start rebalancing")
actor.Spawn(rebalanceActDesc, nil, func() {
gc.rebalance(rebalanceActDesc, topicConsumersCopy, subscriptions, rebalanceResultCh)
})
Expand Down Expand Up @@ -218,9 +221,14 @@ func (gc *T) rebalance(actDesc *actor.Descriptor, topicConsumers map[string]*top
// consumed already.
gc.multiplexersMu.Lock()
defer gc.multiplexersMu.Unlock()
actDesc.Log().Infof("initial rewiring start")
for topic, mux := range gc.multiplexers {
actDesc.Log().Infof("rewiring topic=%s start", topic)
gc.rewireMuxAsync(topic, &wg, mux, topicConsumers[topic], assignedPartitions[topic])
actDesc.Log().Infof("rewiring topic=%s end", topic)
}
actDesc.Log().Infof("initial rewiring start")
actDesc.Log().Infof("secondary rewiring start")
// Start consuming partitions for topics that has not been consumed before.
for topic, assignedTopicPartitions := range assignedPartitions {
tc := topicConsumers[topic]
Expand All @@ -234,16 +242,22 @@ func (gc *T) rebalance(actDesc *actor.Descriptor, topicConsumers map[string]*top
gc.cfg, gc.subscriber, gc.msgFetcherF, gc.offsetMgrF)
}
mux = multiplexer.New(gc.actDesc, spawnInFn)
actDesc.Log().Infof("secondary rewiring spawn topic=%s", topic)
gc.rewireMuxAsync(topic, &wg, mux, tc, assignedTopicPartitions)
gc.multiplexers[topic] = mux
}
actDesc.Log().Infof("secondary rewiring end")
actDesc.Log().Infof("secondary rewiring start (wg)")
wg.Wait()
actDesc.Log().Infof("secondary rewiring end (wg)")
actDesc.Log().Infof("tertiary rewiring start")
// Clean up gears for topics that do not have assigned partitions anymore.
for topic, mux := range gc.multiplexers {
if !mux.IsRunning() {
delete(gc.multiplexers, topic)
}
}
actDesc.Log().Infof("tertiary rewiring end")
// Notify the caller that rebalancing has completed successfully.
rebalanceResultCh <- nil
actDesc.Log().Infof("finished rebalancing partitions: %s", prettyfmt.Val(assignedPartitions))
Expand Down

0 comments on commit d145e0a

Please sign in to comment.