diff --git a/exporter/prometheusremotewriteexporter/README.md b/exporter/prometheusremotewriteexporter/README.md index d1f60260704..66241a0955c 100644 --- a/exporter/prometheusremotewriteexporter/README.md +++ b/exporter/prometheusremotewriteexporter/README.md @@ -48,6 +48,10 @@ Example: exporters: prometheusremotewrite: endpoint: "https://my-cortex:7900/api/v1/push" + wal: # Enabling the Write-Ahead-Log for the exporter. + directory: ./prom_rw # The directory to store the WAL in + buffer_size: 100 # Optional count of elements to be read from the WAL before truncating; default of 300 + truncate_frequency: 45s # Optional frequency for how often the WAL should be truncated. It is a time.ParseDuration; default of 1m ``` ## Advanced Configuration diff --git a/exporter/prometheusremotewriteexporter/config.go b/exporter/prometheusremotewriteexporter/config.go index c1078d361c7..3513d1b31ab 100644 --- a/exporter/prometheusremotewriteexporter/config.go +++ b/exporter/prometheusremotewriteexporter/config.go @@ -45,6 +45,8 @@ type Config struct { // "Enabled" - A boolean field to enable/disable this option. Default is `false`. // If enabled, all the resource attributes will be converted to metric labels by default. exporterhelper.ResourceToTelemetrySettings `mapstructure:"resource_to_telemetry_conversion"` + + WAL *walConfig `mapstructure:"wal"` } // RemoteWriteQueue allows to configure the remote write queue. diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index ab3f1ded693..0874a5b8c7a 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -50,6 +50,8 @@ type PRWExporter struct { concurrency int userAgentHeader string clientSettings *confighttp.HTTPClientSettings + + wal *prweWAL } // NewPRWExporter initializes a new PRWExporter instance and sets fields accordingly. @@ -66,7 +68,7 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(buildInfo.Description), " ", "-"), buildInfo.Version) - return &PRWExporter{ + prwe := &PRWExporter{ namespace: cfg.Namespace, externalLabels: sanitizedLabels, endpointURL: endpointURL, @@ -75,19 +77,44 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e userAgentHeader: userAgentHeader, concurrency: cfg.RemoteWriteQueue.NumConsumers, clientSettings: &cfg.HTTPClientSettings, - }, nil + } + if cfg.WAL == nil { + return prwe, nil + } + + prweWAL, err := newWAL(cfg.WAL, prwe.export) + if err != nil { + return nil, err + } + prwe.wal = prweWAL + return prwe, nil } // Start creates the prometheus client -func (prwe *PRWExporter) Start(_ context.Context, host component.Host) (err error) { +func (prwe *PRWExporter) Start(ctx context.Context, host component.Host) (err error) { prwe.client, err = prwe.clientSettings.ToClient(host.GetExtensions()) - return err + if err != nil { + return err + } + return prwe.turnOnWALIfEnabled(ctx) +} + +func (prwe *PRWExporter) shutdownWALIfEnabled() error { + if !prwe.walEnabled() { + return nil + } + return prwe.wal.stop() } // Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations // to finish before returning func (prwe *PRWExporter) Shutdown(context.Context) error { - close(prwe.closeChan) + select { + case <-prwe.closeChan: + default: + close(prwe.closeChan) + } + prwe.shutdownWALIfEnabled() prwe.wg.Wait() return nil } @@ -167,7 +194,7 @@ func (prwe *PRWExporter) PushMetrics(ctx context.Context, md pdata.Metrics) erro } } - if exportErrors := prwe.export(ctx, tsMap); len(exportErrors) != 0 { + if exportErrors := prwe.handleExport(ctx, tsMap); len(exportErrors) != 0 { dropped = md.MetricCount() errs = append(errs, exportErrors...) } @@ -209,8 +236,7 @@ func (prwe *PRWExporter) addNumberDataPointSlice(dataPoints pdata.NumberDataPoin return nil } -// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order -func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error { +func (prwe *PRWExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error { var errs []error // Calls the helper function to convert and batch the TsMap to the desired format requests, err := batchTimeSeries(tsMap, maxBatchByteSize) @@ -218,7 +244,21 @@ func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti errs = append(errs, consumererror.Permanent(err)) return errs } + if !prwe.walEnabled() { + // Perform a direct export otherwise. + return prwe.export(ctx, requests) + } + // Otherwise the WAL is enabled, and just persist the requests to the WAL + // and they'll be exported in another goroutine to the RemoteWrite endpoint. + if err := prwe.wal.persistToWAL(requests); err != nil { + errs = append(errs, consumererror.Permanent(err)) + } + return errs +} + +// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order +func (prwe *PRWExporter) export(ctx context.Context, requests []*prompb.WriteRequest) (errs []error) { input := make(chan *prompb.WriteRequest, len(requests)) for _, request := range requests { input <- request @@ -237,12 +277,21 @@ func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti go func() { defer wg.Done() - for request := range input { - err := prwe.execute(ctx, request) - if err != nil { - mu.Lock() - errs = append(errs, err) - mu.Unlock() + for { + select { + case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled. + return + + case request, ok := <-input: + if !ok { + return + } + + if err := prwe.execute(ctx, request); err != nil { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } } } }() @@ -294,3 +343,17 @@ func (prwe *PRWExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ } return consumererror.Permanent(rerr) } + +func (prwe *PRWExporter) walEnabled() bool { return prwe.wal != nil } + +func (prwe *PRWExporter) turnOnWALIfEnabled(ctx context.Context) error { + if !prwe.walEnabled() { + return nil + } + cancelCtx, cancel := context.WithCancel(ctx) + go func() { + <-prwe.closeChan + cancel() + }() + return prwe.wal.run(cancelCtx) +} diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 032f10744f5..4b2bd3b224e 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -16,12 +16,15 @@ package prometheusremotewriteexporter import ( "context" + "fmt" "io/ioutil" "net/http" "net/http/httptest" "net/url" + "strings" "sync" "testing" + "time" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -33,6 +36,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configparser" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/internal/testdata" @@ -345,7 +349,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error { return errs } - errs = append(errs, prwe.export(context.Background(), testmap)...) + errs = append(errs, prwe.handleExport(context.Background(), testmap)...) return errs } @@ -601,3 +605,154 @@ func Test_validateAndSanitizeExternalLabels(t *testing.T) { }) } } + +// Ensures that when we attach the Write-Ahead-Log(WAL) to the exporter, +// that it successfully writes the serialized prompb.WriteRequests to the WAL, +// and that we can retrieve those exact requests back from the WAL, when the +// exporter starts up once again, that it picks up where it left off. +func TestWALOnExporterRoundTrip(t *testing.T) { + if testing.Short() { + t.Skip("This test could run for long") + } + + // 1. Create a mock Prometheus Remote Write Exporter that'll just + // receive the bytes uploaded to it by our exporter. + uploadedBytesCh := make(chan []byte, 1) + exiting := make(chan bool) + prweServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + uploaded, err2 := ioutil.ReadAll(req.Body) + assert.Nil(t, err2, "Error while reading from HTTP upload") + select { + case uploadedBytesCh <- uploaded: + case <-exiting: + return + } + })) + defer prweServer.Close() + + // 2. Create the WAL configuration, create the + // exporter and export some time series! + tempDir := t.TempDir() + yamlConfig := fmt.Sprintf(` +namespace: "test_ns" +endpoint: %q +remote_write_queue: + num_consumers: 1 + +wal: + directory: %s + truncate_frequency: 60us + buffer_size: 1 + `, prweServer.URL, tempDir) + + parser, err := configparser.NewParserFromBuffer(strings.NewReader(yamlConfig)) + require.Nil(t, err) + + fullConfig := new(Config) + if err = parser.UnmarshalExact(fullConfig); err != nil { + t.Fatalf("Failed to parse config from YAML: %v", err) + } + walConfig := fullConfig.WAL + require.NotNil(t, walConfig) + require.Equal(t, walConfig.TruncateFrequency, 60*time.Microsecond) + require.Equal(t, walConfig.Directory, tempDir) + + var defaultBuildInfo = component.BuildInfo{ + Description: "OpenTelemetry Collector", + Version: "1.0", + } + prwe, perr := NewPRWExporter(fullConfig, defaultBuildInfo) + assert.Nil(t, perr) + + nopHost := componenttest.NewNopHost() + ctx := context.Background() + require.Nil(t, prwe.Start(ctx, nopHost)) + defer prwe.Shutdown(ctx) + defer close(exiting) + require.NotNil(t, prwe.wal) + + ts1 := &prompb.TimeSeries{ + Labels: []prompb.Label{{Name: "ts1l1", Value: "ts1k1"}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 100}}, + } + ts2 := &prompb.TimeSeries{ + Labels: []prompb.Label{{Name: "ts2l1", Value: "ts2k1"}}, + Samples: []prompb.Sample{{Value: 2, Timestamp: 200}}, + } + tsMap := map[string]*prompb.TimeSeries{ + "timeseries1": ts1, + "timeseries2": ts2, + } + errs := prwe.handleExport(ctx, tsMap) + assert.Nil(t, errs) + // Shutdown after we've written to the WAL. This ensures that our + // exported data in-flight will flushed flushed to the WAL before exiting. + prwe.Shutdown(ctx) + + // 3. Let's now read back all of the WAL records and ensure + // that all the prompb.WriteRequest values exist as we sent them. + wal, _, werr := walConfig.createWAL() + assert.Nil(t, werr) + assert.NotNil(t, wal) + defer wal.Close() + + // Read all the indices. + firstIndex, ierr := wal.FirstIndex() + assert.Nil(t, ierr) + lastIndex, ierr := wal.LastIndex() + assert.Nil(t, ierr) + + var reqs []*prompb.WriteRequest + for i := firstIndex; i <= lastIndex; i++ { + protoBlob, perr := wal.Read(i) + assert.Nil(t, perr) + assert.NotNil(t, protoBlob) + req := new(prompb.WriteRequest) + err = proto.Unmarshal(protoBlob, req) + assert.Nil(t, err) + reqs = append(reqs, req) + } + assert.Equal(t, 1, len(reqs)) + // We MUST have 2 time series as were passed into tsMap. + gotFromWAL := reqs[0] + assert.Equal(t, 2, len(gotFromWAL.Timeseries)) + want := &prompb.WriteRequest{ + Timeseries: orderBySampleTimestamp([]prompb.TimeSeries{ + *ts1, *ts2, + }), + } + + // Even after sorting timeseries, we need to sort them + // also by Label to ensure deterministic ordering. + orderByLabelValue(gotFromWAL) + gotFromWAL.Timeseries = orderBySampleTimestamp(gotFromWAL.Timeseries) + orderByLabelValue(want) + + assert.Equal(t, want, gotFromWAL) + + // 4. Finally, ensure that the bytes that were uploaded to the + // Prometheus Remote Write endpoint are exactly as were saved in the WAL. + // Read from that same WAL, export to the RWExporter server. + prwe2, err := NewPRWExporter(fullConfig, defaultBuildInfo) + assert.Nil(t, err) + require.Nil(t, prwe2.Start(ctx, nopHost)) + defer prwe2.Shutdown(ctx) + require.NotNil(t, prwe2.wal) + + snappyEncodedBytes := <-uploadedBytesCh + decodeBuffer := make([]byte, len(snappyEncodedBytes)) + uploadedBytes, derr := snappy.Decode(decodeBuffer, snappyEncodedBytes) + require.Nil(t, derr) + gotFromUpload := new(prompb.WriteRequest) + uerr := proto.Unmarshal(uploadedBytes, gotFromUpload) + assert.Nil(t, uerr) + gotFromUpload.Timeseries = orderBySampleTimestamp(gotFromUpload.Timeseries) + // Even after sorting timeseries, we need to sort them + // also by Label to ensure deterministic ordering. + orderByLabelValue(gotFromUpload) + + // 4.1. Ensure that all the various combinations match up. + // To ensure a deterministic ordering, sort the TimeSeries by Label Name. + assert.Equal(t, want, gotFromUpload) + assert.Equal(t, gotFromWAL, gotFromUpload) +} diff --git a/exporter/prometheusremotewriteexporter/wal.go b/exporter/prometheusremotewriteexporter/wal.go index 5b52b3f1250..d61b368b938 100644 --- a/exporter/prometheusremotewriteexporter/wal.go +++ b/exporter/prometheusremotewriteexporter/wal.go @@ -20,9 +20,11 @@ import ( "fmt" "path/filepath" "sync" + "sync/atomic" "time" "github.com/gogo/protobuf/proto" + multierror "github.com/hashicorp/go-multierror" "github.com/prometheus/prometheus/prompb" "github.com/tidwall/wal" ) @@ -41,6 +43,34 @@ type prweWAL struct { wWALIndex uint64 } +const ( + defaultWALBufferSize = 300 + defaultWALTruncateFrequency = 1 * time.Minute +) + +type walConfig struct { + // Note: These variable names are meant to closely mirror what Prometheus' WAL uses for field names per + // https://docs.google.com/document/d/1cCcoFgjDFwU2n823tKuMvrIhzHty4UDyn0IcfUHiyyI/edit#heading=h.mlf37ibqjgov + // but also we are using underscores "_" instead of dashes "-". + Directory string `mapstructure:"directory"` + BufferSize int `mapstructure:"buffer_size"` + TruncateFrequency time.Duration `mapstructure:"truncate_frequency"` +} + +func (wc *walConfig) bufferSize() int { + if wc.BufferSize > 0 { + return wc.BufferSize + } + return defaultWALBufferSize +} + +func (wc *walConfig) truncateFrequency() time.Duration { + if wc.TruncateFrequency > 0 { + return wc.TruncateFrequency + } + return defaultWALTruncateFrequency +} + func newWAL(walConfig *walConfig, exportSink func(context.Context, []*prompb.WriteRequest) []error) (*prweWAL, error) { if walConfig == nil { // There are cases for which the WAL can be disabled. @@ -58,7 +88,7 @@ func newWAL(walConfig *walConfig, exportSink func(context.Context, []*prompb.Wri func (wc *walConfig) createWAL() (*wal.Log, string, error) { walPath := filepath.Join(wc.Directory, "prom_remotewrite") wal, err := wal.Open(walPath, &wal.Options{ - SegmentCacheSize: wc.nBeforeTruncation(), + SegmentCacheSize: wc.bufferSize(), NoCopy: true, }) if err != nil { @@ -67,24 +97,9 @@ func (wc *walConfig) createWAL() (*wal.Log, string, error) { return wal, walPath, nil } -type walConfig struct { - // Note: These variable names are meant to closely mirror what Prometheus' WAL uses for field names per - // https://docs.google.com/document/d/1cCcoFgjDFwU2n823tKuMvrIhzHty4UDyn0IcfUHiyyI/edit#heading=h.mlf37ibqjgov - // but also we are using underscores "_" instead of dashes "-". - Directory string `mapstructure:"directory"` - NBeforeTruncation int `mapstructure:"n_before_truncation"` - TruncateFrequency time.Duration `mapstructure:"truncate_frequency"` -} - -func (wc *walConfig) nBeforeTruncation() int { - if wc.NBeforeTruncation <= 0 { - return 300 - } - return wc.NBeforeTruncation -} - var ( errAlreadyClosed = errors.New("already closed") + errNilWAL = errors.New("wal is nil") errNilConfig = errors.New("expecting a non-nil configuration") ) @@ -98,6 +113,7 @@ func (prwe *prweWAL) retrieveWALIndices(context.Context) (err error) { if err != nil { return err } + prwe.wal = wal prwe.walPath = walPath @@ -116,6 +132,9 @@ func (prwe *prweWAL) retrieveWALIndices(context.Context) (err error) { func (prwe *prweWAL) stop() error { err := errAlreadyClosed prwe.stopOnce.Do(func() { + prwe.mu.Lock() + defer prwe.mu.Unlock() + close(prwe.stopChan) prwe.closeWAL() err = nil @@ -139,6 +158,92 @@ func (prwe *prweWAL) start(ctx context.Context) error { return nil } +func (prwe *prweWAL) run(ctx context.Context) (err error) { + if err := prwe.start(ctx); err != nil { + return err + } + + // Start the process of exporting but wait until the exporting has started. + waitUntilStartedCh := make(chan bool) + go func() { + prwe.continuallyPopWALThenExport(ctx, func() { close(waitUntilStartedCh) }) + }() + <-waitUntilStartedCh + return nil +} + +// continuallyPopWALThenExport reads a prompb.WriteRequest proto encoded blob from the WAL, and moves +// the WAL's front index forward until either the read buffer period expires or the maximum +// buffer size is exceeded. When either of the two conditions are matched, it then exports +// the requests to the Remote-Write endpoint, and then truncates the head of the WAL to where +// it last read from. +func (prwe *prweWAL) continuallyPopWALThenExport(ctx context.Context, signalStart func()) (err error) { + var reqL []*prompb.WriteRequest + defer func() { + // Keeping it within a closure to ensure that the later + // updated value of reqL is always flushed to disk. + if errL := prwe.exportSink(ctx, reqL); len(errL) != 0 && err == nil { + err = multierror.Append(nil, errL...) + } + }() + + freshTimer := func() *time.Timer { + return time.NewTimer(prwe.walConfig.truncateFrequency()) + } + + timer := freshTimer() + defer func() { + // Added in a closure to ensure we capture the later + // updated value of timer when changed in the loop below. + timer.Stop() + }() + + signalStart() + + maxCountPerUpload := prwe.walConfig.bufferSize() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-prwe.stopChan: + return nil + default: + } + + req, err := prwe.readPrompbFromWAL(ctx, atomic.LoadUint64(&prwe.rWALIndex)) + if err != nil { + return err + } + reqL = append(reqL, req) + + // Now increment the WAL's read index. + atomic.AddUint64(&prwe.rWALIndex, 1) + + shouldExport := false + select { + case <-timer.C: + shouldExport = true + timer.Stop() + timer = freshTimer() + default: + shouldExport = len(reqL) >= maxCountPerUpload + } + + if !shouldExport { + continue + } + + // Otherwise, it is time to export, flush and then truncate the WAL, but also to kill the timer! + timer.Stop() + if err := prwe.exportThenFrontTruncateWAL(ctx, reqL); err != nil { + return err + } + // Reset but reuse the write requests slice. + reqL = reqL[:0] + timer = freshTimer() + } +} + func (prwe *prweWAL) closeWAL() { if prwe.wal != nil { prwe.wal.Close() @@ -146,6 +251,44 @@ func (prwe *prweWAL) closeWAL() { } } +func (prwe *prweWAL) syncAndTruncateFront() error { + prwe.mu.Lock() + defer prwe.mu.Unlock() + + if prwe.wal == nil { + return errNilWAL + } + + // Save all the entries that aren't yet committed, to the tail of the WAL. + if err := prwe.wal.Sync(); err != nil { + return err + } + // Truncate the WAL from the front for the entries that we already + // read from the WAL and had already exported. + if err := prwe.wal.TruncateFront(atomic.LoadUint64(&prwe.rWALIndex)); err != nil && err != wal.ErrOutOfRange { + return err + } + return nil +} + +func (prwe *prweWAL) exportThenFrontTruncateWAL(ctx context.Context, reqL []*prompb.WriteRequest) error { + if len(reqL) == 0 { + return nil + } + if cErr := ctx.Err(); cErr != nil { + return nil + } + + if errL := prwe.exportSink(ctx, reqL); len(errL) != 0 { + return multierror.Append(nil, errL...) + } + if err := prwe.syncAndTruncateFront(); err != nil { + return err + } + // Reset by retrieving the respective read and write WAL indices. + return prwe.retrieveWALIndices(ctx) +} + // persistToWAL is the routine that'll be hooked into the exporter's receiving side and it'll // write them to the Write-Ahead-Log so that shutdowns won't lose data, and that the routine that // reads from the WAL can then process the previously serialized requests. @@ -157,8 +300,8 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error { if err != nil { return err } - prwe.wWALIndex++ - batch.Write(prwe.wWALIndex, protoBlob) + wIndex := atomic.AddUint64(&prwe.wWALIndex, 1) + batch.Write(wIndex, protoBlob) } prwe.mu.Lock() @@ -167,12 +310,23 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error { return prwe.wal.WriteBatch(batch) } -func (prwe *prweWAL) readPrompbFromWAL(_ context.Context, index uint64) (wreq *prompb.WriteRequest, err error) { +func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq *prompb.WriteRequest, err error) { + prwe.mu.Lock() + defer prwe.mu.Unlock() + var protoBlob []byte for i := 0; i < 12; i++ { + // Firstly check if we've been terminated, then exit if so. + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if index <= 0 { index = 1 } + protoBlob, err = prwe.wal.Read(index) if errors.Is(err, wal.ErrNotFound) { time.Sleep(time.Duration(1<