Skip to content

Commit

Permalink
[exporter/prometheusremotewrite] hookup WAL (open-telemetry#7304)
Browse files Browse the repository at this point in the history
* exporter/prometheusremotewrite: glue up and use Write-Ahead-Log

This change completes the WAL capability for the Prometheus Remote-Write exporter
that allows recovery of data if the prior write requests weren't yet exported.

This wires up a Write-Ahead-Log (WAL) that was implemented in
PR open-telemetry/opentelemetry-collector#3597, which
was split off from PR open-telemetry/opentelemetry-collector#3017.

Note: there is very rare condition for which we can perhaps send the
same data a couple of times, and it can happen if we haven't yet truncated
the WAL, the RemoteWrite endpoint received the prior data but the process
received a Ctrl+C, kill signal. However, this will be very rare and
would have to be timed so fast and precisely.

Replaces PR open-telemetry/opentelemetry-collector#3017
Updates PR open-telemetry/opentelemetry-collector#3597
Fixes open-telemetry#4751
Fixes open-telemetry/prometheus-interoperability-spec#9

* fix go.mod

Signed-off-by: Anthony J Mirabella <[email protected]>

* exporter/prw: WALConfig should be exported to allow programmatic manipulation

Signed-off-by: Anthony J Mirabella <[email protected]>

* Add CHANGELOG entry

Signed-off-by: Anthony J Mirabella <[email protected]>

* fix lint error

Signed-off-by: Anthony J Mirabella <[email protected]>

* mod tidy

Signed-off-by: Anthony J Mirabella <[email protected]>

* tidy up WAL logic and comments

Signed-off-by: Anthony J Mirabella <[email protected]>

* Handle error from closing WAL

Signed-off-by: Anthony J Mirabella <[email protected]>

* Ensure WAL processor keeps running after error

Signed-off-by: Anthony J Mirabella <[email protected]>

* prwe/WAL: refactor WAL run loop

Signed-off-by: Anthony J Mirabella <[email protected]>

* make gotidy

Signed-off-by: Anthony J Mirabella <[email protected]>

* lint

Signed-off-by: Anthony J Mirabella <[email protected]>

* fix data races

Signed-off-by: Anthony J Mirabella <[email protected]>

* Ensure locking around entire WAL persistence routine

Signed-off-by: Anthony J Mirabella <[email protected]>

* Address PR feedback

Signed-off-by: Anthony J Mirabella <[email protected]>

* fix lint issues

Signed-off-by: Anthony J Mirabella <[email protected]>

* update read index inside WAL reader routine

Signed-off-by: Anthony J Mirabella <[email protected]>

* Update CHANGELOG.md

* Undo unrelated go.mod changes

Signed-off-by: Anthony J Mirabella <[email protected]>

Co-authored-by: Emmanuel T Odeke <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
Co-authored-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
4 people authored and David Pequegnot committed May 16, 2022
1 parent 5ce1b0c commit 54422e7
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 37 deletions.
1 change: 1 addition & 0 deletions exporter/awsprometheusremotewriteexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 11 additions & 33 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type prwExporter struct {
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
multiTenancy MultiTenancy
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings
settings component.TelemetrySettings
Expand Down Expand Up @@ -80,7 +79,6 @@ func newPRWExporter(cfg *Config, set component.ExporterCreateSettings) (*prwExpo
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
multiTenancy: cfg.MultiTenancy,
clientSettings: &cfg.HTTPClientSettings,
settings: set.TelemetrySettings,
}
Expand Down Expand Up @@ -173,12 +171,8 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) {
}

func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
var tenantKey = noTenant
if prwe.multiTenancy.Enabled {
tenantKey = prwe.multiTenancy.FromLabel
}
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, maxBatchByteSize, tenantKey)
requests, err := batchTimeSeries(tsMap, maxBatchByteSize)
if err != nil {
return err
}
Expand All @@ -201,17 +195,14 @@ func (prwe *prwExporter) export(ctx context.Context, requests map[string][]*prom
for _, tenantRequests := range requests {
chanLength += len(tenantRequests)
}
input := make(chan struct {
string
*prompb.WriteRequest
}, chanLength)
for tenant, tenantRequests := range requests {
for _, request := range tenantRequests {
input <- struct {
string
*prompb.WriteRequest
}{tenant, request}
}
return nil
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteRequest) error {
input := make(chan *prompb.WriteRequest, len(requests))
for _, request := range requests {
input <- request
}
close(input)

Expand Down Expand Up @@ -250,7 +241,7 @@ func (prwe *prwExporter) export(ctx context.Context, requests map[string][]*prom
return errs
}

func (prwe *prwExporter) execute(ctx context.Context, tenant string, writeReq *prompb.WriteRequest) error {
func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error {
// Uses proto.Marshal to convert the WriteRequest into bytes array
data, err := proto.Marshal(writeReq)
if err != nil {
Expand All @@ -259,18 +250,8 @@ func (prwe *prwExporter) execute(ctx context.Context, tenant string, writeReq *p
buf := make([]byte, len(data), cap(data))
compressedData := snappy.Encode(buf, data)

if prwe.multiTenancy.Enabled && tenant == noTenant {
tenant = prwe.multiTenancy.DefaultTenant
}

// Enable multitenancy at query level (eg. Thanos)
endpointQueryUrl := prwe.endpointURL
if prwe.multiTenancy.Enabled && prwe.multiTenancy.QueryParam != "" {
endpointQueryUrl.Query().Set(prwe.multiTenancy.QueryParam, tenant)
}

// Create the HTTP POST request to send to the endpoint
req, err := http.NewRequestWithContext(ctx, "POST", endpointQueryUrl.String(), bytes.NewReader(compressedData))
req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand All @@ -281,9 +262,6 @@ func (prwe *prwExporter) execute(ctx context.Context, tenant string, writeReq *p
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set("User-Agent", prwe.userAgentHeader)
if prwe.multiTenancy.Enabled && prwe.multiTenancy.Header != "" {
req.Header.Set(prwe.multiTenancy.Header, tenant)
}

resp, err := prwe.client.Do(req)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,11 +634,11 @@ func Test_PushMetrics(t *testing.T) {
prwe, nErr := newPRWExporter(cfg, set)
require.NoError(t, nErr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Cleanup(cancel)
require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost()))
defer func() {
t.Cleanup(func() {
require.NoError(t, prwe.Shutdown(ctx))
}()
})
err := prwe.PushMetrics(ctx, *tt.metrics)
if tt.returnErr {
assert.Error(t, err)
Expand Down Expand Up @@ -676,7 +676,7 @@ func Test_exportWithMultiTenancy(t *testing.T) {

tests := []struct {
name string
md *pdata.Metrics
md *pmetric.Metrics
multiTenancy bool
expectedTenants []string
}{
Expand Down
1 change: 1 addition & 0 deletions exporter/prometheusremotewriteexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 54422e7

Please sign in to comment.