From f3349bdbb4dbcf9a7481563cb0fc3dd3d3ce3985 Mon Sep 17 00:00:00 2001 From: zhangjidi2016 Date: Fri, 14 Oct 2022 15:09:57 +0800 Subject: [PATCH] [ISSUE #939]Pull or poll message support message trace (#940) Co-authored-by: zhangjidi --- consumer/pull_consumer.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index 6c24c631..a7e0ec58 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -79,6 +79,7 @@ type defaultPullConsumer struct { closeOnce sync.Once consumeRequestCache chan *ConsumeRequest submitToConsume func(*processQueue, *primitive.MessageQueue) + interceptor primitive.Interceptor } func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) { @@ -115,6 +116,7 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) { } dc.mqChanged = c.messageQueueChanged c.submitToConsume = c.consumeMessageCurrently + c.interceptor = primitive.ChainInterceptors(c.option.Interceptors...) return c, nil } @@ -251,8 +253,16 @@ RETRY: if result == ConsumeSuccess { msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn) + msgCtx.Success = true } else { msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn) + msgCtx.Success = false + } + + if pc.interceptor != nil { + pc.interceptor(ctx, msgList, nil, func(ctx context.Context, req, reply interface{}) error { + return nil + }) } if !pq.IsDroppd() { @@ -334,6 +344,22 @@ func (pc *defaultPullConsumer) Pull(ctx context.Context, numbers int) (*primitiv } pc.processPullResult(mq, result, data) + + if pc.interceptor != nil { + msgCtx := &primitive.ConsumeMessageContext{ + Properties: make(map[string]string), + ConsumerGroup: pc.consumerGroup, + MQ: mq, + Msgs: result.GetMessageExts(), + Success: true, + } + ctx = primitive.WithConsumerCtx(ctx, msgCtx) + ctx = primitive.WithMethod(ctx, primitive.ConsumerPull) + pc.interceptor(ctx, result.GetMessageExts(), nil, func(ctx context.Context, req, reply interface{}) error { + return nil + }) + } + return result, nil } @@ -454,6 +480,9 @@ func (pc *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int func (pc *defaultPullConsumer) Shutdown() error { var err error pc.closeOnce.Do(func() { + if pc.option.TraceDispatcher != nil { + pc.option.TraceDispatcher.Close() + } close(pc.done) pc.client.UnregisterConsumer(pc.consumerGroup)