Skip to content

Commit

Permalink
Merge pull request #826 from WJL3333/support_goroutine_stack
Browse files Browse the repository at this point in the history
[ISSUE #827] Support get consumerRunningInfo return goroutine stack
  • Loading branch information
duhenglucky authored May 26, 2022
2 parents 2d7d781 + b815a70 commit 12ce1b9
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
16 changes: 15 additions & 1 deletion consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"math"
"runtime/pprof"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -352,7 +353,7 @@ func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, broker
return res
}

func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo {
info := internal.NewConsumerRunningInfo()

pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
Expand All @@ -379,6 +380,19 @@ func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
return true
})

if stack {
var buffer strings.Builder

err := pprof.Lookup("goroutine").WriteTo(&buffer, 2)
if err != nil {
rlog.Error("error when get stack ", map[string]interface{}{
"error": err,
})
} else {
info.JStack = buffer.String()
}
}

nsAddr := ""
for _, value := range pc.client.GetNameSrv().AddrList() {
nsAddr += fmt.Sprintf("%s;", value)
Expand Down
8 changes: 4 additions & 4 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type InnerConsumer interface {
SubscriptionDataList() []*SubscriptionData
Rebalance()
IsUnitMode() bool
GetConsumerRunningInfo() *ConsumerRunningInfo
GetConsumerRunningInfo(stack bool) *ConsumerRunningInfo
ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *ConsumeMessageDirectlyResult
GetcType() string
GetModel() string
Expand Down Expand Up @@ -270,7 +270,7 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
cli, ok := val.(*rmqClient)
var runningInfo *ConsumerRunningInfo
if ok {
runningInfo = cli.getConsumerRunningInfo(header.consumerGroup)
runningInfo = cli.getConsumerRunningInfo(header.consumerGroup, header.jstackEnable)
}
if runningInfo != nil {
res.Code = ResSuccess
Expand Down Expand Up @@ -840,12 +840,12 @@ func (c *rmqClient) resetOffset(topic string, group string, offsetTable map[prim
consumer.(InnerConsumer).ResetOffset(topic, offsetTable)
}

func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
func (c *rmqClient) getConsumerRunningInfo(group string, stack bool) *ConsumerRunningInfo {
consumer, exist := c.consumerMap.Load(group)
if !exist {
return nil
}
info := consumer.(InnerConsumer).GetConsumerRunningInfo()
info := consumer.(InnerConsumer).GetConsumerRunningInfo(stack)
if info != nil {
info.Properties[PropClientVersion] = clientVersion
}
Expand Down
7 changes: 6 additions & 1 deletion internal/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type ConsumerRunningInfo struct {
SubscriptionData map[*SubscriptionData]bool
MQTable map[primitive.MessageQueue]ProcessQueueInfo
StatusTable map[string]ConsumeStatus
JStack string // just follow java request param name, but pass golang stack here.
}

func (info ConsumerRunningInfo) Encode() ([]byte, error) {
Expand Down Expand Up @@ -251,7 +252,11 @@ func (info ConsumerRunningInfo) Encode() ([]byte, error) {
tableJson = fmt.Sprintf("%s,%s:%s", tableJson, string(dataK), string(dataV))
}
tableJson = strings.TrimLeft(tableJson, ",")
jsonData = fmt.Sprintf("%s,\"%s\":%s}", jsonData, "mqTable", fmt.Sprintf("{%s}", tableJson))

jsonData = fmt.Sprintf("%s,\"%s\":%s, \"%s\":\"%s\" }",
jsonData, "mqTable", fmt.Sprintf("{%s}", tableJson),
"jstack", info.JStack)

return []byte(jsonData), nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,14 @@ func (request *GetRouteInfoRequestHeader) Encode() map[string]string {
type GetConsumerRunningInfoHeader struct {
consumerGroup string
clientID string
jstackEnable bool
}

func (request *GetConsumerRunningInfoHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.consumerGroup
maps["clientId"] = request.clientID
maps["jstackEnable"] = strconv.FormatBool(request.jstackEnable)
return maps
}

Expand All @@ -325,6 +327,11 @@ func (request *GetConsumerRunningInfoHeader) Decode(properties map[string]string
if v, existed := properties["clientId"]; existed {
request.clientID = v
}

if v, existed := properties["jstackEnable"]; existed {
parseBool, _ := strconv.ParseBool(v)
request.jstackEnable = parseBool
}
}

type QueryMessageRequestHeader struct {
Expand Down

0 comments on commit 12ce1b9

Please sign in to comment.