Skip to content

Commit

Permalink
push consumer enable getting offset diff map (#964)
Browse files Browse the repository at this point in the history
Co-authored-by: 筱瑜 <[email protected]>
  • Loading branch information
NeonToo and 筱瑜 authored Dec 1, 2022
1 parent 6bbfc3f commit 0742aac
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
3 changes: 3 additions & 0 deletions consumer/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type processQueue struct {
order bool
closeChanOnce *sync.Once
closeChan chan struct{}
maxOffsetInQueue int64
}

func newProcessQueue(order bool) *processQueue {
Expand Down Expand Up @@ -88,6 +89,7 @@ func newProcessQueue(order bool) *processQueue {
closeChan: make(chan struct{}),
locked: atomic.NewBool(false),
dropped: atomic.NewBool(false),
maxOffsetInQueue: -1,
}
return pq
}
Expand Down Expand Up @@ -372,6 +374,7 @@ func (pq *processQueue) clear() {
pq.cachedMsgCount.Store(0)
pq.cachedMsgSize.Store(0)
pq.queueOffsetMax = 0
pq.maxOffsetInQueue = -1
}

func (pq *processQueue) commit() int64 {
Expand Down
24 changes: 24 additions & 0 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,27 @@ func (pc *pushConsumer) Start() error {
return err
}

func (pc *pushConsumer) GetOffsetDiffMap() map[string]int64 {
offsetDiffMap := make(map[string]int64)
pc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
topic := mq.Topic
consumerOffset, _ := pc.storage.readWithException(&mq, _ReadFromMemory)
maxOffset := pq.maxOffsetInQueue
if consumerOffset < 0 || maxOffset < 0 || consumerOffset > maxOffset {
return true
}
if _, ok := offsetDiffMap[topic]; !ok {
offsetDiffMap[topic] = 0
}
offsetDiff := offsetDiffMap[topic]
offsetDiffMap[topic] = offsetDiff + (maxOffset - consumerOffset)
return true
})
return offsetDiffMap
}

func (pc *pushConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
Expand Down Expand Up @@ -823,6 +844,9 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
}

pc.processPullResult(request.mq, result, sd)
if result.MaxOffset > pq.maxOffsetInQueue {
pq.maxOffsetInQueue = result.MaxOffset
}

switch result.Status {
case primitive.PullFound:
Expand Down

0 comments on commit 0742aac

Please sign in to comment.