Skip to content

Commit

Permalink
feat(internal): support reset consumer offset (#682)
Browse files Browse the repository at this point in the history
Co-authored-by: zhangxu16 <[email protected]>
  • Loading branch information
maixiaohai and zhangxu16 authored Jul 30, 2021
1 parent bebe20b commit 46701f1
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 7 deletions.
1 change: 1 addition & 0 deletions consumer/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func (pq *processQueue) MaxOrderlyCache() int64 {

func (pq *processQueue) clear() {
pq.mutex.Lock()
defer pq.mutex.Unlock()
pq.msgCache.Clear()
pq.cachedMsgCount = 0
pq.cachedMsgSize = 0
Expand Down
15 changes: 9 additions & 6 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func (pc *pushConsumer) resume() {
rlog.Info(fmt.Sprintf("resume consumer: %s", pc.consumerGroup), nil)
}

func (pc *pushConsumer) resetOffset(topic string, table map[primitive.MessageQueue]int64) {
func (pc *pushConsumer) ResetOffset(topic string, table map[primitive.MessageQueue]int64) {
//topic := cmd.ExtFields["topic"]
//group := cmd.ExtFields["group"]
//if topic == "" || group == "" {
Expand All @@ -857,11 +857,13 @@ func (pc *pushConsumer) resetOffset(topic string, table map[primitive.MessageQue
// rlog.Infof("[reset-offset] consumer dose not exist. group=%s", group)
// return
//}
pc.suspend()
defer pc.resume()

pc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
if _, ok := table[mq]; !ok {
if _, ok := table[mq]; ok && mq.Topic == topic {
pq.WithDropped(true)
pq.clear()
}
Expand All @@ -872,16 +874,17 @@ func (pc *pushConsumer) resetOffset(topic string, table map[primitive.MessageQue
if !exist {
return
}
queuesOfTopic := v.([]primitive.MessageQueue)
queuesOfTopic := v.([]*primitive.MessageQueue)
for _, k := range queuesOfTopic {
if _, ok := table[k]; ok {
pc.storage.update(&k, table[k], false)
if _, ok := table[*k]; ok {
pc.storage.update(k, table[*k], false)
v, exist := pc.processQueueTable.Load(k)
if !exist {
continue
}
pq := v.(*processQueue)
pc.removeUnnecessaryMessageQueue(&k, pq)
pc.removeUnnecessaryMessageQueue(k, pq)
pc.processQueueTable.Delete(k)
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type InnerConsumer interface {
GetcType() string
GetModel() string
GetWhere() string
ResetOffset(topic string, table map[primitive.MessageQueue]int64)
}

func DefaultClientOptions() ClientOptions {
Expand Down Expand Up @@ -283,6 +284,23 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
}
return res
})

client.remoteClient.RegisterRequestFunc(ReqResetConsumerOffset, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
rlog.Info("receive reset consumer offset request...", map[string]interface{}{
rlog.LogKeyBroker: addr.String(),
rlog.LogKeyTopic: req.ExtFields["topic"],
rlog.LogKeyConsumerGroup: req.ExtFields["group"],
rlog.LogKeyTimeStamp: req.ExtFields["timestamp"],
})
header := new(ResetOffsetHeader)
header.Decode(req.ExtFields)

body := new(ResetOffsetBody)
body.Decode(req.Body)

client.resetOffset(header.topic, header.group, body.OffsetTable)
return nil
})
}
return actual.(*rmqClient)
}
Expand Down Expand Up @@ -777,6 +795,15 @@ func (c *rmqClient) isNeedUpdateSubscribeInfo(topic string) bool {
return result
}

func (c *rmqClient) resetOffset(topic string, group string, offsetTable map[primitive.MessageQueue]int64) {
consumer, exist := c.consumerMap.Load(group)
if !exist {
rlog.Warning("group "+group+" do not exists", nil)
return
}
consumer.(InnerConsumer).ResetOffset(topic, offsetTable)
}

func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
consumer, exist := c.consumerMap.Load(group)
if !exist {
Expand Down
47 changes: 47 additions & 0 deletions internal/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/tidwall/gjson"
"sort"
"strconv"
"strings"

"github.com/apache/rocketmq-client-go/v2/internal/utils"
Expand Down Expand Up @@ -288,3 +290,48 @@ func (result ConsumeMessageDirectlyResult) Encode() ([]byte, error) {
}
return data, nil
}

type ResetOffsetBody struct {
OffsetTable map[primitive.MessageQueue]int64 `json:"offsetTable"`
}

func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
result := gjson.ParseBytes(body)
rlog.Debug("offset table string "+result.Get("offsetTable").String(), nil)

offsetTable := make(map[primitive.MessageQueue]int64, 0)
offsetTableArray := strings.Split(result.Get("offsetTable").String(), "],[")
for index, v := range offsetTableArray {
kvArray := strings.Split(v, "},")

var kstr, vstr string
if index == len(offsetTableArray)-1 {
vstr = kvArray[1][:len(kvArray[1])-2]
} else {
vstr = kvArray[1]
}
offset, err := strconv.ParseInt(vstr, 10, 64)
if err != nil {
rlog.Error("Unmarshal offset error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return
}

if index == 0 {
kstr = kvArray[0][2:len(kvArray[0])] + "}"
} else {
kstr = kvArray[0] + "}"
}
kObj := new(primitive.MessageQueue)
err = jsoniter.Unmarshal([]byte(kstr), &kObj)
if err != nil {
rlog.Error("Unmarshal message queue error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return
}
offsetTable[*kObj] = offset
}
resetOffsetBody.OffsetTable = offsetTable
}
16 changes: 16 additions & 0 deletions internal/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,5 +402,21 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
})
})
}

func TestRestOffsetBody_MarshalJSON(t *testing.T) {
Convey("test ResetOffset Body Decode", t, func() {
body := "{\"offsetTable\":[[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":5},23354233],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":4},23354245],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":7},23354203],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":6},23354312],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":1},23373517],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":0},23373350],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":3},23373424],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":2},23373382]]}"
resetOffsetBody := new(ResetOffsetBody)
resetOffsetBody.Decode([]byte(body))
offsetTable := resetOffsetBody.OffsetTable
So(offsetTable, ShouldNotBeNil)
So(len(offsetTable), ShouldEqual, 8)
messageQueue := primitive.MessageQueue{
Topic: "zx_tst",
BrokerName: "tjwqtst-common-rocketmq-raft0",
QueueId: 5,
}
So(offsetTable[messageQueue], ShouldEqual, 23354233)
})
}
35 changes: 34 additions & 1 deletion internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
ReqGetAllTopicListFromNameServer = int16(206)
ReqDeleteTopicInBroker = int16(215)
ReqDeleteTopicInNameSrv = int16(216)
ReqResetConsuemrOffset = int16(220)
ReqResetConsumerOffset = int16(220)
ReqGetConsumerRunningInfo = int16(307)
ReqConsumeMessageDirectly = int16(309)
)
Expand Down Expand Up @@ -408,6 +408,39 @@ func (request *DeleteTopicRequestHeader) Encode() map[string]string {
return maps
}

type ResetOffsetHeader struct {
topic string
group string
timestamp int64
isForce bool
}

func (request *ResetOffsetHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["topic"] = request.topic
maps["group"] = request.group
maps["timestamp"] = strconv.FormatInt(request.timestamp, 10)
return maps
}

func (request *ResetOffsetHeader) Decode(properties map[string]string) {
if len(properties) == 0 {
return
}

if v, existed := properties["topic"]; existed {
request.topic = v
}

if v, existed := properties["group"]; existed {
request.group = v
}

if v, existed := properties["timestamp"]; existed {
request.timestamp, _ = strconv.ParseInt(v, 10, 0)
}
}

type ConsumeMessageDirectlyHeader struct {
consumerGroup string
clientID string
Expand Down
1 change: 1 addition & 0 deletions rlog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
LogKeyValueChangedFrom = "changedFrom"
LogKeyValueChangedTo = "changeTo"
LogKeyPullRequest = "PullRequest"
LogKeyTimeStamp = "timestamp"
)

type Logger interface {
Expand Down

0 comments on commit 46701f1

Please sign in to comment.