Skip to content

Commit

Permalink
[exporter/prometheusremotewrite] Add the possibility to disable queui…
Browse files Browse the repository at this point in the history
…ng (#6718)

* [prometheusremotewriteexporter] Add the possibility to disable queuing

A new configuration key, `remote_write queue.enabled` is added, which
controls the queue enabling.

* [exporter/awsprometheusremotewriteexporter] Fix config unit tests.

Update the expected prometheusremotewriteexporter default config.
  • Loading branch information
Traian Schiau authored Dec 21, 2021
1 parent aa9b71b commit 270b2c4
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 3 deletions.
1 change: 1 addition & 0 deletions exporter/awsprometheusremotewriteexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestLoadConfig(t *testing.T) {
MaxElapsedTime: 10 * time.Minute,
},
RemoteWriteQueue: prw.RemoteWriteQueue{
Enabled: true,
QueueSize: 10000,
NumConsumers: 5,
},
Expand Down
3 changes: 2 additions & 1 deletion exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ The following settings can be optionally configured:
- *Note the following headers cannot be changed: `Content-Encoding`, `Content-Type`, `X-Prometheus-Remote-Write-Version`, and `User-Agent`.*
- `namespace`: prefix attached to each exported metric name.
- `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes.
- `queue_size`: number of OTLP metrics that can be queued.
- `enabled`: enable the sending queue
- `queue_size`: number of OTLP metrics that can be queued. Ignored if `enabled` is `false`
- `num_consumers`: minimum number of workers to use to fan out the outgoing requests.

Example:
Expand Down
11 changes: 10 additions & 1 deletion exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ type Config struct {

// RemoteWriteQueue allows to configure the remote write queue.
type RemoteWriteQueue struct {
// Enabled if false the queue is not enabled, the export requests
// are executed synchronously.
Enabled bool `mapstructure:"enabled"`

// QueueSize is the maximum number of OTLP metric batches allowed
// in the queue at a given time.
// in the queue at a given time. Ignored if Enabled is false.
QueueSize int `mapstructure:"queue_size"`

// NumWorkers configures the number of workers used by
Expand All @@ -69,6 +73,11 @@ func (cfg *Config) Validate() error {
if cfg.RemoteWriteQueue.QueueSize < 0 {
return fmt.Errorf("remote write queue size can't be negative")
}

if cfg.RemoteWriteQueue.Enabled && cfg.RemoteWriteQueue.QueueSize == 0 {
return fmt.Errorf("a 0 size queue will drop all the data")
}

if cfg.RemoteWriteQueue.NumConsumers < 0 {
return fmt.Errorf("remote write consumer number can't be negative")
}
Expand Down
12 changes: 12 additions & 0 deletions exporter/prometheusremotewriteexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func Test_loadConfig(t *testing.T) {
MaxElapsedTime: 10 * time.Minute,
},
RemoteWriteQueue: RemoteWriteQueue{
Enabled: true,
QueueSize: 2000,
NumConsumers: 10,
},
Expand Down Expand Up @@ -103,3 +104,14 @@ func TestNegativeNumConsumers(t *testing.T) {
_, err = configtest.LoadConfigAndValidate(path.Join(".", "testdata", "negative_num_consumers.yaml"), factories)
assert.Error(t, err)
}

func TestDisabledQueue(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Exporters[typeStr] = factory
cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", "disabled_queue.yaml"), factories)
assert.NoError(t, err)
assert.False(t, cfg.Exporters[config.NewComponentID(typeStr)].(*Config).RemoteWriteQueue.Enabled)
}
3 changes: 2 additions & 1 deletion exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func createMetricsExporter(_ context.Context, set component.ExporterCreateSettin
prwe.PushMetrics,
exporterhelper.WithTimeout(prwCfg.TimeoutSettings),
exporterhelper.WithQueue(exporterhelper.QueueSettings{
Enabled: true,
Enabled: prwCfg.RemoteWriteQueue.Enabled,
NumConsumers: 1,
QueueSize: prwCfg.RemoteWriteQueue.QueueSize,
}),
Expand Down Expand Up @@ -101,6 +101,7 @@ func createDefaultConfig() config.Exporter {
},
// TODO(jbd): Adjust the default queue size.
RemoteWriteQueue: RemoteWriteQueue{
Enabled: true,
QueueSize: 10000,
NumConsumers: 5,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
receivers:
nop:

processors:
nop:

exporters:
prometheusremotewrite:
endpoint: "localhost:8888"
remote_write_queue:
enabled: false
num_consumers: 10

service:
pipelines:
metrics:
receivers: [nop]
processors: [nop]
exporters: [prometheusremotewrite]


0 comments on commit 270b2c4

Please sign in to comment.