Skip to content

Commit

Permalink
[ISSUE #621] Support consume message directly (#622)
Browse files Browse the repository at this point in the history
* Support consume message directly
  • Loading branch information
maixiaohai authored Mar 16, 2021
1 parent 378f68a commit c97d492
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 0 deletions.
53 changes: 53 additions & 0 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,59 @@ func (pc *pushConsumer) GetWhere() string {

}

func (pc *pushConsumer) ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *internal.ConsumeMessageDirectlyResult {
var msgs = []*primitive.MessageExt{msg}
var mq = &primitive.MessageQueue{
Topic: msg.Topic,
BrokerName: brokerName,
QueueId: msg.Queue.QueueId,
}

beginTime := time.Now()
pc.resetRetryAndNamespace(msgs)
var result ConsumeResult

var err error
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: msgs,
}
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
concurrentCtx := primitive.NewConsumeConcurrentlyContext()
concurrentCtx.MQ = *mq
ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)

result, err = pc.consumeInner(ctx, msgs)

consumeRT := time.Now().Sub(beginTime)

res := &internal.ConsumeMessageDirectlyResult{
Order: false,
AutoCommit: true,
SpentTimeMills: int64(consumeRT / time.Millisecond),
}

if err != nil {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
res.ConsumeResult = internal.ThrowException
res.Remark = err.Error()
} else if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
res.ConsumeResult = internal.ConsumeSuccess
} else if result == ConsumeRetryLater {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
res.ConsumeResult = internal.ConsumeRetryLater
}

increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))

return res
}

func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
info := internal.NewConsumerRunningInfo()

Expand Down
40 changes: 40 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type InnerConsumer interface {
Rebalance()
IsUnitMode() bool
GetConsumerRunningInfo() *ConsumerRunningInfo
ConsumeMessageDirectly(msg *primitive.MessageExt, brokerName string) *ConsumeMessageDirectlyResult
GetcType() string
GetModel() string
GetWhere() string
Expand Down Expand Up @@ -252,6 +253,36 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
}
return res
})

client.remoteClient.RegisterRequestFunc(ReqConsumeMessageDirectly, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
rlog.Info("receive consume message directly request...", nil)
header := new(ConsumeMessageDirectlyHeader)
header.Decode(req.ExtFields)
val, exist := clientMap.Load(header.clientID)
res := remote.NewRemotingCommand(ResError, nil, nil)
if !exist {
res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID)
} else {
cli, ok := val.(*rmqClient)
msg := primitive.DecodeMessage(req.Body)[0]
var consumeMessageDirectlyResult *ConsumeMessageDirectlyResult
if ok {
consumeMessageDirectlyResult = cli.consumeMessageDirectly(msg, header.consumerGroup, header.brokerName)
}
if consumeMessageDirectlyResult != nil {
res.Code = ResSuccess
data, err := consumeMessageDirectlyResult.Encode()
if err != nil {
res.Remark = fmt.Sprintf("json marshal error: %s", err.Error())
} else {
res.Body = data
}
} else {
res.Remark = "there is unexpected error when consume message directly, please check log"
}
}
return res
})
}
return actual.(*rmqClient)
}
Expand Down Expand Up @@ -744,6 +775,15 @@ func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
return info
}

func (c *rmqClient) consumeMessageDirectly(msg *primitive.MessageExt, group string, brokerName string) *ConsumeMessageDirectlyResult {
consumer, exist := c.consumerMap.Load(group)
if !exist {
return nil
}
res := consumer.(InnerConsumer).ConsumeMessageDirectly(msg, brokerName)
return res
}

func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.MessageQueue {
list := make([]*primitive.MessageQueue, 0)
for idx := range data.QueueDataList {
Expand Down
27 changes: 27 additions & 0 deletions internal/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,30 @@ func NewConsumerRunningInfo() *ConsumerRunningInfo {
StatusTable: make(map[string]ConsumeStatus),
}
}

type ConsumeMessageDirectlyResult struct {
Order bool `json:"order"`
AutoCommit bool `json:"autoCommit"`
ConsumeResult ConsumeResult `json:"consumeResult"`
Remark string `json:"remark"`
SpentTimeMills int64 `json:"spentTimeMills"`
}

type ConsumeResult int

const (
ConsumeSuccess ConsumeResult = iota
ConsumeRetryLater
Rollback
Commit
ThrowException
ReturnNull
)

func (result ConsumeMessageDirectlyResult) Encode() ([]byte, error) {
data, err := json.Marshal(result)
if err != nil {
return nil, err
}
return data, nil
}
42 changes: 42 additions & 0 deletions internal/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,45 @@ func TestConsumerRunningInfo_MarshalJSON(t *testing.T) {
})
})
}

func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
Convey("test ConsumeMessageDirectlyResult MarshalJson", t, func() {
Convey("test consume success", func() {
consumeMessageDirectlyResult := ConsumeMessageDirectlyResult{
Order: false,
AutoCommit: true,
SpentTimeMills: 2,
}
consumeMessageDirectlyResult.ConsumeResult = ConsumeSuccess
data, err := consumeMessageDirectlyResult.Encode()
So(err, ShouldBeNil)
fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
})

Convey("test consume timeout", func() {
consumeResult := ConsumeMessageDirectlyResult{
Order: false,
AutoCommit: true,
SpentTimeMills: 2,
}
consumeResult.ConsumeResult = ReturnNull
data, err := consumeResult.Encode()
So(err, ShouldBeNil)
fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
})

Convey("test consume exception", func() {
consumeResult := ConsumeMessageDirectlyResult{
Order: false,
AutoCommit: true,
SpentTimeMills: 5,
}
consumeResult.ConsumeResult = ThrowException
consumeResult.Remark = "Unknown Exception"
data, err := consumeResult.Encode()
So(err, ShouldBeNil)
fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
})
})

}
38 changes: 38 additions & 0 deletions internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,41 @@ func (request *DeleteTopicRequestHeader) Encode() map[string]string {

return maps
}

type ConsumeMessageDirectlyHeader struct {
consumerGroup string
clientID string
msgId string
brokerName string
}

func (request *ConsumeMessageDirectlyHeader) Encode() map[string]string {
maps := make(map[string]string)
maps["consumerGroup"] = request.consumerGroup
maps["clientId"] = request.clientID
maps["msgId"] = request.msgId
maps["brokerName"] = request.brokerName
return maps
}

func (request *ConsumeMessageDirectlyHeader) Decode(properties map[string]string) {
if len(properties) == 0 {
return
}

if v, existed := properties["consumerGroup"]; existed {
request.consumerGroup = v
}

if v, existed := properties["clientId"]; existed {
request.clientID = v
}

if v, existed := properties["msgId"]; existed {
request.msgId = v
}

if v, existed := properties["brokerName"]; existed {
request.brokerName = v
}
}

0 comments on commit c97d492

Please sign in to comment.