From 8b5d3680691fe409e493795b1fb292c9e1c72533 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 24 Aug 2021 22:15:24 -0700 Subject: [PATCH] exporter/prometheusremotewrite: glue up and use Write-Ahead-Log This change completes the WAL capability for the Prometheus Remote-Write exporter that allows recovery of data if the prior write requests weren't yet exported. This wires up a Write-Ahead-Log (WAL) that was implemented in PR open-telemetry/opentelemetry-collector#3597, which was split off from PR open-telemetry/opentelemetry-collector#3017. Note: there is very rare condition for which we can perhaps send the same data a couple of times, and it can happen if we haven't yet truncated the WAL, the RemoteWrite endpoint received the prior data but the process received a Ctrl+C, kill signal. However, this will be very rare and would have to be timed so fast and precisely. Replaces PR open-telemetry/opentelemetry-collector#3017 Updates PR open-telemetry/opentelemetry-collector#3597 Fixes #4751 Fixes open-telemetry/wg-prometheus#9 --- cmd/configschema/go.mod | 4 +- cmd/configschema/go.sum | 22 +- .../prometheusremotewriteexporter/README.md | 4 + .../prometheusremotewriteexporter/config.go | 1 + .../prometheusremotewriteexporter/exporter.go | 92 +++++- .../exporter_test.go | 158 +++++++++- exporter/prometheusremotewriteexporter/go.mod | 16 +- exporter/prometheusremotewriteexporter/go.sum | 26 +- exporter/prometheusremotewriteexporter/wal.go | 269 ++++++++++++++++-- .../prometheusremotewriteexporter/wal_test.go | 9 +- go.mod | 6 +- go.sum | 10 +- 12 files changed, 548 insertions(+), 69 deletions(-) diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index da0abda344d1..245f862dbb12 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -394,10 +394,10 @@ require ( go.opentelemetry.io/otel/sdk v1.0.0-RC3 // indirect go.opentelemetry.io/otel/trace v1.0.0-RC3 // indirect go.uber.org/atomic v1.9.0 // indirect - go.uber.org/multierr v1.6.0 // indirect + go.uber.org/multierr v1.7.0 // indirect go.uber.org/zap v1.19.0 // indirect golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect - golang.org/x/net v0.0.0-20210716203947-853a461950ff // indirect + golang.org/x/net v0.0.0-20210825183410-e898025ed96a // indirect golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf // indirect diff --git a/cmd/configschema/go.sum b/cmd/configschema/go.sum index 31a3a8ebda4d..7df0af5c7de5 100644 --- a/cmd/configschema/go.sum +++ b/cmd/configschema/go.sum @@ -690,7 +690,10 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= +<<<<<<< HEAD github.com/fsouza/go-dockerclient v1.7.3/go.mod h1:8xfZB8o9SptLNJ13VoV5pMiRbZGWkU/Omu5VOu/KC9Y= +======= +>>>>>>> 874674ba0... exporter/prometheusremotewrite: glue up and use Write-Ahead-Log github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa/go.mod h1:KnogPXtdwXqoenmZCw6S+25EAm2MkxbG0deNDu4cbSA= github.com/fzipp/gocyclo v0.3.1/go.mod h1:DJHO6AUmbdqj2ET4Z9iArSuwWgYDRryYt2wASxc7x3E= github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= @@ -1117,8 +1120,9 @@ github.com/hashicorp/consul/sdk v0.4.0/go.mod h1:fY08Y9z5SvJqevyZNy6WWPXiG3KwBPA github.com/hashicorp/consul/sdk v0.7.0 h1:H6R9d008jDcHPQPAqPNuydAshJ4v5/8URdFnUvK/+sc= github.com/hashicorp/consul/sdk v0.7.0/go.mod h1:fY08Y9z5SvJqevyZNy6WWPXiG3KwBPAvlcdx16zZ0fM= github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= @@ -2277,8 +2281,9 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= -go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= +go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -2458,8 +2463,9 @@ golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds= golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210825183410-e898025ed96a h1:bRuuGXV8wwSdGTB+CtJf+FjgO1APK1CoO39T4BN/XBw= +golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2629,10 +2635,15 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +<<<<<<< HEAD golang.org/x/sys v0.0.0-20210820121016-41cdb8703e55/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201113234701-d7a72108b828/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +======= +golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k= +golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +>>>>>>> 874674ba0... exporter/prometheusremotewrite: glue up and use Write-Ahead-Log golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE= @@ -2913,9 +2924,14 @@ google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f/go.mod h1:ob2IJxKr google.golang.org/genproto v0.0.0-20210728212813-7823e685a01f/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= google.golang.org/genproto v0.0.0-20210813162853-db860fec028c/go.mod h1:cFeNkxwySK631ADgubI+/XFU/xp8FD5KIVV4rj8UC5w= +<<<<<<< HEAD google.golang.org/genproto v0.0.0-20210821163610-241b8fcbd6c8/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71 h1:z+ErRPu0+KS02Td3fOAgdX+lnPDh/VyaABEJPD4JRQs= google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= +======= +google.golang.org/genproto v0.0.0-20210825212027-de86158e7fda h1:iT5uhT54PtbqUsWddv/nnEWdE5e/MTr+Nv3vjxlBP1A= +google.golang.org/genproto v0.0.0-20210825212027-de86158e7fda/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= +>>>>>>> 874674ba0... exporter/prometheusremotewrite: glue up and use Write-Ahead-Log google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= diff --git a/exporter/prometheusremotewriteexporter/README.md b/exporter/prometheusremotewriteexporter/README.md index 2cac307c5fd0..999164c15006 100644 --- a/exporter/prometheusremotewriteexporter/README.md +++ b/exporter/prometheusremotewriteexporter/README.md @@ -48,6 +48,10 @@ Example: exporters: prometheusremotewrite: endpoint: "https://my-cortex:7900/api/v1/push" + wal: # Enabling the Write-Ahead-Log for the exporter. + directory: ./prom_rw # The directory to store the WAL in + buffer_size: 100 # Optional count of elements to be read from the WAL before truncating; default of 300 + truncate_frequency: 45s # Optional frequency for how often the WAL should be truncated. It is a time.ParseDuration; default of 1m ``` ## Advanced Configuration diff --git a/exporter/prometheusremotewriteexporter/config.go b/exporter/prometheusremotewriteexporter/config.go index a5b5ec258e24..1a93e0318f83 100644 --- a/exporter/prometheusremotewriteexporter/config.go +++ b/exporter/prometheusremotewriteexporter/config.go @@ -47,6 +47,7 @@ type Config struct { // "Enabled" - A boolean field to enable/disable this option. Default is `false`. // If enabled, all the resource attributes will be converted to metric labels by default. ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"` + WAL *walConfig `mapstructure:"wal"` } // RemoteWriteQueue allows to configure the remote write queue. diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index c046b1956e47..0874a5b8c7a6 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -30,6 +30,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer/consumererror" @@ -49,6 +50,8 @@ type PRWExporter struct { concurrency int userAgentHeader string clientSettings *confighttp.HTTPClientSettings + + wal *prweWAL } // NewPRWExporter initializes a new PRWExporter instance and sets fields accordingly. @@ -65,7 +68,7 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(buildInfo.Description), " ", "-"), buildInfo.Version) - return &PRWExporter{ + prwe := &PRWExporter{ namespace: cfg.Namespace, externalLabels: sanitizedLabels, endpointURL: endpointURL, @@ -74,19 +77,44 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e userAgentHeader: userAgentHeader, concurrency: cfg.RemoteWriteQueue.NumConsumers, clientSettings: &cfg.HTTPClientSettings, - }, nil + } + if cfg.WAL == nil { + return prwe, nil + } + + prweWAL, err := newWAL(cfg.WAL, prwe.export) + if err != nil { + return nil, err + } + prwe.wal = prweWAL + return prwe, nil } // Start creates the prometheus client -func (prwe *PRWExporter) Start(_ context.Context, host component.Host) (err error) { +func (prwe *PRWExporter) Start(ctx context.Context, host component.Host) (err error) { prwe.client, err = prwe.clientSettings.ToClient(host.GetExtensions()) - return err + if err != nil { + return err + } + return prwe.turnOnWALIfEnabled(ctx) +} + +func (prwe *PRWExporter) shutdownWALIfEnabled() error { + if !prwe.walEnabled() { + return nil + } + return prwe.wal.stop() } // Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations // to finish before returning func (prwe *PRWExporter) Shutdown(context.Context) error { - close(prwe.closeChan) + select { + case <-prwe.closeChan: + default: + close(prwe.closeChan) + } + prwe.shutdownWALIfEnabled() prwe.wg.Wait() return nil } @@ -166,7 +194,7 @@ func (prwe *PRWExporter) PushMetrics(ctx context.Context, md pdata.Metrics) erro } } - if exportErrors := prwe.export(ctx, tsMap); len(exportErrors) != 0 { + if exportErrors := prwe.handleExport(ctx, tsMap); len(exportErrors) != 0 { dropped = md.MetricCount() errs = append(errs, exportErrors...) } @@ -208,8 +236,7 @@ func (prwe *PRWExporter) addNumberDataPointSlice(dataPoints pdata.NumberDataPoin return nil } -// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order -func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error { +func (prwe *PRWExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error { var errs []error // Calls the helper function to convert and batch the TsMap to the desired format requests, err := batchTimeSeries(tsMap, maxBatchByteSize) @@ -217,7 +244,21 @@ func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti errs = append(errs, consumererror.Permanent(err)) return errs } + if !prwe.walEnabled() { + // Perform a direct export otherwise. + return prwe.export(ctx, requests) + } + // Otherwise the WAL is enabled, and just persist the requests to the WAL + // and they'll be exported in another goroutine to the RemoteWrite endpoint. + if err := prwe.wal.persistToWAL(requests); err != nil { + errs = append(errs, consumererror.Permanent(err)) + } + return errs +} + +// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order +func (prwe *PRWExporter) export(ctx context.Context, requests []*prompb.WriteRequest) (errs []error) { input := make(chan *prompb.WriteRequest, len(requests)) for _, request := range requests { input <- request @@ -236,12 +277,21 @@ func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti go func() { defer wg.Done() - for request := range input { - err := prwe.execute(ctx, request) - if err != nil { - mu.Lock() - errs = append(errs, err) - mu.Unlock() + for { + select { + case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled. + return + + case request, ok := <-input: + if !ok { + return + } + + if err := prwe.execute(ctx, request); err != nil { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } } } }() @@ -293,3 +343,17 @@ func (prwe *PRWExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ } return consumererror.Permanent(rerr) } + +func (prwe *PRWExporter) walEnabled() bool { return prwe.wal != nil } + +func (prwe *PRWExporter) turnOnWALIfEnabled(ctx context.Context) error { + if !prwe.walEnabled() { + return nil + } + cancelCtx, cancel := context.WithCancel(ctx) + go func() { + <-prwe.closeChan + cancel() + }() + return prwe.wal.run(cancelCtx) +} diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index d5e771d5428d..72c35a1c8018 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -16,22 +16,27 @@ package prometheusremotewriteexporter import ( "context" + "fmt" "io/ioutil" "net/http" "net/http/httptest" "net/url" + "strings" "sync" "testing" + "time" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configparser" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/model/pdata" @@ -345,7 +350,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error { return errs } - errs = append(errs, prwe.export(context.Background(), testmap)...) + errs = append(errs, prwe.handleExport(context.Background(), testmap)...) return errs } @@ -601,3 +606,154 @@ func Test_validateAndSanitizeExternalLabels(t *testing.T) { }) } } + +// Ensures that when we attach the Write-Ahead-Log(WAL) to the exporter, +// that it successfully writes the serialized prompb.WriteRequests to the WAL, +// and that we can retrieve those exact requests back from the WAL, when the +// exporter starts up once again, that it picks up where it left off. +func TestWALOnExporterRoundTrip(t *testing.T) { + if testing.Short() { + t.Skip("This test could run for long") + } + + // 1. Create a mock Prometheus Remote Write Exporter that'll just + // receive the bytes uploaded to it by our exporter. + uploadedBytesCh := make(chan []byte, 1) + exiting := make(chan bool) + prweServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + uploaded, err2 := ioutil.ReadAll(req.Body) + assert.Nil(t, err2, "Error while reading from HTTP upload") + select { + case uploadedBytesCh <- uploaded: + case <-exiting: + return + } + })) + defer prweServer.Close() + + // 2. Create the WAL configuration, create the + // exporter and export some time series! + tempDir := t.TempDir() + yamlConfig := fmt.Sprintf(` +namespace: "test_ns" +endpoint: %q +remote_write_queue: + num_consumers: 1 + +wal: + directory: %s + truncate_frequency: 60us + buffer_size: 1 + `, prweServer.URL, tempDir) + + parser, err := configparser.NewParserFromBuffer(strings.NewReader(yamlConfig)) + require.Nil(t, err) + + fullConfig := new(Config) + if err = parser.UnmarshalExact(fullConfig); err != nil { + t.Fatalf("Failed to parse config from YAML: %v", err) + } + walConfig := fullConfig.WAL + require.NotNil(t, walConfig) + require.Equal(t, walConfig.TruncateFrequency, 60*time.Microsecond) + require.Equal(t, walConfig.Directory, tempDir) + + var defaultBuildInfo = component.BuildInfo{ + Description: "OpenTelemetry Collector", + Version: "1.0", + } + prwe, perr := NewPRWExporter(fullConfig, defaultBuildInfo) + assert.Nil(t, perr) + + nopHost := componenttest.NewNopHost() + ctx := context.Background() + require.Nil(t, prwe.Start(ctx, nopHost)) + defer prwe.Shutdown(ctx) + defer close(exiting) + require.NotNil(t, prwe.wal) + + ts1 := &prompb.TimeSeries{ + Labels: []prompb.Label{{Name: "ts1l1", Value: "ts1k1"}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: 100}}, + } + ts2 := &prompb.TimeSeries{ + Labels: []prompb.Label{{Name: "ts2l1", Value: "ts2k1"}}, + Samples: []prompb.Sample{{Value: 2, Timestamp: 200}}, + } + tsMap := map[string]*prompb.TimeSeries{ + "timeseries1": ts1, + "timeseries2": ts2, + } + errs := prwe.handleExport(ctx, tsMap) + assert.Nil(t, errs) + // Shutdown after we've written to the WAL. This ensures that our + // exported data in-flight will flushed flushed to the WAL before exiting. + prwe.Shutdown(ctx) + + // 3. Let's now read back all of the WAL records and ensure + // that all the prompb.WriteRequest values exist as we sent them. + wal, _, werr := walConfig.createWAL() + assert.Nil(t, werr) + assert.NotNil(t, wal) + defer wal.Close() + + // Read all the indices. + firstIndex, ierr := wal.FirstIndex() + assert.Nil(t, ierr) + lastIndex, ierr := wal.LastIndex() + assert.Nil(t, ierr) + + var reqs []*prompb.WriteRequest + for i := firstIndex; i <= lastIndex; i++ { + protoBlob, perr := wal.Read(i) + assert.Nil(t, perr) + assert.NotNil(t, protoBlob) + req := new(prompb.WriteRequest) + err = proto.Unmarshal(protoBlob, req) + assert.Nil(t, err) + reqs = append(reqs, req) + } + assert.Equal(t, 1, len(reqs)) + // We MUST have 2 time series as were passed into tsMap. + gotFromWAL := reqs[0] + assert.Equal(t, 2, len(gotFromWAL.Timeseries)) + want := &prompb.WriteRequest{ + Timeseries: orderBySampleTimestamp([]prompb.TimeSeries{ + *ts1, *ts2, + }), + } + + // Even after sorting timeseries, we need to sort them + // also by Label to ensure deterministic ordering. + orderByLabelValue(gotFromWAL) + gotFromWAL.Timeseries = orderBySampleTimestamp(gotFromWAL.Timeseries) + orderByLabelValue(want) + + assert.Equal(t, want, gotFromWAL) + + // 4. Finally, ensure that the bytes that were uploaded to the + // Prometheus Remote Write endpoint are exactly as were saved in the WAL. + // Read from that same WAL, export to the RWExporter server. + prwe2, err := NewPRWExporter(fullConfig, defaultBuildInfo) + assert.Nil(t, err) + require.Nil(t, prwe2.Start(ctx, nopHost)) + defer prwe2.Shutdown(ctx) + require.NotNil(t, prwe2.wal) + + snappyEncodedBytes := <-uploadedBytesCh + decodeBuffer := make([]byte, len(snappyEncodedBytes)) + uploadedBytes, derr := snappy.Decode(decodeBuffer, snappyEncodedBytes) + require.Nil(t, derr) + gotFromUpload := new(prompb.WriteRequest) + uerr := proto.Unmarshal(uploadedBytes, gotFromUpload) + assert.Nil(t, uerr) + gotFromUpload.Timeseries = orderBySampleTimestamp(gotFromUpload.Timeseries) + // Even after sorting timeseries, we need to sort them + // also by Label to ensure deterministic ordering. + orderByLabelValue(gotFromUpload) + + // 4.1. Ensure that all the various combinations match up. + // To ensure a deterministic ordering, sort the TimeSeries by Label Name. + assert.Equal(t, want, gotFromUpload) + assert.Equal(t, gotFromWAL, gotFromUpload) +} diff --git a/exporter/prometheusremotewriteexporter/go.mod b/exporter/prometheusremotewriteexporter/go.mod index 95cca8aeb1af..e462d8d06d31 100644 --- a/exporter/prometheusremotewriteexporter/go.mod +++ b/exporter/prometheusremotewriteexporter/go.mod @@ -3,23 +3,28 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/promet go 1.17 require ( + github.com/fsnotify/fsnotify v1.5.1 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 - github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.35.0 - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.35.0 + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 github.com/prometheus/common v0.30.0 github.com/prometheus/prometheus v1.8.2-0.20210621150501-ff58416a0b02 github.com/stretchr/testify v1.7.0 + github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/wal v0.1.5 go.opentelemetry.io/collector v0.35.0 go.opentelemetry.io/collector/model v0.35.0 + go.uber.org/multierr v1.7.0 // indirect + golang.org/x/net v0.0.0-20210825183410-e898025ed96a // indirect + golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf // indirect + google.golang.org/genproto v0.0.0-20210825212027-de86158e7fda // indirect ) require ( github.com/cenkalti/backoff/v4 v4.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/felixge/httpsnoop v1.0.2 // indirect - github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/knadh/koanf v1.2.2 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect @@ -30,7 +35,6 @@ require ( github.com/spf13/cast v1.4.1 // indirect github.com/tidwall/gjson v1.8.1 // indirect github.com/tidwall/match v1.0.3 // indirect - github.com/tidwall/pretty v1.1.0 // indirect github.com/tidwall/tinylru v1.0.2 // indirect go.opencensus.io v0.23.0 // indirect go.opentelemetry.io/contrib v0.23.0 // indirect @@ -40,12 +44,8 @@ require ( go.opentelemetry.io/otel/metric v0.23.0 // indirect go.opentelemetry.io/otel/trace v1.0.0-RC3 // indirect go.uber.org/atomic v1.9.0 // indirect - go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.19.0 // indirect - golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect - golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect golang.org/x/text v0.3.7 // indirect - google.golang.org/genproto v0.0.0-20210604141403-392c879c8b08 // indirect google.golang.org/grpc v1.40.0 // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/exporter/prometheusremotewriteexporter/go.sum b/exporter/prometheusremotewriteexporter/go.sum index 1c21dde6e609..ee73da81b219 100644 --- a/exporter/prometheusremotewriteexporter/go.sum +++ b/exporter/prometheusremotewriteexporter/go.sum @@ -242,8 +242,9 @@ github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15/go.mod h1:tPg4cp github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= +github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo5dl+VrEg= github.com/gdamore/tcell v1.3.0/go.mod h1:Hjvr+Ofd+gLglo7RYKxxnzCBmev3BzsS67MebKS4zMM= github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -521,6 +522,8 @@ github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyN github.com/hashicorp/consul/sdk v0.4.0/go.mod h1:fY08Y9z5SvJqevyZNy6WWPXiG3KwBPAvlcdx16zZ0fM= github.com/hashicorp/consul/sdk v0.7.0/go.mod h1:fY08Y9z5SvJqevyZNy6WWPXiG3KwBPAvlcdx16zZ0fM= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI= @@ -532,6 +535,8 @@ github.com/hashicorp/go-immutable-radix v1.2.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-plugin v1.0.1/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-retryablehttp v0.5.4/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= @@ -916,8 +921,9 @@ github.com/tidwall/gjson v1.8.1/go.mod h1:5/xDoumyyDNerp2U36lyolv46b3uF/9Bu6OfyQ github.com/tidwall/match v1.0.3 h1:FQUVvBImDutD8wJLN6c5eMzWtjgONK9MwIBCOrUJKeE= github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tidwall/pretty v1.1.0 h1:K3hMW5epkdAVwibsQEfR/7Zj0Qgt4DxtNumTq/VloO8= github.com/tidwall/pretty v1.1.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/tinylru v1.0.2 h1:W4mp7iUz4cnVMqAvWy2zbzC35ASv5sqdyyEjoQKKBFg= github.com/tidwall/tinylru v1.0.2/go.mod h1:HDVL7TsWeezQ4g44Um84TOVBMFcq7Xa9giqNc805KJ8= github.com/tidwall/wal v0.1.5 h1:RUG4al7k6tJLGteZnUDjSLMZd5n1LLFHvUL7pwfKVmI= @@ -1027,8 +1033,9 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= -go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= +go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -1164,8 +1171,9 @@ golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1 golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210825183410-e898025ed96a h1:bRuuGXV8wwSdGTB+CtJf+FjgO1APK1CoO39T4BN/XBw= +golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1278,8 +1286,10 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210611083646-a4fc73990273/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k= +golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -1380,8 +1390,9 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.3 h1:L69ShwSZEyCsLKoAxDKeMvLDZkumEe8gXUZAjab0tX8= golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1474,8 +1485,9 @@ google.golang.org/genproto v0.0.0-20210319143718-93e7006c17a6/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= google.golang.org/genproto v0.0.0-20210513213006-bf773b8c8384/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= -google.golang.org/genproto v0.0.0-20210604141403-392c879c8b08 h1:pc16UedxnxXXtGxHCSUhafAoVHQZ0yXl8ZelMH4EETc= google.golang.org/genproto v0.0.0-20210604141403-392c879c8b08/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= +google.golang.org/genproto v0.0.0-20210825212027-de86158e7fda h1:iT5uhT54PtbqUsWddv/nnEWdE5e/MTr+Nv3vjxlBP1A= +google.golang.org/genproto v0.0.0-20210825212027-de86158e7fda/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/exporter/prometheusremotewriteexporter/wal.go b/exporter/prometheusremotewriteexporter/wal.go index 5b52b3f12504..1ea54cab0b33 100644 --- a/exporter/prometheusremotewriteexporter/wal.go +++ b/exporter/prometheusremotewriteexporter/wal.go @@ -20,9 +20,12 @@ import ( "fmt" "path/filepath" "sync" + "sync/atomic" "time" + "github.com/fsnotify/fsnotify" "github.com/gogo/protobuf/proto" + multierror "github.com/hashicorp/go-multierror" "github.com/prometheus/prometheus/prompb" "github.com/tidwall/wal" ) @@ -41,6 +44,34 @@ type prweWAL struct { wWALIndex uint64 } +const ( + defaultWALBufferSize = 300 + defaultWALTruncateFrequency = 1 * time.Minute +) + +type walConfig struct { + // Note: These variable names are meant to closely mirror what Prometheus' WAL uses for field names per + // https://docs.google.com/document/d/1cCcoFgjDFwU2n823tKuMvrIhzHty4UDyn0IcfUHiyyI/edit#heading=h.mlf37ibqjgov + // but also we are using underscores "_" instead of dashes "-". + Directory string `mapstructure:"directory"` + BufferSize int `mapstructure:"buffer_size"` + TruncateFrequency time.Duration `mapstructure:"truncate_frequency"` +} + +func (wc *walConfig) bufferSize() int { + if wc.BufferSize > 0 { + return wc.BufferSize + } + return defaultWALBufferSize +} + +func (wc *walConfig) truncateFrequency() time.Duration { + if wc.TruncateFrequency > 0 { + return wc.TruncateFrequency + } + return defaultWALTruncateFrequency +} + func newWAL(walConfig *walConfig, exportSink func(context.Context, []*prompb.WriteRequest) []error) (*prweWAL, error) { if walConfig == nil { // There are cases for which the WAL can be disabled. @@ -58,7 +89,7 @@ func newWAL(walConfig *walConfig, exportSink func(context.Context, []*prompb.Wri func (wc *walConfig) createWAL() (*wal.Log, string, error) { walPath := filepath.Join(wc.Directory, "prom_remotewrite") wal, err := wal.Open(walPath, &wal.Options{ - SegmentCacheSize: wc.nBeforeTruncation(), + SegmentCacheSize: wc.bufferSize(), NoCopy: true, }) if err != nil { @@ -67,24 +98,9 @@ func (wc *walConfig) createWAL() (*wal.Log, string, error) { return wal, walPath, nil } -type walConfig struct { - // Note: These variable names are meant to closely mirror what Prometheus' WAL uses for field names per - // https://docs.google.com/document/d/1cCcoFgjDFwU2n823tKuMvrIhzHty4UDyn0IcfUHiyyI/edit#heading=h.mlf37ibqjgov - // but also we are using underscores "_" instead of dashes "-". - Directory string `mapstructure:"directory"` - NBeforeTruncation int `mapstructure:"n_before_truncation"` - TruncateFrequency time.Duration `mapstructure:"truncate_frequency"` -} - -func (wc *walConfig) nBeforeTruncation() int { - if wc.NBeforeTruncation <= 0 { - return 300 - } - return wc.NBeforeTruncation -} - var ( errAlreadyClosed = errors.New("already closed") + errNilWAL = errors.New("wal is nil") errNilConfig = errors.New("expecting a non-nil configuration") ) @@ -98,6 +114,7 @@ func (prwe *prweWAL) retrieveWALIndices(context.Context) (err error) { if err != nil { return err } + prwe.wal = wal prwe.walPath = walPath @@ -116,6 +133,9 @@ func (prwe *prweWAL) retrieveWALIndices(context.Context) (err error) { func (prwe *prweWAL) stop() error { err := errAlreadyClosed prwe.stopOnce.Do(func() { + prwe.mu.Lock() + defer prwe.mu.Unlock() + close(prwe.stopChan) prwe.closeWAL() err = nil @@ -139,6 +159,92 @@ func (prwe *prweWAL) start(ctx context.Context) error { return nil } +func (prwe *prweWAL) run(ctx context.Context) (err error) { + if err := prwe.start(ctx); err != nil { + return err + } + + // Start the process of exporting but wait until the exporting has started. + waitUntilStartedCh := make(chan bool) + go func() { + prwe.continuallyPopWALThenExport(ctx, func() { close(waitUntilStartedCh) }) + }() + <-waitUntilStartedCh + return nil +} + +// continuallyPopWALThenExport reads a prompb.WriteRequest proto encoded blob from the WAL, and moves +// the WAL's front index forward until either the read buffer period expires or the maximum +// buffer size is exceeded. When either of the two conditions are matched, it then exports +// the requests to the Remote-Write endpoint, and then truncates the head of the WAL to where +// it last read from. +func (prwe *prweWAL) continuallyPopWALThenExport(ctx context.Context, signalStart func()) (err error) { + var reqL []*prompb.WriteRequest + defer func() { + // Keeping it within a closure to ensure that the later + // updated value of reqL is always flushed to disk. + if errL := prwe.exportSink(ctx, reqL); len(errL) != 0 && err == nil { + err = multierror.Append(nil, errL...) + } + }() + + freshTimer := func() *time.Timer { + return time.NewTimer(prwe.walConfig.truncateFrequency()) + } + + timer := freshTimer() + defer func() { + // Added in a closure to ensure we capture the later + // updated value of timer when changed in the loop below. + timer.Stop() + }() + + signalStart() + + maxCountPerUpload := prwe.walConfig.bufferSize() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-prwe.stopChan: + return nil + default: + } + + req, err := prwe.readPrompbFromWAL(ctx, atomic.LoadUint64(&prwe.rWALIndex)) + if err != nil { + return err + } + reqL = append(reqL, req) + + // Now increment the WAL's read index. + atomic.AddUint64(&prwe.rWALIndex, 1) + + shouldExport := false + select { + case <-timer.C: + shouldExport = true + timer.Stop() + timer = freshTimer() + default: + shouldExport = len(reqL) >= maxCountPerUpload + } + + if !shouldExport { + continue + } + + // Otherwise, it is time to export, flush and then truncate the WAL, but also to kill the timer! + timer.Stop() + if err := prwe.exportThenFrontTruncateWAL(ctx, reqL); err != nil { + return err + } + // Reset but reuse the write requests slice. + reqL = reqL[:0] + timer = freshTimer() + } +} + func (prwe *prweWAL) closeWAL() { if prwe.wal != nil { prwe.wal.Close() @@ -146,6 +252,44 @@ func (prwe *prweWAL) closeWAL() { } } +func (prwe *prweWAL) syncAndTruncateFront() error { + prwe.mu.Lock() + defer prwe.mu.Unlock() + + if prwe.wal == nil { + return errNilWAL + } + + // Save all the entries that aren't yet committed, to the tail of the WAL. + if err := prwe.wal.Sync(); err != nil { + return err + } + // Truncate the WAL from the front for the entries that we already + // read from the WAL and had already exported. + if err := prwe.wal.TruncateFront(atomic.LoadUint64(&prwe.rWALIndex)); err != nil && err != wal.ErrOutOfRange { + return err + } + return nil +} + +func (prwe *prweWAL) exportThenFrontTruncateWAL(ctx context.Context, reqL []*prompb.WriteRequest) error { + if len(reqL) == 0 { + return nil + } + if cErr := ctx.Err(); cErr != nil { + return nil + } + + if errL := prwe.exportSink(ctx, reqL); len(errL) != 0 { + return multierror.Append(nil, errL...) + } + if err := prwe.syncAndTruncateFront(); err != nil { + return err + } + // Reset by retrieving the respective read and write WAL indices. + return prwe.retrieveWALIndices(ctx) +} + // persistToWAL is the routine that'll be hooked into the exporter's receiving side and it'll // write them to the Write-Ahead-Log so that shutdowns won't lose data, and that the routine that // reads from the WAL can then process the previously serialized requests. @@ -157,8 +301,8 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error { if err != nil { return err } - prwe.wWALIndex++ - batch.Write(prwe.wWALIndex, protoBlob) + wIndex := atomic.AddUint64(&prwe.wWALIndex, 1) + batch.Write(wIndex, protoBlob) } prwe.mu.Lock() @@ -167,26 +311,97 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error { return prwe.wal.WriteBatch(batch) } -func (prwe *prweWAL) readPrompbFromWAL(_ context.Context, index uint64) (wreq *prompb.WriteRequest, err error) { +func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq *prompb.WriteRequest, err error) { + prwe.mu.Lock() + defer prwe.mu.Unlock() + var protoBlob []byte for i := 0; i < 12; i++ { + // Firstly check if we've been terminated, then exit if so. + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if index <= 0 { index = 1 } + protoBlob, err = prwe.wal.Read(index) - if errors.Is(err, wal.ErrNotFound) { + if err == nil { // The read succeeded. + req := new(prompb.WriteRequest) + if err = proto.Unmarshal(protoBlob, req); err != nil { + return nil, err + } + return req, nil + } + + if !errors.Is(err, wal.ErrNotFound) { + return nil, err + } + + if index <= 1 { + // This could be the very first attempted read, so try again, after a small sleep. time.Sleep(time.Duration(1<