Skip to content

Commit

Permalink
fix bug:seek offset won't work due to wrong map key type (#1184)
Browse files Browse the repository at this point in the history
Co-authored-by: muyun.cyt <[email protected]>
  • Loading branch information
cytnju and muyun.cyt authored Dec 20, 2024
1 parent abae4a0 commit bd8861c
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,17 @@ func (pc *defaultPullConsumer) nextPullOffset(mq *primitive.MessageQueue, origin
if pc.SubType != Assign {
return originOffset
}
value, exist := pc.mq2seekOffset.LoadAndDelete(mq)
value, exist := pc.mq2seekOffset.LoadAndDelete(*mq)
if !exist {
return originOffset
} else {
nextOffset := value.(int64)
_ = pc.updateOffset(mq, nextOffset)
rlog.Info("pull consumer assign new offset", map[string]interface{}{
"group": pc.GroupName,
"mq": mq,
"offset": nextOffset,
})
return nextOffset
}
}
Expand Down Expand Up @@ -711,7 +716,7 @@ func (pc *defaultPullConsumer) ResetOffset(topic string, table map[primitive.Mes
}

func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset int64) {
pc.mq2seekOffset.Store(mq, offset)
pc.mq2seekOffset.Store(*mq, offset)
rlog.Info("pull consumer seek offset", map[string]interface{}{
"mq": mq,
"offset": offset,
Expand Down Expand Up @@ -881,6 +886,8 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
pullRequest.SysFlag = clearCommitOffsetFlag(pullRequest.SysFlag)
}

rlog.Debug(fmt.Sprintf("defaultPullConsumer pull message from broker: %s, request: %+v", brokerResult.BrokerAddr, pullRequest), nil)

result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
if err != nil {
rlog.Warning("defaultPullConsumer pull message from broker error", map[string]interface{}{
Expand Down

0 comments on commit bd8861c

Please sign in to comment.