diff --git a/site/content/includes/m3query/annotated_config.yaml b/site/content/includes/m3query/annotated_config.yaml index 5dc1368821..99e4e9b2d1 100644 --- a/site/content/includes/m3query/annotated_config.yaml +++ b/site/content/includes/m3query/annotated_config.yaml @@ -154,6 +154,7 @@ clusters: connectTimeout: # Configuration for retrying write operations writeRetry: + # Defaults to 5s. initialBackoff: # Factor for exponential backoff backoffFactor: diff --git a/src/msg/producer/writer/options.go b/src/msg/producer/writer/options.go index a6f696d5bc..05912c403a 100644 --- a/src/msg/producer/writer/options.go +++ b/src/msg/producer/writer/options.go @@ -50,6 +50,8 @@ const ( // Using 65k which provides much better performance comparing // to lower values like 1k ~ 8k. defaultConnectionBufferSize = 2 << 15 // ~65kb + + defaultWriterRetryInitialBackoff = time.Second * 5 ) // ConnectionOptions configs the connections. @@ -397,11 +399,13 @@ type writerOptions struct { // NewOptions creates Options. func NewOptions() Options { + messageRetryOpts := retry.NewOptions(). + SetInitialBackoff(defaultWriterRetryInitialBackoff) return &writerOptions{ topicWatchInitTimeout: defaultTopicWatchInitTimeout, placementOpts: placement.NewOptions(), placementWatchInitTimeout: defaultPlacementWatchInitTimeout, - messageRetryOpts: retry.NewOptions(), + messageRetryOpts: messageRetryOpts, messageQueueNewWritesScanInterval: defaultMessageQueueNewWritesScanInterval, messageQueueFullScanInterval: defaultMessageQueueFullScanInterval, messageQueueScanBatchSize: defaultMessageQueueScanBatchSize, diff --git a/src/msg/producer/writer/options_test.go b/src/msg/producer/writer/options_test.go index 4be135fc0f..8bbef5face 100644 --- a/src/msg/producer/writer/options_test.go +++ b/src/msg/producer/writer/options_test.go @@ -54,6 +54,9 @@ func TestOptions(t *testing.T) { require.Equal(t, time.Second, opts.SetCloseCheckInterval(time.Second).CloseCheckInterval()) require.Nil(t, opts.SetInstrumentOptions(nil).InstrumentOptions()) + + require.NotNil(t, opts.MessageRetryOptions()) + require.Equal(t, defaultWriterRetryInitialBackoff, opts.MessageRetryOptions().InitialBackoff()) } func TestConnectionOptions(t *testing.T) {