Skip to content

Commit

Permalink
exporter/prometheusremotewrite: glue up and use Write-Ahead-Log
Browse files Browse the repository at this point in the history
This change completes the WAL capability for the Prometheus Remote-Write exporter
that allows recovery of data if the prior write requests weren't yet exported.

This wires up a Write-Ahead-Log (WAL) that was implemented in PR open-telemetry#3597, which
was split off from PR open-telemetry#3017.

Note: there is very rare condition for which we can perhaps send the
same data many times and it can happen if we haven't yet truncated the
WAL, the RemoteWrite endpoint received the prior data but the process
received a Ctrl+C, kill signal. However, this will be very rare and
would have to be timed so fast and precisely.

Replaces PR open-telemetry#3017
Updates PR open-telemetry#3597
Fixes open-telemetry/prometheus-interoperability-spec#9
  • Loading branch information
odeke-em committed Aug 16, 2021
1 parent cce6f70 commit f801d66
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 36 deletions.
4 changes: 4 additions & 0 deletions exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ Example:
exporters:
prometheusremotewrite:
endpoint: "https://my-cortex:7900/api/v1/push"
wal: # Enabling the Write-Ahead-Log for the exporter.
directory: ./prom_rw # The directory to store the WAL in
buffer_size: 100 # Optional count of elements to be read from the WAL before truncating; default of 300
truncate_frequency: 45s # Optional frequency for how often the WAL should be truncated. It is a time.ParseDuration; default of 1m
```
## Advanced Configuration
Expand Down
2 changes: 2 additions & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Config struct {
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
exporterhelper.ResourceToTelemetrySettings `mapstructure:"resource_to_telemetry_conversion"`

WAL *walConfig `mapstructure:"wal"`
}

// RemoteWriteQueue allows to configure the remote write queue.
Expand Down
91 changes: 77 additions & 14 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type PRWExporter struct {
concurrency int
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings

wal *prweWAL
}

// NewPRWExporter initializes a new PRWExporter instance and sets fields accordingly.
Expand All @@ -66,7 +68,7 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e

userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(buildInfo.Description), " ", "-"), buildInfo.Version)

return &PRWExporter{
prwe := &PRWExporter{
namespace: cfg.Namespace,
externalLabels: sanitizedLabels,
endpointURL: endpointURL,
Expand All @@ -75,19 +77,44 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e
userAgentHeader: userAgentHeader,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
}, nil
}
if cfg.WAL == nil {
return prwe, nil
}

prweWAL, err := newWAL(cfg.WAL, prwe.export)
if err != nil {
return nil, err
}
prwe.wal = prweWAL
return prwe, nil
}

// Start creates the prometheus client
func (prwe *PRWExporter) Start(_ context.Context, host component.Host) (err error) {
func (prwe *PRWExporter) Start(ctx context.Context, host component.Host) (err error) {
prwe.client, err = prwe.clientSettings.ToClient(host.GetExtensions())
return err
if err != nil {
return err
}
return prwe.turnOnWALIfEnabled(ctx)
}

func (prwe *PRWExporter) shutdownWALIfEnabled() error {
if !prwe.walEnabled() {
return nil
}
return prwe.wal.stop()
}

// Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
// to finish before returning
func (prwe *PRWExporter) Shutdown(context.Context) error {
close(prwe.closeChan)
select {
case <-prwe.closeChan:
default:
close(prwe.closeChan)
}
prwe.shutdownWALIfEnabled()
prwe.wg.Wait()
return nil
}
Expand Down Expand Up @@ -167,7 +194,7 @@ func (prwe *PRWExporter) PushMetrics(ctx context.Context, md pdata.Metrics) erro
}
}

if exportErrors := prwe.export(ctx, tsMap); len(exportErrors) != 0 {
if exportErrors := prwe.handleExport(ctx, tsMap); len(exportErrors) != 0 {
dropped = md.MetricCount()
errs = append(errs, exportErrors...)
}
Expand Down Expand Up @@ -209,16 +236,29 @@ func (prwe *PRWExporter) addNumberDataPointSlice(dataPoints pdata.NumberDataPoin
return nil
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error {
func (prwe *PRWExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error {
var errs []error
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, maxBatchByteSize)
if err != nil {
errs = append(errs, consumererror.Permanent(err))
return errs
}
if !prwe.walEnabled() {
// Perform a direct export otherwise.
return prwe.export(ctx, requests)
}

// Otherwise the WAL is enabled, and just persist the requests to the WAL
// and they'll be exported in another goroutine to the RemoteWrite endpoint.
if err := prwe.wal.persistToWAL(requests); err != nil {
errs = append(errs, consumererror.Permanent(err))
}
return errs
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *PRWExporter) export(ctx context.Context, requests []*prompb.WriteRequest) (errs []error) {
input := make(chan *prompb.WriteRequest, len(requests))
for _, request := range requests {
input <- request
Expand All @@ -237,12 +277,21 @@ func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
go func() {
defer wg.Done()

for request := range input {
err := prwe.execute(ctx, request)
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
for {
select {
case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled.
return

case request, ok := <-input:
if !ok {
return
}

if err := prwe.execute(ctx, request); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}
}
}()
Expand Down Expand Up @@ -294,3 +343,17 @@ func (prwe *PRWExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ
}
return consumererror.Permanent(rerr)
}

func (prwe *PRWExporter) walEnabled() bool { return prwe.wal != nil }

func (prwe *PRWExporter) turnOnWALIfEnabled(ctx context.Context) error {
if !prwe.walEnabled() {
return nil
}
cancelCtx, cancel := context.WithCancel(ctx)
go func() {
<-prwe.closeChan
cancel()
}()
return prwe.wal.run(cancelCtx)
}
157 changes: 156 additions & 1 deletion exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package prometheusremotewriteexporter

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand All @@ -33,6 +36,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configparser"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -345,7 +349,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error {
return errs
}

errs = append(errs, prwe.export(context.Background(), testmap)...)
errs = append(errs, prwe.handleExport(context.Background(), testmap)...)
return errs
}

Expand Down Expand Up @@ -601,3 +605,154 @@ func Test_validateAndSanitizeExternalLabels(t *testing.T) {
})
}
}

// Ensures that when we attach the Write-Ahead-Log(WAL) to the exporter,
// that it successfully writes the serialized prompb.WriteRequests to the WAL,
// and that we can retrieve those exact requests back from the WAL, when the
// exporter starts up once again, that it picks up where it left off.
func TestWALOnExporterRoundTrip(t *testing.T) {
if testing.Short() {
t.Skip("This test could run for long")
}

// 1. Create a mock Prometheus Remote Write Exporter that'll just
// receive the bytes uploaded to it by our exporter.
uploadedBytesCh := make(chan []byte, 1)
exiting := make(chan bool)
prweServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
uploaded, err2 := ioutil.ReadAll(req.Body)
assert.Nil(t, err2, "Error while reading from HTTP upload")
select {
case uploadedBytesCh <- uploaded:
case <-exiting:
return
}
}))
defer prweServer.Close()

// 2. Create the WAL configuration, create the
// exporter and export some time series!
tempDir := t.TempDir()
yamlConfig := fmt.Sprintf(`
namespace: "test_ns"
endpoint: %q
remote_write_queue:
num_consumers: 1
wal:
directory: %s
truncate_frequency: 60us
buffer_size: 1
`, prweServer.URL, tempDir)

parser, err := configparser.NewParserFromBuffer(strings.NewReader(yamlConfig))
require.Nil(t, err)

fullConfig := new(Config)
if err = parser.UnmarshalExact(fullConfig); err != nil {
t.Fatalf("Failed to parse config from YAML: %v", err)
}
walConfig := fullConfig.WAL
require.NotNil(t, walConfig)
require.Equal(t, walConfig.TruncateFrequency, 60*time.Microsecond)
require.Equal(t, walConfig.Directory, tempDir)

var defaultBuildInfo = component.BuildInfo{
Description: "OpenTelemetry Collector",
Version: "1.0",
}
prwe, perr := NewPRWExporter(fullConfig, defaultBuildInfo)
assert.Nil(t, perr)

nopHost := componenttest.NewNopHost()
ctx := context.Background()
require.Nil(t, prwe.Start(ctx, nopHost))
defer prwe.Shutdown(ctx)
defer close(exiting)
require.NotNil(t, prwe.wal)

ts1 := &prompb.TimeSeries{
Labels: []prompb.Label{{Name: "ts1l1", Value: "ts1k1"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 100}},
}
ts2 := &prompb.TimeSeries{
Labels: []prompb.Label{{Name: "ts2l1", Value: "ts2k1"}},
Samples: []prompb.Sample{{Value: 2, Timestamp: 200}},
}
tsMap := map[string]*prompb.TimeSeries{
"timeseries1": ts1,
"timeseries2": ts2,
}
errs := prwe.handleExport(ctx, tsMap)
assert.Nil(t, errs)
// Shutdown after we've written to the WAL. This ensures that our
// exported data in-flight will flushed flushed to the WAL before exiting.
prwe.Shutdown(ctx)

// 3. Let's now read back all of the WAL records and ensure
// that all the prompb.WriteRequest values exist as we sent them.
wal, _, werr := walConfig.createWAL()
assert.Nil(t, werr)
assert.NotNil(t, wal)
defer wal.Close()

// Read all the indices.
firstIndex, ierr := wal.FirstIndex()
assert.Nil(t, ierr)
lastIndex, ierr := wal.LastIndex()
assert.Nil(t, ierr)

var reqs []*prompb.WriteRequest
for i := firstIndex; i <= lastIndex; i++ {
protoBlob, perr := wal.Read(i)
assert.Nil(t, perr)
assert.NotNil(t, protoBlob)
req := new(prompb.WriteRequest)
err = proto.Unmarshal(protoBlob, req)
assert.Nil(t, err)
reqs = append(reqs, req)
}
assert.Equal(t, 1, len(reqs))
// We MUST have 2 time series as were passed into tsMap.
gotFromWAL := reqs[0]
assert.Equal(t, 2, len(gotFromWAL.Timeseries))
want := &prompb.WriteRequest{
Timeseries: orderBySampleTimestamp([]prompb.TimeSeries{
*ts1, *ts2,
}),
}

// Even after sorting timeseries, we need to sort them
// also by Label to ensure deterministic ordering.
orderByLabelValue(gotFromWAL)
gotFromWAL.Timeseries = orderBySampleTimestamp(gotFromWAL.Timeseries)
orderByLabelValue(want)

assert.Equal(t, want, gotFromWAL)

// 4. Finally, ensure that the bytes that were uploaded to the
// Prometheus Remote Write endpoint are exactly as were saved in the WAL.
// Read from that same WAL, export to the RWExporter server.
prwe2, err := NewPRWExporter(fullConfig, defaultBuildInfo)
assert.Nil(t, err)
require.Nil(t, prwe2.Start(ctx, nopHost))
defer prwe2.Shutdown(ctx)
require.NotNil(t, prwe2.wal)

snappyEncodedBytes := <-uploadedBytesCh
decodeBuffer := make([]byte, len(snappyEncodedBytes))
uploadedBytes, derr := snappy.Decode(decodeBuffer, snappyEncodedBytes)
require.Nil(t, derr)
gotFromUpload := new(prompb.WriteRequest)
uerr := proto.Unmarshal(uploadedBytes, gotFromUpload)
assert.Nil(t, uerr)
gotFromUpload.Timeseries = orderBySampleTimestamp(gotFromUpload.Timeseries)
// Even after sorting timeseries, we need to sort them
// also by Label to ensure deterministic ordering.
orderByLabelValue(gotFromUpload)

// 4.1. Ensure that all the various combinations match up.
// To ensure a deterministic ordering, sort the TimeSeries by Label Name.
assert.Equal(t, want, gotFromUpload)
assert.Equal(t, gotFromWAL, gotFromUpload)
}
Loading

0 comments on commit f801d66

Please sign in to comment.