From 4046234701ae1a17cbfa428a86cf5b239f68cd74 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 17 Jul 2020 14:07:03 -0700 Subject: [PATCH] Add support for queued retry in the exporter helper. (#1386) * Add support for queued retry in the exporter helper. Changed only the OTLP exporter for the moment to use the new settings. Timeout is enabled for all the exporters. Fixes #1193 There are some missing features that will be added in a followup PR: 1. Enforcing errors. For the moment added the Throttle error as a hack to keep backwards compatibility with OTLP. 2. Enable queued and retry for all exporters. 3. Fix observability metrics for the case when requests are dropped because the queue is full. * First round of comments addressed --- exporter/exporterhelper/common.go | 193 ++++++- exporter/exporterhelper/common_test.go | 10 +- exporter/exporterhelper/logshelper.go | 71 ++- exporter/exporterhelper/logshelper_test.go | 22 +- exporter/exporterhelper/metricshelper.go | 105 ++-- exporter/exporterhelper/metricshelper_test.go | 9 + exporter/exporterhelper/queued_retry.go | 204 +++++++ exporter/exporterhelper/queued_retry_test.go | 500 ++++++++++++++++++ exporter/exporterhelper/tracehelper.go | 117 ++-- exporter/exporterhelper/tracehelper_test.go | 8 + exporter/opencensusexporter/factory_test.go | 2 +- exporter/otlpexporter/README.md | 13 + exporter/otlpexporter/config.go | 6 +- exporter/otlpexporter/config_test.go | 15 + exporter/otlpexporter/factory.go | 18 + exporter/otlpexporter/factory_test.go | 2 +- exporter/otlpexporter/otlp.go | 110 ++-- exporter/otlpexporter/testdata/config.yaml | 10 + processor/queuedprocessor/config.go | 2 +- 19 files changed, 1206 insertions(+), 211 deletions(-) create mode 100644 exporter/exporterhelper/queued_retry.go create mode 100644 exporter/exporterhelper/queued_retry_test.go diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index bf5f27c403b..2c287faabbb 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -16,72 +16,217 @@ package exporterhelper import ( "context" + "sync" + "time" "go.opencensus.io/trace" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" ) var ( okStatus = trace.Status{Code: trace.StatusCodeOK} ) +// Settings for timeout. The timeout applies to individual attempts to send data to the backend. +type TimeoutSettings struct { + // Timeout is the timeout for every attempt to send data to the backend. + Timeout time.Duration `mapstructure:"timeout"` +} + +// CreateDefaultTimeoutSettings returns the default settings for TimeoutSettings. +func CreateDefaultTimeoutSettings() TimeoutSettings { + return TimeoutSettings{ + Timeout: 5 * time.Second, + } +} + +// request is an abstraction of an individual request (batch of data) independent of the type of the data (traces, metrics, logs). +type request interface { + // context returns the Context of the requests. + context() context.Context + // setContext updates the Context of the requests. + setContext(context.Context) + export(ctx context.Context) (int, error) + // Returns a new request that contains the items left to be sent. + onPartialError(consumererror.PartialError) request + // Returns the count of spans/metric points or log records. + count() int +} + +// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). +type requestSender interface { + send(req request) (int, error) +} + +// baseRequest is a base implementation for the request. +type baseRequest struct { + ctx context.Context +} + +func (req *baseRequest) context() context.Context { + return req.ctx +} + +func (req *baseRequest) setContext(ctx context.Context) { + req.ctx = ctx +} + // Start specifies the function invoked when the exporter is being started. type Start func(context.Context, component.Host) error // Shutdown specifies the function invoked when the exporter is being shutdown. type Shutdown func(context.Context) error +// internalOptions represents all the options that users can configure. +type internalOptions struct { + TimeoutSettings + QueueSettings + RetrySettings + Start + Shutdown +} + +// fromConfiguredOptions returns the internal options starting from the default and applying all configured options. +func fromConfiguredOptions(options ...ExporterOption) *internalOptions { + // Start from the default options: + opts := &internalOptions{ + TimeoutSettings: CreateDefaultTimeoutSettings(), + // TODO: Enable queuing by default (call CreateDefaultQueueSettings) + QueueSettings: QueueSettings{Disabled: true}, + // TODO: Enable retry by default (call CreateDefaultRetrySettings) + RetrySettings: RetrySettings{Disabled: true}, + Start: func(ctx context.Context, host component.Host) error { return nil }, + Shutdown: func(ctx context.Context) error { return nil }, + } + + for _, op := range options { + op(opts) + } + + return opts +} + // ExporterOption apply changes to internalOptions. -type ExporterOption func(*baseExporter) +type ExporterOption func(*internalOptions) // WithShutdown overrides the default Shutdown function for an exporter. // The default shutdown function does nothing and always returns nil. func WithShutdown(shutdown Shutdown) ExporterOption { - return func(o *baseExporter) { - o.shutdown = shutdown + return func(o *internalOptions) { + o.Shutdown = shutdown } } // WithStart overrides the default Start function for an exporter. // The default shutdown function does nothing and always returns nil. func WithStart(start Start) ExporterOption { - return func(o *baseExporter) { - o.start = start + return func(o *internalOptions) { + o.Start = start } } -// internalOptions contains internalOptions concerning how an Exporter is configured. -type baseExporter struct { - exporterFullName string - start Start - shutdown Shutdown +// WithTimeout overrides the default TimeoutSettings for an exporter. +// The default TimeoutSettings is 5 seconds. +func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption { + return func(o *internalOptions) { + o.TimeoutSettings = timeoutSettings + } } -// Construct the internalOptions from multiple ExporterOption. -func newBaseExporter(exporterFullName string, options ...ExporterOption) baseExporter { - be := baseExporter{ - exporterFullName: exporterFullName, +// WithRetry overrides the default RetrySettings for an exporter. +// The default RetrySettings is to disable retries. +func WithRetry(retrySettings RetrySettings) ExporterOption { + return func(o *internalOptions) { + o.RetrySettings = retrySettings } +} - for _, op := range options { - op(&be) +// WithQueue overrides the default QueueSettings for an exporter. +// The default QueueSettings is to disable queueing. +func WithQueue(queueSettings QueueSettings) ExporterOption { + return func(o *internalOptions) { + o.QueueSettings = queueSettings } +} + +// baseExporter contains common fields between different exporter types. +type baseExporter struct { + cfg configmodels.Exporter + sender requestSender + qrSender *queuedRetrySender + start Start + shutdown Shutdown + startOnce sync.Once + shutdownOnce sync.Once +} + +func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *baseExporter { + opts := fromConfiguredOptions(options...) + be := &baseExporter{ + cfg: cfg, + start: opts.Start, + shutdown: opts.Shutdown, + } + + be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings}) + be.sender = be.qrSender return be } +// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper. +// This can be used to wrap with observability (create spans, record metrics) the consumer sender. +func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) { + be.qrSender.consumerSender = f(be.qrSender.consumerSender) +} + +// Start all senders and exporter and is invoked during service start. func (be *baseExporter) Start(ctx context.Context, host component.Host) error { - if be.start != nil { - return be.start(ctx, host) - } - return nil + err := componenterror.ErrAlreadyStarted + be.startOnce.Do(func() { + // First start the wrapped exporter. + err = be.start(ctx, host) + if err != nil { + // TODO: Log errors, or check if it is recorded by the caller. + return + } + + // If no error then start the queuedRetrySender. + be.qrSender.start() + }) + return err } -// Shutdown stops the exporter and is invoked during shutdown. +// Shutdown all senders and exporter and is invoked during service shutdown. func (be *baseExporter) Shutdown(ctx context.Context) error { - if be.shutdown != nil { - return be.shutdown(ctx) + err := componenterror.ErrAlreadyStopped + be.shutdownOnce.Do(func() { + // First shutdown the queued retry sender + be.qrSender.shutdown() + // Last shutdown the wrapped exporter itself. + err = be.shutdown(ctx) + }) + return err +} + +// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender. +type timeoutSender struct { + cfg TimeoutSettings +} + +// send implements the requestSender interface +func (ts *timeoutSender) send(req request) (int, error) { + // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be + // updated because this deadline most likely is before the next one. + ctx := req.context() + if ts.cfg.Timeout > 0 { + var cancelFunc func() + ctx, cancelFunc = context.WithTimeout(req.context(), ts.cfg.Timeout) + defer cancelFunc() } - return nil + return req.export(ctx) } diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 1fc661a5d62..45bf9def720 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -23,22 +23,28 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configmodels" ) +var defaultExporterCfg = &configmodels.ExporterSettings{ + TypeVal: "test", + NameVal: "test", +} + func TestErrorToStatus(t *testing.T) { require.Equal(t, okStatus, errToStatus(nil)) require.Equal(t, trace.Status{Code: trace.StatusCodeUnknown, Message: "my_error"}, errToStatus(errors.New("my_error"))) } func TestBaseExporter(t *testing.T) { - be := newBaseExporter("test") + be := newBaseExporter(defaultExporterCfg) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) } func TestBaseExporterWithOptions(t *testing.T) { be := newBaseExporter( - "test", + defaultExporterCfg, WithStart(func(ctx context.Context, host component.Host) error { return errors.New("my error") }), WithShutdown(func(ctx context.Context) error { return errors.New("my error") })) require.Error(t, be.Start(context.Background(), componenttest.NewNopHost())) diff --git a/exporter/exporterhelper/logshelper.go b/exporter/exporterhelper/logshelper.go index d6e1e7f345c..71525f6d3de 100644 --- a/exporter/exporterhelper/logshelper.go +++ b/exporter/exporterhelper/logshelper.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/data" "go.opentelemetry.io/collector/obsreport" ) @@ -27,21 +28,47 @@ import ( // the number of dropped logs. type PushLogsData func(ctx context.Context, md data.Logs) (droppedTimeSeries int, err error) +type logsRequest struct { + baseRequest + ld data.Logs + pusher PushLogsData +} + +func newLogsRequest(ctx context.Context, ld data.Logs, pusher PushLogsData) request { + return &logsRequest{ + baseRequest: baseRequest{ctx: ctx}, + ld: ld, + pusher: pusher, + } +} + +func (req *logsRequest) onPartialError(partialErr consumererror.PartialError) request { + // TODO: Implement this + return req +} + +func (req *logsRequest) export(ctx context.Context) (int, error) { + return req.pusher(ctx, req.ld) +} + +func (req *logsRequest) count() int { + return req.ld.LogRecordCount() +} + type logsExporter struct { - baseExporter + *baseExporter pushLogsData PushLogsData } -func (me *logsExporter) ConsumeLogs(ctx context.Context, md data.Logs) error { - exporterCtx := obsreport.ExporterContext(ctx, me.exporterFullName) - _, err := me.pushLogsData(exporterCtx, md) +func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld data.Logs) error { + exporterCtx := obsreport.ExporterContext(ctx, lexp.cfg.Name()) + _, err := lexp.sender.send(newLogsRequest(exporterCtx, ld, lexp.pushLogsData)) return err } -// NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span. -// TODO: Add support for retries. -func NewLogsExporter(config configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) { - if config == nil { +// NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span. +func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) { + if cfg == nil { return nil, errNilConfig } @@ -49,22 +76,28 @@ func NewLogsExporter(config configmodels.Exporter, pushLogsData PushLogsData, op return nil, errNilPushLogsData } - pushLogsData = pushLogsWithObservability(pushLogsData, config.Name()) + be := newBaseExporter(cfg, options...) + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &logsExporterWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &logsExporter{ - baseExporter: newBaseExporter(config.Name(), options...), + baseExporter: be, pushLogsData: pushLogsData, }, nil } -func pushLogsWithObservability(next PushLogsData, exporterName string) PushLogsData { - return func(ctx context.Context, ld data.Logs) (int, error) { - ctx = obsreport.StartLogsExportOp(ctx, exporterName) - numDroppedLogs, err := next(ctx, ld) - - numLogs := ld.LogRecordCount() +type logsExporterWithObservability struct { + exporterName string + nextSender requestSender +} - obsreport.EndLogsExportOp(ctx, numLogs, numDroppedLogs, err) - return numLogs, err - } +func (lewo *logsExporterWithObservability) send(req request) (int, error) { + req.setContext(obsreport.StartLogsExportOp(req.context(), lewo.exporterName)) + numDroppedLogs, err := lewo.nextSender.send(req) + obsreport.EndLogsExportOp(req.context(), req.count(), numDroppedLogs, err) + return numDroppedLogs, err } diff --git a/exporter/exporterhelper/logshelper_test.go b/exporter/exporterhelper/logshelper_test.go index 0d20a7d158c..bd08d1d209e 100644 --- a/exporter/exporterhelper/logshelper_test.go +++ b/exporter/exporterhelper/logshelper_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/data" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/obsreport" @@ -43,6 +44,13 @@ var ( } ) +func TestLogsRequest(t *testing.T) { + mr := newLogsRequest(context.Background(), testdata.GenerateLogDataEmpty(), nil) + + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan()) + assert.Same(t, mr, mr.onPartialError(partialErr.(consumererror.PartialError))) +} + func TestLogsExporter_InvalidName(t *testing.T) { me, err := NewLogsExporter(nil, newPushLogsData(0, nil)) require.Nil(t, me) @@ -178,7 +186,7 @@ func generateLogsTraffic(t *testing.T, me component.LogExporter, numRequests int } } -func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantError error, numMetricPoints int64) { +func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantError error, numLogRecords int64) { ocSpansSaver := new(testOCTraceExporter) trace.RegisterExporter(ocSpansSaver) defer trace.UnregisterExporter(ocSpansSaver) @@ -201,13 +209,13 @@ func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantEr require.Equalf(t, parentSpan.SpanContext.SpanID, sd.ParentSpanID, "Exporter span not a child\nSpanData %v", sd) require.Equalf(t, errToStatus(wantError), sd.Status, "SpanData %v", sd) - sentMetricPoints := numMetricPoints - var failedToSendMetricPoints int64 + sentLogRecords := numLogRecords + var failedToSendLogRecords int64 if wantError != nil { - sentMetricPoints = 0 - failedToSendMetricPoints = numMetricPoints + sentLogRecords = 0 + failedToSendLogRecords = numLogRecords } - require.Equalf(t, sentMetricPoints, sd.Attributes[obsreport.SentLogRecordsKey], "SpanData %v", sd) - require.Equalf(t, failedToSendMetricPoints, sd.Attributes[obsreport.FailedToSendLogRecordsKey], "SpanData %v", sd) + require.Equalf(t, sentLogRecords, sd.Attributes[obsreport.SentLogRecordsKey], "SpanData %v", sd) + require.Equalf(t, failedToSendLogRecords, sd.Attributes[obsreport.FailedToSendLogRecordsKey], "SpanData %v", sd) } } diff --git a/exporter/exporterhelper/metricshelper.go b/exporter/exporterhelper/metricshelper.go index 385aeaaab7b..0b1c8717bde 100644 --- a/exporter/exporterhelper/metricshelper.go +++ b/exporter/exporterhelper/metricshelper.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/obsreport" @@ -30,21 +31,20 @@ import ( type PushMetricsDataOld func(ctx context.Context, td consumerdata.MetricsData) (droppedTimeSeries int, err error) type metricsExporterOld struct { - baseExporter + *baseExporter pushMetricsData PushMetricsDataOld } -func (me *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { - exporterCtx := obsreport.ExporterContext(ctx, me.exporterFullName) - _, err := me.pushMetricsData(exporterCtx, md) +func (mexp *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { + exporterCtx := obsreport.ExporterContext(ctx, mexp.cfg.Name()) + _, err := mexp.pushMetricsData(exporterCtx, md) return err } -// NewMetricsExporterOld creates an MetricsExporter that can record metrics and can wrap every request with a Span. -// If no internalOptions are passed it just adds the exporter format as a tag in the Context. +// NewMetricsExporterOld creates an MetricsExporter that records observability metrics and wraps every request with a Span. // TODO: Add support for retries. -func NewMetricsExporterOld(config configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) { - if config == nil { +func NewMetricsExporterOld(cfg configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) { + if cfg == nil { return nil, errNilConfig } @@ -52,10 +52,10 @@ func NewMetricsExporterOld(config configmodels.Exporter, pushMetricsData PushMet return nil, errNilPushMetricsData } - pushMetricsData = pushMetricsWithObservabilityOld(pushMetricsData, config.Name()) + pushMetricsData = pushMetricsWithObservabilityOld(pushMetricsData, cfg.Name()) return &metricsExporterOld{ - baseExporter: newBaseExporter(config.Name(), options...), + baseExporter: newBaseExporter(cfg, options...), pushMetricsData: pushMetricsData, }, nil } @@ -88,22 +88,49 @@ func NumTimeSeries(md consumerdata.MetricsData) int { // the number of dropped metrics. type PushMetricsData func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error) +type metricsRequest struct { + baseRequest + md pdata.Metrics + pusher PushMetricsData +} + +func newMetricsRequest(ctx context.Context, md pdata.Metrics, pusher PushMetricsData) request { + return &metricsRequest{ + baseRequest: baseRequest{ctx: ctx}, + md: md, + pusher: pusher, + } +} + +func (req *metricsRequest) onPartialError(consumererror.PartialError) request { + // TODO: implement this. + return req +} + +func (req *metricsRequest) export(ctx context.Context) (int, error) { + return req.pusher(ctx, req.md) +} + +func (req *metricsRequest) count() int { + _, numPoints := pdatautil.MetricAndDataPointCount(req.md) + return numPoints +} + type metricsExporter struct { - baseExporter - pushMetricsData PushMetricsData + *baseExporter + pusher PushMetricsData } -func (me *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { - exporterCtx := obsreport.ExporterContext(ctx, me.exporterFullName) - _, err := me.pushMetricsData(exporterCtx, md) +func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + exporterCtx := obsreport.ExporterContext(ctx, mexp.cfg.Name()) + req := newMetricsRequest(exporterCtx, md, mexp.pusher) + _, err := mexp.sender.send(req) return err } -// NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span. -// If no internalOptions are passed it just adds the exporter format as a tag in the Context. -// TODO: Add support for retries. -func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) { - if config == nil { +// NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span. +func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) { + if cfg == nil { return nil, errNilConfig } @@ -111,25 +138,35 @@ func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetric return nil, errNilPushMetricsData } - pushMetricsData = pushMetricsWithObservability(pushMetricsData, config.Name()) + be := newBaseExporter(cfg, options...) + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &metricsSenderWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &metricsExporter{ - baseExporter: newBaseExporter(config.Name(), options...), - pushMetricsData: pushMetricsData, + baseExporter: be, + pusher: pushMetricsData, }, nil } -func pushMetricsWithObservability(next PushMetricsData, exporterName string) PushMetricsData { - return func(ctx context.Context, md pdata.Metrics) (int, error) { - ctx = obsreport.StartMetricsExportOp(ctx, exporterName) - numDroppedMetrics, err := next(ctx, md) +type metricsSenderWithObservability struct { + exporterName string + nextSender requestSender +} - // TODO: this is not ideal: it should come from the next function itself. - // temporarily loading it from internal format. Once full switch is done - // to new metrics will remove this. - numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(md) +func (mewo *metricsSenderWithObservability) send(req request) (int, error) { + req.setContext(obsreport.StartMetricsExportOp(req.context(), mewo.exporterName)) + numDroppedMetrics, err := mewo.nextSender.send(req) - obsreport.EndMetricsExportOp(ctx, numPoints, numReceivedMetrics, numDroppedMetrics, err) - return numReceivedMetrics, err - } + // TODO: this is not ideal: it should come from the next function itself. + // temporarily loading it from internal format. Once full switch is done + // to new metrics will remove this. + mReq := req.(*metricsRequest) + numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(mReq.md) + + obsreport.EndMetricsExportOp(req.context(), numPoints, numReceivedMetrics, numDroppedMetrics, err) + return numReceivedMetrics, err } diff --git a/exporter/exporterhelper/metricshelper_test.go b/exporter/exporterhelper/metricshelper_test.go index 3f96826b115..c78f7565f77 100644 --- a/exporter/exporterhelper/metricshelper_test.go +++ b/exporter/exporterhelper/metricshelper_test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/internal/data/testdata" @@ -46,6 +47,14 @@ var ( } ) +func TestMetricsRequest(t *testing.T) { + mr := newMetricsRequest(context.Background(), pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataEmpty()), nil) + + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan()) + assert.Same(t, mr, mr.onPartialError(partialErr.(consumererror.PartialError))) + assert.Equal(t, 0, mr.count()) +} + func TestMetricsExporter_InvalidName(t *testing.T) { me, err := NewMetricsExporter(nil, newPushMetricsData(0, nil)) require.Nil(t, me) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go new file mode 100644 index 00000000000..aa4c4b01e34 --- /dev/null +++ b/exporter/exporterhelper/queued_retry.go @@ -0,0 +1,204 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "errors" + "fmt" + "time" + + "github.com/cenkalti/backoff" + "github.com/jaegertracing/jaeger/pkg/queue" + + "go.opentelemetry.io/collector/consumer/consumererror" +) + +// QueueSettings defines configuration for queueing batches before sending to the consumerSender. +type QueueSettings struct { + // Disabled indicates whether to not enqueue batches before sending to the consumerSender. + Disabled bool `mapstructure:"disabled"` + // NumConsumers is the number of consumers from the queue. + NumConsumers int `mapstructure:"num_consumers"` + // QueueSize is the maximum number of batches allowed in queue at a given time. + QueueSize int `mapstructure:"queue_size"` +} + +// CreateDefaultQueueSettings returns the default settings for QueueSettings. +func CreateDefaultQueueSettings() QueueSettings { + return QueueSettings{ + Disabled: false, + NumConsumers: 10, + // For 5000 queue elements at 100 requests/sec gives about 50 sec of survival of destination outage. + // This is a pretty decent value for production. + // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, + // multiply that by the number of requests per seconds. + QueueSize: 5000, + } +} + +// RetrySettings defines configuration for retrying batches in case of export failure. +// The current supported strategy is exponential backoff. +type RetrySettings struct { + // Disabled indicates whether to not retry sending batches in case of export failure. + Disabled bool `mapstructure:"disabled"` + // InitialInterval the time to wait after the first failure before retrying. + InitialInterval time.Duration `mapstructure:"initial_interval"` + // MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between + // consecutive retries will always be `MaxInterval`. + MaxInterval time.Duration `mapstructure:"max_interval"` + // MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch. + // Once this value is reached, the data is discarded. + MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"` +} + +// CreateDefaultRetrySettings returns the default settings for RetrySettings. +func CreateDefaultRetrySettings() RetrySettings { + return RetrySettings{ + Disabled: false, + InitialInterval: 5 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 5 * time.Minute, + } +} + +type queuedRetrySender struct { + cfg QueueSettings + consumerSender requestSender + queue *queue.BoundedQueue + retryStopCh chan struct{} +} + +var errorRefused = errors.New("failed to add to the queue") + +func newQueuedRetrySender(qCfg QueueSettings, rCfg RetrySettings, nextSender requestSender) *queuedRetrySender { + retryStopCh := make(chan struct{}) + return &queuedRetrySender{ + cfg: qCfg, + consumerSender: &retrySender{ + cfg: rCfg, + nextSender: nextSender, + stopCh: retryStopCh, + }, + queue: queue.NewBoundedQueue(qCfg.QueueSize, func(item interface{}) {}), + retryStopCh: retryStopCh, + } +} + +// start is invoked during service startup. +func (qrs *queuedRetrySender) start() { + qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) { + value := item.(request) + _, _ = qrs.consumerSender.send(value) + }) +} + +// send implements the requestSender interface +func (qrs *queuedRetrySender) send(req request) (int, error) { + if qrs.cfg.Disabled { + return qrs.consumerSender.send(req) + } + + if !qrs.queue.Produce(req) { + return req.count(), errorRefused + } + + return 0, nil +} + +// shutdown is invoked during service shutdown. +func (qrs *queuedRetrySender) shutdown() { + // First stop the retry goroutines, so that unblocks the queue workers. + close(qrs.retryStopCh) + + // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only + // try once every request. + qrs.queue.Stop() +} + +// TODO: Clean this by forcing all exporters to return an internal error type that always include the information about retries. +type throttleRetry struct { + error + delay time.Duration +} + +func NewThrottleRetry(err error, delay time.Duration) error { + return &throttleRetry{ + error: err, + delay: delay, + } +} + +type retrySender struct { + cfg RetrySettings + nextSender requestSender + stopCh chan struct{} +} + +// send implements the requestSender interface +func (rs *retrySender) send(req request) (int, error) { + if rs.cfg.Disabled { + return rs.nextSender.send(req) + } + + expBackoff := backoff.NewExponentialBackOff() + expBackoff.InitialInterval = rs.cfg.InitialInterval + expBackoff.MaxInterval = rs.cfg.MaxInterval + expBackoff.MaxElapsedTime = rs.cfg.MaxElapsedTime + for { + droppedItems, err := rs.nextSender.send(req) + + if err == nil { + return droppedItems, nil + } + + // Immediately drop data on permanent errors. + if consumererror.IsPermanent(err) { + return droppedItems, err + } + + // If partial error, update data and stats with non exported data. + if partialErr, isPartial := err.(consumererror.PartialError); isPartial { + req = req.onPartialError(partialErr) + } + + backoffDelay := expBackoff.NextBackOff() + + if backoffDelay == backoff.Stop { + // throw away the batch + return req.count(), fmt.Errorf("max elapsed time expired %w", err) + } + + if throttleErr, isThrottle := err.(*throttleRetry); isThrottle { + backoffDelay = max(backoffDelay, throttleErr.delay) + } + + // back-off, but get interrupted when shutting down or request is cancelled or timed out. + select { + case <-req.context().Done(): + return req.count(), fmt.Errorf("request is cancelled or timed out %w", err) + case <-rs.stopCh: + return req.count(), fmt.Errorf("interrupted due to shutdown %w", err) + case <-time.After(backoffDelay): + } + } +} + +// max returns the larger of x or y. +func max(x, y time.Duration) time.Duration { + if x < y { + return y + } + return x +} diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go new file mode 100644 index 00000000000..995aeaa30ea --- /dev/null +++ b/exporter/exporterhelper/queued_retry_test.go @@ -0,0 +1,500 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/internal/data/testdata" + "go.opentelemetry.io/collector/obsreport/obsreporttest" +) + +func TestQueuedRetry_DropOnPermanentError(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(consumererror.Permanent(errors.New("bad data"))) + + qCfg := CreateDefaultQueueSettings() + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + <-time.After(200 * time.Millisecond) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_DropOnNoRetry(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueueSettings() + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = true + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + <-time.After(200 * time.Millisecond) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_PartialError(t *testing.T) { + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan()) + mockP := newMockConcurrentExporter() + mockP.updateError(partialErr) + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + // There is a small race condition in this test, but expect to execute this in less than 1 second. + mockP.updateError(nil) + mockP.waitGroup.Add(1) + mockP.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockP.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_StopWhileWaiting(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 30 * time.Minute + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + + // Enqueue another request to ensure when calling shutdown we drain the queue. + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 3, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + + mockP.checkNumRequests(t, 1) + // TODO: Ensure that queue is drained, and uncomment the next 3 lines. + // https://github.com/jaegertracing/jaeger/pull/2349 + // ocs.checkSendItemsCount(t, 3) + ocs.checkDroppedItemsCount(t, 2) + // require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_PreserveCancellation(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ctx, cancelFunc := context.WithCancel(context.Background()) + start := time.Now() + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(ctx, 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + + cancelFunc() + + mockP.checkNumRequests(t, 1) + require.Zero(t, be.qrSender.queue.Size()) + + // Stop should succeed and not retry. + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + + // We should ensure that we actually did not wait for the initial backoff (30 sec). + assert.True(t, 5*time.Second > time.Since(start)) + + // In the newMockConcurrentExporter we count requests and items even for failed requests. + mockP.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_MaxElapsedTime(t *testing.T) { + mockP := newMockConcurrentExporter() + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 100 * time.Millisecond + rCfg.MaxElapsedTime = 1 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + // Add an item that will always fail. + droppedItems, err := be.sender.send(newErrorRequest(context.Background())) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests. + mockP.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 7) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_ThrottleError(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(NewThrottleRetry(errors.New("throttle error"), 1*time.Second)) + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 100 * time.Millisecond + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + start := time.Now() + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + // There is a small race condition in this test, but expect to execute this in less than 2 second. + mockP.updateError(nil) + mockP.waitGroup.Add(1) + mockP.awaitAsyncProcessing() + + // The initial backoff is 100ms, but because of the throttle this should wait at least 1 seconds. + assert.True(t, 1*time.Second < time.Since(start)) + + mockP.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_RetryOnError(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialInterval = 2 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 0) + + // There is a small race condition in this test, but expect to execute this in less than 2 second. + mockP.updateError(nil) + mockP.waitGroup.Add(1) + mockP.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockP.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) +} + +func TestQueuedRetry_DropOnFull(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueueSettings() + qCfg.QueueSize = 0 + rCfg := CreateDefaultRetrySettings() + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.Error(t, err) + assert.Equal(t, 2, droppedItems) +} + +func TestQueuedRetryHappyPath(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + mockP := newMockConcurrentExporter() + qCfg := CreateDefaultQueueSettings() + rCfg := CreateDefaultRetrySettings() + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + wantRequests := 10 + for i := 0; i < wantRequests; i++ { + mockP.run(func() { + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + } + + // Wait until all batches received + mockP.awaitAsyncProcessing() + + mockP.checkNumRequests(t, wantRequests) + ocs.checkSendItemsCount(t, 2*wantRequests) + ocs.checkDroppedItemsCount(t, 0) +} + +type mockErrorRequest struct { + baseRequest +} + +func (mer *mockErrorRequest) export(_ context.Context) (int, error) { + return 0, errors.New("transient error") +} + +func (mer *mockErrorRequest) onPartialError(consumererror.PartialError) request { + return mer +} + +func (mer *mockErrorRequest) count() int { + return 7 +} + +func newErrorRequest(ctx context.Context) request { + return &mockErrorRequest{ + baseRequest: baseRequest{ctx: ctx}, + } +} + +type mockRequest struct { + baseRequest + cnt int + mce *mockConcurrentExporter +} + +func (m *mockRequest) export(_ context.Context) (int, error) { + err := m.mce.export() + if err != nil { + return m.cnt, err + } + return 0, nil +} + +func (m *mockRequest) onPartialError(consumererror.PartialError) request { + return &mockRequest{ + baseRequest: m.baseRequest, + cnt: 1, + mce: m.mce, + } +} + +func (m *mockRequest) count() int { + return m.cnt +} + +func newMockRequest(ctx context.Context, cnt int, mce *mockConcurrentExporter) request { + return &mockRequest{ + baseRequest: baseRequest{ctx: ctx}, + cnt: cnt, + mce: mce, + } +} + +type mockConcurrentExporter struct { + waitGroup *sync.WaitGroup + mu sync.Mutex + consumeError error + requestCount int64 + stopped int32 +} + +func newMockConcurrentExporter() *mockConcurrentExporter { + return &mockConcurrentExporter{waitGroup: new(sync.WaitGroup)} +} + +func (p *mockConcurrentExporter) export() error { + if atomic.LoadInt32(&p.stopped) == 1 { + return nil + } + atomic.AddInt64(&p.requestCount, 1) + p.mu.Lock() + defer p.mu.Unlock() + defer p.waitGroup.Done() + return p.consumeError +} + +func (p *mockConcurrentExporter) updateError(err error) { + p.mu.Lock() + defer p.mu.Unlock() + p.consumeError = err +} + +func (p *mockConcurrentExporter) checkNumRequests(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&p.requestCount)) +} + +func (p *mockConcurrentExporter) run(fn func()) { + p.waitGroup.Add(1) + fn() +} + +func (p *mockConcurrentExporter) awaitAsyncProcessing() { + p.waitGroup.Wait() +} + +func (p *mockConcurrentExporter) stop() { + atomic.StoreInt32(&p.stopped, 1) +} + +type observabilityConsumerSender struct { + sentItemsCount int64 + droppedItemsCount int64 + nextSender requestSender +} + +func (ocs *observabilityConsumerSender) send(req request) (int, error) { + dic, err := ocs.nextSender.send(req) + atomic.AddInt64(&ocs.sentItemsCount, int64(req.count()-dic)) + atomic.AddInt64(&ocs.droppedItemsCount, int64(dic)) + return dic, err +} + +func (ocs *observabilityConsumerSender) checkSendItemsCount(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&ocs.sentItemsCount)) +} + +func (ocs *observabilityConsumerSender) checkDroppedItemsCount(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&ocs.droppedItemsCount)) +} diff --git a/exporter/exporterhelper/tracehelper.go b/exporter/exporterhelper/tracehelper.go index 463924fd2b2..119d57c7ddf 100644 --- a/exporter/exporterhelper/tracehelper.go +++ b/exporter/exporterhelper/tracehelper.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/obsreport" ) @@ -28,32 +29,25 @@ import ( // returns the number of dropped spans. type traceDataPusherOld func(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error) -// traceDataPusher is a helper function that is similar to ConsumeTraceData but also -// returns the number of dropped spans. -type traceDataPusher func(ctx context.Context, td pdata.Traces) (droppedSpans int, err error) - -// traceExporterOld implements the exporter with additional helper internalOptions. type traceExporterOld struct { - baseExporter + *baseExporter dataPusher traceDataPusherOld } -func (te *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { - exporterCtx := obsreport.ExporterContext(ctx, te.exporterFullName) - _, err := te.dataPusher(exporterCtx, td) +func (texp *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { + exporterCtx := obsreport.ExporterContext(ctx, texp.cfg.Name()) + _, err := texp.dataPusher(exporterCtx, td) return err } -// NewTraceExporterOld creates an TraceExporterOld that can record metrics and can wrap every -// request with a Span. If no internalOptions are passed it just adds the exporter format as a -// tag in the Context. +// NewTraceExporterOld creates an TraceExporterOld that records observability metrics and wraps every request with a Span. func NewTraceExporterOld( - config configmodels.Exporter, + cfg configmodels.Exporter, dataPusher traceDataPusherOld, options ...ExporterOption, ) (component.TraceExporterOld, error) { - if config == nil { + if cfg == nil { return nil, errNilConfig } @@ -61,10 +55,10 @@ func NewTraceExporterOld( return nil, errNilPushTraceData } - dataPusher = dataPusher.withObservability(config.Name()) + dataPusher = dataPusher.withObservability(cfg.Name()) return &traceExporterOld{ - baseExporter: newBaseExporter(config.Name(), options...), + baseExporter: newBaseExporter(cfg, options...), dataPusher: dataPusher, }, nil } @@ -86,29 +80,56 @@ func (p traceDataPusherOld) withObservability(exporterName string) traceDataPush } } +// traceDataPusher is a helper function that is similar to ConsumeTraceData but also +// returns the number of dropped spans. +type traceDataPusher func(ctx context.Context, td pdata.Traces) (droppedSpans int, err error) + +type tracesRequest struct { + baseRequest + td pdata.Traces + pusher traceDataPusher +} + +func newTracesRequest(ctx context.Context, td pdata.Traces, pusher traceDataPusher) request { + return &tracesRequest{ + baseRequest: baseRequest{ctx: ctx}, + td: td, + pusher: pusher, + } +} + +func (req *tracesRequest) onPartialError(partialErr consumererror.PartialError) request { + return newTracesRequest(req.ctx, partialErr.GetTraces(), req.pusher) +} + +func (req *tracesRequest) export(ctx context.Context) (int, error) { + return req.pusher(ctx, req.td) +} + +func (req *tracesRequest) count() int { + return req.td.SpanCount() +} + type traceExporter struct { - baseExporter - dataPusher traceDataPusher + *baseExporter + pusher traceDataPusher } -func (te *traceExporter) ConsumeTraces( - ctx context.Context, - td pdata.Traces, -) error { - exporterCtx := obsreport.ExporterContext(ctx, te.exporterFullName) - _, err := te.dataPusher(exporterCtx, td) +func (texp *traceExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) error { + exporterCtx := obsreport.ExporterContext(ctx, texp.cfg.Name()) + req := newTracesRequest(exporterCtx, td, texp.pusher) + _, err := texp.sender.send(req) return err } -// NewTraceExporter creates a TraceExporter that can record metrics and can wrap -// every request with a Span. +// NewTraceExporter creates a TraceExporter that records observability metrics and wraps every request with a Span. func NewTraceExporter( - config configmodels.Exporter, + cfg configmodels.Exporter, dataPusher traceDataPusher, options ...ExporterOption, ) (component.TraceExporter, error) { - if config == nil { + if cfg == nil { return nil, errNilConfig } @@ -116,27 +137,33 @@ func NewTraceExporter( return nil, errNilPushTraceData } - dataPusher = dataPusher.withObservability(config.Name()) + be := newBaseExporter(cfg, options...) + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &tracesExporterWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &traceExporter{ - baseExporter: newBaseExporter(config.Name(), options...), - dataPusher: dataPusher, + baseExporter: be, + pusher: dataPusher, }, nil } -// withObservability wraps the current pusher into a function that records -// the observability signals during the pusher execution. -func (p traceDataPusher) withObservability(exporterName string) traceDataPusher { - return func(ctx context.Context, td pdata.Traces) (int, error) { - ctx = obsreport.StartTraceDataExportOp(ctx, exporterName) - // Forward the data to the next consumer (this pusher is the next). - droppedSpans, err := p(ctx, td) +type tracesExporterWithObservability struct { + exporterName string + nextSender requestSender +} - // TODO: this is not ideal: it should come from the next function itself. - // temporarily loading it from internal format. Once full switch is done - // to new metrics will remove this. - numSpans := td.SpanCount() - obsreport.EndTraceDataExportOp(ctx, numSpans, droppedSpans, err) - return droppedSpans, err - } +func (tewo *tracesExporterWithObservability) send(req request) (int, error) { + req.setContext(obsreport.StartTraceDataExportOp(req.context(), tewo.exporterName)) + // Forward the data to the next consumer (this pusher is the next). + droppedSpans, err := tewo.nextSender.send(req) + + // TODO: this is not ideal: it should come from the next function itself. + // temporarily loading it from internal format. Once full switch is done + // to new metrics will remove this. + obsreport.EndTraceDataExportOp(req.context(), req.count(), droppedSpans, err) + return droppedSpans, err } diff --git a/exporter/exporterhelper/tracehelper_test.go b/exporter/exporterhelper/tracehelper_test.go index eef81757b3d..9c18d6d7321 100644 --- a/exporter/exporterhelper/tracehelper_test.go +++ b/exporter/exporterhelper/tracehelper_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/obsreport" @@ -46,6 +47,13 @@ var ( } ) +func TestTracesRequest(t *testing.T) { + mr := newTracesRequest(context.Background(), testdata.GenerateTraceDataOneSpan(), nil) + + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataEmpty()) + assert.EqualValues(t, newTracesRequest(context.Background(), testdata.GenerateTraceDataEmpty(), nil), mr.onPartialError(partialErr.(consumererror.PartialError))) +} + // TODO https://go.opentelemetry.io/collector/issues/266 // Migrate tests to use testify/assert instead of t.Fatal pattern. func TestTraceExporterOld_InvalidName(t *testing.T) { diff --git a/exporter/opencensusexporter/factory_test.go b/exporter/opencensusexporter/factory_test.go index 7d1854e6f6d..b7eee6bbb9f 100644 --- a/exporter/opencensusexporter/factory_test.go +++ b/exporter/opencensusexporter/factory_test.go @@ -138,7 +138,7 @@ func TestCreateTraceExporter(t *testing.T) { }, }, { - name: "NumWorkers", + name: "NumConsumers", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: rcvCfg.NetAddr.Endpoint, diff --git a/exporter/otlpexporter/README.md b/exporter/otlpexporter/README.md index 820659e7acf..107773dc4e1 100644 --- a/exporter/otlpexporter/README.md +++ b/exporter/otlpexporter/README.md @@ -28,6 +28,19 @@ The following settings can be optionally configured: [grpc.WithInsecure()](https://godoc.org/google.golang.org/grpc#WithInsecure). - `balancer_name`(default = pick_first): Sets the balancer in grpclb_policy to discover the servers. See [grpc loadbalancing example](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md). +- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend. +- `retry_on_failure` + - `disabled` (default = false) + - `initial_interval` (default = 5s): Time to wait after the first failure before retrying; ignored if `disabled` is `true` + - `max_interval` (default = 30s): Is the upper bound on backoff; ignored if `disabled` is `true` + - `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if `disabled` is `true` +- `sending_queue` + - `disabled` (default = true) + - `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `disabled` is `true` + - `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `disabled` is `true`; + User should calculate this as `num_seconds * requests_per_second` where: + - `num_seconds` is the number of seconds to buffer in case of a backend outage + - `requests_per_second` is the average number of requests per seconds. Example: diff --git a/exporter/otlpexporter/config.go b/exporter/otlpexporter/config.go index be4dfa65521..fe9c3f0ded8 100644 --- a/exporter/otlpexporter/config.go +++ b/exporter/otlpexporter/config.go @@ -17,11 +17,15 @@ package otlpexporter import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) // Config defines configuration for OpenCensus exporter. type Config struct { - configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.QueueSettings `mapstructure:"sending_queue"` + exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. } diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index 5433e59ea05..821e295494d 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) func TestLoadConfig(t *testing.T) { @@ -49,6 +50,20 @@ func TestLoadConfig(t *testing.T) { NameVal: "otlp/2", TypeVal: "otlp", }, + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: 10 * time.Second, + }, + RetrySettings: exporterhelper.RetrySettings{ + Disabled: false, + InitialInterval: 10 * time.Second, + MaxInterval: 1 * time.Minute, + MaxElapsedTime: 10 * time.Minute, + }, + QueueSettings: exporterhelper.QueueSettings{ + Disabled: false, + NumConsumers: 2, + QueueSize: 10, + }, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index 6134d066297..4b9f28e9c2f 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -39,11 +39,17 @@ func NewFactory() component.ExporterFactory { } func createDefaultConfig() configmodels.Exporter { + // TODO: Enable the queued settings. + qs := exporterhelper.CreateDefaultQueueSettings() + qs.Disabled = true return &Config{ ExporterSettings: configmodels.ExporterSettings{ TypeVal: typeStr, NameVal: typeStr, }, + TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(), + RetrySettings: exporterhelper.CreateDefaultRetrySettings(), + QueueSettings: qs, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{}, // We almost read 0 bytes, so no need to tune ReadBufferSize. @@ -61,9 +67,13 @@ func createTraceExporter( if err != nil { return nil, err } + oCfg := cfg.(*Config) oexp, err := exporterhelper.NewTraceExporter( cfg, oce.pushTraceData, + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown)) if err != nil { return nil, err @@ -81,9 +91,13 @@ func createMetricsExporter( if err != nil { return nil, err } + oCfg := cfg.(*Config) oexp, err := exporterhelper.NewMetricsExporter( cfg, oce.pushMetricsData, + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown), ) if err != nil { @@ -102,9 +116,13 @@ func createLogExporter( if err != nil { return nil, err } + oCfg := cfg.(*Config) oexp, err := exporterhelper.NewLogsExporter( cfg, oce.pushLogData, + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown), ) if err != nil { diff --git a/exporter/otlpexporter/factory_test.go b/exporter/otlpexporter/factory_test.go index c554592748f..6cf00d55959 100644 --- a/exporter/otlpexporter/factory_test.go +++ b/exporter/otlpexporter/factory_test.go @@ -111,7 +111,7 @@ func TestCreateTraceExporter(t *testing.T) { }, }, { - name: "NumWorkers", + name: "NumConsumers", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: endpoint, diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index f0c33704fcb..25586ff0d8d 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -18,20 +18,19 @@ import ( "context" "errors" "fmt" - "sync" "time" - "github.com/cenkalti/backoff" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/internal/data" otlplogs "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1" otlpmetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/metrics/v1" @@ -41,9 +40,7 @@ import ( type exporterImp struct { // Input configuration. config *Config - - stopOnce sync.Once - w sender + w sender } type sender interface { @@ -54,8 +51,7 @@ type sender interface { } var ( - errTimeout = errors.New("timeout") - errFatalError = errors.New("fatal error sending to server") + errPermanentError = consumererror.Permanent(errors.New("fatal error sending to server")) ) // Crete new exporter and start it. The exporter will begin connecting but @@ -78,12 +74,7 @@ func newExporter(cfg configmodels.Exporter) (*exporterImp, error) { } func (e *exporterImp) shutdown(context.Context) error { - err := componenterror.ErrAlreadyStopped - e.stopOnce.Do(func() { - // Close the connection. - err = e.w.stop() - }) - return err + return e.w.stop() } func (e *exporterImp) pushTraceData(ctx context.Context, td pdata.Traces) (int, error) { @@ -160,24 +151,18 @@ func (gs *grpcSender) stop() error { } func (gs *grpcSender) exportTrace(ctx context.Context, request *otlptrace.ExportTraceServiceRequest) error { - return exportRequest(gs.enhanceContext(ctx), func(ctx context.Context) error { - _, err := gs.traceExporter.Export(ctx, request, grpc.WaitForReady(gs.waitForReady)) - return err - }) + _, err := gs.traceExporter.Export(gs.enhanceContext(ctx), request, grpc.WaitForReady(gs.waitForReady)) + return processError(err) } func (gs *grpcSender) exportMetrics(ctx context.Context, request *otlpmetrics.ExportMetricsServiceRequest) error { - return exportRequest(gs.enhanceContext(ctx), func(ctx context.Context) error { - _, err := gs.metricExporter.Export(ctx, request, grpc.WaitForReady(gs.waitForReady)) - return err - }) + _, err := gs.metricExporter.Export(gs.enhanceContext(ctx), request, grpc.WaitForReady(gs.waitForReady)) + return processError(err) } func (gs *grpcSender) exportLogs(ctx context.Context, request *otlplogs.ExportLogsServiceRequest) error { - return exportRequest(gs.enhanceContext(ctx), func(ctx context.Context) error { - _, err := gs.logExporter.Export(ctx, request, grpc.WaitForReady(gs.waitForReady)) - return err - }) + _, err := gs.logExporter.Export(gs.enhanceContext(ctx), request, grpc.WaitForReady(gs.waitForReady)) + return processError(err) } func (gs *grpcSender) enhanceContext(ctx context.Context) context.Context { @@ -190,63 +175,36 @@ func (gs *grpcSender) enhanceContext(ctx context.Context) context.Context { // Send a trace or metrics request to the server. "perform" function is expected to make // the actual gRPC unary call that sends the request. This function implements the // common OTLP logic around request handling such as retries and throttling. -func exportRequest(ctx context.Context, perform func(ctx context.Context) error) error { - - expBackoff := backoff.NewExponentialBackOff() - - // Spend max 15 mins on this operation. This is just a reasonable number that - // gives plenty of time for typical quick transient errors to resolve. - expBackoff.MaxElapsedTime = time.Minute * 15 - - for { - // Send to server. - err := perform(ctx) - - if err == nil { - // Request is successful, we are done. - return nil - } - - // We have an error, check gRPC status code. - - status := status.Convert(err) - - statusCode := status.Code() - if statusCode == codes.OK { - // Not really an error, still success. - return nil - } +func processError(err error) error { + if err == nil { + // Request is successful, we are done. + return nil + } - // Now, this is this a real error. + // We have an error, check gRPC status code. - if !shouldRetry(statusCode) { - // It is not a retryable error, we should not retry. - return errFatalError - } + st := status.Convert(err) + if st.Code() == codes.OK { + // Not really an error, still success. + return nil + } - // Need to retry. + // Now, this is this a real error. - // Check if server returned throttling information. - waitDuration := getThrottleDuration(status) - if waitDuration == 0 { - // No explicit throttle duration. Use exponential backoff strategy. - waitDuration = expBackoff.NextBackOff() - if waitDuration == backoff.Stop { - // We run out of max time allocated to this operation. - return errTimeout - } - } + if !shouldRetry(st.Code()) { + // It is not a retryable error, we should not retry. + return errPermanentError + } - // Wait until one of the conditions below triggers. - select { - case <-ctx.Done(): - // This request is cancelled or timed out. - return errTimeout + // Need to retry. - case <-time.After(waitDuration): - // Time to try again. - } + // Check if server returned throttling information. + throttleDuration := getThrottleDuration(st) + if throttleDuration != 0 { + return exporterhelper.NewThrottleRetry(err, throttleDuration) } + + return err } func shouldRetry(code codes.Code) bool { diff --git a/exporter/otlpexporter/testdata/config.yaml b/exporter/otlpexporter/testdata/config.yaml index 737fabdefaf..38787c3b0a4 100644 --- a/exporter/otlpexporter/testdata/config.yaml +++ b/exporter/otlpexporter/testdata/config.yaml @@ -10,6 +10,16 @@ exporters: endpoint: "1.2.3.4:1234" compression: "on" ca_file: /var/lib/mycert.pem + timeout: 10s + sending_queue: + disabled: false + num_consumers: 2 + queue_size: 10 + retry_on_failure: + disabled: false + initial_interval: 10s + max_interval: 60s + max_elapsed_time: 10m per_rpc_auth: type: bearer bearer_token: some-token diff --git a/processor/queuedprocessor/config.go b/processor/queuedprocessor/config.go index 9b71ed78d59..c7d1c6335a0 100644 --- a/processor/queuedprocessor/config.go +++ b/processor/queuedprocessor/config.go @@ -24,7 +24,7 @@ import ( type Config struct { configmodels.ProcessorSettings `mapstructure:",squash"` - // NumWorkers is the number of queue workers that dequeue batches and send them out. + // NumConsumers is the number of queue workers that dequeue batches and send them out. NumWorkers int `mapstructure:"num_workers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"`