From b815a7035ebd48f163ec9d20b5c7f7c609198a4d Mon Sep 17 00:00:00 2001 From: "wangjinlong.1048576" Date: Fri, 13 May 2022 18:11:57 +0800 Subject: [PATCH] 1. support get consumerRunningInfo return goroutine stack --- consumer/push_consumer.go | 16 +++++++++++++++- internal/client.go | 8 ++++---- internal/model.go | 7 ++++++- internal/request.go | 7 +++++++ 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 5a9f64af..801f412e 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -22,6 +22,7 @@ import ( "fmt" errors2 "github.com/apache/rocketmq-client-go/v2/errors" "math" + "runtime/pprof" "strconv" "strings" "sync" @@ -352,7 +353,7 @@ func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, broker return res } -func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo { +func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRunningInfo { info := internal.NewConsumerRunningInfo() pc.subscriptionDataTable.Range(func(key, value interface{}) bool { @@ -379,6 +380,19 @@ func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo { return true }) + if stack { + var buffer strings.Builder + + err := pprof.Lookup("goroutine").WriteTo(&buffer, 2) + if err != nil { + rlog.Error("error when get stack ", map[string]interface{}{ + "error": err, + }) + } else { + info.JStack = buffer.String() + } + } + nsAddr := "" for _, value := range pc.client.GetNameSrv().AddrList() { nsAddr += fmt.Sprintf("%s;", value) diff --git a/internal/client.go b/internal/client.go index cc7a3ecc..9965c238 100644 --- a/internal/client.go +++ b/internal/client.go @@ -85,7 +85,7 @@ type InnerConsumer interface { SubscriptionDataList() []*SubscriptionData Rebalance() IsUnitMode() bool - GetConsumerRunningInfo() *ConsumerRunningInfo + GetConsumerRunningInfo(stack bool) *ConsumerRunningInfo ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *ConsumeMessageDirectlyResult GetcType() string GetModel() string @@ -270,7 +270,7 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R cli, ok := val.(*rmqClient) var runningInfo *ConsumerRunningInfo if ok { - runningInfo = cli.getConsumerRunningInfo(header.consumerGroup) + runningInfo = cli.getConsumerRunningInfo(header.consumerGroup, header.jstackEnable) } if runningInfo != nil { res.Code = ResSuccess @@ -840,12 +840,12 @@ func (c *rmqClient) resetOffset(topic string, group string, offsetTable map[prim consumer.(InnerConsumer).ResetOffset(topic, offsetTable) } -func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo { +func (c *rmqClient) getConsumerRunningInfo(group string, stack bool) *ConsumerRunningInfo { consumer, exist := c.consumerMap.Load(group) if !exist { return nil } - info := consumer.(InnerConsumer).GetConsumerRunningInfo() + info := consumer.(InnerConsumer).GetConsumerRunningInfo(stack) if info != nil { info.Properties[PropClientVersion] = clientVersion } diff --git a/internal/model.go b/internal/model.go index 7a011d77..c248bb36 100644 --- a/internal/model.go +++ b/internal/model.go @@ -149,6 +149,7 @@ type ConsumerRunningInfo struct { SubscriptionData map[*SubscriptionData]bool MQTable map[primitive.MessageQueue]ProcessQueueInfo StatusTable map[string]ConsumeStatus + JStack string // just follow java request param name, but pass golang stack here. } func (info ConsumerRunningInfo) Encode() ([]byte, error) { @@ -251,7 +252,11 @@ func (info ConsumerRunningInfo) Encode() ([]byte, error) { tableJson = fmt.Sprintf("%s,%s:%s", tableJson, string(dataK), string(dataV)) } tableJson = strings.TrimLeft(tableJson, ",") - jsonData = fmt.Sprintf("%s,\"%s\":%s}", jsonData, "mqTable", fmt.Sprintf("{%s}", tableJson)) + + jsonData = fmt.Sprintf("%s,\"%s\":%s, \"%s\":\"%s\" }", + jsonData, "mqTable", fmt.Sprintf("{%s}", tableJson), + "jstack", info.JStack) + return []byte(jsonData), nil } diff --git a/internal/request.go b/internal/request.go index 0e3d8e1c..2a475e1a 100644 --- a/internal/request.go +++ b/internal/request.go @@ -305,12 +305,14 @@ func (request *GetRouteInfoRequestHeader) Encode() map[string]string { type GetConsumerRunningInfoHeader struct { consumerGroup string clientID string + jstackEnable bool } func (request *GetConsumerRunningInfoHeader) Encode() map[string]string { maps := make(map[string]string) maps["consumerGroup"] = request.consumerGroup maps["clientId"] = request.clientID + maps["jstackEnable"] = strconv.FormatBool(request.jstackEnable) return maps } @@ -325,6 +327,11 @@ func (request *GetConsumerRunningInfoHeader) Decode(properties map[string]string if v, existed := properties["clientId"]; existed { request.clientID = v } + + if v, existed := properties["jstackEnable"]; existed { + parseBool, _ := strconv.ParseBool(v) + request.jstackEnable = parseBool + } } type QueryMessageRequestHeader struct {