diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ebf2fcb081..16cff070c06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,11 +12,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Added `WithOSDescription` resource configuration option to set OS (Operating System) description resource attribute (`os.description`). (#1840) - Added `WithOS` resource configuration option to set all OS (Operating System) resource attributes at once. (#1840) +- Added the `WithRetry` option to the `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp` package. + This option is a replacement for the removed `WithMaxAttempts` and `WithBackoff` options. (#2095) - Added API `LinkFromContext` to return Link which encapsulates SpanContext from provided context and also encapsulates attributes. (#2115) ### Changed - The `SpanModels` function is now exported from the `go.opentelemetry.io/otel/exporters/zipkin` package to convert OpenTelemetry spans into Zipkin model spans. (#2027) +- Rename the `"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc".RetrySettings` to `RetryConfig`. (#2095) +- Rename the `"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp".RetrySettings` to `RetryConfig`. (#2095) ### Deprecated @@ -28,6 +32,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Removed the deprecated package `go.opentelemetry.io/otel/exporters/trace/zipkin`. (#2020) - Removed the `"go.opentelemetry.io/otel/sdk/resource".WithBuiltinDetectors` function. The explicit `With*` options for every built-in detector should be used instead. (#2026 #2097) +- Removed the `WithMaxAttempts` and `WithBackoff` options from the `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp` package. + The retry logic of the package has been updated to match the `otlptracegrpc` package and accordingly a `WithRetry` option is added that should be used instead. (#2095) - Removed metrics test package `go.opentelemetry.io/otel/sdk/export/metric/metrictest`. (#2105) ### Fixed diff --git a/exporters/otlp/otlptrace/internal/connection/connection.go b/exporters/otlp/otlptrace/internal/connection/connection.go index 0c6f89fa47e..b2b94731fd6 100644 --- a/exporters/otlp/otlptrace/internal/connection/connection.go +++ b/exporters/otlp/otlptrace/internal/connection/connection.go @@ -16,14 +16,12 @@ package connection import ( "context" - "fmt" "math/rand" "sync" "sync/atomic" "time" "unsafe" - "github.com/cenkalti/backoff/v4" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -31,6 +29,7 @@ import ( "google.golang.org/grpc/encoding/gzip" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -48,6 +47,7 @@ type Connection struct { // these fields are read-only after constructor is finished cfg otlpconfig.Config SCfg otlpconfig.SignalConfig + requestFunc retry.RequestFunc metadata metadata.MD newConnectionHandler func(cc *grpc.ClientConn) @@ -66,6 +66,7 @@ func NewConnection(cfg otlpconfig.Config, sCfg otlpconfig.SignalConfig, handler c := new(Connection) c.newConnectionHandler = handler c.cfg = cfg + c.requestFunc = cfg.RetryConfig.RequestFunc(evaluate) c.SCfg = sCfg if len(c.SCfg.Headers) > 0 { c.metadata = metadata.New(c.SCfg.Headers) @@ -287,88 +288,24 @@ func (c *Connection) ContextWithStop(ctx context.Context) (context.Context, cont } func (c *Connection) DoRequest(ctx context.Context, fn func(context.Context) error) error { - expBackoff := newExponentialBackoff(c.cfg.RetrySettings) - - for { + ctx, cancel := c.ContextWithStop(ctx) + defer cancel() + return c.requestFunc(ctx, func(ctx context.Context) error { err := fn(ctx) - if err == nil { - // request succeeded. - return nil - } - - if !c.cfg.RetrySettings.Enabled { - return err - } - - // We have an error, check gRPC status code. - st := status.Convert(err) - if st.Code() == codes.OK { - // Not really an error, still success. - return nil - } - - // Now, this is this a real error. - - if !shouldRetry(st.Code()) { - // It is not a retryable error, we should not retry. - return err - } - - // Need to retry. - - throttle := getThrottleDuration(st) - - backoffDelay := expBackoff.NextBackOff() - if backoffDelay == backoff.Stop { - // throw away the batch - err = fmt.Errorf("max elapsed time expired: %w", err) - return err - } - - var delay time.Duration - - if backoffDelay > throttle { - delay = backoffDelay - } else { - if expBackoff.GetElapsedTime()+throttle > expBackoff.MaxElapsedTime { - err = fmt.Errorf("max elapsed time expired when respecting server throttle: %w", err) - return err - } - - // Respect server throttling. - delay = throttle - } - - // back-off, but get interrupted when shutting down or request is cancelled or timed out. - err = func() error { - dt := time.NewTimer(delay) - defer dt.Stop() - - select { - case <-ctx.Done(): - return ctx.Err() - case <-c.stopCh: - return fmt.Errorf("interrupted due to shutdown: %w", err) - case <-dt.C: - } - + // nil is converted to OK. + if status.Code(err) == codes.OK { + // Success. return nil - }() - - if err != nil { - return err } - - } + return err + }) } -func shouldRetry(code codes.Code) bool { - switch code { - case codes.OK: - // Success. This function should not be called for this code, the best we - // can do is tell the caller not to retry. - return false - +// evaluate returns if err is retry-able and a duration to wait for if an +// explicit throttle time is included in err. +func evaluate(err error) (bool, time.Duration) { + s := status.Convert(err) + switch s.Code() { case codes.Canceled, codes.DeadlineExceeded, codes.ResourceExhausted, @@ -376,54 +313,20 @@ func shouldRetry(code codes.Code) bool { codes.OutOfRange, codes.Unavailable, codes.DataLoss: - // These are retryable errors. - return true - - case codes.Unknown, - codes.InvalidArgument, - codes.Unauthenticated, - codes.PermissionDenied, - codes.NotFound, - codes.AlreadyExists, - codes.FailedPrecondition, - codes.Unimplemented, - codes.Internal: - // These are fatal errors, don't retry. - return false - - default: - // Don't retry on unknown codes. - return false + return true, throttleDelay(s) } + + // Not a retry-able error. + return false, 0 } -func getThrottleDuration(status *status.Status) time.Duration { - // See if throttling information is available. +// throttleDelay returns a duration to wait for if an explicit throttle time +// is included in the response status. +func throttleDelay(status *status.Status) time.Duration { for _, detail := range status.Details() { if t, ok := detail.(*errdetails.RetryInfo); ok { - if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 { - // We are throttled. Wait before retrying as requested by the server. - return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond - } - return 0 + return t.RetryDelay.AsDuration() } } return 0 } - -func newExponentialBackoff(rs otlpconfig.RetrySettings) *backoff.ExponentialBackOff { - // Do not use NewExponentialBackOff since it calls Reset and the code here must - // call Reset after changing the InitialInterval (this saves an unnecessary call to Now). - expBackoff := &backoff.ExponentialBackOff{ - InitialInterval: rs.InitialInterval, - RandomizationFactor: backoff.DefaultRandomizationFactor, - Multiplier: backoff.DefaultMultiplier, - MaxInterval: rs.MaxInterval, - MaxElapsedTime: rs.MaxElapsedTime, - Stop: backoff.Stop, - Clock: backoff.SystemClock, - } - expBackoff.Reset() - - return expBackoff -} diff --git a/exporters/otlp/otlptrace/internal/connection/connection_test.go b/exporters/otlp/otlptrace/internal/connection/connection_test.go index 0b4ac2ff23b..3c2a18eb02b 100644 --- a/exporters/otlp/otlptrace/internal/connection/connection_test.go +++ b/exporters/otlp/otlptrace/internal/connection/connection_test.go @@ -15,76 +15,132 @@ package connection import ( + "context" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" + "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" ) -func TestGetThrottleDuration(t *testing.T) { - tts := []struct { - stsFn func() (*status.Status, error) - throttle time.Duration +func TestThrottleDuration(t *testing.T) { + c := codes.ResourceExhausted + testcases := []struct { + status *status.Status + expected time.Duration }{ { - stsFn: func() (*status.Status, error) { - return status.New( - codes.OK, - "status with no retry info", - ), nil - }, - throttle: 0, + status: status.New(c, "no retry info"), + expected: 0, }, { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with retry info") - return st.WithDetails( - &errdetails.RetryInfo{RetryDelay: durationpb.New(15 * time.Millisecond)}, + status: func() *status.Status { + s, err := status.New(c, "single retry info").WithDetails( + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(15 * time.Millisecond), + }, ) - }, - throttle: 15 * time.Millisecond, + require.NoError(t, err) + return s + }(), + expected: 15 * time.Millisecond, }, { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with error info detail") - return st.WithDetails( + status: func() *status.Status { + s, err := status.New(c, "error info").WithDetails( &errdetails.ErrorInfo{Reason: "no throttle detail"}, ) - }, - throttle: 0, + require.NoError(t, err) + return s + }(), + expected: 0, }, { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with error info and retry info") - return st.WithDetails( - &errdetails.ErrorInfo{Reason: "no throttle detail"}, - &errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)}, + status: func() *status.Status { + s, err := status.New(c, "error and retry info").WithDetails( + &errdetails.ErrorInfo{Reason: "with throttle detail"}, + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(13 * time.Minute), + }, ) - }, - throttle: 13 * time.Minute, + require.NoError(t, err) + return s + }(), + expected: 13 * time.Minute, }, { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with two retry info should take the first") - return st.WithDetails( - &errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)}, - &errdetails.RetryInfo{RetryDelay: durationpb.New(18 * time.Minute)}, + status: func() *status.Status { + s, err := status.New(c, "double retry info").WithDetails( + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(13 * time.Minute), + }, + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(15 * time.Minute), + }, ) - }, - throttle: 13 * time.Minute, + require.NoError(t, err) + return s + }(), + expected: 13 * time.Minute, }, } - for _, tt := range tts { - sts, _ := tt.stsFn() - t.Run(sts.Message(), func(t *testing.T) { - th := getThrottleDuration(sts) - require.Equal(t, tt.throttle, th) + for _, tc := range testcases { + t.Run(tc.status.Message(), func(t *testing.T) { + require.Equal(t, tc.expected, throttleDelay(tc.status)) }) } } + +func TestEvaluate(t *testing.T) { + retryable := map[codes.Code]bool{ + codes.OK: false, + codes.Canceled: true, + codes.Unknown: false, + codes.InvalidArgument: false, + codes.DeadlineExceeded: true, + codes.NotFound: false, + codes.AlreadyExists: false, + codes.PermissionDenied: false, + codes.ResourceExhausted: true, + codes.FailedPrecondition: false, + codes.Aborted: true, + codes.OutOfRange: true, + codes.Unimplemented: false, + codes.Internal: false, + codes.Unavailable: true, + codes.DataLoss: true, + codes.Unauthenticated: false, + } + + for c, want := range retryable { + got, _ := evaluate(status.Error(c, "")) + assert.Equalf(t, want, got, "evaluate(%s)", c) + } +} + +func TestDoRequest(t *testing.T) { + ev := func(error) (bool, time.Duration) { return false, 0 } + + c := new(Connection) + c.requestFunc = retry.Config{}.RequestFunc(ev) + c.stopCh = make(chan struct{}) + + ctx := context.Background() + assert.NoError(t, c.DoRequest(ctx, func(ctx context.Context) error { + return nil + })) + assert.NoError(t, c.DoRequest(ctx, func(ctx context.Context) error { + return status.Error(codes.OK, "") + })) + assert.ErrorIs(t, c.DoRequest(ctx, func(ctx context.Context) error { + return assert.AnError + }), assert.AnError) +} diff --git a/exporters/otlp/otlptrace/internal/otlpconfig/options.go b/exporters/otlp/otlptrace/internal/otlpconfig/options.go index ef6a22beec2..a97c880a422 100644 --- a/exporters/otlp/otlptrace/internal/otlpconfig/options.go +++ b/exporters/otlp/otlptrace/internal/otlpconfig/options.go @@ -21,34 +21,19 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" + + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" ) const ( - // DefaultMaxAttempts describes how many times the driver - // should retry the sending of the payload in case of a - // retryable error. - DefaultMaxAttempts int = 5 // DefaultTracesPath is a default URL path for endpoint that // receives spans. DefaultTracesPath string = "/v1/traces" - // DefaultBackoff is a default base backoff time used in the - // exponential backoff strategy. - DefaultBackoff time.Duration = 300 * time.Millisecond // DefaultTimeout is a default max waiting time for the backend to process // each span batch. DefaultTimeout time.Duration = 10 * time.Second ) -var ( - // defaultRetrySettings is a default settings for the retry policy. - defaultRetrySettings = RetrySettings{ - Enabled: true, - InitialInterval: 5 * time.Second, - MaxInterval: 30 * time.Second, - MaxElapsedTime: time.Minute, - } -) - type ( SignalConfig struct { Endpoint string @@ -67,15 +52,12 @@ type ( // Signal specific configurations Traces SignalConfig - // HTTP configurations - MaxAttempts int - Backoff time.Duration + RetryConfig retry.Config // gRPC configurations ReconnectionPeriod time.Duration ServiceConfig string DialOptions []grpc.DialOption - RetrySettings RetrySettings } ) @@ -87,9 +69,7 @@ func NewDefaultConfig() Config { Compression: NoCompression, Timeout: DefaultTimeout, }, - MaxAttempts: DefaultMaxAttempts, - Backoff: DefaultBackoff, - RetrySettings: defaultRetrySettings, + RetryConfig: retry.DefaultConfig, } return c @@ -219,9 +199,9 @@ func WithURLPath(urlPath string) GenericOption { }) } -func WithRetry(settings RetrySettings) GenericOption { +func WithRetry(rc retry.Config) GenericOption { return newGenericOption(func(cfg *Config) { - cfg.RetrySettings = settings + cfg.RetryConfig = rc }) } @@ -256,15 +236,3 @@ func WithTimeout(duration time.Duration) GenericOption { cfg.Traces.Timeout = duration }) } - -func WithMaxAttempts(maxAttempts int) GenericOption { - return newGenericOption(func(cfg *Config) { - cfg.MaxAttempts = maxAttempts - }) -} - -func WithBackoff(duration time.Duration) GenericOption { - return newGenericOption(func(cfg *Config) { - cfg.Backoff = duration - }) -} diff --git a/exporters/otlp/otlptrace/internal/otlpconfig/optiontypes.go b/exporters/otlp/otlptrace/internal/otlpconfig/optiontypes.go index baec669cf80..f69e31095d2 100644 --- a/exporters/otlp/otlptrace/internal/otlpconfig/optiontypes.go +++ b/exporters/otlp/otlptrace/internal/otlpconfig/optiontypes.go @@ -14,8 +14,6 @@ package otlpconfig // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" -import "time" - const ( // DefaultCollectorPort is the port the Exporter will attempt connect to // if no collector port is provided. @@ -47,18 +45,3 @@ const ( // MarshalJSON tells the driver to send using json format. MarshalJSON ) - -// RetrySettings defines configuration for retrying batches in case of export failure -// using an exponential backoff. -type RetrySettings struct { - // Enabled indicates whether to not retry sending batches in case of export failure. - Enabled bool - // InitialInterval the time to wait after the first failure before retrying. - InitialInterval time.Duration - // MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between - // consecutive retries will always be `MaxInterval`. - MaxInterval time.Duration - // MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch. - // Once this value is reached, the data is discarded. - MaxElapsedTime time.Duration -} diff --git a/exporters/otlp/otlptrace/internal/retry/retry.go b/exporters/otlp/otlptrace/internal/retry/retry.go new file mode 100644 index 00000000000..25bef0ba5b6 --- /dev/null +++ b/exporters/otlp/otlptrace/internal/retry/retry.go @@ -0,0 +1,130 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" +) + +// DefaultConfig are the recommended defaults to use. +var DefaultConfig = Config{ + Enabled: true, + InitialInterval: 5 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: time.Minute, +} + +// Config defines configuration for retrying batches in case of export failure +// using an exponential backoff. +type Config struct { + // Enabled indicates whether to not retry sending batches in case of + // export failure. + Enabled bool + // InitialInterval the time to wait after the first failure before + // retrying. + InitialInterval time.Duration + // MaxInterval is the upper bound on backoff interval. Once this value is + // reached the delay between consecutive retries will always be + // `MaxInterval`. + MaxInterval time.Duration + // MaxElapsedTime is the maximum amount of time (including retries) spent + // trying to send a request/batch. Once this value is reached, the data + // is discarded. + MaxElapsedTime time.Duration +} + +// RequestFunc wraps a request with retry logic. +type RequestFunc func(context.Context, func(context.Context) error) error + +// EvaluateFunc returns if an error is retry-able and if an explicit throttle +// duration should be honored that was included in the error. +type EvaluateFunc func(error) (bool, time.Duration) + +func (c Config) RequestFunc(evaluate EvaluateFunc) RequestFunc { + if !c.Enabled { + return func(ctx context.Context, fn func(context.Context) error) error { + return fn(ctx) + } + } + + // Do not use NewExponentialBackOff since it calls Reset and the code here + // must call Reset after changing the InitialInterval (this saves an + // unnecessary call to Now). + b := &backoff.ExponentialBackOff{ + InitialInterval: c.InitialInterval, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: c.MaxInterval, + MaxElapsedTime: c.MaxElapsedTime, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + b.Reset() + + return func(ctx context.Context, fn func(context.Context) error) error { + for { + err := fn(ctx) + if err == nil { + return nil + } + + retryable, throttle := evaluate(err) + if !retryable { + return err + } + + bOff := b.NextBackOff() + if bOff == backoff.Stop { + return fmt.Errorf("max retry time elapsed: %w", err) + } + + // Wait for the greater of the backoff or throttle delay. + var delay time.Duration + if bOff > throttle { + delay = bOff + } else { + elapsed := b.GetElapsedTime() + if b.MaxElapsedTime != 0 && elapsed+throttle > b.MaxElapsedTime { + return fmt.Errorf("max retry time would elapse: %w", err) + } + delay = throttle + } + + if err := waitFunc(ctx, delay); err != nil { + return err + } + } + } +} + +// Allow override for testing. +var waitFunc = wait + +func wait(ctx context.Context, delay time.Duration) error { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + } + + return nil +} diff --git a/exporters/otlp/otlptrace/internal/retry/retry_test.go b/exporters/otlp/otlptrace/internal/retry/retry_test.go new file mode 100644 index 00000000000..67242df0070 --- /dev/null +++ b/exporters/otlp/otlptrace/internal/retry/retry_test.go @@ -0,0 +1,195 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestWait(t *testing.T) { + tests := []struct { + ctx context.Context + delay time.Duration + expected error + }{ + { + ctx: context.Background(), + delay: time.Duration(0), + expected: nil, + }, + { + ctx: context.Background(), + delay: time.Duration(1), + expected: nil, + }, + { + ctx: context.Background(), + delay: time.Duration(-1), + expected: nil, + }, + { + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }(), + expected: context.Canceled, + }, + } + + for _, test := range tests { + assert.Equal(t, test.expected, wait(test.ctx, test.delay)) + } +} + +func TestNonRetryableError(t *testing.T) { + ev := func(error) (bool, time.Duration) { return false, 0 } + + reqFunc := Config{ + Enabled: true, + InitialInterval: 1 * time.Nanosecond, + MaxInterval: 1 * time.Nanosecond, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + ctx := context.Background() + assert.NoError(t, reqFunc(ctx, func(context.Context) error { + return nil + })) + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }), assert.AnError) +} + +func TestThrottledRetry(t *testing.T) { + // Ensure the throttle delay is used by making longer than backoff delay. + throttleDelay, backoffDelay := time.Second, time.Nanosecond + + ev := func(error) (bool, time.Duration) { + // Retry everything with a throttle delay. + return true, throttleDelay + } + + reqFunc := Config{ + Enabled: true, + InitialInterval: backoffDelay, + MaxInterval: backoffDelay, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + + origWait := waitFunc + var done bool + waitFunc = func(_ context.Context, delay time.Duration) error { + assert.Equal(t, throttleDelay, delay, "retry not throttled") + // Try twice to ensure call is attempted again after delay. + if done { + return assert.AnError + } + done = true + return nil + } + defer func() { waitFunc = origWait }() + + ctx := context.Background() + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return errors.New("not this error") + }), assert.AnError) +} + +func TestBackoffRetry(t *testing.T) { + ev := func(error) (bool, time.Duration) { return true, 0 } + + delay := time.Nanosecond + reqFunc := Config{ + Enabled: true, + InitialInterval: delay, + MaxInterval: delay, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + + origWait := waitFunc + var done bool + waitFunc = func(_ context.Context, d time.Duration) error { + assert.Equal(t, delay, d, "retry not backoffed") + // Try twice to ensure call is attempted again after delay. + if done { + return assert.AnError + } + done = true + return nil + } + defer func() { waitFunc = origWait }() + + ctx := context.Background() + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return errors.New("not this error") + }), assert.AnError) +} + +func TestThrottledRetryGreaterThanMaxElapsedTime(t *testing.T) { + // Ensure the throttle delay is used by making longer than backoff delay. + tDelay, bDelay := time.Hour, time.Nanosecond + ev := func(error) (bool, time.Duration) { return true, tDelay } + reqFunc := Config{ + Enabled: true, + InitialInterval: bDelay, + MaxInterval: bDelay, + MaxElapsedTime: tDelay - (time.Nanosecond), + }.RequestFunc(ev) + + ctx := context.Background() + assert.Contains(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }).Error(), "max retry time would elapse: ") +} + +func TestMaxElapsedTime(t *testing.T) { + ev := func(error) (bool, time.Duration) { return true, 0 } + delay := time.Nanosecond + reqFunc := Config{ + Enabled: true, + // InitialInterval > MaxElapsedTime means immediate return. + InitialInterval: 2 * delay, + MaxElapsedTime: delay, + }.RequestFunc(ev) + + ctx := context.Background() + assert.Contains(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }).Error(), "max retry time elapsed: ") +} + +func TestRetryNotEnabled(t *testing.T) { + ev := func(error) (bool, time.Duration) { + t.Error("evaluated retry when not enabled") + return false, 0 + } + + reqFunc := Config{}.RequestFunc(ev) + ctx := context.Background() + assert.NoError(t, reqFunc(ctx, func(context.Context) error { + return nil + })) + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }), assert.AnError) +} diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go index 3c53c1637a9..c9ebdf40683 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go @@ -25,10 +25,8 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlptracetest" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -171,7 +169,7 @@ func TestNew_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) { reconnectionPeriod := 20 * time.Millisecond ctx := context.Background() exp := newGRPCExporter(t, ctx, mc.endpoint, - otlptracegrpc.WithRetry(otlptracegrpc.RetrySettings{Enabled: false}), + otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}), otlptracegrpc.WithReconnectionPeriod(reconnectionPeriod)) defer func() { require.NoError(t, exp.Shutdown(ctx)) }() @@ -222,280 +220,13 @@ func TestNew_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) { require.NoError(t, nmc.Stop()) } -func TestExporterExportFailureAndRecoveryModes(t *testing.T) { - tts := []struct { - name string - errors []error - rs otlptracegrpc.RetrySettings - fn func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) - opts []otlptracegrpc.Option - }{ - { - name: "Do not retry if succeeded", - fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) { - require.NoError(t, exp.ExportSpans(ctx, roSpans)) - - span := mc.getSpans() - - require.Len(t, span, 1) - require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 success request.") - }, - }, - { - name: "Do not retry if 'error' is ok", - errors: []error{ - status.Error(codes.OK, ""), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) { - require.NoError(t, exp.ExportSpans(ctx, roSpans)) - - span := mc.getSpans() - - require.Len(t, span, 0) - require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 error OK request.") - }, - }, - { - name: "Fail three times and succeed", - rs: otlptracegrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: 300 * time.Millisecond, - InitialInterval: 2 * time.Millisecond, - MaxInterval: 10 * time.Millisecond, - }, - errors: []error{ - status.Error(codes.Unavailable, "backend under pressure"), - status.Error(codes.Unavailable, "backend under pressure"), - status.Error(codes.Unavailable, "backend under pressure"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) { - require.NoError(t, exp.ExportSpans(ctx, roSpans)) - - span := mc.getSpans() - - require.Len(t, span, 1) - require.Equal(t, 4, mc.traceSvc.requests, "trace service must receive 3 failure requests and 1 success request.") - }, - }, - { - name: "Permanent error should not be retried", - rs: otlptracegrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: 300 * time.Millisecond, - InitialInterval: 2 * time.Millisecond, - MaxInterval: 10 * time.Millisecond, - }, - errors: []error{ - status.Error(codes.InvalidArgument, "invalid arguments"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) { - require.Error(t, exp.ExportSpans(ctx, roSpans)) - - span := mc.getSpans() - - require.Len(t, span, 0) - require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 error requests.") - }, - }, - { - name: "Test all transient errors and succeed", - rs: otlptracegrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: 500 * time.Millisecond, - InitialInterval: 1 * time.Millisecond, - MaxInterval: 2 * time.Millisecond, - }, - errors: []error{ - status.Error(codes.Canceled, ""), - status.Error(codes.DeadlineExceeded, ""), - status.Error(codes.ResourceExhausted, ""), - status.Error(codes.Aborted, ""), - status.Error(codes.OutOfRange, ""), - status.Error(codes.Unavailable, ""), - status.Error(codes.DataLoss, ""), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) { - require.NoError(t, exp.ExportSpans(ctx, roSpans)) - - span := mc.getSpans() - - require.Len(t, span, 1) - require.Equal(t, 8, mc.traceSvc.requests, "trace service must receive 9 failure requests and 1 success request.") - }, - }, - { - name: "Retry should honor server throttling", - rs: otlptracegrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: time.Minute, - InitialInterval: time.Nanosecond, - MaxInterval: time.Nanosecond, - }, - opts: []otlptracegrpc.Option{ - otlptracegrpc.WithTimeout(time.Millisecond * 100), - }, - errors: []error{ - newThrottlingError(codes.ResourceExhausted, time.Second*30), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) { - err := exp.ExportSpans(ctx, roSpans) - require.Error(t, err) - require.Equal(t, "context deadline exceeded", err.Error()) - - span := mc.getSpans() - - require.Len(t, span, 0) - require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests and 1 success request.") - }, - }, - { - name: "Retry should fail if server throttling is higher than the MaxElapsedTime", - rs: otlptracegrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: time.Millisecond * 100, - InitialInterval: time.Nanosecond, - MaxInterval: time.Nanosecond, - }, - errors: []error{ - newThrottlingError(codes.ResourceExhausted, time.Minute), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) { - err := exp.ExportSpans(ctx, roSpans) - require.Error(t, err) - require.Equal(t, "max elapsed time expired when respecting server throttle: rpc error: code = ResourceExhausted desc = ", err.Error()) - - span := mc.getSpans() - - require.Len(t, span, 0) - require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests and 1 success request.") - }, - }, - { - name: "Retry stops if takes too long", - rs: otlptracegrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: time.Millisecond * 100, - InitialInterval: time.Millisecond * 50, - MaxInterval: time.Millisecond * 50, - }, - errors: []error{ - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) { - err := exp.ExportSpans(ctx, roSpans) - require.Error(t, err) - - require.Equal(t, "max elapsed time expired: rpc error: code = Unavailable desc = unavailable", err.Error()) - - span := mc.getSpans() - - require.Len(t, span, 0) - require.LessOrEqual(t, 1, mc.traceSvc.requests, "trace service must receive at least 1 failure requests.") - }, - }, - { - name: "Disabled retry", - rs: otlptracegrpc.RetrySettings{ - Enabled: false, - }, - errors: []error{ - status.Error(codes.Unavailable, "unavailable"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) { - err := exp.ExportSpans(ctx, roSpans) - require.Error(t, err) - - require.Equal(t, "rpc error: code = Unavailable desc = unavailable", err.Error()) - - span := mc.getSpans() - - require.Len(t, span, 0) - require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests.") - }, - }, - } - - for _, tt := range tts { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - - mc := runMockCollectorWithConfig(t, &mockConfig{ - errors: tt.errors, - }) - - opts := []otlptracegrpc.Option{ - otlptracegrpc.WithRetry(tt.rs), - } - - if len(tt.opts) != 0 { - opts = append(opts, tt.opts...) - } - - exp := newGRPCExporter(t, ctx, mc.endpoint, opts...) - - tt.fn(t, ctx, exp, mc) - - require.NoError(t, mc.Stop()) - require.NoError(t, exp.Shutdown(ctx)) - }) - } - -} - -func TestPermanentErrorsShouldNotBeRetried(t *testing.T) { - permanentErrors := []*status.Status{ - status.New(codes.Unknown, "Unknown"), - status.New(codes.InvalidArgument, "InvalidArgument"), - status.New(codes.NotFound, "NotFound"), - status.New(codes.AlreadyExists, "AlreadyExists"), - status.New(codes.FailedPrecondition, "FailedPrecondition"), - status.New(codes.Unimplemented, "Unimplemented"), - status.New(codes.Internal, "Internal"), - status.New(codes.PermissionDenied, ""), - status.New(codes.Unauthenticated, ""), - } - - for _, sts := range permanentErrors { - t.Run(sts.Code().String(), func(t *testing.T) { - ctx := context.Background() - - mc := runMockCollectorWithConfig(t, &mockConfig{ - errors: []error{sts.Err()}, - }) - - exp := newGRPCExporter(t, ctx, mc.endpoint) - - err := exp.ExportSpans(ctx, roSpans) - require.Error(t, err) - require.Len(t, mc.getSpans(), 0) - require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 permanent error requests.") - - require.NoError(t, mc.Stop()) - require.NoError(t, exp.Shutdown(ctx)) - }) - } -} - -func newThrottlingError(code codes.Code, duration time.Duration) error { - s := status.New(code, "") - - s, _ = s.WithDetails(&errdetails.RetryInfo{RetryDelay: durationpb.New(duration)}) - - return s.Err() -} - func TestNew_collectorConnectionDiesThenReconnects(t *testing.T) { mc := runMockCollector(t) reconnectionPeriod := 50 * time.Millisecond ctx := context.Background() exp := newGRPCExporter(t, ctx, mc.endpoint, - otlptracegrpc.WithRetry(otlptracegrpc.RetrySettings{Enabled: false}), + otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}), otlptracegrpc.WithReconnectionPeriod(reconnectionPeriod)) defer func() { require.NoError(t, exp.Shutdown(ctx)) }() @@ -637,7 +368,7 @@ func TestNew_WithTimeout(t *testing.T) { }() ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, otlptracegrpc.WithTimeout(tt.timeout), otlptracegrpc.WithRetry(otlptracegrpc.RetrySettings{Enabled: false})) + exp := newGRPCExporter(t, ctx, mc.endpoint, otlptracegrpc.WithTimeout(tt.timeout), otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false})) defer func() { _ = exp.Shutdown(ctx) }() diff --git a/exporters/otlp/otlptrace/otlptracegrpc/go.mod b/exporters/otlp/otlptrace/otlptracegrpc/go.mod index 3d0bcfa61e3..2cc7eafa9ae 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/go.mod +++ b/exporters/otlp/otlptrace/otlptracegrpc/go.mod @@ -8,9 +8,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.0.0-RC1 go.opentelemetry.io/otel/sdk v1.0.0-RC1 go.opentelemetry.io/proto/otlp v0.9.0 - google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.39.0 - google.golang.org/protobuf v1.27.1 ) replace go.opentelemetry.io/otel => ../../../.. diff --git a/exporters/otlp/otlptrace/otlptracegrpc/options.go b/exporters/otlp/otlptrace/otlptracegrpc/options.go index 2a483f7d866..d2cf76461cc 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/options.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/options.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -30,9 +31,9 @@ type Option interface { applyGRPCOption(*otlpconfig.Config) } -// RetrySettings defines configuration for retrying batches in case of export failure -// using an exponential backoff. -type RetrySettings otlpconfig.RetrySettings +// RetryConfig defines configuration for retrying batches in case of export +// failure using an exponential backoff. +type RetryConfig retry.Config type wrappedOption struct { otlpconfig.GRPCOption @@ -121,12 +122,11 @@ func WithTimeout(duration time.Duration) Option { return wrappedOption{otlpconfig.WithTimeout(duration)} } -// WithRetry configures the retry policy for transient errors that may occurs when -// exporting traces. An exponential back-off algorithm is used to -// ensure endpoints are not overwhelmed with retries. If unset, the default -// ensure endpoints are not overwhelmed with retries. If unset, the default -// retry policy will retry after 5 seconds and increase exponentially after each +// WithRetry configures the retry policy for transient errors that may occurs +// when exporting traces. An exponential back-off algorithm is used to ensure +// endpoints are not overwhelmed with retries. If unset, the default retry +// policy will retry after 5 seconds and increase exponentially after each // error for a total of 1 minute. -func WithRetry(settings RetrySettings) Option { - return wrappedOption{otlpconfig.WithRetry(otlpconfig.RetrySettings(settings))} +func WithRetry(settings RetryConfig) Option { + return wrappedOption{otlpconfig.WithRetry(retry.Config(settings))} } diff --git a/exporters/otlp/otlptrace/otlptracehttp/client.go b/exporters/otlp/otlptrace/otlptracehttp/client.go index ca0fa18bb97..4aa8782b38a 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client.go @@ -21,26 +21,34 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "net" "net/http" "path" + "strconv" "strings" + "sync" "time" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" "google.golang.org/protobuf/proto" - "go.opentelemetry.io/otel" coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" ) const contentTypeProto = "application/x-protobuf" +var gzPool = sync.Pool{ + New: func() interface{} { + w := gzip.NewWriter(ioutil.Discard) + return w + }, +} + // Keep it in sync with golang's DefaultTransport from net/http! We // have our own copy to avoid handling a situation where the // DefaultTransport is overwritten with some different implementation @@ -59,11 +67,12 @@ var ourTransport = &http.Transport{ } type client struct { - name string - cfg otlpconfig.SignalConfig - generalCfg otlpconfig.Config - client *http.Client - stopCh chan struct{} + name string + cfg otlpconfig.SignalConfig + generalCfg otlpconfig.Config + requestFunc retry.RequestFunc + client *http.Client + stopCh chan struct{} } var _ otlptrace.Client = (*client)(nil) @@ -77,7 +86,7 @@ func NewClient(opts ...Option) otlptrace.Client { } for pathPtr, defaultPath := range map[*string]string{ - &cfg.Traces.URLPath: defaultTracesPath, + &cfg.Traces.URLPath: otlpconfig.DefaultTracesPath, } { tmp := strings.TrimSpace(*pathPtr) if tmp == "" { @@ -90,15 +99,6 @@ func NewClient(opts ...Option) otlptrace.Client { } *pathPtr = tmp } - if cfg.MaxAttempts <= 0 { - cfg.MaxAttempts = defaultMaxAttempts - } - if cfg.MaxAttempts > defaultMaxAttempts { - cfg.MaxAttempts = defaultMaxAttempts - } - if cfg.Backoff <= 0 { - cfg.Backoff = defaultBackoff - } httpClient := &http.Client{ Transport: ourTransport, @@ -112,11 +112,12 @@ func NewClient(opts ...Option) otlptrace.Client { stopCh := make(chan struct{}) return &client{ - name: "traces", - cfg: cfg.Traces, - generalCfg: cfg, - stopCh: stopCh, - client: httpClient, + name: "traces", + cfg: cfg.Traces, + generalCfg: cfg, + requestFunc: cfg.RetryConfig.RequestFunc(evaluate), + stopCh: stopCh, + client: httpClient, } } @@ -151,41 +152,150 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc if err != nil { return err } - return d.send(ctx, rawRequest) -} -func (d *client) send(ctx context.Context, rawRequest []byte) error { - address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.Endpoint, d.cfg.URLPath) - var cancel context.CancelFunc - ctx, cancel = d.contextWithStop(ctx) + ctx, cancel := d.contextWithStop(ctx) defer cancel() - for i := 0; i < d.generalCfg.MaxAttempts; i++ { - response, err := d.singleSend(ctx, rawRequest, address) + + request, err := d.newRequest(rawRequest) + if err != nil { + return err + } + + return d.requestFunc(ctx, func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + request.reset(ctx) + resp, err := d.client.Do(request.Request) if err != nil { return err } - // We don't care about the body, so try to read it - // into /dev/null and close it immediately. The - // reading part is to facilitate connection reuse. - _, _ = io.Copy(ioutil.Discard, response.Body) - _ = response.Body.Close() - switch response.StatusCode { + + var rErr error + switch resp.StatusCode { case http.StatusOK: - return nil - case http.StatusTooManyRequests: - fallthrough - case http.StatusServiceUnavailable: - select { - case <-time.After(getWaitDuration(d.generalCfg.Backoff, i)): - continue - case <-ctx.Done(): - return ctx.Err() + // Success, do not retry. + case http.StatusTooManyRequests, + http.StatusServiceUnavailable: + // Retry-able failure. + rErr = newResponseError(resp.Header) + + // Going to retry, drain the body to reuse the connection. + if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil { + _ = resp.Body.Close() + return err } default: - return fmt.Errorf("failed to send %s to %s with HTTP status %s", d.name, address, response.Status) + rErr = fmt.Errorf("failed to send %s to %s: %s", d.name, request.URL, resp.Status) + } + + if err := resp.Body.Close(); err != nil { + return err + } + return rErr + }) +} + +func (d *client) newRequest(body []byte) (request, error) { + address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.Endpoint, d.cfg.URLPath) + r, err := http.NewRequest(http.MethodPost, address, nil) + if err != nil { + return request{Request: r}, err + } + + for k, v := range d.cfg.Headers { + r.Header.Set(k, v) + } + r.Header.Set("Content-Type", contentTypeProto) + + req := request{Request: r} + switch Compression(d.cfg.Compression) { + case NoCompression: + r.ContentLength = (int64)(len(body)) + req.bodyReader = bodyReader(body) + case GzipCompression: + // Ensure the content length is not used. + r.ContentLength = -1 + r.Header.Set("Content-Encoding", "gzip") + + gz := gzPool.Get().(*gzip.Writer) + defer gzPool.Put(gz) + + var b bytes.Buffer + gz.Reset(&b) + + if _, err := gz.Write(body); err != nil { + return req, err + } + // Close needs to be called to ensure body if fully written. + if err := gz.Close(); err != nil { + return req, err + } + + req.bodyReader = bodyReader(b.Bytes()) + } + + return req, nil +} + +// bodyReader returns a closure returning a new reader for buf. +func bodyReader(buf []byte) func() io.ReadCloser { + return func() io.ReadCloser { + return ioutil.NopCloser(bytes.NewReader(buf)) + } +} + +// request wraps an http.Request with a resettable body reader. +type request struct { + *http.Request + + // bodyReader allows the same body to be used for multiple requests. + bodyReader func() io.ReadCloser +} + +// reset reinitializes the request Body and uses ctx for the request. +func (r *request) reset(ctx context.Context) { + r.Body = r.bodyReader() + r.Request = r.Request.WithContext(ctx) +} + +// retryableError represents a request failure that can be retried. +type retryableError struct { + throttle int64 +} + +// newResponseError returns a retryableError and will extract any explicit +// throttle delay contained in headers. +func newResponseError(header http.Header) error { + var rErr retryableError + if s, ok := header["Retry-After"]; ok { + if t, err := strconv.ParseInt(s[0], 10, 64); err == nil { + rErr.throttle = t } } - return fmt.Errorf("failed to send data to %s after %d tries", address, d.generalCfg.MaxAttempts) + return rErr +} + +func (e retryableError) Error() string { + return "retry-able request failure" +} + +// evaluate returns if err is retry-able. If it is and it includes an explicit +// throttling delay, that delay is also returned. +func evaluate(err error) (bool, time.Duration) { + if err == nil { + return false, 0 + } + + rErr, ok := err.(retryableError) + if !ok { + return false, 0 + } + + return true, time.Duration(rErr.throttle) } func (d *client) getScheme() string { @@ -195,26 +305,6 @@ func (d *client) getScheme() string { return "https" } -func getWaitDuration(backoff time.Duration, i int) time.Duration { - // Strategy: after nth failed attempt, attempt resending after - // k * initialBackoff + jitter, where k is a random number in - // range [0, 2^n-1), and jitter is a random percentage of - // initialBackoff from [-5%, 5%). - // - // Based on - // https://en.wikipedia.org/wiki/Exponential_backoff#Example_exponential_backoff_algorithm - // - // Jitter is our addition. - - // There won't be an overflow, since i is capped to - // defaultMaxAttempts (5). - upperK := (int64)(1) << (i + 1) - jitterPercent := (rand.Float64() - 0.5) / 10. - jitter := jitterPercent * (float64)(backoff) - k := rand.Int63n(upperK) - return (time.Duration)(k)*backoff + (time.Duration)(jitter) -} - func (d *client) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { // Unify the parent context Done signal with the client's stop // channel. @@ -230,51 +320,3 @@ func (d *client) contextWithStop(ctx context.Context) (context.Context, context. }(ctx, cancel) return ctx, cancel } - -func (d *client) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) { - request, err := http.NewRequestWithContext(ctx, http.MethodPost, address, nil) - if err != nil { - return nil, err - } - bodyReader, contentLength, headers := d.prepareBody(rawRequest) - // Not closing bodyReader through defer, the HTTP Client's - // Transport will do it for us - request.Body = bodyReader - request.ContentLength = contentLength - for key, values := range headers { - for _, value := range values { - request.Header.Add(key, value) - } - } - return d.client.Do(request) -} - -func (d *client) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) { - var bodyReader io.ReadCloser - headers := http.Header{} - for k, v := range d.cfg.Headers { - headers.Set(k, v) - } - contentLength := (int64)(len(rawRequest)) - headers.Set("Content-Type", contentTypeProto) - requestReader := bytes.NewBuffer(rawRequest) - switch Compression(d.cfg.Compression) { - case NoCompression: - bodyReader = ioutil.NopCloser(requestReader) - case GzipCompression: - preader, pwriter := io.Pipe() - go func() { - defer pwriter.Close() - gzipper := gzip.NewWriter(pwriter) - defer gzipper.Close() - _, err := io.Copy(gzipper, requestReader) - if err != nil { - otel.Handle(fmt.Errorf("otlphttp: failed to gzip request: %v", err)) - } - }() - headers.Set("Content-Encoding", "gzip") - bodyReader = preader - contentLength = -1 - } - return bodyReader, contentLength, headers -} diff --git a/exporters/otlp/otlptrace/otlptracehttp/client_test.go b/exporters/otlp/otlptrace/otlptracehttp/client_test.go index 2b4ae640691..e3f7850e2f9 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client_test.go @@ -60,6 +60,55 @@ func TestEndToEnd(t *testing.T) { otlptracehttp.WithCompression(otlptracehttp.GzipCompression), }, }, + { + name: "retry", + opts: []otlptracehttp.Option{ + otlptracehttp.WithRetry(otlptracehttp.RetryConfig{ + Enabled: true, + InitialInterval: time.Nanosecond, + MaxInterval: time.Nanosecond, + // Do not stop trying. + MaxElapsedTime: 0, + }), + }, + mcCfg: mockCollectorConfig{ + InjectHTTPStatus: []int{503, 429}, + }, + }, + { + name: "retry with gzip compression", + opts: []otlptracehttp.Option{ + otlptracehttp.WithCompression(otlptracehttp.GzipCompression), + otlptracehttp.WithRetry(otlptracehttp.RetryConfig{ + Enabled: true, + InitialInterval: time.Nanosecond, + MaxInterval: time.Nanosecond, + // Do not stop trying. + MaxElapsedTime: 0, + }), + }, + mcCfg: mockCollectorConfig{ + InjectHTTPStatus: []int{503, 503}, + }, + }, + { + name: "retry with throttle", + opts: []otlptracehttp.Option{ + otlptracehttp.WithRetry(otlptracehttp.RetryConfig{ + Enabled: true, + InitialInterval: time.Nanosecond, + MaxInterval: time.Nanosecond, + // Do not stop trying. + MaxElapsedTime: 0, + }), + }, + mcCfg: mockCollectorConfig{ + InjectHTTPStatus: []int{503}, + InjectResponseHeader: []map[string]string{ + {"Retry-After": "10"}, + }, + }, + }, { name: "with empty paths (forced to defaults)", opts: []otlptracehttp.Option{ @@ -138,32 +187,6 @@ func TestExporterShutdown(t *testing.T) { }) } -func TestRetry(t *testing.T) { - statuses := []int{ - http.StatusTooManyRequests, - http.StatusServiceUnavailable, - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - client := otlptracehttp.NewClient( - otlptracehttp.WithEndpoint(mc.Endpoint()), - otlptracehttp.WithInsecure(), - otlptracehttp.WithMaxAttempts(len(statuses)+1), - ) - ctx := context.Background() - exporter, err := otlptrace.New(ctx, client) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(ctx)) - }() - err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan()) - assert.NoError(t, err) - assert.Len(t, mc.GetSpans(), 1) -} - func TestTimeout(t *testing.T) { mcCfg := mockCollectorConfig{ InjectDelay: 100 * time.Millisecond, @@ -185,45 +208,21 @@ func TestTimeout(t *testing.T) { assert.Equal(t, true, os.IsTimeout(err)) } -func TestRetryFailed(t *testing.T) { - statuses := []int{ - http.StatusTooManyRequests, - http.StatusServiceUnavailable, - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - driver := otlptracehttp.NewClient( - otlptracehttp.WithEndpoint(mc.Endpoint()), - otlptracehttp.WithInsecure(), - otlptracehttp.WithMaxAttempts(1), - ) - ctx := context.Background() - exporter, err := otlptrace.New(ctx, driver) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(ctx)) - }() - err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan()) - assert.Error(t, err) - assert.Empty(t, mc.GetSpans()) -} - func TestNoRetry(t *testing.T) { - statuses := []int{ - http.StatusBadRequest, - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) + mc := runMockCollector(t, mockCollectorConfig{ + InjectHTTPStatus: []int{http.StatusBadRequest}, + }) defer mc.MustStop(t) driver := otlptracehttp.NewClient( otlptracehttp.WithEndpoint(mc.Endpoint()), otlptracehttp.WithInsecure(), - otlptracehttp.WithMaxAttempts(len(statuses)+1), + otlptracehttp.WithRetry(otlptracehttp.RetryConfig{ + Enabled: true, + InitialInterval: 1 * time.Nanosecond, + MaxInterval: 1 * time.Nanosecond, + // Never stop retry of retry-able status. + MaxElapsedTime: 0, + }), ) ctx := context.Background() exporter, err := otlptrace.New(ctx, driver) @@ -233,7 +232,7 @@ func TestNoRetry(t *testing.T) { }() err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan()) assert.Error(t, err) - assert.Equal(t, fmt.Sprintf("failed to send traces to http://%s/v1/traces with HTTP status 400 Bad Request", mc.endpoint), err.Error()) + assert.Equal(t, fmt.Sprintf("failed to send traces to http://%s/v1/traces: 400 Bad Request", mc.endpoint), err.Error()) assert.Empty(t, mc.GetSpans()) } @@ -257,88 +256,6 @@ func TestEmptyData(t *testing.T) { assert.Empty(t, mc.GetSpans()) } -func TestUnreasonableMaxAttempts(t *testing.T) { - // Max attempts is 5, we set collector to fail 7 times and try - // to configure max attempts to be either negative or too - // large. Since we set max attempts to 5 in such cases, - // exporting to the collector should fail. - type testcase struct { - name string - maxAttempts int - } - for _, tc := range []testcase{ - { - name: "negative max attempts", - maxAttempts: -3, - }, - { - name: "too large max attempts", - maxAttempts: 10, - }, - } { - t.Run(tc.name, func(t *testing.T) { - statuses := make([]int, 0, 7) - for i := 0; i < cap(statuses); i++ { - statuses = append(statuses, http.StatusTooManyRequests) - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - driver := otlptracehttp.NewClient( - otlptracehttp.WithEndpoint(mc.Endpoint()), - otlptracehttp.WithInsecure(), - otlptracehttp.WithMaxAttempts(tc.maxAttempts), - otlptracehttp.WithBackoff(time.Millisecond), - ) - ctx := context.Background() - exporter, err := otlptrace.New(ctx, driver) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(ctx)) - }() - err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan()) - assert.Error(t, err) - assert.Empty(t, mc.GetSpans()) - }) - } -} - -func TestUnreasonableBackoff(t *testing.T) { - // This sets backoff to negative value, which gets corrected - // to default backoff instead of being used. Default max - // attempts is 5, so we set the collector to fail 4 times, but - // we set the deadline to 3 times of the default backoff, so - // this should show that deadline is not met, meaning that the - // retries weren't immediate (as negative backoff could - // imply). - statuses := make([]int, 0, 4) - for i := 0; i < cap(statuses); i++ { - statuses = append(statuses, http.StatusTooManyRequests) - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - driver := otlptracehttp.NewClient( - otlptracehttp.WithEndpoint(mc.Endpoint()), - otlptracehttp.WithInsecure(), - otlptracehttp.WithBackoff(-time.Millisecond), - ) - ctx, cancel := context.WithTimeout(context.Background(), 3*(300*time.Millisecond)) - defer cancel() - exporter, err := otlptrace.New(ctx, driver) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(context.Background())) - }() - err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan()) - assert.Error(t, err) - assert.Empty(t, mc.GetSpans()) -} - func TestCancelledContext(t *testing.T) { mcCfg := mockCollectorConfig{} mc := runMockCollector(t, mcCfg) @@ -372,7 +289,13 @@ func TestDeadlineContext(t *testing.T) { driver := otlptracehttp.NewClient( otlptracehttp.WithEndpoint(mc.Endpoint()), otlptracehttp.WithInsecure(), - otlptracehttp.WithBackoff(time.Minute), + otlptracehttp.WithRetry(otlptracehttp.RetryConfig{ + Enabled: true, + InitialInterval: 1 * time.Hour, + MaxInterval: 1 * time.Hour, + // Never stop retry of retry-able status. + MaxElapsedTime: 0, + }), ) ctx := context.Background() exporter, err := otlptrace.New(ctx, driver) @@ -400,7 +323,13 @@ func TestStopWhileExporting(t *testing.T) { driver := otlptracehttp.NewClient( otlptracehttp.WithEndpoint(mc.Endpoint()), otlptracehttp.WithInsecure(), - otlptracehttp.WithBackoff(time.Minute), + otlptracehttp.WithRetry(otlptracehttp.RetryConfig{ + Enabled: true, + InitialInterval: 1 * time.Hour, + MaxInterval: 1 * time.Hour, + // Never stop retry of retry-able status. + MaxElapsedTime: 0, + }), ) ctx := context.Background() exporter, err := otlptrace.New(ctx, driver) diff --git a/exporters/otlp/otlptrace/otlptracehttp/go.mod b/exporters/otlp/otlptrace/otlptracehttp/go.mod index bf543bf7c2d..f5402df1f8b 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/go.mod +++ b/exporters/otlp/otlptrace/otlptracehttp/go.mod @@ -4,7 +4,6 @@ go 1.15 require ( github.com/stretchr/testify v1.7.0 - go.opentelemetry.io/otel v1.0.0-RC1 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.0.0-RC1 go.opentelemetry.io/proto/otlp v0.9.0 google.golang.org/protobuf v1.27.1 diff --git a/exporters/otlp/otlptrace/otlptracehttp/go.sum b/exporters/otlp/otlptrace/otlptracehttp/go.sum index e82a3dd14b1..b366899d900 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/go.sum +++ b/exporters/otlp/otlptrace/otlptracehttp/go.sum @@ -2,6 +2,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= diff --git a/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go b/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go index 6ca62a05910..af6481f946a 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go @@ -46,9 +46,10 @@ type mockCollector struct { spanLock sync.Mutex spansStorage otlptracetest.SpansStorage - injectHTTPStatus []int - injectContentType string - injectDelay time.Duration + injectHTTPStatus []int + injectResponseHeader []map[string]string + injectContentType string + injectDelay time.Duration clientTLSConfig *tls.Config expectedHeaders map[string]string @@ -97,8 +98,9 @@ func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } + h := c.getInjectResponseHeader() if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 { - writeReply(w, rawResponse, injectedStatus, c.injectContentType) + writeReply(w, rawResponse, injectedStatus, c.injectContentType, h) return } rawRequest, err := readRequest(r) @@ -112,7 +114,7 @@ func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } - writeReply(w, rawResponse, 0, c.injectContentType) + writeReply(w, rawResponse, 0, c.injectContentType, h) c.spanLock.Lock() defer c.spanLock.Unlock() c.spansStorage.AddSpans(request) @@ -149,6 +151,17 @@ func (c *mockCollector) getInjectHTTPStatus() int { return status } +func (c *mockCollector) getInjectResponseHeader() (h map[string]string) { + if len(c.injectResponseHeader) == 0 { + return + } + h, c.injectResponseHeader = c.injectResponseHeader[0], c.injectResponseHeader[1:] + if len(c.injectResponseHeader) == 0 { + c.injectResponseHeader = nil + } + return +} + func readRequest(r *http.Request) ([]byte, error) { if r.Header.Get("Content-Encoding") == "gzip" { return readGzipBody(r.Body) @@ -170,28 +183,32 @@ func readGzipBody(body io.Reader) ([]byte, error) { return rawRequest.Bytes(), nil } -func writeReply(w http.ResponseWriter, rawResponse []byte, injectHTTPStatus int, injectContentType string) { +func writeReply(w http.ResponseWriter, rawResponse []byte, s int, ct string, h map[string]string) { status := http.StatusOK - if injectHTTPStatus != 0 { - status = injectHTTPStatus + if s != 0 { + status = s } contentType := "application/x-protobuf" - if injectContentType != "" { - contentType = injectContentType + if ct != "" { + contentType = ct } w.Header().Set("Content-Type", contentType) + for k, v := range h { + w.Header().Add(k, v) + } w.WriteHeader(status) _, _ = w.Write(rawResponse) } type mockCollectorConfig struct { - TracesURLPath string - Port int - InjectHTTPStatus []int - InjectContentType string - InjectDelay time.Duration - WithTLS bool - ExpectedHeaders map[string]string + TracesURLPath string + Port int + InjectHTTPStatus []int + InjectContentType string + InjectResponseHeader []map[string]string + InjectDelay time.Duration + WithTLS bool + ExpectedHeaders map[string]string } func (c *mockCollectorConfig) fillInDefaults() { @@ -207,12 +224,13 @@ func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector { _, portStr, err := net.SplitHostPort(ln.Addr().String()) require.NoError(t, err) m := &mockCollector{ - endpoint: fmt.Sprintf("localhost:%s", portStr), - spansStorage: otlptracetest.NewSpansStorage(), - injectHTTPStatus: cfg.InjectHTTPStatus, - injectContentType: cfg.InjectContentType, - injectDelay: cfg.InjectDelay, - expectedHeaders: cfg.ExpectedHeaders, + endpoint: fmt.Sprintf("localhost:%s", portStr), + spansStorage: otlptracetest.NewSpansStorage(), + injectHTTPStatus: cfg.InjectHTTPStatus, + injectResponseHeader: cfg.InjectResponseHeader, + injectContentType: cfg.InjectContentType, + injectDelay: cfg.InjectDelay, + expectedHeaders: cfg.ExpectedHeaders, } mux := http.NewServeMux() mux.Handle(cfg.TracesURLPath, http.HandlerFunc(m.serveTraces)) diff --git a/exporters/otlp/otlptrace/otlptracehttp/options.go b/exporters/otlp/otlptrace/otlptracehttp/options.go index 1a3c659d725..46cc6a5e41d 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/options.go +++ b/exporters/otlp/otlptrace/otlptracehttp/options.go @@ -19,19 +19,7 @@ import ( "time" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" -) - -const ( - // defaultMaxAttempts describes how many times the driver - // should retry the sending of the payload in case of a - // retryable error. - defaultMaxAttempts int = 5 - // defaultTracesPath is a default URL path for endpoint that - // receives spans. - defaultTracesPath string = "/v1/traces" - // defaultBackoff is a default base backoff time used in the - // exponential backoff strategy. - defaultBackoff time.Duration = 300 * time.Millisecond + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" ) // Compression describes the compression used for payloads sent to the @@ -52,9 +40,9 @@ type Option interface { applyHTTPOption(*otlpconfig.Config) } -// RetrySettings defines configuration for retrying batches in case of export failure -// using an exponential backoff. -type RetrySettings otlpconfig.RetrySettings +// RetryConfig defines configuration for retrying batches in case of export +// failure using an exponential backoff. +type RetryConfig retry.Config type wrappedOption struct { otlpconfig.HTTPOption @@ -84,21 +72,6 @@ func WithURLPath(urlPath string) Option { return wrappedOption{otlpconfig.WithURLPath(urlPath)} } -// WithMaxAttempts allows one to override how many times the driver -// will try to send the payload in case of retryable errors. -// The max attempts is limited to at most 5 retries. If unset, -// default (5) will be used. -func WithMaxAttempts(maxAttempts int) Option { - return wrappedOption{otlpconfig.WithMaxAttempts(maxAttempts)} -} - -// WithBackoff tells the driver to use the duration as a base of the -// exponential backoff strategy. If unset, default (300ms) will be -// used. -func WithBackoff(duration time.Duration) Option { - return wrappedOption{otlpconfig.WithBackoff(duration)} -} - // WithTLSClientConfig can be used to set up a custom TLS // configuration for the client used to send payloads to the // collector. Use it if you want to use a custom certificate. @@ -124,3 +97,12 @@ func WithHeaders(headers map[string]string) Option { func WithTimeout(duration time.Duration) Option { return wrappedOption{otlpconfig.WithTimeout(duration)} } + +// WithRetry configures the retry policy for transient errors that may occurs +// when exporting traces. An exponential back-off algorithm is used to ensure +// endpoints are not overwhelmed with retries. If unset, the default retry +// policy will retry after 5 seconds and increase exponentially after each +// error for a total of 1 minute. +func WithRetry(rc RetryConfig) Option { + return wrappedOption{otlpconfig.WithRetry(retry.Config(rc))} +}