From f5e7c0b887c26776daec4f98dad9ff43d90c7e31 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Wed, 27 Sep 2023 12:11:19 +0200 Subject: [PATCH] More logs --- consumer/multiplexer/multiplexer.go | 14 ++++++-------- consumer/partitioncsm/partitioncsm.go | 14 +++++++++++++- consumer/subscriber/subscriber.go | 4 +++- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/consumer/multiplexer/multiplexer.go b/consumer/multiplexer/multiplexer.go index 2d365aa2..bb98f0e8 100644 --- a/consumer/multiplexer/multiplexer.go +++ b/consumer/multiplexer/multiplexer.go @@ -1,6 +1,7 @@ package multiplexer import ( + "fmt" "reflect" "sort" "sync" @@ -111,11 +112,10 @@ func (m *T) WireUp(output Out, assigned []int32) { // If output is not provided, then stop all inputs and return. if output == nil { for p, in := range m.inputs { - wg.Add(1) - go func(in *input) { - defer wg.Done() + in := in + actor.Spawn(m.actDesc.NewChild(fmt.Sprintf("stop_all.p%d", p)), &wg, func() { in.Stop() - }(in) + }) delete(m.inputs, p) } wg.Wait() @@ -125,11 +125,9 @@ func (m *T) WireUp(output Out, assigned []int32) { // Stop inputs that are not assigned anymore. for p, in := range m.inputs { if !slices.Contains(assigned, p) { - wg.Add(1) - go func(in *input) { - defer wg.Done() + actor.Spawn(m.actDesc.NewChild(fmt.Sprintf("stop_unassigned.p%d", p)), &wg, func() { in.Stop() - }(in) + }) delete(m.inputs, p) } } diff --git a/consumer/partitioncsm/partitioncsm.go b/consumer/partitioncsm/partitioncsm.go index ffd75d4b..1ff0f48d 100644 --- a/consumer/partitioncsm/partitioncsm.go +++ b/consumer/partitioncsm/partitioncsm.go @@ -15,6 +15,7 @@ import ( "github.com/mailgun/kafka-pixy/consumer/subscriber" "github.com/mailgun/kafka-pixy/none" "github.com/mailgun/kafka-pixy/offsetmgr" + "github.com/mailgun/kafka-pixy/prettyfmt" "github.com/pkg/errors" ) @@ -107,7 +108,11 @@ func (pc *T) Stop() { func (pc *T) run() { defer close(pc.messagesCh) - defer pc.groupMember.ClaimPartition(pc.actDesc, pc.topic, pc.partition, pc.stopCh)() + release := pc.groupMember.ClaimPartition(pc.actDesc, pc.topic, pc.partition, pc.stopCh) + if release == nil { + return + } + defer release() var err error if pc.offsetMgr, err = pc.offsetMgrF.Spawn(pc.actDesc, pc.group, pc.topic, pc.partition); err != nil { @@ -119,6 +124,7 @@ func (pc *T) run() { select { case pc.committedOffset = <-pc.offsetMgr.CommittedOffsets(): case <-pc.stopCh: + pc.actDesc.Log().Info("Closing after close signal (1)") return } pc.actDesc.Log().Infof("Initial offset: %s", offsetRepr(pc.committedOffset)) @@ -131,10 +137,13 @@ func (pc *T) run() { for pc.runFetchLoop() { } + pc.actDesc.Log().Info("Closing after close signal (2)") // Wait for clients to acknowledge pending offers. for timeout := pc.offsetTrk.ShouldWait4Ack(); timeout > 0; timeout = pc.offsetTrk.ShouldWait4Ack() { + pc.actDesc.Log().Info("Closing after close signal (waiting for events)") select { case event := <-pc.eventsCh: + pc.actDesc.Log().Infof("Closing after close signal (event=%s)", prettyfmt.Val(event)) if event.T == consumer.EvAcked { var offerCount int pc.submittedOffset, offerCount = pc.offsetTrk.OnAcked(event.Offset) @@ -142,9 +151,12 @@ func (pc *T) run() { pc.offsetMgr.SubmitOffset(pc.submittedOffset) } case <-time.After(timeout): + pc.actDesc.Log().Info("Closing after close signal (TIMEOUT)") continue } } + + pc.actDesc.Log().Info("Closing after close signal (3)") } func (pc *T) runFetchLoop() bool { diff --git a/consumer/subscriber/subscriber.go b/consumer/subscriber/subscriber.go index 0b078cab..2f2d8a35 100644 --- a/consumer/subscriber/subscriber.go +++ b/consumer/subscriber/subscriber.go @@ -242,7 +242,9 @@ func (pc *partitionClaimer) claim() func() { select { case <-time.After(pc.subscriber.cfg.Consumer.RetryBackoff): case <-pc.cancelCh: - return func() {} + logFailureFn("Failed to claim partition (CANCEL): via=%s, retries=%d, took=%s", + pc.subscriber.actDesc, retries, time.Since(beginAt)) + return nil } } pc.actDesc.Log().Infof("Partition claimed: via=%s, retries=%d, took=%s",