Skip to content

Commit

Permalink
[ISSUE #901]Close the traceDispatcher when producer or consumer is sh…
Browse files Browse the repository at this point in the history
…utdown (#902)

Co-authored-by: zhangjidi <[email protected]>
  • Loading branch information
zhangjidi2016 and zhangjidi authored Aug 24, 2022
1 parent 39619b9 commit 2630383
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 8 deletions.
8 changes: 4 additions & 4 deletions consumer/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ import (
// WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
func WithTrace(traceCfg *primitive.TraceConfig) Option {
return func(options *consumerOptions) {

dispatcher := internal.NewTraceDispatcher(traceCfg)
options.TraceDispatcher = dispatcher
ori := options.Interceptors
options.Interceptors = make([]primitive.Interceptor, 0)
options.Interceptors = append(options.Interceptors, newTraceInterceptor(traceCfg))
options.Interceptors = append(options.Interceptors, newTraceInterceptor(dispatcher))
options.Interceptors = append(options.Interceptors, ori...)
}
}

func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor {
dispatcher := internal.NewTraceDispatcher(traceCfg)
func newTraceInterceptor(dispatcher internal.TraceDispatcher) primitive.Interceptor {
if dispatcher != nil {
dispatcher.Start()
}
Expand Down
2 changes: 2 additions & 0 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type consumerOptions struct {
filterMessageHooks []hooks.FilterMessageHook

Limiter Limiter

TraceDispatcher internal.TraceDispatcher
}

func defaultPushConsumerOptions() consumerOptions {
Expand Down
3 changes: 3 additions & 0 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ func (pc *pushConsumer) Start() error {
func (pc *pushConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
if pc.option.TraceDispatcher != nil {
pc.option.TraceDispatcher.Close()
}
close(pc.done)

pc.client.UnregisterConsumer(pc.consumerGroup)
Expand Down
8 changes: 4 additions & 4 deletions producer/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ import (
// WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
func WithTrace(traceCfg *primitive.TraceConfig) Option {
return func(options *producerOptions) {

dispatcher := internal.NewTraceDispatcher(traceCfg)
options.TraceDispatcher = dispatcher
ori := options.Interceptors
options.Interceptors = make([]primitive.Interceptor, 0)
options.Interceptors = append(options.Interceptors, newTraceInterceptor(traceCfg))
options.Interceptors = append(options.Interceptors, newTraceInterceptor(dispatcher))
options.Interceptors = append(options.Interceptors, ori...)
}
}

func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor {
dispatcher := internal.NewTraceDispatcher(traceCfg)
func newTraceInterceptor(dispatcher internal.TraceDispatcher) primitive.Interceptor {
if dispatcher != nil {
dispatcher.Start()
}
Expand Down
1 change: 1 addition & 0 deletions producer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type producerOptions struct {
Resolver primitive.NsResolver
CompressMsgBodyOverHowmuch int
CompressLevel int
TraceDispatcher internal.TraceDispatcher
}

type Option func(*producerOptions)
Expand Down
3 changes: 3 additions & 0 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func (p *defaultProducer) Start() error {

func (p *defaultProducer) Shutdown() error {
p.ShutdownOnce.Do(func() {
if p.options.TraceDispatcher != nil {
p.options.TraceDispatcher.Close()
}
atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
p.client.UnregisterProducer(p.group)
p.client.Shutdown()
Expand Down

0 comments on commit 2630383

Please sign in to comment.