Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusremotewrite] hookup WAL #7304

Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
8b5d368
exporter/prometheusremotewrite: glue up and use Write-Ahead-Log
odeke-em Aug 25, 2021
b113a59
fix go.mod
Aneurysm9 Jan 20, 2022
209c650
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Jan 20, 2022
800f9cd
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Jan 21, 2022
06a1c48
exporter/prw: WALConfig should be exported to allow programmatic mani…
Aneurysm9 Jan 21, 2022
875897b
Add CHANGELOG entry
Aneurysm9 Jan 21, 2022
d76771b
fix lint error
Aneurysm9 Jan 21, 2022
c055725
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Jan 21, 2022
31a0941
mod tidy
Aneurysm9 Jan 21, 2022
2b11f30
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Jan 24, 2022
702584a
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Jan 27, 2022
26d9b75
tidy up WAL logic and comments
Aneurysm9 Jan 27, 2022
6b043de
Merge branch 'main' into exporter-prometheusremotewrite-hookup-WAL
Jan 27, 2022
0044c37
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Jan 27, 2022
22b3f60
Merge remote-tracking branch 'a9/exporter-prometheusremotewrite-hooku…
Aneurysm9 Jan 27, 2022
a7853ed
Merge branch 'main' into exporter-prometheusremotewrite-hookup-WAL
jpkrohling Jan 28, 2022
a687a57
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Feb 7, 2022
6a749e5
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Feb 7, 2022
4a2dc2f
Handle error from closing WAL
Aneurysm9 Feb 7, 2022
9bbef4f
Ensure WAL processor keeps running after error
Aneurysm9 Feb 8, 2022
c1f39ee
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Feb 25, 2022
65dfa99
prwe/WAL: refactor WAL run loop
Aneurysm9 Feb 26, 2022
5f4188d
Merge remote-tracking branch 'a9/exporter-prometheusremotewrite-hooku…
Aneurysm9 Feb 26, 2022
20b7e57
make gotidy
Aneurysm9 Feb 26, 2022
d5d26f5
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Feb 26, 2022
8a2ca9b
Merge remote-tracking branch 'a9/exporter-prometheusremotewrite-hooku…
Aneurysm9 Feb 26, 2022
2913547
lint
Aneurysm9 Feb 26, 2022
a9d50b9
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Feb 28, 2022
be040f0
fix data races
Aneurysm9 Feb 28, 2022
5d611fd
Ensure locking around entire WAL persistence routine
Aneurysm9 Feb 28, 2022
5a6d11c
Address PR feedback
Aneurysm9 Feb 28, 2022
2b3f568
fix lint issues
Aneurysm9 Feb 28, 2022
f861153
update read index inside WAL reader routine
Aneurysm9 Feb 28, 2022
410f934
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Mar 1, 2022
008f6ff
Update CHANGELOG.md
Aneurysm9 Mar 3, 2022
a90b53b
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Mar 3, 2022
1839a2d
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
Aneurysm9 Mar 7, 2022
5ec4301
Undo unrelated go.mod changes
Aneurysm9 Mar 7, 2022
2e123ab
Merge branch 'main' into exporter-prometheusremotewrite-hookup-WAL
jpkrohling Mar 7, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### 💡 Enhancements 💡

- `prometheusremotewriteexporter`: Write-Ahead Log support enabled (#7304)

## v0.46.0

### 💡 Enhancements 💡
Expand Down
4 changes: 2 additions & 2 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ require (
github.com/grobie/gomemcache v0.0.0-20180201122607-1f779c573665 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/consul/api v1.12.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
Expand Down Expand Up @@ -377,7 +377,7 @@ require (
github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.7.0.20210223165440-c65ae3540d44 // indirect
github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921 // indirect
github.com/shirou/gopsutil v2.20.9+incompatible // indirect
github.com/shirou/gopsutil v3.21.10+incompatible // indirect
github.com/shirou/gopsutil/v3 v3.22.2 // indirect
github.com/signalfx/com_signalfx_metrics_protobuf v0.0.3 // indirect
github.com/signalfx/gohistogram v0.0.0-20160107210732-1ccfd2ff5083 // indirect
Expand Down
8 changes: 4 additions & 4 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions exporter/awsprometheusremotewriteexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ require (
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-logr/logr v1.2.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/knadh/koanf v1.4.0 // indirect
Expand Down Expand Up @@ -47,6 +50,7 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
google.golang.org/grpc v1.44.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions exporter/awsprometheusremotewriteexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ Example:
exporters:
prometheusremotewrite:
endpoint: "https://my-cortex:7900/api/v1/push"
wal: # Enabling the Write-Ahead-Log for the exporter.
directory: ./prom_rw # The directory to store the WAL in
buffer_size: 100 # Optional count of elements to be read from the WAL before truncating; default of 300
truncate_frequency: 45s # Optional frequency for how often the WAL should be truncated. It is a time.ParseDuration; default of 1m
```

Example:
Expand Down
1 change: 1 addition & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Config struct {
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"`
WAL *WALConfig `mapstructure:"wal"`
}

// RemoteWriteQueue allows to configure the remote write queue.
Expand Down
46 changes: 46 additions & 0 deletions exporter/prometheusremotewriteexporter/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheusremotewriteexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter"

import (
"context"
"fmt"

"go.uber.org/zap"
)

type ctxKey int

const (
loggerCtxKey ctxKey = iota
)

func contextWithLogger(ctx context.Context, log *zap.Logger) context.Context {
return context.WithValue(ctx, loggerCtxKey, log)
}

func loggerFromContext(ctx context.Context) (*zap.Logger, error) {
v := ctx.Value(loggerCtxKey)
if v == nil {
return nil, fmt.Errorf("no logger found in context")
}

l, ok := v.(*zap.Logger)
if !ok {
return nil, fmt.Errorf("invalid logger found in context")
}

return l, nil
}
90 changes: 76 additions & 14 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type prwExporter struct {
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings
settings component.TelemetrySettings

wal *prweWAL
}

// newPRWExporter initializes a new prwExporter instance and sets fields accordingly.
Expand All @@ -69,7 +71,7 @@ func newPRWExporter(cfg *Config, set component.ExporterCreateSettings) (*prwExpo

userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version)

return &prwExporter{
prwe := &prwExporter{
namespace: cfg.Namespace,
externalLabels: sanitizedLabels,
endpointURL: endpointURL,
Expand All @@ -79,21 +81,45 @@ func newPRWExporter(cfg *Config, set component.ExporterCreateSettings) (*prwExpo
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
settings: set.TelemetrySettings,
}, nil
}
if cfg.WAL == nil {
return prwe, nil
}

prwe.wal, err = newWAL(cfg.WAL, prwe.export)
if err != nil {
return nil, err
}
return prwe, nil
}

// Start creates the prometheus client
func (prwe *prwExporter) Start(_ context.Context, host component.Host) (err error) {
func (prwe *prwExporter) Start(ctx context.Context, host component.Host) (err error) {
prwe.client, err = prwe.clientSettings.ToClient(host.GetExtensions(), prwe.settings)
return err
if err != nil {
return err
}
return prwe.turnOnWALIfEnabled(contextWithLogger(ctx, prwe.settings.Logger.Named("prw.wal")))
}

func (prwe *prwExporter) shutdownWALIfEnabled() error {
if !prwe.walEnabled() {
return nil
}
return prwe.wal.stop()
}

// Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
// to finish before returning
func (prwe *prwExporter) Shutdown(context.Context) error {
close(prwe.closeChan)
select {
case <-prwe.closeChan:
default:
close(prwe.closeChan)
}
err := prwe.shutdownWALIfEnabled()
prwe.wg.Wait()
return nil
return err
}

// PushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of
Expand All @@ -112,7 +138,7 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) erro
err = consumererror.NewPermanent(err)
}
// Call export even if a conversion error, since there may be points that were successfully converted.
return multierr.Combine(err, prwe.export(ctx, tsMap))
return multierr.Combine(err, prwe.handleExport(ctx, tsMap))
}
}

Expand Down Expand Up @@ -144,14 +170,27 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) {
return sanitizedLabels, nil
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *prwExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, maxBatchByteSize)
if err != nil {
return err
}
if !prwe.walEnabled() {
// Perform a direct export otherwise.
return prwe.export(ctx, requests)
}

// Otherwise the WAL is enabled, and just persist the requests to the WAL
// and they'll be exported in another goroutine to the RemoteWrite endpoint.
if err = prwe.wal.persistToWAL(requests); err != nil {
return consumererror.NewPermanent(err)
}
return nil
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteRequest) error {
input := make(chan *prompb.WriteRequest, len(requests))
for _, request := range requests {
input <- request
Expand All @@ -170,11 +209,20 @@ func (prwe *prwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
for i := 0; i < concurrencyLimit; i++ {
go func() {
defer wg.Done()
for request := range input {
if errExecute := prwe.execute(ctx, request); errExecute != nil {
mu.Lock()
errs = multierr.Append(errs, consumererror.NewPermanent(errExecute))
mu.Unlock()
for {
select {
case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled.
return

case request, ok := <-input:
if !ok {
return
}
if errExecute := prwe.execute(ctx, request); errExecute != nil {
mu.Lock()
errs = multierr.Append(errs, consumererror.NewPermanent(errExecute))
mu.Unlock()
}
}
}
}()
Expand Down Expand Up @@ -226,3 +274,17 @@ func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ
}
return consumererror.NewPermanent(rerr)
}

func (prwe *prwExporter) walEnabled() bool { return prwe.wal != nil }

func (prwe *prwExporter) turnOnWALIfEnabled(ctx context.Context) error {
if !prwe.walEnabled() {
return nil
}
cancelCtx, cancel := context.WithCancel(ctx)
go func() {
<-prwe.closeChan
cancel()
}()
return prwe.wal.run(cancelCtx)
}
Loading