From f801d66f51c7051a0d2215f1690abde0ab28d182 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 6 Aug 2021 20:24:17 -0700 Subject: [PATCH] exporter/prometheusremotewrite: glue up and use Write-Ahead-Log This change completes the WAL capability for the Prometheus Remote-Write exporter that allows recovery of data if the prior write requests weren't yet exported. This wires up a Write-Ahead-Log (WAL) that was implemented in PR #3597, which was split off from PR #3017. Note: there is very rare condition for which we can perhaps send the same data many times and it can happen if we haven't yet truncated the WAL, the RemoteWrite endpoint received the prior data but the process received a Ctrl+C, kill signal. However, this will be very rare and would have to be timed so fast and precisely. Replaces PR #3017 Updates PR #3597 Fixes open-telemetry/wg-prometheus#9 --- .../prometheusremotewriteexporter/README.md | 4 + .../prometheusremotewriteexporter/config.go | 2 + .../prometheusremotewriteexporter/exporter.go | 91 ++++++-- .../exporter_test.go | 157 +++++++++++++- exporter/prometheusremotewriteexporter/wal.go | 194 ++++++++++++++++-- .../prometheusremotewriteexporter/wal_test.go | 9 +- go.mod | 1 + go.sum | 2 + 8 files changed, 424 insertions(+), 36 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/README.md b/exporter/prometheusremotewriteexporter/README.md index d1f60260704..66241a0955c 100644 --- a/exporter/prometheusremotewriteexporter/README.md +++ b/exporter/prometheusremotewriteexporter/README.md @@ -48,6 +48,10 @@ Example: exporters: prometheusremotewrite: endpoint: "https://my-cortex:7900/api/v1/push" + wal: # Enabling the Write-Ahead-Log for the exporter. + directory: ./prom_rw # The directory to store the WAL in + buffer_size: 100 # Optional count of elements to be read from the WAL before truncating; default of 300 + truncate_frequency: 45s # Optional frequency for how often the WAL should be truncated. It is a time.ParseDuration; default of 1m ``` ## Advanced Configuration diff --git a/exporter/prometheusremotewriteexporter/config.go b/exporter/prometheusremotewriteexporter/config.go index c1078d361c7..3513d1b31ab 100644 --- a/exporter/prometheusremotewriteexporter/config.go +++ b/exporter/prometheusremotewriteexporter/config.go @@ -45,6 +45,8 @@ type Config struct { // "Enabled" - A boolean field to enable/disable this option. Default is `false`. // If enabled, all the resource attributes will be converted to metric labels by default. exporterhelper.ResourceToTelemetrySettings `mapstructure:"resource_to_telemetry_conversion"` + + WAL *walConfig `mapstructure:"wal"` } // RemoteWriteQueue allows to configure the remote write queue. diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index ab3f1ded693..0874a5b8c7a 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -50,6 +50,8 @@ type PRWExporter struct { concurrency int userAgentHeader string clientSettings *confighttp.HTTPClientSettings + + wal *prweWAL } // NewPRWExporter initializes a new PRWExporter instance and sets fields accordingly. @@ -66,7 +68,7 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(buildInfo.Description), " ", "-"), buildInfo.Version) - return &PRWExporter{ + prwe := &PRWExporter{ namespace: cfg.Namespace, externalLabels: sanitizedLabels, endpointURL: endpointURL, @@ -75,19 +77,44 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e userAgentHeader: userAgentHeader, concurrency: cfg.RemoteWriteQueue.NumConsumers, clientSettings: &cfg.HTTPClientSettings, - }, nil + } + if cfg.WAL == nil { + return prwe, nil + } + + prweWAL, err := newWAL(cfg.WAL, prwe.export) + if err != nil { + return nil, err + } + prwe.wal = prweWAL + return prwe, nil } // Start creates the prometheus client -func (prwe *PRWExporter) Start(_ context.Context, host component.Host) (err error) { +func (prwe *PRWExporter) Start(ctx context.Context, host component.Host) (err error) { prwe.client, err = prwe.clientSettings.ToClient(host.GetExtensions()) - return err + if err != nil { + return err + } + return prwe.turnOnWALIfEnabled(ctx) +} + +func (prwe *PRWExporter) shutdownWALIfEnabled() error { + if !prwe.walEnabled() { + return nil + } + return prwe.wal.stop() } // Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations // to finish before returning func (prwe *PRWExporter) Shutdown(context.Context) error { - close(prwe.closeChan) + select { + case <-prwe.closeChan: + default: + close(prwe.closeChan) + } + prwe.shutdownWALIfEnabled() prwe.wg.Wait() return nil } @@ -167,7 +194,7 @@ func (prwe *PRWExporter) PushMetrics(ctx context.Context, md pdata.Metrics) erro } } - if exportErrors := prwe.export(ctx, tsMap); len(exportErrors) != 0 { + if exportErrors := prwe.handleExport(ctx, tsMap); len(exportErrors) != 0 { dropped = md.MetricCount() errs = append(errs, exportErrors...) } @@ -209,8 +236,7 @@ func (prwe *PRWExporter) addNumberDataPointSlice(dataPoints pdata.NumberDataPoin return nil } -// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order -func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error { +func (prwe *PRWExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error { var errs []error // Calls the helper function to convert and batch the TsMap to the desired format requests, err := batchTimeSeries(tsMap, maxBatchByteSize) @@ -218,7 +244,21 @@ func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti errs = append(errs, consumererror.Permanent(err)) return errs } + if !prwe.walEnabled() { + // Perform a direct export otherwise. + return prwe.export(ctx, requests) + } + // Otherwise the WAL is enabled, and just persist the requests to the WAL + // and they'll be exported in another goroutine to the RemoteWrite endpoint. + if err := prwe.wal.persistToWAL(requests); err != nil { + errs = append(errs, consumererror.Permanent(err)) + } + return errs +} + +// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order +func (prwe *PRWExporter) export(ctx context.Context, requests []*prompb.WriteRequest) (errs []error) { input := make(chan *prompb.WriteRequest, len(requests)) for _, request := range requests { input <- request @@ -237,12 +277,21 @@ func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti go func() { defer wg.Done() - for request := range input { - err := prwe.execute(ctx, request) - if err != nil { - mu.Lock() - errs = append(errs, err) - mu.Unlock() + for { + select { + case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled. + return + + case request, ok := <-input: + if !ok { + return + } + + if err := prwe.execute(ctx, request); err != nil { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } } } }() @@ -294,3 +343,17 @@ func (prwe *PRWExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ } return consumererror.Permanent(rerr) } + +func (prwe *PRWExporter) walEnabled() bool { return prwe.wal != nil } + +func (prwe *PRWExporter) turnOnWALIfEnabled(ctx context.Context) error { + if !prwe.walEnabled() { + return nil + } + cancelCtx, cancel := context.WithCancel(ctx) + go func() { + <-prwe.closeChan + cancel() + }() + return prwe.wal.run(cancelCtx) +} diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 032f10744f5..4b2bd3b224e 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -16,12 +16,15 @@ package prometheusremotewriteexporter import ( "context" + "fmt" "io/ioutil" "net/http" "net/http/httptest" "net/url" + "strings" "sync" "testing" + "time" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -33,6 +36,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configparser" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/internal/testdata" @@ -345,7 +349,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error { return errs } - errs = append(errs, prwe.export(context.Background(), testmap)...) + errs = append(errs, prwe.handleExport(context.Background(), testmap)...) return errs } @@ -601,3 +605,154 @@ func Test_validateAndSanitizeExternalLabels(t *testing.T) { }) } } + +// Ensures that when we attach the Write-Ahead-Log(WAL) to the exporter, +// that it successfully writes the serialized prompb.WriteRequests to the WAL, +// and that we can retrieve those exact requests back from the WAL, when the +// exporter starts up once again, that it picks up where it left off. +func TestWALOnExporterRoundTrip(t *testing.T) { + if testing.Short() { + t.Skip("This test could run for long") + } + + // 1. Create a mock Prometheus Remote Write Exporter that'll just + // receive the bytes uploaded to it by our exporter. + uploadedBytesCh := make(chan []byte, 1) + exiting := make(chan bool) + prweServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + uploaded, err2 := ioutil.ReadAll(req.Body) + assert.Nil(t, err2, "Error while reading from HTTP upload") + select { + case uploadedBytesCh <- uploaded: + case <-exiting: + return + } + })) + defer prweServer.Close() + + // 2. Create the WAL configuration, create the + // exporter and export some time series! + tempDir := t.TempDir() + yamlConfig := fmt.Sprintf(` +namespace: "test_ns" +endpoint: %q +remote_write_queue: + num_consumers: 1 + +wal: + directory: %s + truncate_frequency: 60us + buffer_size: 1 + `, prweServer.URL, tempDir) + + parser, err := configparser.NewParserFromBuffer(strings.NewReader(yamlConfig)) + require.Nil(t, err) + + fullConfig := new(Config) + if err = parser.UnmarshalExact(fullConfig); err != nil { + t.Fatalf("Failed to parse config from YAML: %v", err) + } + walConfig := fullConfig.WAL + require.NotNil(t, walConfig) + require.Equal(t, walConfig.TruncateFrequency, 60*time.Microsecond) + require.Equal(t, walConfig.Directory, tempDir) + + var defaultBuildInfo = component.BuildInfo{ + Description: "OpenTelemetry Collector", + Version: "1.0", + } + prwe, perr := NewPRWExporter(fullConfig, defaultBuildInfo) + assert.Nil(t, perr) + + nopHost := componenttest.NewNopHost() + ctx := context.Background() + require.Nil(t, prwe.Start(ctx, nopHost)) + defer prwe.Shutdown(ctx) + defer close(exiting) + require.NotNil(t, prwe.wal) + + ts1 := &prompb.TimeSeries{ + Labels: []prompb.Label{{Name: "ts1l1", Value: "ts1k1"}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 100}}, + } + ts2 := &prompb.TimeSeries{ + Labels: []prompb.Label{{Name: "ts2l1", Value: "ts2k1"}}, + Samples: []prompb.Sample{{Value: 2, Timestamp: 200}}, + } + tsMap := map[string]*prompb.TimeSeries{ + "timeseries1": ts1, + "timeseries2": ts2, + } + errs := prwe.handleExport(ctx, tsMap) + assert.Nil(t, errs) + // Shutdown after we've written to the WAL. This ensures that our + // exported data in-flight will flushed flushed to the WAL before exiting. + prwe.Shutdown(ctx) + + // 3. Let's now read back all of the WAL records and ensure + // that all the prompb.WriteRequest values exist as we sent them. + wal, _, werr := walConfig.createWAL() + assert.Nil(t, werr) + assert.NotNil(t, wal) + defer wal.Close() + + // Read all the indices. + firstIndex, ierr := wal.FirstIndex() + assert.Nil(t, ierr) + lastIndex, ierr := wal.LastIndex() + assert.Nil(t, ierr) + + var reqs []*prompb.WriteRequest + for i := firstIndex; i <= lastIndex; i++ { + protoBlob, perr := wal.Read(i) + assert.Nil(t, perr) + assert.NotNil(t, protoBlob) + req := new(prompb.WriteRequest) + err = proto.Unmarshal(protoBlob, req) + assert.Nil(t, err) + reqs = append(reqs, req) + } + assert.Equal(t, 1, len(reqs)) + // We MUST have 2 time series as were passed into tsMap. + gotFromWAL := reqs[0] + assert.Equal(t, 2, len(gotFromWAL.Timeseries)) + want := &prompb.WriteRequest{ + Timeseries: orderBySampleTimestamp([]prompb.TimeSeries{ + *ts1, *ts2, + }), + } + + // Even after sorting timeseries, we need to sort them + // also by Label to ensure deterministic ordering. + orderByLabelValue(gotFromWAL) + gotFromWAL.Timeseries = orderBySampleTimestamp(gotFromWAL.Timeseries) + orderByLabelValue(want) + + assert.Equal(t, want, gotFromWAL) + + // 4. Finally, ensure that the bytes that were uploaded to the + // Prometheus Remote Write endpoint are exactly as were saved in the WAL. + // Read from that same WAL, export to the RWExporter server. + prwe2, err := NewPRWExporter(fullConfig, defaultBuildInfo) + assert.Nil(t, err) + require.Nil(t, prwe2.Start(ctx, nopHost)) + defer prwe2.Shutdown(ctx) + require.NotNil(t, prwe2.wal) + + snappyEncodedBytes := <-uploadedBytesCh + decodeBuffer := make([]byte, len(snappyEncodedBytes)) + uploadedBytes, derr := snappy.Decode(decodeBuffer, snappyEncodedBytes) + require.Nil(t, derr) + gotFromUpload := new(prompb.WriteRequest) + uerr := proto.Unmarshal(uploadedBytes, gotFromUpload) + assert.Nil(t, uerr) + gotFromUpload.Timeseries = orderBySampleTimestamp(gotFromUpload.Timeseries) + // Even after sorting timeseries, we need to sort them + // also by Label to ensure deterministic ordering. + orderByLabelValue(gotFromUpload) + + // 4.1. Ensure that all the various combinations match up. + // To ensure a deterministic ordering, sort the TimeSeries by Label Name. + assert.Equal(t, want, gotFromUpload) + assert.Equal(t, gotFromWAL, gotFromUpload) +} diff --git a/exporter/prometheusremotewriteexporter/wal.go b/exporter/prometheusremotewriteexporter/wal.go index 5b52b3f1250..d61b368b938 100644 --- a/exporter/prometheusremotewriteexporter/wal.go +++ b/exporter/prometheusremotewriteexporter/wal.go @@ -20,9 +20,11 @@ import ( "fmt" "path/filepath" "sync" + "sync/atomic" "time" "github.com/gogo/protobuf/proto" + multierror "github.com/hashicorp/go-multierror" "github.com/prometheus/prometheus/prompb" "github.com/tidwall/wal" ) @@ -41,6 +43,34 @@ type prweWAL struct { wWALIndex uint64 } +const ( + defaultWALBufferSize = 300 + defaultWALTruncateFrequency = 1 * time.Minute +) + +type walConfig struct { + // Note: These variable names are meant to closely mirror what Prometheus' WAL uses for field names per + // https://docs.google.com/document/d/1cCcoFgjDFwU2n823tKuMvrIhzHty4UDyn0IcfUHiyyI/edit#heading=h.mlf37ibqjgov + // but also we are using underscores "_" instead of dashes "-". + Directory string `mapstructure:"directory"` + BufferSize int `mapstructure:"buffer_size"` + TruncateFrequency time.Duration `mapstructure:"truncate_frequency"` +} + +func (wc *walConfig) bufferSize() int { + if wc.BufferSize > 0 { + return wc.BufferSize + } + return defaultWALBufferSize +} + +func (wc *walConfig) truncateFrequency() time.Duration { + if wc.TruncateFrequency > 0 { + return wc.TruncateFrequency + } + return defaultWALTruncateFrequency +} + func newWAL(walConfig *walConfig, exportSink func(context.Context, []*prompb.WriteRequest) []error) (*prweWAL, error) { if walConfig == nil { // There are cases for which the WAL can be disabled. @@ -58,7 +88,7 @@ func newWAL(walConfig *walConfig, exportSink func(context.Context, []*prompb.Wri func (wc *walConfig) createWAL() (*wal.Log, string, error) { walPath := filepath.Join(wc.Directory, "prom_remotewrite") wal, err := wal.Open(walPath, &wal.Options{ - SegmentCacheSize: wc.nBeforeTruncation(), + SegmentCacheSize: wc.bufferSize(), NoCopy: true, }) if err != nil { @@ -67,24 +97,9 @@ func (wc *walConfig) createWAL() (*wal.Log, string, error) { return wal, walPath, nil } -type walConfig struct { - // Note: These variable names are meant to closely mirror what Prometheus' WAL uses for field names per - // https://docs.google.com/document/d/1cCcoFgjDFwU2n823tKuMvrIhzHty4UDyn0IcfUHiyyI/edit#heading=h.mlf37ibqjgov - // but also we are using underscores "_" instead of dashes "-". - Directory string `mapstructure:"directory"` - NBeforeTruncation int `mapstructure:"n_before_truncation"` - TruncateFrequency time.Duration `mapstructure:"truncate_frequency"` -} - -func (wc *walConfig) nBeforeTruncation() int { - if wc.NBeforeTruncation <= 0 { - return 300 - } - return wc.NBeforeTruncation -} - var ( errAlreadyClosed = errors.New("already closed") + errNilWAL = errors.New("wal is nil") errNilConfig = errors.New("expecting a non-nil configuration") ) @@ -98,6 +113,7 @@ func (prwe *prweWAL) retrieveWALIndices(context.Context) (err error) { if err != nil { return err } + prwe.wal = wal prwe.walPath = walPath @@ -116,6 +132,9 @@ func (prwe *prweWAL) retrieveWALIndices(context.Context) (err error) { func (prwe *prweWAL) stop() error { err := errAlreadyClosed prwe.stopOnce.Do(func() { + prwe.mu.Lock() + defer prwe.mu.Unlock() + close(prwe.stopChan) prwe.closeWAL() err = nil @@ -139,6 +158,92 @@ func (prwe *prweWAL) start(ctx context.Context) error { return nil } +func (prwe *prweWAL) run(ctx context.Context) (err error) { + if err := prwe.start(ctx); err != nil { + return err + } + + // Start the process of exporting but wait until the exporting has started. + waitUntilStartedCh := make(chan bool) + go func() { + prwe.continuallyPopWALThenExport(ctx, func() { close(waitUntilStartedCh) }) + }() + <-waitUntilStartedCh + return nil +} + +// continuallyPopWALThenExport reads a prompb.WriteRequest proto encoded blob from the WAL, and moves +// the WAL's front index forward until either the read buffer period expires or the maximum +// buffer size is exceeded. When either of the two conditions are matched, it then exports +// the requests to the Remote-Write endpoint, and then truncates the head of the WAL to where +// it last read from. +func (prwe *prweWAL) continuallyPopWALThenExport(ctx context.Context, signalStart func()) (err error) { + var reqL []*prompb.WriteRequest + defer func() { + // Keeping it within a closure to ensure that the later + // updated value of reqL is always flushed to disk. + if errL := prwe.exportSink(ctx, reqL); len(errL) != 0 && err == nil { + err = multierror.Append(nil, errL...) + } + }() + + freshTimer := func() *time.Timer { + return time.NewTimer(prwe.walConfig.truncateFrequency()) + } + + timer := freshTimer() + defer func() { + // Added in a closure to ensure we capture the later + // updated value of timer when changed in the loop below. + timer.Stop() + }() + + signalStart() + + maxCountPerUpload := prwe.walConfig.bufferSize() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-prwe.stopChan: + return nil + default: + } + + req, err := prwe.readPrompbFromWAL(ctx, atomic.LoadUint64(&prwe.rWALIndex)) + if err != nil { + return err + } + reqL = append(reqL, req) + + // Now increment the WAL's read index. + atomic.AddUint64(&prwe.rWALIndex, 1) + + shouldExport := false + select { + case <-timer.C: + shouldExport = true + timer.Stop() + timer = freshTimer() + default: + shouldExport = len(reqL) >= maxCountPerUpload + } + + if !shouldExport { + continue + } + + // Otherwise, it is time to export, flush and then truncate the WAL, but also to kill the timer! + timer.Stop() + if err := prwe.exportThenFrontTruncateWAL(ctx, reqL); err != nil { + return err + } + // Reset but reuse the write requests slice. + reqL = reqL[:0] + timer = freshTimer() + } +} + func (prwe *prweWAL) closeWAL() { if prwe.wal != nil { prwe.wal.Close() @@ -146,6 +251,44 @@ func (prwe *prweWAL) closeWAL() { } } +func (prwe *prweWAL) syncAndTruncateFront() error { + prwe.mu.Lock() + defer prwe.mu.Unlock() + + if prwe.wal == nil { + return errNilWAL + } + + // Save all the entries that aren't yet committed, to the tail of the WAL. + if err := prwe.wal.Sync(); err != nil { + return err + } + // Truncate the WAL from the front for the entries that we already + // read from the WAL and had already exported. + if err := prwe.wal.TruncateFront(atomic.LoadUint64(&prwe.rWALIndex)); err != nil && err != wal.ErrOutOfRange { + return err + } + return nil +} + +func (prwe *prweWAL) exportThenFrontTruncateWAL(ctx context.Context, reqL []*prompb.WriteRequest) error { + if len(reqL) == 0 { + return nil + } + if cErr := ctx.Err(); cErr != nil { + return nil + } + + if errL := prwe.exportSink(ctx, reqL); len(errL) != 0 { + return multierror.Append(nil, errL...) + } + if err := prwe.syncAndTruncateFront(); err != nil { + return err + } + // Reset by retrieving the respective read and write WAL indices. + return prwe.retrieveWALIndices(ctx) +} + // persistToWAL is the routine that'll be hooked into the exporter's receiving side and it'll // write them to the Write-Ahead-Log so that shutdowns won't lose data, and that the routine that // reads from the WAL can then process the previously serialized requests. @@ -157,8 +300,8 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error { if err != nil { return err } - prwe.wWALIndex++ - batch.Write(prwe.wWALIndex, protoBlob) + wIndex := atomic.AddUint64(&prwe.wWALIndex, 1) + batch.Write(wIndex, protoBlob) } prwe.mu.Lock() @@ -167,12 +310,23 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error { return prwe.wal.WriteBatch(batch) } -func (prwe *prweWAL) readPrompbFromWAL(_ context.Context, index uint64) (wreq *prompb.WriteRequest, err error) { +func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq *prompb.WriteRequest, err error) { + prwe.mu.Lock() + defer prwe.mu.Unlock() + var protoBlob []byte for i := 0; i < 12; i++ { + // Firstly check if we've been terminated, then exit if so. + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if index <= 0 { index = 1 } + protoBlob, err = prwe.wal.Read(index) if errors.Is(err, wal.ErrNotFound) { time.Sleep(time.Duration(1<