Skip to content

Commit

Permalink
More logs
Browse files Browse the repository at this point in the history
  • Loading branch information
danog committed Sep 27, 2023
1 parent 2db9dbe commit f5e7c0b
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
14 changes: 6 additions & 8 deletions consumer/multiplexer/multiplexer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package multiplexer

import (
"fmt"
"reflect"
"sort"
"sync"
Expand Down Expand Up @@ -111,11 +112,10 @@ func (m *T) WireUp(output Out, assigned []int32) {
// If output is not provided, then stop all inputs and return.
if output == nil {
for p, in := range m.inputs {
wg.Add(1)
go func(in *input) {
defer wg.Done()
in := in
actor.Spawn(m.actDesc.NewChild(fmt.Sprintf("stop_all.p%d", p)), &wg, func() {
in.Stop()
}(in)
})
delete(m.inputs, p)
}
wg.Wait()
Expand All @@ -125,11 +125,9 @@ func (m *T) WireUp(output Out, assigned []int32) {
// Stop inputs that are not assigned anymore.
for p, in := range m.inputs {
if !slices.Contains(assigned, p) {
wg.Add(1)
go func(in *input) {
defer wg.Done()
actor.Spawn(m.actDesc.NewChild(fmt.Sprintf("stop_unassigned.p%d", p)), &wg, func() {
in.Stop()
}(in)
})
delete(m.inputs, p)
}
}
Expand Down
14 changes: 13 additions & 1 deletion consumer/partitioncsm/partitioncsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/mailgun/kafka-pixy/consumer/subscriber"
"github.com/mailgun/kafka-pixy/none"
"github.com/mailgun/kafka-pixy/offsetmgr"
"github.com/mailgun/kafka-pixy/prettyfmt"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -107,7 +108,11 @@ func (pc *T) Stop() {

func (pc *T) run() {
defer close(pc.messagesCh)
defer pc.groupMember.ClaimPartition(pc.actDesc, pc.topic, pc.partition, pc.stopCh)()
release := pc.groupMember.ClaimPartition(pc.actDesc, pc.topic, pc.partition, pc.stopCh)
if release == nil {
return
}
defer release()

var err error
if pc.offsetMgr, err = pc.offsetMgrF.Spawn(pc.actDesc, pc.group, pc.topic, pc.partition); err != nil {
Expand All @@ -119,6 +124,7 @@ func (pc *T) run() {
select {
case pc.committedOffset = <-pc.offsetMgr.CommittedOffsets():
case <-pc.stopCh:
pc.actDesc.Log().Info("Closing after close signal (1)")
return
}
pc.actDesc.Log().Infof("Initial offset: %s", offsetRepr(pc.committedOffset))
Expand All @@ -131,20 +137,26 @@ func (pc *T) run() {
for pc.runFetchLoop() {
}

pc.actDesc.Log().Info("Closing after close signal (2)")
// Wait for clients to acknowledge pending offers.
for timeout := pc.offsetTrk.ShouldWait4Ack(); timeout > 0; timeout = pc.offsetTrk.ShouldWait4Ack() {
pc.actDesc.Log().Info("Closing after close signal (waiting for events)")
select {
case event := <-pc.eventsCh:
pc.actDesc.Log().Infof("Closing after close signal (event=%s)", prettyfmt.Val(event))
if event.T == consumer.EvAcked {
var offerCount int
pc.submittedOffset, offerCount = pc.offsetTrk.OnAcked(event.Offset)
atomic.StoreInt32(&pc.offerCount, int32(offerCount))
pc.offsetMgr.SubmitOffset(pc.submittedOffset)
}
case <-time.After(timeout):
pc.actDesc.Log().Info("Closing after close signal (TIMEOUT)")
continue
}
}

pc.actDesc.Log().Info("Closing after close signal (3)")
}

func (pc *T) runFetchLoop() bool {
Expand Down
4 changes: 3 additions & 1 deletion consumer/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ func (pc *partitionClaimer) claim() func() {
select {
case <-time.After(pc.subscriber.cfg.Consumer.RetryBackoff):
case <-pc.cancelCh:
return func() {}
logFailureFn("Failed to claim partition (CANCEL): via=%s, retries=%d, took=%s",
pc.subscriber.actDesc, retries, time.Since(beginAt))
return nil
}
}
pc.actDesc.Log().Infof("Partition claimed: via=%s, retries=%d, took=%s",
Expand Down

0 comments on commit f5e7c0b

Please sign in to comment.