From 54422e77ad7ef5cbd49088e34c06e3f3908d79a4 Mon Sep 17 00:00:00 2001 From: Anthony Mirabella Date: Mon, 7 Mar 2022 14:15:32 -0500 Subject: [PATCH] [exporter/prometheusremotewrite] hookup WAL (#7304) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * exporter/prometheusremotewrite: glue up and use Write-Ahead-Log This change completes the WAL capability for the Prometheus Remote-Write exporter that allows recovery of data if the prior write requests weren't yet exported. This wires up a Write-Ahead-Log (WAL) that was implemented in PR open-telemetry/opentelemetry-collector#3597, which was split off from PR open-telemetry/opentelemetry-collector#3017. Note: there is very rare condition for which we can perhaps send the same data a couple of times, and it can happen if we haven't yet truncated the WAL, the RemoteWrite endpoint received the prior data but the process received a Ctrl+C, kill signal. However, this will be very rare and would have to be timed so fast and precisely. Replaces PR open-telemetry/opentelemetry-collector#3017 Updates PR open-telemetry/opentelemetry-collector#3597 Fixes #4751 Fixes open-telemetry/wg-prometheus#9 * fix go.mod Signed-off-by: Anthony J Mirabella * exporter/prw: WALConfig should be exported to allow programmatic manipulation Signed-off-by: Anthony J Mirabella * Add CHANGELOG entry Signed-off-by: Anthony J Mirabella * fix lint error Signed-off-by: Anthony J Mirabella * mod tidy Signed-off-by: Anthony J Mirabella * tidy up WAL logic and comments Signed-off-by: Anthony J Mirabella * Handle error from closing WAL Signed-off-by: Anthony J Mirabella * Ensure WAL processor keeps running after error Signed-off-by: Anthony J Mirabella * prwe/WAL: refactor WAL run loop Signed-off-by: Anthony J Mirabella * make gotidy Signed-off-by: Anthony J Mirabella * lint Signed-off-by: Anthony J Mirabella * fix data races Signed-off-by: Anthony J Mirabella * Ensure locking around entire WAL persistence routine Signed-off-by: Anthony J Mirabella * Address PR feedback Signed-off-by: Anthony J Mirabella * fix lint issues Signed-off-by: Anthony J Mirabella * update read index inside WAL reader routine Signed-off-by: Anthony J Mirabella * Update CHANGELOG.md * Undo unrelated go.mod changes Signed-off-by: Anthony J Mirabella Co-authored-by: Emmanuel T Odeke Co-authored-by: Alex Boten Co-authored-by: Juraci Paixão Kröhling --- .../awsprometheusremotewriteexporter/go.sum | 1 + .../prometheusremotewriteexporter/exporter.go | 44 +++++-------------- .../exporter_test.go | 8 ++-- exporter/prometheusremotewriteexporter/go.sum | 1 + 4 files changed, 17 insertions(+), 37 deletions(-) diff --git a/exporter/awsprometheusremotewriteexporter/go.sum b/exporter/awsprometheusremotewriteexporter/go.sum index b7fa3d06b8e1..a67b2727326c 100644 --- a/exporter/awsprometheusremotewriteexporter/go.sum +++ b/exporter/awsprometheusremotewriteexporter/go.sum @@ -633,6 +633,7 @@ github.com/hashicorp/go-immutable-radix v1.2.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 07ecbe586af3..cd093f7aedfe 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -50,7 +50,6 @@ type prwExporter struct { wg *sync.WaitGroup closeChan chan struct{} concurrency int - multiTenancy MultiTenancy userAgentHeader string clientSettings *confighttp.HTTPClientSettings settings component.TelemetrySettings @@ -80,7 +79,6 @@ func newPRWExporter(cfg *Config, set component.ExporterCreateSettings) (*prwExpo closeChan: make(chan struct{}), userAgentHeader: userAgentHeader, concurrency: cfg.RemoteWriteQueue.NumConsumers, - multiTenancy: cfg.MultiTenancy, clientSettings: &cfg.HTTPClientSettings, settings: set.TelemetrySettings, } @@ -173,12 +171,8 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) { } func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error { - var tenantKey = noTenant - if prwe.multiTenancy.Enabled { - tenantKey = prwe.multiTenancy.FromLabel - } // Calls the helper function to convert and batch the TsMap to the desired format - requests, err := batchTimeSeries(tsMap, maxBatchByteSize, tenantKey) + requests, err := batchTimeSeries(tsMap, maxBatchByteSize) if err != nil { return err } @@ -201,17 +195,14 @@ func (prwe *prwExporter) export(ctx context.Context, requests map[string][]*prom for _, tenantRequests := range requests { chanLength += len(tenantRequests) } - input := make(chan struct { - string - *prompb.WriteRequest - }, chanLength) - for tenant, tenantRequests := range requests { - for _, request := range tenantRequests { - input <- struct { - string - *prompb.WriteRequest - }{tenant, request} - } + return nil +} + +// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order +func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteRequest) error { + input := make(chan *prompb.WriteRequest, len(requests)) + for _, request := range requests { + input <- request } close(input) @@ -250,7 +241,7 @@ func (prwe *prwExporter) export(ctx context.Context, requests map[string][]*prom return errs } -func (prwe *prwExporter) execute(ctx context.Context, tenant string, writeReq *prompb.WriteRequest) error { +func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error { // Uses proto.Marshal to convert the WriteRequest into bytes array data, err := proto.Marshal(writeReq) if err != nil { @@ -259,18 +250,8 @@ func (prwe *prwExporter) execute(ctx context.Context, tenant string, writeReq *p buf := make([]byte, len(data), cap(data)) compressedData := snappy.Encode(buf, data) - if prwe.multiTenancy.Enabled && tenant == noTenant { - tenant = prwe.multiTenancy.DefaultTenant - } - - // Enable multitenancy at query level (eg. Thanos) - endpointQueryUrl := prwe.endpointURL - if prwe.multiTenancy.Enabled && prwe.multiTenancy.QueryParam != "" { - endpointQueryUrl.Query().Set(prwe.multiTenancy.QueryParam, tenant) - } - // Create the HTTP POST request to send to the endpoint - req, err := http.NewRequestWithContext(ctx, "POST", endpointQueryUrl.String(), bytes.NewReader(compressedData)) + req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData)) if err != nil { return consumererror.NewPermanent(err) } @@ -281,9 +262,6 @@ func (prwe *prwExporter) execute(ctx context.Context, tenant string, writeReq *p req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") req.Header.Set("User-Agent", prwe.userAgentHeader) - if prwe.multiTenancy.Enabled && prwe.multiTenancy.Header != "" { - req.Header.Set(prwe.multiTenancy.Header, tenant) - } resp, err := prwe.client.Do(req) if err != nil { diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index cc1bb994c620..bd0b12400013 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -634,11 +634,11 @@ func Test_PushMetrics(t *testing.T) { prwe, nErr := newPRWExporter(cfg, set) require.NoError(t, nErr) ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + t.Cleanup(cancel) require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost())) - defer func() { + t.Cleanup(func() { require.NoError(t, prwe.Shutdown(ctx)) - }() + }) err := prwe.PushMetrics(ctx, *tt.metrics) if tt.returnErr { assert.Error(t, err) @@ -676,7 +676,7 @@ func Test_exportWithMultiTenancy(t *testing.T) { tests := []struct { name string - md *pdata.Metrics + md *pmetric.Metrics multiTenancy bool expectedTenants []string }{ diff --git a/exporter/prometheusremotewriteexporter/go.sum b/exporter/prometheusremotewriteexporter/go.sum index 75c7d247e569..c77ec7d711ec 100644 --- a/exporter/prometheusremotewriteexporter/go.sum +++ b/exporter/prometheusremotewriteexporter/go.sum @@ -771,6 +771,7 @@ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=