From 270b2c4cc10fb5188c0c839838a13781d9bd4ae4 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Tue, 21 Dec 2021 14:30:16 +0200 Subject: [PATCH] [exporter/prometheusremotewrite] Add the possibility to disable queuing (#6718) * [prometheusremotewriteexporter] Add the possibility to disable queuing A new configuration key, `remote_write queue.enabled` is added, which controls the queue enabling. * [exporter/awsprometheusremotewriteexporter] Fix config unit tests. Update the expected prometheusremotewriteexporter default config. --- .../config_test.go | 1 + .../prometheusremotewriteexporter/README.md | 3 ++- .../prometheusremotewriteexporter/config.go | 11 +++++++++- .../config_test.go | 12 +++++++++++ .../prometheusremotewriteexporter/factory.go | 3 ++- .../testdata/disabled_queue.yaml | 21 +++++++++++++++++++ 6 files changed, 48 insertions(+), 3 deletions(-) create mode 100644 exporter/prometheusremotewriteexporter/testdata/disabled_queue.yaml diff --git a/exporter/awsprometheusremotewriteexporter/config_test.go b/exporter/awsprometheusremotewriteexporter/config_test.go index 7b5171f93a33..83162844531b 100644 --- a/exporter/awsprometheusremotewriteexporter/config_test.go +++ b/exporter/awsprometheusremotewriteexporter/config_test.go @@ -64,6 +64,7 @@ func TestLoadConfig(t *testing.T) { MaxElapsedTime: 10 * time.Minute, }, RemoteWriteQueue: prw.RemoteWriteQueue{ + Enabled: true, QueueSize: 10000, NumConsumers: 5, }, diff --git a/exporter/prometheusremotewriteexporter/README.md b/exporter/prometheusremotewriteexporter/README.md index b87ac142b739..27d71e87d726 100644 --- a/exporter/prometheusremotewriteexporter/README.md +++ b/exporter/prometheusremotewriteexporter/README.md @@ -39,7 +39,8 @@ The following settings can be optionally configured: - *Note the following headers cannot be changed: `Content-Encoding`, `Content-Type`, `X-Prometheus-Remote-Write-Version`, and `User-Agent`.* - `namespace`: prefix attached to each exported metric name. - `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes. - - `queue_size`: number of OTLP metrics that can be queued. + - `enabled`: enable the sending queue + - `queue_size`: number of OTLP metrics that can be queued. Ignored if `enabled` is `false` - `num_consumers`: minimum number of workers to use to fan out the outgoing requests. Example: diff --git a/exporter/prometheusremotewriteexporter/config.go b/exporter/prometheusremotewriteexporter/config.go index 39a9aebd4e79..fd6c3274ec99 100644 --- a/exporter/prometheusremotewriteexporter/config.go +++ b/exporter/prometheusremotewriteexporter/config.go @@ -51,8 +51,12 @@ type Config struct { // RemoteWriteQueue allows to configure the remote write queue. type RemoteWriteQueue struct { + // Enabled if false the queue is not enabled, the export requests + // are executed synchronously. + Enabled bool `mapstructure:"enabled"` + // QueueSize is the maximum number of OTLP metric batches allowed - // in the queue at a given time. + // in the queue at a given time. Ignored if Enabled is false. QueueSize int `mapstructure:"queue_size"` // NumWorkers configures the number of workers used by @@ -69,6 +73,11 @@ func (cfg *Config) Validate() error { if cfg.RemoteWriteQueue.QueueSize < 0 { return fmt.Errorf("remote write queue size can't be negative") } + + if cfg.RemoteWriteQueue.Enabled && cfg.RemoteWriteQueue.QueueSize == 0 { + return fmt.Errorf("a 0 size queue will drop all the data") + } + if cfg.RemoteWriteQueue.NumConsumers < 0 { return fmt.Errorf("remote write consumer number can't be negative") } diff --git a/exporter/prometheusremotewriteexporter/config_test.go b/exporter/prometheusremotewriteexporter/config_test.go index a6f286cab54b..d369cc8f1784 100644 --- a/exporter/prometheusremotewriteexporter/config_test.go +++ b/exporter/prometheusremotewriteexporter/config_test.go @@ -60,6 +60,7 @@ func Test_loadConfig(t *testing.T) { MaxElapsedTime: 10 * time.Minute, }, RemoteWriteQueue: RemoteWriteQueue{ + Enabled: true, QueueSize: 2000, NumConsumers: 10, }, @@ -103,3 +104,14 @@ func TestNegativeNumConsumers(t *testing.T) { _, err = configtest.LoadConfigAndValidate(path.Join(".", "testdata", "negative_num_consumers.yaml"), factories) assert.Error(t, err) } + +func TestDisabledQueue(t *testing.T) { + factories, err := componenttest.NopFactories() + assert.NoError(t, err) + + factory := NewFactory() + factories.Exporters[typeStr] = factory + cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", "disabled_queue.yaml"), factories) + assert.NoError(t, err) + assert.False(t, cfg.Exporters[config.NewComponentID(typeStr)].(*Config).RemoteWriteQueue.Enabled) +} diff --git a/exporter/prometheusremotewriteexporter/factory.go b/exporter/prometheusremotewriteexporter/factory.go index 248df3c98717..491bdb665306 100644 --- a/exporter/prometheusremotewriteexporter/factory.go +++ b/exporter/prometheusremotewriteexporter/factory.go @@ -65,7 +65,7 @@ func createMetricsExporter(_ context.Context, set component.ExporterCreateSettin prwe.PushMetrics, exporterhelper.WithTimeout(prwCfg.TimeoutSettings), exporterhelper.WithQueue(exporterhelper.QueueSettings{ - Enabled: true, + Enabled: prwCfg.RemoteWriteQueue.Enabled, NumConsumers: 1, QueueSize: prwCfg.RemoteWriteQueue.QueueSize, }), @@ -101,6 +101,7 @@ func createDefaultConfig() config.Exporter { }, // TODO(jbd): Adjust the default queue size. RemoteWriteQueue: RemoteWriteQueue{ + Enabled: true, QueueSize: 10000, NumConsumers: 5, }, diff --git a/exporter/prometheusremotewriteexporter/testdata/disabled_queue.yaml b/exporter/prometheusremotewriteexporter/testdata/disabled_queue.yaml new file mode 100644 index 000000000000..88df651c6316 --- /dev/null +++ b/exporter/prometheusremotewriteexporter/testdata/disabled_queue.yaml @@ -0,0 +1,21 @@ +receivers: + nop: + +processors: + nop: + +exporters: + prometheusremotewrite: + endpoint: "localhost:8888" + remote_write_queue: + enabled: false + num_consumers: 10 + +service: + pipelines: + metrics: + receivers: [nop] + processors: [nop] + exporters: [prometheusremotewrite] + +