From d145e0a8ff2eb480197af3625fd4db0483ad6625 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Tue, 26 Sep 2023 14:47:54 +0200 Subject: [PATCH] Even more logs --- consumer/groupcsm/groupcsm.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/consumer/groupcsm/groupcsm.go b/consumer/groupcsm/groupcsm.go index 6a6f0326..d873f8c1 100644 --- a/consumer/groupcsm/groupcsm.go +++ b/consumer/groupcsm/groupcsm.go @@ -152,6 +152,7 @@ func (gc *T) run() { stopped = true continue } + gc.actDesc.Log().Info("got new subscriptions, must rebalance") rebalanceRequired = true case err := <-rebalanceResultCh: @@ -170,6 +171,7 @@ func (gc *T) run() { goto done } case <-nilOrRetryCh: + gc.actDesc.Log().Info("retry rebalancing") rebalanceScheduled = false } @@ -182,6 +184,7 @@ func (gc *T) run() { topicConsumersCopy[topic] = tc } subscriptions := subscriptions + gc.actDesc.Log().Info("about to start rebalancing") actor.Spawn(rebalanceActDesc, nil, func() { gc.rebalance(rebalanceActDesc, topicConsumersCopy, subscriptions, rebalanceResultCh) }) @@ -218,9 +221,14 @@ func (gc *T) rebalance(actDesc *actor.Descriptor, topicConsumers map[string]*top // consumed already. gc.multiplexersMu.Lock() defer gc.multiplexersMu.Unlock() + actDesc.Log().Infof("initial rewiring start") for topic, mux := range gc.multiplexers { + actDesc.Log().Infof("rewiring topic=%s start", topic) gc.rewireMuxAsync(topic, &wg, mux, topicConsumers[topic], assignedPartitions[topic]) + actDesc.Log().Infof("rewiring topic=%s end", topic) } + actDesc.Log().Infof("initial rewiring start") + actDesc.Log().Infof("secondary rewiring start") // Start consuming partitions for topics that has not been consumed before. for topic, assignedTopicPartitions := range assignedPartitions { tc := topicConsumers[topic] @@ -234,16 +242,22 @@ func (gc *T) rebalance(actDesc *actor.Descriptor, topicConsumers map[string]*top gc.cfg, gc.subscriber, gc.msgFetcherF, gc.offsetMgrF) } mux = multiplexer.New(gc.actDesc, spawnInFn) + actDesc.Log().Infof("secondary rewiring spawn topic=%s", topic) gc.rewireMuxAsync(topic, &wg, mux, tc, assignedTopicPartitions) gc.multiplexers[topic] = mux } + actDesc.Log().Infof("secondary rewiring end") + actDesc.Log().Infof("secondary rewiring start (wg)") wg.Wait() + actDesc.Log().Infof("secondary rewiring end (wg)") + actDesc.Log().Infof("tertiary rewiring start") // Clean up gears for topics that do not have assigned partitions anymore. for topic, mux := range gc.multiplexers { if !mux.IsRunning() { delete(gc.multiplexers, topic) } } + actDesc.Log().Infof("tertiary rewiring end") // Notify the caller that rebalancing has completed successfully. rebalanceResultCh <- nil actDesc.Log().Infof("finished rebalancing partitions: %s", prettyfmt.Val(assignedPartitions))