diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index bf5f27c403b..2c287faabbb 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -16,72 +16,217 @@ package exporterhelper import ( "context" + "sync" + "time" "go.opencensus.io/trace" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" ) var ( okStatus = trace.Status{Code: trace.StatusCodeOK} ) +// Settings for timeout. The timeout applies to individual attempts to send data to the backend. +type TimeoutSettings struct { + // Timeout is the timeout for every attempt to send data to the backend. + Timeout time.Duration `mapstructure:"timeout"` +} + +// CreateDefaultTimeoutSettings returns the default settings for TimeoutSettings. +func CreateDefaultTimeoutSettings() TimeoutSettings { + return TimeoutSettings{ + Timeout: 5 * time.Second, + } +} + +// request is an abstraction of an individual request (batch of data) independent of the type of the data (traces, metrics, logs). +type request interface { + // context returns the Context of the requests. + context() context.Context + // setContext updates the Context of the requests. + setContext(context.Context) + export(ctx context.Context) (int, error) + // Returns a new request that contains the items left to be sent. + onPartialError(consumererror.PartialError) request + // Returns the count of spans/metric points or log records. + count() int +} + +// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). +type requestSender interface { + send(req request) (int, error) +} + +// baseRequest is a base implementation for the request. +type baseRequest struct { + ctx context.Context +} + +func (req *baseRequest) context() context.Context { + return req.ctx +} + +func (req *baseRequest) setContext(ctx context.Context) { + req.ctx = ctx +} + // Start specifies the function invoked when the exporter is being started. type Start func(context.Context, component.Host) error // Shutdown specifies the function invoked when the exporter is being shutdown. type Shutdown func(context.Context) error +// internalOptions represents all the options that users can configure. +type internalOptions struct { + TimeoutSettings + QueueSettings + RetrySettings + Start + Shutdown +} + +// fromConfiguredOptions returns the internal options starting from the default and applying all configured options. +func fromConfiguredOptions(options ...ExporterOption) *internalOptions { + // Start from the default options: + opts := &internalOptions{ + TimeoutSettings: CreateDefaultTimeoutSettings(), + // TODO: Enable queuing by default (call CreateDefaultQueueSettings) + QueueSettings: QueueSettings{Disabled: true}, + // TODO: Enable retry by default (call CreateDefaultRetrySettings) + RetrySettings: RetrySettings{Disabled: true}, + Start: func(ctx context.Context, host component.Host) error { return nil }, + Shutdown: func(ctx context.Context) error { return nil }, + } + + for _, op := range options { + op(opts) + } + + return opts +} + // ExporterOption apply changes to internalOptions. -type ExporterOption func(*baseExporter) +type ExporterOption func(*internalOptions) // WithShutdown overrides the default Shutdown function for an exporter. // The default shutdown function does nothing and always returns nil. func WithShutdown(shutdown Shutdown) ExporterOption { - return func(o *baseExporter) { - o.shutdown = shutdown + return func(o *internalOptions) { + o.Shutdown = shutdown } } // WithStart overrides the default Start function for an exporter. // The default shutdown function does nothing and always returns nil. func WithStart(start Start) ExporterOption { - return func(o *baseExporter) { - o.start = start + return func(o *internalOptions) { + o.Start = start } } -// internalOptions contains internalOptions concerning how an Exporter is configured. -type baseExporter struct { - exporterFullName string - start Start - shutdown Shutdown +// WithTimeout overrides the default TimeoutSettings for an exporter. +// The default TimeoutSettings is 5 seconds. +func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption { + return func(o *internalOptions) { + o.TimeoutSettings = timeoutSettings + } } -// Construct the internalOptions from multiple ExporterOption. -func newBaseExporter(exporterFullName string, options ...ExporterOption) baseExporter { - be := baseExporter{ - exporterFullName: exporterFullName, +// WithRetry overrides the default RetrySettings for an exporter. +// The default RetrySettings is to disable retries. +func WithRetry(retrySettings RetrySettings) ExporterOption { + return func(o *internalOptions) { + o.RetrySettings = retrySettings } +} - for _, op := range options { - op(&be) +// WithQueue overrides the default QueueSettings for an exporter. +// The default QueueSettings is to disable queueing. +func WithQueue(queueSettings QueueSettings) ExporterOption { + return func(o *internalOptions) { + o.QueueSettings = queueSettings } +} + +// baseExporter contains common fields between different exporter types. +type baseExporter struct { + cfg configmodels.Exporter + sender requestSender + qrSender *queuedRetrySender + start Start + shutdown Shutdown + startOnce sync.Once + shutdownOnce sync.Once +} + +func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *baseExporter { + opts := fromConfiguredOptions(options...) + be := &baseExporter{ + cfg: cfg, + start: opts.Start, + shutdown: opts.Shutdown, + } + + be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings}) + be.sender = be.qrSender return be } +// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper. +// This can be used to wrap with observability (create spans, record metrics) the consumer sender. +func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) { + be.qrSender.consumerSender = f(be.qrSender.consumerSender) +} + +// Start all senders and exporter and is invoked during service start. func (be *baseExporter) Start(ctx context.Context, host component.Host) error { - if be.start != nil { - return be.start(ctx, host) - } - return nil + err := componenterror.ErrAlreadyStarted + be.startOnce.Do(func() { + // First start the wrapped exporter. + err = be.start(ctx, host) + if err != nil { + // TODO: Log errors, or check if it is recorded by the caller. + return + } + + // If no error then start the queuedRetrySender. + be.qrSender.start() + }) + return err } -// Shutdown stops the exporter and is invoked during shutdown. +// Shutdown all senders and exporter and is invoked during service shutdown. func (be *baseExporter) Shutdown(ctx context.Context) error { - if be.shutdown != nil { - return be.shutdown(ctx) + err := componenterror.ErrAlreadyStopped + be.shutdownOnce.Do(func() { + // First shutdown the queued retry sender + be.qrSender.shutdown() + // Last shutdown the wrapped exporter itself. + err = be.shutdown(ctx) + }) + return err +} + +// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender. +type timeoutSender struct { + cfg TimeoutSettings +} + +// send implements the requestSender interface +func (ts *timeoutSender) send(req request) (int, error) { + // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be + // updated because this deadline most likely is before the next one. + ctx := req.context() + if ts.cfg.Timeout > 0 { + var cancelFunc func() + ctx, cancelFunc = context.WithTimeout(req.context(), ts.cfg.Timeout) + defer cancelFunc() } - return nil + return req.export(ctx) } diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 1fc661a5d62..45bf9def720 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -23,22 +23,28 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configmodels" ) +var defaultExporterCfg = &configmodels.ExporterSettings{ + TypeVal: "test", + NameVal: "test", +} + func TestErrorToStatus(t *testing.T) { require.Equal(t, okStatus, errToStatus(nil)) require.Equal(t, trace.Status{Code: trace.StatusCodeUnknown, Message: "my_error"}, errToStatus(errors.New("my_error"))) } func TestBaseExporter(t *testing.T) { - be := newBaseExporter("test") + be := newBaseExporter(defaultExporterCfg) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) } func TestBaseExporterWithOptions(t *testing.T) { be := newBaseExporter( - "test", + defaultExporterCfg, WithStart(func(ctx context.Context, host component.Host) error { return errors.New("my error") }), WithShutdown(func(ctx context.Context) error { return errors.New("my error") })) require.Error(t, be.Start(context.Background(), componenttest.NewNopHost())) diff --git a/exporter/exporterhelper/logshelper.go b/exporter/exporterhelper/logshelper.go index d6e1e7f345c..71525f6d3de 100644 --- a/exporter/exporterhelper/logshelper.go +++ b/exporter/exporterhelper/logshelper.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/data" "go.opentelemetry.io/collector/obsreport" ) @@ -27,21 +28,47 @@ import ( // the number of dropped logs. type PushLogsData func(ctx context.Context, md data.Logs) (droppedTimeSeries int, err error) +type logsRequest struct { + baseRequest + ld data.Logs + pusher PushLogsData +} + +func newLogsRequest(ctx context.Context, ld data.Logs, pusher PushLogsData) request { + return &logsRequest{ + baseRequest: baseRequest{ctx: ctx}, + ld: ld, + pusher: pusher, + } +} + +func (req *logsRequest) onPartialError(partialErr consumererror.PartialError) request { + // TODO: Implement this + return req +} + +func (req *logsRequest) export(ctx context.Context) (int, error) { + return req.pusher(ctx, req.ld) +} + +func (req *logsRequest) count() int { + return req.ld.LogRecordCount() +} + type logsExporter struct { - baseExporter + *baseExporter pushLogsData PushLogsData } -func (me *logsExporter) ConsumeLogs(ctx context.Context, md data.Logs) error { - exporterCtx := obsreport.ExporterContext(ctx, me.exporterFullName) - _, err := me.pushLogsData(exporterCtx, md) +func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld data.Logs) error { + exporterCtx := obsreport.ExporterContext(ctx, lexp.cfg.Name()) + _, err := lexp.sender.send(newLogsRequest(exporterCtx, ld, lexp.pushLogsData)) return err } -// NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span. -// TODO: Add support for retries. -func NewLogsExporter(config configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) { - if config == nil { +// NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span. +func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) { + if cfg == nil { return nil, errNilConfig } @@ -49,22 +76,28 @@ func NewLogsExporter(config configmodels.Exporter, pushLogsData PushLogsData, op return nil, errNilPushLogsData } - pushLogsData = pushLogsWithObservability(pushLogsData, config.Name()) + be := newBaseExporter(cfg, options...) + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &logsExporterWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &logsExporter{ - baseExporter: newBaseExporter(config.Name(), options...), + baseExporter: be, pushLogsData: pushLogsData, }, nil } -func pushLogsWithObservability(next PushLogsData, exporterName string) PushLogsData { - return func(ctx context.Context, ld data.Logs) (int, error) { - ctx = obsreport.StartLogsExportOp(ctx, exporterName) - numDroppedLogs, err := next(ctx, ld) - - numLogs := ld.LogRecordCount() +type logsExporterWithObservability struct { + exporterName string + nextSender requestSender +} - obsreport.EndLogsExportOp(ctx, numLogs, numDroppedLogs, err) - return numLogs, err - } +func (lewo *logsExporterWithObservability) send(req request) (int, error) { + req.setContext(obsreport.StartLogsExportOp(req.context(), lewo.exporterName)) + numDroppedLogs, err := lewo.nextSender.send(req) + obsreport.EndLogsExportOp(req.context(), req.count(), numDroppedLogs, err) + return numDroppedLogs, err } diff --git a/exporter/exporterhelper/logshelper_test.go b/exporter/exporterhelper/logshelper_test.go index 0d20a7d158c..bd08d1d209e 100644 --- a/exporter/exporterhelper/logshelper_test.go +++ b/exporter/exporterhelper/logshelper_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/data" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/obsreport" @@ -43,6 +44,13 @@ var ( } ) +func TestLogsRequest(t *testing.T) { + mr := newLogsRequest(context.Background(), testdata.GenerateLogDataEmpty(), nil) + + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan()) + assert.Same(t, mr, mr.onPartialError(partialErr.(consumererror.PartialError))) +} + func TestLogsExporter_InvalidName(t *testing.T) { me, err := NewLogsExporter(nil, newPushLogsData(0, nil)) require.Nil(t, me) @@ -178,7 +186,7 @@ func generateLogsTraffic(t *testing.T, me component.LogExporter, numRequests int } } -func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantError error, numMetricPoints int64) { +func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantError error, numLogRecords int64) { ocSpansSaver := new(testOCTraceExporter) trace.RegisterExporter(ocSpansSaver) defer trace.UnregisterExporter(ocSpansSaver) @@ -201,13 +209,13 @@ func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantEr require.Equalf(t, parentSpan.SpanContext.SpanID, sd.ParentSpanID, "Exporter span not a child\nSpanData %v", sd) require.Equalf(t, errToStatus(wantError), sd.Status, "SpanData %v", sd) - sentMetricPoints := numMetricPoints - var failedToSendMetricPoints int64 + sentLogRecords := numLogRecords + var failedToSendLogRecords int64 if wantError != nil { - sentMetricPoints = 0 - failedToSendMetricPoints = numMetricPoints + sentLogRecords = 0 + failedToSendLogRecords = numLogRecords } - require.Equalf(t, sentMetricPoints, sd.Attributes[obsreport.SentLogRecordsKey], "SpanData %v", sd) - require.Equalf(t, failedToSendMetricPoints, sd.Attributes[obsreport.FailedToSendLogRecordsKey], "SpanData %v", sd) + require.Equalf(t, sentLogRecords, sd.Attributes[obsreport.SentLogRecordsKey], "SpanData %v", sd) + require.Equalf(t, failedToSendLogRecords, sd.Attributes[obsreport.FailedToSendLogRecordsKey], "SpanData %v", sd) } } diff --git a/exporter/exporterhelper/metricshelper.go b/exporter/exporterhelper/metricshelper.go index 385aeaaab7b..0b1c8717bde 100644 --- a/exporter/exporterhelper/metricshelper.go +++ b/exporter/exporterhelper/metricshelper.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/obsreport" @@ -30,21 +31,20 @@ import ( type PushMetricsDataOld func(ctx context.Context, td consumerdata.MetricsData) (droppedTimeSeries int, err error) type metricsExporterOld struct { - baseExporter + *baseExporter pushMetricsData PushMetricsDataOld } -func (me *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { - exporterCtx := obsreport.ExporterContext(ctx, me.exporterFullName) - _, err := me.pushMetricsData(exporterCtx, md) +func (mexp *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { + exporterCtx := obsreport.ExporterContext(ctx, mexp.cfg.Name()) + _, err := mexp.pushMetricsData(exporterCtx, md) return err } -// NewMetricsExporterOld creates an MetricsExporter that can record metrics and can wrap every request with a Span. -// If no internalOptions are passed it just adds the exporter format as a tag in the Context. +// NewMetricsExporterOld creates an MetricsExporter that records observability metrics and wraps every request with a Span. // TODO: Add support for retries. -func NewMetricsExporterOld(config configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) { - if config == nil { +func NewMetricsExporterOld(cfg configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) { + if cfg == nil { return nil, errNilConfig } @@ -52,10 +52,10 @@ func NewMetricsExporterOld(config configmodels.Exporter, pushMetricsData PushMet return nil, errNilPushMetricsData } - pushMetricsData = pushMetricsWithObservabilityOld(pushMetricsData, config.Name()) + pushMetricsData = pushMetricsWithObservabilityOld(pushMetricsData, cfg.Name()) return &metricsExporterOld{ - baseExporter: newBaseExporter(config.Name(), options...), + baseExporter: newBaseExporter(cfg, options...), pushMetricsData: pushMetricsData, }, nil } @@ -88,22 +88,49 @@ func NumTimeSeries(md consumerdata.MetricsData) int { // the number of dropped metrics. type PushMetricsData func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error) +type metricsRequest struct { + baseRequest + md pdata.Metrics + pusher PushMetricsData +} + +func newMetricsRequest(ctx context.Context, md pdata.Metrics, pusher PushMetricsData) request { + return &metricsRequest{ + baseRequest: baseRequest{ctx: ctx}, + md: md, + pusher: pusher, + } +} + +func (req *metricsRequest) onPartialError(consumererror.PartialError) request { + // TODO: implement this. + return req +} + +func (req *metricsRequest) export(ctx context.Context) (int, error) { + return req.pusher(ctx, req.md) +} + +func (req *metricsRequest) count() int { + _, numPoints := pdatautil.MetricAndDataPointCount(req.md) + return numPoints +} + type metricsExporter struct { - baseExporter - pushMetricsData PushMetricsData + *baseExporter + pusher PushMetricsData } -func (me *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { - exporterCtx := obsreport.ExporterContext(ctx, me.exporterFullName) - _, err := me.pushMetricsData(exporterCtx, md) +func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + exporterCtx := obsreport.ExporterContext(ctx, mexp.cfg.Name()) + req := newMetricsRequest(exporterCtx, md, mexp.pusher) + _, err := mexp.sender.send(req) return err } -// NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span. -// If no internalOptions are passed it just adds the exporter format as a tag in the Context. -// TODO: Add support for retries. -func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) { - if config == nil { +// NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span. +func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) { + if cfg == nil { return nil, errNilConfig } @@ -111,25 +138,35 @@ func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetric return nil, errNilPushMetricsData } - pushMetricsData = pushMetricsWithObservability(pushMetricsData, config.Name()) + be := newBaseExporter(cfg, options...) + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &metricsSenderWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &metricsExporter{ - baseExporter: newBaseExporter(config.Name(), options...), - pushMetricsData: pushMetricsData, + baseExporter: be, + pusher: pushMetricsData, }, nil } -func pushMetricsWithObservability(next PushMetricsData, exporterName string) PushMetricsData { - return func(ctx context.Context, md pdata.Metrics) (int, error) { - ctx = obsreport.StartMetricsExportOp(ctx, exporterName) - numDroppedMetrics, err := next(ctx, md) +type metricsSenderWithObservability struct { + exporterName string + nextSender requestSender +} - // TODO: this is not ideal: it should come from the next function itself. - // temporarily loading it from internal format. Once full switch is done - // to new metrics will remove this. - numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(md) +func (mewo *metricsSenderWithObservability) send(req request) (int, error) { + req.setContext(obsreport.StartMetricsExportOp(req.context(), mewo.exporterName)) + numDroppedMetrics, err := mewo.nextSender.send(req) - obsreport.EndMetricsExportOp(ctx, numPoints, numReceivedMetrics, numDroppedMetrics, err) - return numReceivedMetrics, err - } + // TODO: this is not ideal: it should come from the next function itself. + // temporarily loading it from internal format. Once full switch is done + // to new metrics will remove this. + mReq := req.(*metricsRequest) + numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(mReq.md) + + obsreport.EndMetricsExportOp(req.context(), numPoints, numReceivedMetrics, numDroppedMetrics, err) + return numReceivedMetrics, err } diff --git a/exporter/exporterhelper/metricshelper_test.go b/exporter/exporterhelper/metricshelper_test.go index 3f96826b115..c78f7565f77 100644 --- a/exporter/exporterhelper/metricshelper_test.go +++ b/exporter/exporterhelper/metricshelper_test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/internal/data/testdata" @@ -46,6 +47,14 @@ var ( } ) +func TestMetricsRequest(t *testing.T) { + mr := newMetricsRequest(context.Background(), pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataEmpty()), nil) + + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan()) + assert.Same(t, mr, mr.onPartialError(partialErr.(consumererror.PartialError))) + assert.Equal(t, 0, mr.count()) +} + func TestMetricsExporter_InvalidName(t *testing.T) { me, err := NewMetricsExporter(nil, newPushMetricsData(0, nil)) require.Nil(t, me) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go new file mode 100644 index 00000000000..aa4c4b01e34 --- /dev/null +++ b/exporter/exporterhelper/queued_retry.go @@ -0,0 +1,204 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "errors" + "fmt" + "time" + + "github.com/cenkalti/backoff" + "github.com/jaegertracing/jaeger/pkg/queue" + + "go.opentelemetry.io/collector/consumer/consumererror" +) + +// QueueSettings defines configuration for queueing batches before sending to the consumerSender. +type QueueSettings struct { + // Disabled indicates whether to not enqueue batches before sending to the consumerSender. + Disabled bool `mapstructure:"disabled"` + // NumConsumers is the number of consumers from the queue. + NumConsumers int `mapstructure:"num_consumers"` + // QueueSize is the maximum number of batches allowed in queue at a given time. + QueueSize int `mapstructure:"queue_size"` +} + +// CreateDefaultQueueSettings returns the default settings for QueueSettings. +func CreateDefaultQueueSettings() QueueSettings { + return QueueSettings{ + Disabled: false, + NumConsumers: 10, + // For 5000 queue elements at 100 requests/sec gives about 50 sec of survival of destination outage. + // This is a pretty decent value for production. + // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, + // multiply that by the number of requests per seconds. + QueueSize: 5000, + } +} + +// RetrySettings defines configuration for retrying batches in case of export failure. +// The current supported strategy is exponential backoff. +type RetrySettings struct { + // Disabled indicates whether to not retry sending batches in case of export failure. + Disabled bool `mapstructure:"disabled"` + // InitialInterval the time to wait after the first failure before retrying. + InitialInterval time.Duration `mapstructure:"initial_interval"` + // MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between + // consecutive retries will always be `MaxInterval`. + MaxInterval time.Duration `mapstructure:"max_interval"` + // MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch. + // Once this value is reached, the data is discarded. + MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"` +} + +// CreateDefaultRetrySettings returns the default settings for RetrySettings. +func CreateDefaultRetrySettings() RetrySettings { + return RetrySettings{ + Disabled: false, + InitialInterval: 5 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 5 * time.Minute, + } +} + +type queuedRetrySender struct { + cfg QueueSettings + consumerSender requestSender + queue *queue.BoundedQueue + retryStopCh chan struct{} +} + +var errorRefused = errors.New("failed to add to the queue") + +func newQueuedRetrySender(qCfg QueueSettings, rCfg RetrySettings, nextSender requestSender) *queuedRetrySender { + retryStopCh := make(chan struct{}) + return &queuedRetrySender{ + cfg: qCfg, + consumerSender: &retrySender{ + cfg: rCfg, + nextSender: nextSender, + stopCh: retryStopCh, + }, + queue: queue.NewBoundedQueue(qCfg.QueueSize, func(item interface{}) {}), + retryStopCh: retryStopCh, + } +} + +// start is invoked during service startup. +func (qrs *queuedRetrySender) start() { + qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) { + value := item.(request) + _, _ = qrs.consumerSender.send(value) + }) +} + +// send implements the requestSender interface +func (qrs *queuedRetrySender) send(req request) (int, error) { + if qrs.cfg.Disabled { + return qrs.consumerSender.send(req) + } + + if !qrs.queue.Produce(req) { + return req.count(), errorRefused + } + + return 0, nil +} + +// shutdown is invoked during service shutdown. +func (qrs *queuedRetrySender) shutdown() { + // First stop the retry goroutines, so that unblocks the queue workers. + close(qrs.retryStopCh) + + // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only + // try once every request. + qrs.queue.Stop() +} + +// TODO: Clean this by forcing all exporters to return an internal error type that always include the information about retries. +type throttleRetry struct { + error + delay time.Duration +} + +func NewThrottleRetry(err error, delay time.Duration) error { + return &throttleRetry{ + error: err, + delay: delay, + } +} + +type retrySender struct { + cfg RetrySettings + nextSender requestSender + stopCh chan struct{} +} + +// send implements the requestSender interface +func (rs *retrySender) send(req request) (int, error) { + if rs.cfg.Disabled { + return rs.nextSender.send(req) + } + + expBackoff := backoff.NewExponentialBackOff() + expBackoff.InitialInterval = rs.cfg.InitialInterval + expBackoff.MaxInterval = rs.cfg.MaxInterval + expBackoff.MaxElapsedTime = rs.cfg.MaxElapsedTime + for { + droppedItems, err := rs.nextSender.send(req) + + if err == nil { + return droppedItems, nil + } + + // Immediately drop data on permanent errors. + if consumererror.IsPermanent(err) { + return droppedItems, err + } + + // If partial error, update data and stats with non exported data. + if partialErr, isPartial := err.(consumererror.PartialError); isPartial { + req = req.onPartialError(partialErr) + } + + backoffDelay := expBackoff.NextBackOff() + + if backoffDelay == backoff.Stop { + // throw away the batch + return req.count(), fmt.Errorf("max elapsed time expired %w", err) + } + + if throttleErr, isThrottle := err.(*throttleRetry); isThrottle { + backoffDelay = max(backoffDelay, throttleErr.delay) + } + + // back-off, but get interrupted when shutting down or request is cancelled or timed out. + select { + case <-req.context().Done(): + return req.count(), fmt.Errorf("request is cancelled or timed out %w", err) + case <-rs.stopCh: + return req.count(), fmt.Errorf("interrupted due to shutdown %w", err) + case <-time.After(backoffDelay): + } + } +} + +// max returns the larger of x or y. +func max(x, y time.Duration) time.Duration { + if x < y { + return y + } + return x +} diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go new file mode 100644 index 00000000000..995aeaa30ea --- /dev/null +++ b/exporter/exporterhelper/queued_retry_test.go @@ -0,0 +1,500 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/internal/data/testdata" + "go.opentelemetry.io/collector/obsreport/obsreporttest" +) + +func TestQueuedRetry_DropOnPermanentError(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(consumererror.Permanent(errors.New("bad data"))) + + qCfg := CreateDefaultQueueSettings() + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + <-time.After(200 * time.Millisecond) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_DropOnNoRetry(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueueSettings() + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = true + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + <-time.After(200 * time.Millisecond) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_PartialError(t *testing.T) { + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan()) + mockP := newMockConcurrentExporter() + mockP.updateError(partialErr) + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + // There is a small race condition in this test, but expect to execute this in less than 1 second. + mockP.updateError(nil) + mockP.waitGroup.Add(1) + mockP.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockP.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_StopWhileWaiting(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 30 * time.Minute + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + + // Enqueue another request to ensure when calling shutdown we drain the queue. + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 3, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + + mockP.checkNumRequests(t, 1) + // TODO: Ensure that queue is drained, and uncomment the next 3 lines. + // https://github.com/jaegertracing/jaeger/pull/2349 + // ocs.checkSendItemsCount(t, 3) + ocs.checkDroppedItemsCount(t, 2) + // require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_PreserveCancellation(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ctx, cancelFunc := context.WithCancel(context.Background()) + start := time.Now() + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(ctx, 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + + cancelFunc() + + mockP.checkNumRequests(t, 1) + require.Zero(t, be.qrSender.queue.Size()) + + // Stop should succeed and not retry. + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + + // We should ensure that we actually did not wait for the initial backoff (30 sec). + assert.True(t, 5*time.Second > time.Since(start)) + + // In the newMockConcurrentExporter we count requests and items even for failed requests. + mockP.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_MaxElapsedTime(t *testing.T) { + mockP := newMockConcurrentExporter() + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 100 * time.Millisecond + rCfg.MaxElapsedTime = 1 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + // Add an item that will always fail. + droppedItems, err := be.sender.send(newErrorRequest(context.Background())) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests. + mockP.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 7) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_ThrottleError(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(NewThrottleRetry(errors.New("throttle error"), 1*time.Second)) + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 100 * time.Millisecond + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + start := time.Now() + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + // There is a small race condition in this test, but expect to execute this in less than 2 second. + mockP.updateError(nil) + mockP.waitGroup.Add(1) + mockP.awaitAsyncProcessing() + + // The initial backoff is 100ms, but because of the throttle this should wait at least 1 seconds. + assert.True(t, 1*time.Second < time.Since(start)) + + mockP.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_RetryOnError(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 2 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 0) + + // There is a small race condition in this test, but expect to execute this in less than 2 second. + mockP.updateError(nil) + mockP.waitGroup.Add(1) + mockP.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockP.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_DropOnFull(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueueSettings() + qCfg.QueueSize = 0 + rCfg := CreateDefaultRetrySettings() + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.Error(t, err) + assert.Equal(t, 2, droppedItems) +} + +func TestQueuedRetryHappyPath(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + mockP := newMockConcurrentExporter() + qCfg := CreateDefaultQueueSettings() + rCfg := CreateDefaultRetrySettings() + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + wantRequests := 10 + for i := 0; i < wantRequests; i++ { + mockP.run(func() { + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + } + + // Wait until all batches received + mockP.awaitAsyncProcessing() + + mockP.checkNumRequests(t, wantRequests) + ocs.checkSendItemsCount(t, 2*wantRequests) + ocs.checkDroppedItemsCount(t, 0) +} + +type mockErrorRequest struct { + baseRequest +} + +func (mer *mockErrorRequest) export(_ context.Context) (int, error) { + return 0, errors.New("transient error") +} + +func (mer *mockErrorRequest) onPartialError(consumererror.PartialError) request { + return mer +} + +func (mer *mockErrorRequest) count() int { + return 7 +} + +func newErrorRequest(ctx context.Context) request { + return &mockErrorRequest{ + baseRequest: baseRequest{ctx: ctx}, + } +} + +type mockRequest struct { + baseRequest + cnt int + mce *mockConcurrentExporter +} + +func (m *mockRequest) export(_ context.Context) (int, error) { + err := m.mce.export() + if err != nil { + return m.cnt, err + } + return 0, nil +} + +func (m *mockRequest) onPartialError(consumererror.PartialError) request { + return &mockRequest{ + baseRequest: m.baseRequest, + cnt: 1, + mce: m.mce, + } +} + +func (m *mockRequest) count() int { + return m.cnt +} + +func newMockRequest(ctx context.Context, cnt int, mce *mockConcurrentExporter) request { + return &mockRequest{ + baseRequest: baseRequest{ctx: ctx}, + cnt: cnt, + mce: mce, + } +} + +type mockConcurrentExporter struct { + waitGroup *sync.WaitGroup + mu sync.Mutex + consumeError error + requestCount int64 + stopped int32 +} + +func newMockConcurrentExporter() *mockConcurrentExporter { + return &mockConcurrentExporter{waitGroup: new(sync.WaitGroup)} +} + +func (p *mockConcurrentExporter) export() error { + if atomic.LoadInt32(&p.stopped) == 1 { + return nil + } + atomic.AddInt64(&p.requestCount, 1) + p.mu.Lock() + defer p.mu.Unlock() + defer p.waitGroup.Done() + return p.consumeError +} + +func (p *mockConcurrentExporter) updateError(err error) { + p.mu.Lock() + defer p.mu.Unlock() + p.consumeError = err +} + +func (p *mockConcurrentExporter) checkNumRequests(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&p.requestCount)) +} + +func (p *mockConcurrentExporter) run(fn func()) { + p.waitGroup.Add(1) + fn() +} + +func (p *mockConcurrentExporter) awaitAsyncProcessing() { + p.waitGroup.Wait() +} + +func (p *mockConcurrentExporter) stop() { + atomic.StoreInt32(&p.stopped, 1) +} + +type observabilityConsumerSender struct { + sentItemsCount int64 + droppedItemsCount int64 + nextSender requestSender +} + +func (ocs *observabilityConsumerSender) send(req request) (int, error) { + dic, err := ocs.nextSender.send(req) + atomic.AddInt64(&ocs.sentItemsCount, int64(req.count()-dic)) + atomic.AddInt64(&ocs.droppedItemsCount, int64(dic)) + return dic, err +} + +func (ocs *observabilityConsumerSender) checkSendItemsCount(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&ocs.sentItemsCount)) +} + +func (ocs *observabilityConsumerSender) checkDroppedItemsCount(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&ocs.droppedItemsCount)) +} diff --git a/exporter/exporterhelper/tracehelper.go b/exporter/exporterhelper/tracehelper.go index 463924fd2b2..119d57c7ddf 100644 --- a/exporter/exporterhelper/tracehelper.go +++ b/exporter/exporterhelper/tracehelper.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/obsreport" ) @@ -28,32 +29,25 @@ import ( // returns the number of dropped spans. type traceDataPusherOld func(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error) -// traceDataPusher is a helper function that is similar to ConsumeTraceData but also -// returns the number of dropped spans. -type traceDataPusher func(ctx context.Context, td pdata.Traces) (droppedSpans int, err error) - -// traceExporterOld implements the exporter with additional helper internalOptions. type traceExporterOld struct { - baseExporter + *baseExporter dataPusher traceDataPusherOld } -func (te *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { - exporterCtx := obsreport.ExporterContext(ctx, te.exporterFullName) - _, err := te.dataPusher(exporterCtx, td) +func (texp *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { + exporterCtx := obsreport.ExporterContext(ctx, texp.cfg.Name()) + _, err := texp.dataPusher(exporterCtx, td) return err } -// NewTraceExporterOld creates an TraceExporterOld that can record metrics and can wrap every -// request with a Span. If no internalOptions are passed it just adds the exporter format as a -// tag in the Context. +// NewTraceExporterOld creates an TraceExporterOld that records observability metrics and wraps every request with a Span. func NewTraceExporterOld( - config configmodels.Exporter, + cfg configmodels.Exporter, dataPusher traceDataPusherOld, options ...ExporterOption, ) (component.TraceExporterOld, error) { - if config == nil { + if cfg == nil { return nil, errNilConfig } @@ -61,10 +55,10 @@ func NewTraceExporterOld( return nil, errNilPushTraceData } - dataPusher = dataPusher.withObservability(config.Name()) + dataPusher = dataPusher.withObservability(cfg.Name()) return &traceExporterOld{ - baseExporter: newBaseExporter(config.Name(), options...), + baseExporter: newBaseExporter(cfg, options...), dataPusher: dataPusher, }, nil } @@ -86,29 +80,56 @@ func (p traceDataPusherOld) withObservability(exporterName string) traceDataPush } } +// traceDataPusher is a helper function that is similar to ConsumeTraceData but also +// returns the number of dropped spans. +type traceDataPusher func(ctx context.Context, td pdata.Traces) (droppedSpans int, err error) + +type tracesRequest struct { + baseRequest + td pdata.Traces + pusher traceDataPusher +} + +func newTracesRequest(ctx context.Context, td pdata.Traces, pusher traceDataPusher) request { + return &tracesRequest{ + baseRequest: baseRequest{ctx: ctx}, + td: td, + pusher: pusher, + } +} + +func (req *tracesRequest) onPartialError(partialErr consumererror.PartialError) request { + return newTracesRequest(req.ctx, partialErr.GetTraces(), req.pusher) +} + +func (req *tracesRequest) export(ctx context.Context) (int, error) { + return req.pusher(ctx, req.td) +} + +func (req *tracesRequest) count() int { + return req.td.SpanCount() +} + type traceExporter struct { - baseExporter - dataPusher traceDataPusher + *baseExporter + pusher traceDataPusher } -func (te *traceExporter) ConsumeTraces( - ctx context.Context, - td pdata.Traces, -) error { - exporterCtx := obsreport.ExporterContext(ctx, te.exporterFullName) - _, err := te.dataPusher(exporterCtx, td) +func (texp *traceExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) error { + exporterCtx := obsreport.ExporterContext(ctx, texp.cfg.Name()) + req := newTracesRequest(exporterCtx, td, texp.pusher) + _, err := texp.sender.send(req) return err } -// NewTraceExporter creates a TraceExporter that can record metrics and can wrap -// every request with a Span. +// NewTraceExporter creates a TraceExporter that records observability metrics and wraps every request with a Span. func NewTraceExporter( - config configmodels.Exporter, + cfg configmodels.Exporter, dataPusher traceDataPusher, options ...ExporterOption, ) (component.TraceExporter, error) { - if config == nil { + if cfg == nil { return nil, errNilConfig } @@ -116,27 +137,33 @@ func NewTraceExporter( return nil, errNilPushTraceData } - dataPusher = dataPusher.withObservability(config.Name()) + be := newBaseExporter(cfg, options...) + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &tracesExporterWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &traceExporter{ - baseExporter: newBaseExporter(config.Name(), options...), - dataPusher: dataPusher, + baseExporter: be, + pusher: dataPusher, }, nil } -// withObservability wraps the current pusher into a function that records -// the observability signals during the pusher execution. -func (p traceDataPusher) withObservability(exporterName string) traceDataPusher { - return func(ctx context.Context, td pdata.Traces) (int, error) { - ctx = obsreport.StartTraceDataExportOp(ctx, exporterName) - // Forward the data to the next consumer (this pusher is the next). - droppedSpans, err := p(ctx, td) +type tracesExporterWithObservability struct { + exporterName string + nextSender requestSender +} - // TODO: this is not ideal: it should come from the next function itself. - // temporarily loading it from internal format. Once full switch is done - // to new metrics will remove this. - numSpans := td.SpanCount() - obsreport.EndTraceDataExportOp(ctx, numSpans, droppedSpans, err) - return droppedSpans, err - } +func (tewo *tracesExporterWithObservability) send(req request) (int, error) { + req.setContext(obsreport.StartTraceDataExportOp(req.context(), tewo.exporterName)) + // Forward the data to the next consumer (this pusher is the next). + droppedSpans, err := tewo.nextSender.send(req) + + // TODO: this is not ideal: it should come from the next function itself. + // temporarily loading it from internal format. Once full switch is done + // to new metrics will remove this. + obsreport.EndTraceDataExportOp(req.context(), req.count(), droppedSpans, err) + return droppedSpans, err } diff --git a/exporter/exporterhelper/tracehelper_test.go b/exporter/exporterhelper/tracehelper_test.go index eef81757b3d..9c18d6d7321 100644 --- a/exporter/exporterhelper/tracehelper_test.go +++ b/exporter/exporterhelper/tracehelper_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/obsreport" @@ -46,6 +47,13 @@ var ( } ) +func TestTracesRequest(t *testing.T) { + mr := newTracesRequest(context.Background(), testdata.GenerateTraceDataOneSpan(), nil) + + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataEmpty()) + assert.EqualValues(t, newTracesRequest(context.Background(), testdata.GenerateTraceDataEmpty(), nil), mr.onPartialError(partialErr.(consumererror.PartialError))) +} + // TODO https://go.opentelemetry.io/collector/issues/266 // Migrate tests to use testify/assert instead of t.Fatal pattern. func TestTraceExporterOld_InvalidName(t *testing.T) { diff --git a/exporter/opencensusexporter/factory_test.go b/exporter/opencensusexporter/factory_test.go index 7d1854e6f6d..b7eee6bbb9f 100644 --- a/exporter/opencensusexporter/factory_test.go +++ b/exporter/opencensusexporter/factory_test.go @@ -138,7 +138,7 @@ func TestCreateTraceExporter(t *testing.T) { }, }, { - name: "NumWorkers", + name: "NumConsumers", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: rcvCfg.NetAddr.Endpoint, diff --git a/exporter/otlpexporter/README.md b/exporter/otlpexporter/README.md index 820659e7acf..107773dc4e1 100644 --- a/exporter/otlpexporter/README.md +++ b/exporter/otlpexporter/README.md @@ -28,6 +28,19 @@ The following settings can be optionally configured: [grpc.WithInsecure()](https://godoc.org/google.golang.org/grpc#WithInsecure). - `balancer_name`(default = pick_first): Sets the balancer in grpclb_policy to discover the servers. See [grpc loadbalancing example](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md). +- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend. +- `retry_on_failure` + - `disabled` (default = false) + - `initial_interval` (default = 5s): Time to wait after the first failure before retrying; ignored if `disabled` is `true` + - `max_interval` (default = 30s): Is the upper bound on backoff; ignored if `disabled` is `true` + - `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if `disabled` is `true` +- `sending_queue` + - `disabled` (default = true) + - `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `disabled` is `true` + - `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `disabled` is `true`; + User should calculate this as `num_seconds * requests_per_second` where: + - `num_seconds` is the number of seconds to buffer in case of a backend outage + - `requests_per_second` is the average number of requests per seconds. Example: diff --git a/exporter/otlpexporter/config.go b/exporter/otlpexporter/config.go index be4dfa65521..fe9c3f0ded8 100644 --- a/exporter/otlpexporter/config.go +++ b/exporter/otlpexporter/config.go @@ -17,11 +17,15 @@ package otlpexporter import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) // Config defines configuration for OpenCensus exporter. type Config struct { - configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.QueueSettings `mapstructure:"sending_queue"` + exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. } diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index 5433e59ea05..821e295494d 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) func TestLoadConfig(t *testing.T) { @@ -49,6 +50,20 @@ func TestLoadConfig(t *testing.T) { NameVal: "otlp/2", TypeVal: "otlp", }, + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: 10 * time.Second, + }, + RetrySettings: exporterhelper.RetrySettings{ + Disabled: false, + InitialInterval: 10 * time.Second, + MaxInterval: 1 * time.Minute, + MaxElapsedTime: 10 * time.Minute, + }, + QueueSettings: exporterhelper.QueueSettings{ + Disabled: false, + NumConsumers: 2, + QueueSize: 10, + }, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index 6134d066297..4b9f28e9c2f 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -39,11 +39,17 @@ func NewFactory() component.ExporterFactory { } func createDefaultConfig() configmodels.Exporter { + // TODO: Enable the queued settings. + qs := exporterhelper.CreateDefaultQueueSettings() + qs.Disabled = true return &Config{ ExporterSettings: configmodels.ExporterSettings{ TypeVal: typeStr, NameVal: typeStr, }, + TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(), + RetrySettings: exporterhelper.CreateDefaultRetrySettings(), + QueueSettings: qs, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{}, // We almost read 0 bytes, so no need to tune ReadBufferSize. @@ -61,9 +67,13 @@ func createTraceExporter( if err != nil { return nil, err } + oCfg := cfg.(*Config) oexp, err := exporterhelper.NewTraceExporter( cfg, oce.pushTraceData, + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown)) if err != nil { return nil, err @@ -81,9 +91,13 @@ func createMetricsExporter( if err != nil { return nil, err } + oCfg := cfg.(*Config) oexp, err := exporterhelper.NewMetricsExporter( cfg, oce.pushMetricsData, + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown), ) if err != nil { @@ -102,9 +116,13 @@ func createLogExporter( if err != nil { return nil, err } + oCfg := cfg.(*Config) oexp, err := exporterhelper.NewLogsExporter( cfg, oce.pushLogData, + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown), ) if err != nil { diff --git a/exporter/otlpexporter/factory_test.go b/exporter/otlpexporter/factory_test.go index c554592748f..6cf00d55959 100644 --- a/exporter/otlpexporter/factory_test.go +++ b/exporter/otlpexporter/factory_test.go @@ -111,7 +111,7 @@ func TestCreateTraceExporter(t *testing.T) { }, }, { - name: "NumWorkers", + name: "NumConsumers", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: endpoint, diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index f0c33704fcb..25586ff0d8d 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -18,20 +18,19 @@ import ( "context" "errors" "fmt" - "sync" "time" - "github.com/cenkalti/backoff" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/internal/data" otlplogs "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1" otlpmetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/metrics/v1" @@ -41,9 +40,7 @@ import ( type exporterImp struct { // Input configuration. config *Config - - stopOnce sync.Once - w sender + w sender } type sender interface { @@ -54,8 +51,7 @@ type sender interface { } var ( - errTimeout = errors.New("timeout") - errFatalError = errors.New("fatal error sending to server") + errPermanentError = consumererror.Permanent(errors.New("fatal error sending to server")) ) // Crete new exporter and start it. The exporter will begin connecting but @@ -78,12 +74,7 @@ func newExporter(cfg configmodels.Exporter) (*exporterImp, error) { } func (e *exporterImp) shutdown(context.Context) error { - err := componenterror.ErrAlreadyStopped - e.stopOnce.Do(func() { - // Close the connection. - err = e.w.stop() - }) - return err + return e.w.stop() } func (e *exporterImp) pushTraceData(ctx context.Context, td pdata.Traces) (int, error) { @@ -160,24 +151,18 @@ func (gs *grpcSender) stop() error { } func (gs *grpcSender) exportTrace(ctx context.Context, request *otlptrace.ExportTraceServiceRequest) error { - return exportRequest(gs.enhanceContext(ctx), func(ctx context.Context) error { - _, err := gs.traceExporter.Export(ctx, request, grpc.WaitForReady(gs.waitForReady)) - return err - }) + _, err := gs.traceExporter.Export(gs.enhanceContext(ctx), request, grpc.WaitForReady(gs.waitForReady)) + return processError(err) } func (gs *grpcSender) exportMetrics(ctx context.Context, request *otlpmetrics.ExportMetricsServiceRequest) error { - return exportRequest(gs.enhanceContext(ctx), func(ctx context.Context) error { - _, err := gs.metricExporter.Export(ctx, request, grpc.WaitForReady(gs.waitForReady)) - return err - }) + _, err := gs.metricExporter.Export(gs.enhanceContext(ctx), request, grpc.WaitForReady(gs.waitForReady)) + return processError(err) } func (gs *grpcSender) exportLogs(ctx context.Context, request *otlplogs.ExportLogsServiceRequest) error { - return exportRequest(gs.enhanceContext(ctx), func(ctx context.Context) error { - _, err := gs.logExporter.Export(ctx, request, grpc.WaitForReady(gs.waitForReady)) - return err - }) + _, err := gs.logExporter.Export(gs.enhanceContext(ctx), request, grpc.WaitForReady(gs.waitForReady)) + return processError(err) } func (gs *grpcSender) enhanceContext(ctx context.Context) context.Context { @@ -190,63 +175,36 @@ func (gs *grpcSender) enhanceContext(ctx context.Context) context.Context { // Send a trace or metrics request to the server. "perform" function is expected to make // the actual gRPC unary call that sends the request. This function implements the // common OTLP logic around request handling such as retries and throttling. -func exportRequest(ctx context.Context, perform func(ctx context.Context) error) error { - - expBackoff := backoff.NewExponentialBackOff() - - // Spend max 15 mins on this operation. This is just a reasonable number that - // gives plenty of time for typical quick transient errors to resolve. - expBackoff.MaxElapsedTime = time.Minute * 15 - - for { - // Send to server. - err := perform(ctx) - - if err == nil { - // Request is successful, we are done. - return nil - } - - // We have an error, check gRPC status code. - - status := status.Convert(err) - - statusCode := status.Code() - if statusCode == codes.OK { - // Not really an error, still success. - return nil - } +func processError(err error) error { + if err == nil { + // Request is successful, we are done. + return nil + } - // Now, this is this a real error. + // We have an error, check gRPC status code. - if !shouldRetry(statusCode) { - // It is not a retryable error, we should not retry. - return errFatalError - } + st := status.Convert(err) + if st.Code() == codes.OK { + // Not really an error, still success. + return nil + } - // Need to retry. + // Now, this is this a real error. - // Check if server returned throttling information. - waitDuration := getThrottleDuration(status) - if waitDuration == 0 { - // No explicit throttle duration. Use exponential backoff strategy. - waitDuration = expBackoff.NextBackOff() - if waitDuration == backoff.Stop { - // We run out of max time allocated to this operation. - return errTimeout - } - } + if !shouldRetry(st.Code()) { + // It is not a retryable error, we should not retry. + return errPermanentError + } - // Wait until one of the conditions below triggers. - select { - case <-ctx.Done(): - // This request is cancelled or timed out. - return errTimeout + // Need to retry. - case <-time.After(waitDuration): - // Time to try again. - } + // Check if server returned throttling information. + throttleDuration := getThrottleDuration(st) + if throttleDuration != 0 { + return exporterhelper.NewThrottleRetry(err, throttleDuration) } + + return err } func shouldRetry(code codes.Code) bool { diff --git a/exporter/otlpexporter/testdata/config.yaml b/exporter/otlpexporter/testdata/config.yaml index 737fabdefaf..38787c3b0a4 100644 --- a/exporter/otlpexporter/testdata/config.yaml +++ b/exporter/otlpexporter/testdata/config.yaml @@ -10,6 +10,16 @@ exporters: endpoint: "1.2.3.4:1234" compression: "on" ca_file: /var/lib/mycert.pem + timeout: 10s + sending_queue: + disabled: false + num_consumers: 2 + queue_size: 10 + retry_on_failure: + disabled: false + initial_interval: 10s + max_interval: 60s + max_elapsed_time: 10m per_rpc_auth: type: bearer bearer_token: some-token diff --git a/processor/queuedprocessor/config.go b/processor/queuedprocessor/config.go index 9b71ed78d59..c7d1c6335a0 100644 --- a/processor/queuedprocessor/config.go +++ b/processor/queuedprocessor/config.go @@ -24,7 +24,7 @@ import ( type Config struct { configmodels.ProcessorSettings `mapstructure:",squash"` - // NumWorkers is the number of queue workers that dequeue batches and send them out. + // NumConsumers is the number of queue workers that dequeue batches and send them out. NumWorkers int `mapstructure:"num_workers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"`