Skip to content

Commit

Permalink
[ISSUE #939]Pull or poll message support message trace (#940)
Browse files Browse the repository at this point in the history
Co-authored-by: zhangjidi <[email protected]>
  • Loading branch information
zhangjidi2016 and zhangjidi authored Oct 14, 2022
1 parent a093bac commit f3349bd
Showing 1 changed file with 29 additions and 0 deletions.
29 changes: 29 additions & 0 deletions consumer/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type defaultPullConsumer struct {
closeOnce sync.Once
consumeRequestCache chan *ConsumeRequest
submitToConsume func(*processQueue, *primitive.MessageQueue)
interceptor primitive.Interceptor
}

func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
Expand Down Expand Up @@ -115,6 +116,7 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
}
dc.mqChanged = c.messageQueueChanged
c.submitToConsume = c.consumeMessageCurrently
c.interceptor = primitive.ChainInterceptors(c.option.Interceptors...)
return c, nil
}

Expand Down Expand Up @@ -251,8 +253,16 @@ RETRY:

if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
msgCtx.Success = true
} else {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
msgCtx.Success = false
}

if pc.interceptor != nil {
pc.interceptor(ctx, msgList, nil, func(ctx context.Context, req, reply interface{}) error {
return nil
})
}

if !pq.IsDroppd() {
Expand Down Expand Up @@ -334,6 +344,22 @@ func (pc *defaultPullConsumer) Pull(ctx context.Context, numbers int) (*primitiv
}

pc.processPullResult(mq, result, data)

if pc.interceptor != nil {
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: result.GetMessageExts(),
Success: true,
}
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPull)
pc.interceptor(ctx, result.GetMessageExts(), nil, func(ctx context.Context, req, reply interface{}) error {
return nil
})
}

return result, nil
}

Expand Down Expand Up @@ -454,6 +480,9 @@ func (pc *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int
func (pc *defaultPullConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
if pc.option.TraceDispatcher != nil {
pc.option.TraceDispatcher.Close()
}
close(pc.done)

pc.client.UnregisterConsumer(pc.consumerGroup)
Expand Down

0 comments on commit f3349bd

Please sign in to comment.