From 3d2cd25f38038a745c988fb41ca070eb2f9dec02 Mon Sep 17 00:00:00 2001 From: cserwen Date: Thu, 12 May 2022 17:20:03 +0800 Subject: [PATCH] fix some params for consumer can not be changed --- consumer/option.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/consumer/option.go b/consumer/option.go index 330f6c4f..1ef184b2 100644 --- a/consumer/option.go +++ b/consumer/option.go @@ -118,6 +118,7 @@ func defaultPushConsumerOptions() consumerOptions { ConsumerModel: Clustering, AutoCommit: true, Resolver: primitive.NewHttpResolver("DEFAULT"), + ConsumeTimestamp: time.Now().Add(time.Minute * (-30)).Format("20060102150405"), } opts.ClientOptions.GroupName = "DEFAULT_CONSUMER" return opts @@ -158,6 +159,24 @@ func WithConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize int) Option { } } +func WithConsumeTimestamp(consumeTimestamp string) Option { + return func(options *consumerOptions) { + options.ConsumeTimestamp = consumeTimestamp + } +} + +func WithConsumerPullTimeout(consumerPullTimeout time.Duration) Option { + return func(options *consumerOptions) { + options.ConsumerPullTimeout = consumerPullTimeout + } +} + +func WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option { + return func(options *consumerOptions) { + options.ConsumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan + } +} + // WithChainConsumerInterceptor returns a ConsumerOption that specifies the chained interceptor for consumer. // The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper // around the real call.