Skip to content

Commit

Permalink
exporter/prometheusremotewrite: glue up and use Write-Ahead-Log
Browse files Browse the repository at this point in the history
This change completes the WAL capability for the Prometheus Remote-Write exporter
that allows recovery of data if the prior write requests weren't yet exported.

This wires up a Write-Ahead-Log (WAL) that was implemented in
PR open-telemetry/opentelemetry-collector#3597, which
was split off from PR open-telemetry/opentelemetry-collector#3017.

Note: there is very rare condition for which we can perhaps send the
same data a couple of times, and it can happen if we haven't yet truncated
the WAL, the RemoteWrite endpoint received the prior data but the process
received a Ctrl+C, kill signal. However, this will be very rare and
would have to be timed so fast and precisely.

Replaces PR open-telemetry/opentelemetry-collector#3017
Updates PR open-telemetry/opentelemetry-collector#3597
Fixes #4751
Fixes open-telemetry/prometheus-interoperability-spec#9
  • Loading branch information
odeke-em committed Sep 11, 2021
1 parent f9b04ce commit 8b5d368
Show file tree
Hide file tree
Showing 12 changed files with 548 additions and 69 deletions.
4 changes: 2 additions & 2 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,10 @@ require (
go.opentelemetry.io/otel/sdk v1.0.0-RC3 // indirect
go.opentelemetry.io/otel/trace v1.0.0-RC3 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.0 // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
golang.org/x/net v0.0.0-20210716203947-853a461950ff // indirect
golang.org/x/net v0.0.0-20210825183410-e898025ed96a // indirect
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf // indirect
Expand Down
22 changes: 19 additions & 3 deletions cmd/configschema/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,10 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
<<<<<<< HEAD
github.com/fsouza/go-dockerclient v1.7.3/go.mod h1:8xfZB8o9SptLNJ13VoV5pMiRbZGWkU/Omu5VOu/KC9Y=
=======
>>>>>>> 874674ba0... exporter/prometheusremotewrite: glue up and use Write-Ahead-Log
github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa/go.mod h1:KnogPXtdwXqoenmZCw6S+25EAm2MkxbG0deNDu4cbSA=
github.com/fzipp/gocyclo v0.3.1/go.mod h1:DJHO6AUmbdqj2ET4Z9iArSuwWgYDRryYt2wASxc7x3E=
github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY=
Expand Down Expand Up @@ -1117,8 +1120,9 @@ github.com/hashicorp/consul/sdk v0.4.0/go.mod h1:fY08Y9z5SvJqevyZNy6WWPXiG3KwBPA
github.com/hashicorp/consul/sdk v0.7.0 h1:H6R9d008jDcHPQPAqPNuydAshJ4v5/8URdFnUvK/+sc=
github.com/hashicorp/consul/sdk v0.7.0/go.mod h1:fY08Y9z5SvJqevyZNy6WWPXiG3KwBPAvlcdx16zZ0fM=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
Expand Down Expand Up @@ -2277,8 +2281,9 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
Expand Down Expand Up @@ -2458,8 +2463,9 @@ golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds=
golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210825183410-e898025ed96a h1:bRuuGXV8wwSdGTB+CtJf+FjgO1APK1CoO39T4BN/XBw=
golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -2629,10 +2635,15 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
<<<<<<< HEAD
golang.org/x/sys v0.0.0-20210820121016-41cdb8703e55/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201113234701-d7a72108b828/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
=======
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
>>>>>>> 874674ba0... exporter/prometheusremotewrite: glue up and use Write-Ahead-Log
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
Expand Down Expand Up @@ -2913,9 +2924,14 @@ google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f/go.mod h1:ob2IJxKr
google.golang.org/genproto v0.0.0-20210728212813-7823e685a01f/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48=
google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48=
google.golang.org/genproto v0.0.0-20210813162853-db860fec028c/go.mod h1:cFeNkxwySK631ADgubI+/XFU/xp8FD5KIVV4rj8UC5w=
<<<<<<< HEAD
google.golang.org/genproto v0.0.0-20210821163610-241b8fcbd6c8/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71 h1:z+ErRPu0+KS02Td3fOAgdX+lnPDh/VyaABEJPD4JRQs=
google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
=======
google.golang.org/genproto v0.0.0-20210825212027-de86158e7fda h1:iT5uhT54PtbqUsWddv/nnEWdE5e/MTr+Nv3vjxlBP1A=
google.golang.org/genproto v0.0.0-20210825212027-de86158e7fda/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
>>>>>>> 874674ba0... exporter/prometheusremotewrite: glue up and use Write-Ahead-Log
google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
Expand Down
4 changes: 4 additions & 0 deletions exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ Example:
exporters:
prometheusremotewrite:
endpoint: "https://my-cortex:7900/api/v1/push"
wal: # Enabling the Write-Ahead-Log for the exporter.
directory: ./prom_rw # The directory to store the WAL in
buffer_size: 100 # Optional count of elements to be read from the WAL before truncating; default of 300
truncate_frequency: 45s # Optional frequency for how often the WAL should be truncated. It is a time.ParseDuration; default of 1m
```
## Advanced Configuration
Expand Down
1 change: 1 addition & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Config struct {
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"`
WAL *walConfig `mapstructure:"wal"`
}

// RemoteWriteQueue allows to configure the remote write queue.
Expand Down
92 changes: 78 additions & 14 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer/consumererror"
Expand All @@ -49,6 +50,8 @@ type PRWExporter struct {
concurrency int
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings

wal *prweWAL
}

// NewPRWExporter initializes a new PRWExporter instance and sets fields accordingly.
Expand All @@ -65,7 +68,7 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e

userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(buildInfo.Description), " ", "-"), buildInfo.Version)

return &PRWExporter{
prwe := &PRWExporter{
namespace: cfg.Namespace,
externalLabels: sanitizedLabels,
endpointURL: endpointURL,
Expand All @@ -74,19 +77,44 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e
userAgentHeader: userAgentHeader,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
}, nil
}
if cfg.WAL == nil {
return prwe, nil
}

prweWAL, err := newWAL(cfg.WAL, prwe.export)
if err != nil {
return nil, err
}
prwe.wal = prweWAL
return prwe, nil
}

// Start creates the prometheus client
func (prwe *PRWExporter) Start(_ context.Context, host component.Host) (err error) {
func (prwe *PRWExporter) Start(ctx context.Context, host component.Host) (err error) {
prwe.client, err = prwe.clientSettings.ToClient(host.GetExtensions())
return err
if err != nil {
return err
}
return prwe.turnOnWALIfEnabled(ctx)
}

func (prwe *PRWExporter) shutdownWALIfEnabled() error {
if !prwe.walEnabled() {
return nil
}
return prwe.wal.stop()
}

// Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
// to finish before returning
func (prwe *PRWExporter) Shutdown(context.Context) error {
close(prwe.closeChan)
select {
case <-prwe.closeChan:
default:
close(prwe.closeChan)
}
prwe.shutdownWALIfEnabled()
prwe.wg.Wait()
return nil
}
Expand Down Expand Up @@ -166,7 +194,7 @@ func (prwe *PRWExporter) PushMetrics(ctx context.Context, md pdata.Metrics) erro
}
}

if exportErrors := prwe.export(ctx, tsMap); len(exportErrors) != 0 {
if exportErrors := prwe.handleExport(ctx, tsMap); len(exportErrors) != 0 {
dropped = md.MetricCount()
errs = append(errs, exportErrors...)
}
Expand Down Expand Up @@ -208,16 +236,29 @@ func (prwe *PRWExporter) addNumberDataPointSlice(dataPoints pdata.NumberDataPoin
return nil
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error {
func (prwe *PRWExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) []error {
var errs []error
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, maxBatchByteSize)
if err != nil {
errs = append(errs, consumererror.Permanent(err))
return errs
}
if !prwe.walEnabled() {
// Perform a direct export otherwise.
return prwe.export(ctx, requests)
}

// Otherwise the WAL is enabled, and just persist the requests to the WAL
// and they'll be exported in another goroutine to the RemoteWrite endpoint.
if err := prwe.wal.persistToWAL(requests); err != nil {
errs = append(errs, consumererror.Permanent(err))
}
return errs
}

// export sends a Snappy-compressed WriteRequest containing TimeSeries to a remote write endpoint in order
func (prwe *PRWExporter) export(ctx context.Context, requests []*prompb.WriteRequest) (errs []error) {
input := make(chan *prompb.WriteRequest, len(requests))
for _, request := range requests {
input <- request
Expand All @@ -236,12 +277,21 @@ func (prwe *PRWExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
go func() {
defer wg.Done()

for request := range input {
err := prwe.execute(ctx, request)
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
for {
select {
case <-ctx.Done(): // Check firstly to ensure that the context wasn't cancelled.
return

case request, ok := <-input:
if !ok {
return
}

if err := prwe.execute(ctx, request); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}
}
}()
Expand Down Expand Up @@ -293,3 +343,17 @@ func (prwe *PRWExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ
}
return consumererror.Permanent(rerr)
}

func (prwe *PRWExporter) walEnabled() bool { return prwe.wal != nil }

func (prwe *PRWExporter) turnOnWALIfEnabled(ctx context.Context) error {
if !prwe.walEnabled() {
return nil
}
cancelCtx, cancel := context.WithCancel(ctx)
go func() {
<-prwe.closeChan
cancel()
}()
return prwe.wal.run(cancelCtx)
}
Loading

0 comments on commit 8b5d368

Please sign in to comment.