From eb115536d67ca0ebc0059074b2fb683d8eec224e Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 16 Jul 2020 14:42:58 -0700 Subject: [PATCH 1/2] Add support for queued retry in the exporter helper. Changed only the OTLP exporter for the moment to use the new settings. Timeout is enabled for all the exporters. Fixes #1193 There are some missing features that will be added in a followup PR: 1. Enforcing errors. For the moment added the Throttle error as a hack to keep backwards compatibility with OTLP. 2. Enable queued and retry for all exporters. 3. Fix observability metrics for the case when requests are dropped because the queue is full. --- exporter/exporterhelper/common.go | 163 ++++++- exporter/exporterhelper/common_test.go | 10 +- exporter/exporterhelper/logshelper.go | 69 ++- exporter/exporterhelper/logshelper_test.go | 22 +- exporter/exporterhelper/metricshelper.go | 103 ++-- exporter/exporterhelper/metricshelper_test.go | 9 + exporter/exporterhelper/queued_retry.go | 196 ++++++++ exporter/exporterhelper/queued_retry_test.go | 451 ++++++++++++++++++ exporter/exporterhelper/tracehelper.go | 113 +++-- exporter/exporterhelper/tracehelper_test.go | 8 + exporter/otlpexporter/README.md | 10 + exporter/otlpexporter/config.go | 6 +- exporter/otlpexporter/config_test.go | 15 + exporter/otlpexporter/factory.go | 18 + exporter/otlpexporter/otlp.go | 110 ++--- exporter/otlpexporter/testdata/config.yaml | 10 + 16 files changed, 1121 insertions(+), 192 deletions(-) create mode 100644 exporter/exporterhelper/queued_retry.go create mode 100644 exporter/exporterhelper/queued_retry_test.go diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index bf5f27c403b..ef27874685d 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -16,16 +16,65 @@ package exporterhelper import ( "context" + "sync" + "time" "go.opencensus.io/trace" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" ) var ( okStatus = trace.Status{Code: trace.StatusCodeOK} ) +type TimeoutSettings struct { + // Timeout is the timeout for each operation. + Timeout time.Duration `mapstructure:"timeout"` +} + +func CreateDefaultTimeoutSettings() TimeoutSettings { + return TimeoutSettings{ + Timeout: 5 * time.Second, + } +} + +type settings struct { + configmodels.Exporter + TimeoutSettings + QueuedSettings + RetrySettings +} + +type request interface { + context() context.Context + setContext(context.Context) + export(ctx context.Context) (int, error) + // Returns a new queue request that contains the items left to be exported. + onPartialError(consumererror.PartialError) request + // Returns the cnt of spans/metric points or log records. + count() int +} + +type requestSender interface { + send(req request) (int, error) +} + +type baseRequest struct { + ctx context.Context +} + +func (req *baseRequest) context() context.Context { + return req.ctx +} + +func (req *baseRequest) setContext(ctx context.Context) { + req.ctx = ctx +} + // Start specifies the function invoked when the exporter is being started. type Start func(context.Context, component.Host) error @@ -51,37 +100,121 @@ func WithStart(start Start) ExporterOption { } } +// WithShutdown overrides the default TimeoutSettings for an exporter. +// The default TimeoutSettings is 5 seconds. +func WithTimeout(timeout TimeoutSettings) ExporterOption { + return func(o *baseExporter) { + o.cfg.TimeoutSettings = timeout + } +} + +// WithRetry overrides the default RetrySettings for an exporter. +// The default RetrySettings is to disable retries. +func WithRetry(retry RetrySettings) ExporterOption { + return func(o *baseExporter) { + o.cfg.RetrySettings = retry + } +} + +// WithQueued overrides the default QueuedSettings for an exporter. +// The default QueuedSettings is to disable queueing. +func WithQueued(queued QueuedSettings) ExporterOption { + return func(o *baseExporter) { + o.cfg.QueuedSettings = queued + } +} + // internalOptions contains internalOptions concerning how an Exporter is configured. type baseExporter struct { - exporterFullName string - start Start - shutdown Shutdown + cfg *settings + sender requestSender + rSender *retrySender + qSender *queuedSender + start Start + shutdown Shutdown + startOnce sync.Once + shutdownOnce sync.Once } // Construct the internalOptions from multiple ExporterOption. -func newBaseExporter(exporterFullName string, options ...ExporterOption) baseExporter { - be := baseExporter{ - exporterFullName: exporterFullName, +func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *baseExporter { + be := &baseExporter{ + cfg: &settings{ + Exporter: cfg, + TimeoutSettings: CreateDefaultTimeoutSettings(), + // TODO: Enable queuing by default (call CreateDefaultQueuedSettings + QueuedSettings: QueuedSettings{Disabled: true}, + // TODO: Enable retry by default (call CreateDefaultRetrySettings) + RetrySettings: RetrySettings{Disabled: true}, + }, } for _, op := range options { - op(&be) + op(be) + } + + if be.start == nil { + be.start = func(ctx context.Context, host component.Host) error { return nil } } + if be.shutdown == nil { + be.shutdown = func(ctx context.Context) error { return nil } + } + + be.sender = &timeoutSender{cfg: &be.cfg.TimeoutSettings} + + be.rSender = newRetrySender(&be.cfg.RetrySettings, be.sender) + be.sender = be.rSender + + be.qSender = newQueuedSender(&be.cfg.QueuedSettings, be.sender) + be.sender = be.qSender + return be } func (be *baseExporter) Start(ctx context.Context, host component.Host) error { - if be.start != nil { - return be.start(ctx, host) - } - return nil + err := componenterror.ErrAlreadyStarted + be.startOnce.Do(func() { + // First start the nextSender + err = be.start(ctx, host) + if err != nil { + return + } + + // If no error then start the queuedSender + be.qSender.start() + }) + return err } -// Shutdown stops the exporter and is invoked during shutdown. +// Shutdown stops the nextSender and is invoked during shutdown. func (be *baseExporter) Shutdown(ctx context.Context) error { - if be.shutdown != nil { - return be.shutdown(ctx) + err := componenterror.ErrAlreadyStopped + be.shutdownOnce.Do(func() { + // First stop the retry goroutines + be.rSender.shutdown() + + // All operations will try to export once but will not retry because retrying was disabled when be.rSender stopped. + be.qSender.shutdown() + + // Last shutdown the nextSender itself. + err = be.shutdown(ctx) + }) + return err +} + +type timeoutSender struct { + cfg *TimeoutSettings +} + +func (te *timeoutSender) send(req request) (int, error) { + // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be + // updated because this deadline most likely is before the next one. + ctx := req.context() + if te.cfg.Timeout > 0 { + var cancelFunc func() + ctx, cancelFunc = context.WithTimeout(req.context(), te.cfg.Timeout) + defer cancelFunc() } - return nil + return req.export(ctx) } diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 1fc661a5d62..45bf9def720 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -23,22 +23,28 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configmodels" ) +var defaultExporterCfg = &configmodels.ExporterSettings{ + TypeVal: "test", + NameVal: "test", +} + func TestErrorToStatus(t *testing.T) { require.Equal(t, okStatus, errToStatus(nil)) require.Equal(t, trace.Status{Code: trace.StatusCodeUnknown, Message: "my_error"}, errToStatus(errors.New("my_error"))) } func TestBaseExporter(t *testing.T) { - be := newBaseExporter("test") + be := newBaseExporter(defaultExporterCfg) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) } func TestBaseExporterWithOptions(t *testing.T) { be := newBaseExporter( - "test", + defaultExporterCfg, WithStart(func(ctx context.Context, host component.Host) error { return errors.New("my error") }), WithShutdown(func(ctx context.Context) error { return errors.New("my error") })) require.Error(t, be.Start(context.Background(), componenttest.NewNopHost())) diff --git a/exporter/exporterhelper/logshelper.go b/exporter/exporterhelper/logshelper.go index d6e1e7f345c..6d9d81c8922 100644 --- a/exporter/exporterhelper/logshelper.go +++ b/exporter/exporterhelper/logshelper.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/data" "go.opentelemetry.io/collector/obsreport" ) @@ -27,21 +28,47 @@ import ( // the number of dropped logs. type PushLogsData func(ctx context.Context, md data.Logs) (droppedTimeSeries int, err error) +type logsRequest struct { + baseRequest + ld data.Logs + pusher PushLogsData +} + +func newLogsRequest(ctx context.Context, ld data.Logs, pusher PushLogsData) request { + return &logsRequest{ + baseRequest: baseRequest{ctx: ctx}, + ld: ld, + pusher: pusher, + } +} + +func (req *logsRequest) onPartialError(partialErr consumererror.PartialError) request { + // TODO: Implement this + return req +} + +func (req *logsRequest) export(ctx context.Context) (int, error) { + return req.pusher(ctx, req.ld) +} + +func (req *logsRequest) count() int { + return req.ld.LogRecordCount() +} + type logsExporter struct { - baseExporter + *baseExporter pushLogsData PushLogsData } -func (me *logsExporter) ConsumeLogs(ctx context.Context, md data.Logs) error { - exporterCtx := obsreport.ExporterContext(ctx, me.exporterFullName) - _, err := me.pushLogsData(exporterCtx, md) +func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld data.Logs) error { + exporterCtx := obsreport.ExporterContext(ctx, lexp.cfg.Name()) + _, err := lexp.sender.send(newLogsRequest(exporterCtx, ld, lexp.pushLogsData)) return err } // NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span. -// TODO: Add support for retries. -func NewLogsExporter(config configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) { - if config == nil { +func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) { + if cfg == nil { return nil, errNilConfig } @@ -49,22 +76,28 @@ func NewLogsExporter(config configmodels.Exporter, pushLogsData PushLogsData, op return nil, errNilPushLogsData } - pushLogsData = pushLogsWithObservability(pushLogsData, config.Name()) + be := newBaseExporter(cfg, options...) + + // Record metrics on the consumer. + be.qSender.nextSender = &logsExporterWithObservability{ + exporterName: cfg.Name(), + sender: be.qSender.nextSender, + } return &logsExporter{ - baseExporter: newBaseExporter(config.Name(), options...), + baseExporter: be, pushLogsData: pushLogsData, }, nil } -func pushLogsWithObservability(next PushLogsData, exporterName string) PushLogsData { - return func(ctx context.Context, ld data.Logs) (int, error) { - ctx = obsreport.StartLogsExportOp(ctx, exporterName) - numDroppedLogs, err := next(ctx, ld) - - numLogs := ld.LogRecordCount() +type logsExporterWithObservability struct { + exporterName string + sender requestSender +} - obsreport.EndLogsExportOp(ctx, numLogs, numDroppedLogs, err) - return numLogs, err - } +func (lewo *logsExporterWithObservability) send(req request) (int, error) { + req.setContext(obsreport.StartLogsExportOp(req.context(), lewo.exporterName)) + numDroppedLogs, err := lewo.sender.send(req) + obsreport.EndLogsExportOp(req.context(), req.count(), numDroppedLogs, err) + return numDroppedLogs, err } diff --git a/exporter/exporterhelper/logshelper_test.go b/exporter/exporterhelper/logshelper_test.go index 0d20a7d158c..bd08d1d209e 100644 --- a/exporter/exporterhelper/logshelper_test.go +++ b/exporter/exporterhelper/logshelper_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/data" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/obsreport" @@ -43,6 +44,13 @@ var ( } ) +func TestLogsRequest(t *testing.T) { + mr := newLogsRequest(context.Background(), testdata.GenerateLogDataEmpty(), nil) + + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan()) + assert.Same(t, mr, mr.onPartialError(partialErr.(consumererror.PartialError))) +} + func TestLogsExporter_InvalidName(t *testing.T) { me, err := NewLogsExporter(nil, newPushLogsData(0, nil)) require.Nil(t, me) @@ -178,7 +186,7 @@ func generateLogsTraffic(t *testing.T, me component.LogExporter, numRequests int } } -func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantError error, numMetricPoints int64) { +func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantError error, numLogRecords int64) { ocSpansSaver := new(testOCTraceExporter) trace.RegisterExporter(ocSpansSaver) defer trace.UnregisterExporter(ocSpansSaver) @@ -201,13 +209,13 @@ func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantEr require.Equalf(t, parentSpan.SpanContext.SpanID, sd.ParentSpanID, "Exporter span not a child\nSpanData %v", sd) require.Equalf(t, errToStatus(wantError), sd.Status, "SpanData %v", sd) - sentMetricPoints := numMetricPoints - var failedToSendMetricPoints int64 + sentLogRecords := numLogRecords + var failedToSendLogRecords int64 if wantError != nil { - sentMetricPoints = 0 - failedToSendMetricPoints = numMetricPoints + sentLogRecords = 0 + failedToSendLogRecords = numLogRecords } - require.Equalf(t, sentMetricPoints, sd.Attributes[obsreport.SentLogRecordsKey], "SpanData %v", sd) - require.Equalf(t, failedToSendMetricPoints, sd.Attributes[obsreport.FailedToSendLogRecordsKey], "SpanData %v", sd) + require.Equalf(t, sentLogRecords, sd.Attributes[obsreport.SentLogRecordsKey], "SpanData %v", sd) + require.Equalf(t, failedToSendLogRecords, sd.Attributes[obsreport.FailedToSendLogRecordsKey], "SpanData %v", sd) } } diff --git a/exporter/exporterhelper/metricshelper.go b/exporter/exporterhelper/metricshelper.go index 385aeaaab7b..5e2b60330c4 100644 --- a/exporter/exporterhelper/metricshelper.go +++ b/exporter/exporterhelper/metricshelper.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/obsreport" @@ -30,21 +31,21 @@ import ( type PushMetricsDataOld func(ctx context.Context, td consumerdata.MetricsData) (droppedTimeSeries int, err error) type metricsExporterOld struct { - baseExporter + *baseExporter pushMetricsData PushMetricsDataOld } -func (me *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { - exporterCtx := obsreport.ExporterContext(ctx, me.exporterFullName) - _, err := me.pushMetricsData(exporterCtx, md) +func (mexp *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { + exporterCtx := obsreport.ExporterContext(ctx, mexp.cfg.Name()) + _, err := mexp.pushMetricsData(exporterCtx, md) return err } // NewMetricsExporterOld creates an MetricsExporter that can record metrics and can wrap every request with a Span. -// If no internalOptions are passed it just adds the exporter format as a tag in the Context. +// If no internalOptions are passed it just adds the nextSender format as a tag in the Context. // TODO: Add support for retries. -func NewMetricsExporterOld(config configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) { - if config == nil { +func NewMetricsExporterOld(cfg configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) { + if cfg == nil { return nil, errNilConfig } @@ -52,10 +53,10 @@ func NewMetricsExporterOld(config configmodels.Exporter, pushMetricsData PushMet return nil, errNilPushMetricsData } - pushMetricsData = pushMetricsWithObservabilityOld(pushMetricsData, config.Name()) + pushMetricsData = pushMetricsWithObservabilityOld(pushMetricsData, cfg.Name()) return &metricsExporterOld{ - baseExporter: newBaseExporter(config.Name(), options...), + baseExporter: newBaseExporter(cfg, options...), pushMetricsData: pushMetricsData, }, nil } @@ -88,22 +89,50 @@ func NumTimeSeries(md consumerdata.MetricsData) int { // the number of dropped metrics. type PushMetricsData func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error) +type metricsRequest struct { + baseRequest + md pdata.Metrics + pusher PushMetricsData +} + +func newMetricsRequest(ctx context.Context, md pdata.Metrics, pusher PushMetricsData) request { + return &metricsRequest{ + baseRequest: baseRequest{ctx: ctx}, + md: md, + pusher: pusher, + } +} + +func (req *metricsRequest) onPartialError(consumererror.PartialError) request { + // TODO: implement this. + return req +} + +func (req *metricsRequest) export(ctx context.Context) (int, error) { + return req.pusher(ctx, req.md) +} + +func (req *metricsRequest) count() int { + _, numPoints := pdatautil.MetricAndDataPointCount(req.md) + return numPoints +} + type metricsExporter struct { - baseExporter - pushMetricsData PushMetricsData + *baseExporter + pusher PushMetricsData } -func (me *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { - exporterCtx := obsreport.ExporterContext(ctx, me.exporterFullName) - _, err := me.pushMetricsData(exporterCtx, md) +func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + exporterCtx := obsreport.ExporterContext(ctx, mexp.cfg.Name()) + req := newMetricsRequest(exporterCtx, md, mexp.pusher) + _, err := mexp.sender.send(req) return err } // NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span. -// If no internalOptions are passed it just adds the exporter format as a tag in the Context. -// TODO: Add support for retries. -func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) { - if config == nil { +// If no internalOptions are passed it just adds the nextSender format as a tag in the Context. +func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) { + if cfg == nil { return nil, errNilConfig } @@ -111,25 +140,35 @@ func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetric return nil, errNilPushMetricsData } - pushMetricsData = pushMetricsWithObservability(pushMetricsData, config.Name()) + be := newBaseExporter(cfg, options...) + + // Record metrics on the consumer. + be.qSender.nextSender = &metricsSenderWithObservability{ + exporterName: cfg.Name(), + sender: be.qSender.nextSender, + } return &metricsExporter{ - baseExporter: newBaseExporter(config.Name(), options...), - pushMetricsData: pushMetricsData, + baseExporter: be, + pusher: pushMetricsData, }, nil } -func pushMetricsWithObservability(next PushMetricsData, exporterName string) PushMetricsData { - return func(ctx context.Context, md pdata.Metrics) (int, error) { - ctx = obsreport.StartMetricsExportOp(ctx, exporterName) - numDroppedMetrics, err := next(ctx, md) +type metricsSenderWithObservability struct { + exporterName string + sender requestSender +} - // TODO: this is not ideal: it should come from the next function itself. - // temporarily loading it from internal format. Once full switch is done - // to new metrics will remove this. - numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(md) +func (mewo *metricsSenderWithObservability) send(req request) (int, error) { + req.setContext(obsreport.StartMetricsExportOp(req.context(), mewo.exporterName)) + numDroppedMetrics, err := mewo.sender.send(req) - obsreport.EndMetricsExportOp(ctx, numPoints, numReceivedMetrics, numDroppedMetrics, err) - return numReceivedMetrics, err - } + // TODO: this is not ideal: req should come from the next function itself. + // temporarily loading req from internal format. Once full switch is done + // to new metrics will remove this. + mReq := req.(*metricsRequest) + numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(mReq.md) + + obsreport.EndMetricsExportOp(req.context(), numPoints, numReceivedMetrics, numDroppedMetrics, err) + return numReceivedMetrics, err } diff --git a/exporter/exporterhelper/metricshelper_test.go b/exporter/exporterhelper/metricshelper_test.go index 3f96826b115..c78f7565f77 100644 --- a/exporter/exporterhelper/metricshelper_test.go +++ b/exporter/exporterhelper/metricshelper_test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/internal/data/testdata" @@ -46,6 +47,14 @@ var ( } ) +func TestMetricsRequest(t *testing.T) { + mr := newMetricsRequest(context.Background(), pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataEmpty()), nil) + + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan()) + assert.Same(t, mr, mr.onPartialError(partialErr.(consumererror.PartialError))) + assert.Equal(t, 0, mr.count()) +} + func TestMetricsExporter_InvalidName(t *testing.T) { me, err := NewMetricsExporter(nil, newPushMetricsData(0, nil)) require.Nil(t, me) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go new file mode 100644 index 00000000000..ba3bfae2ed4 --- /dev/null +++ b/exporter/exporterhelper/queued_retry.go @@ -0,0 +1,196 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "errors" + "fmt" + "time" + + "github.com/cenkalti/backoff" + "github.com/jaegertracing/jaeger/pkg/queue" + + "go.opentelemetry.io/collector/consumer/consumererror" +) + +// QueuedSettings defines configuration for queueing batches before sending to the nextSender. +type QueuedSettings struct { + // Disabled indicates whether to not enqueue batches before sending to the nextSender. + Disabled bool `mapstructure:"disabled"` + // NumWorkers is the number of consumers from the queue. + NumWorkers int `mapstructure:"num_workers"` + // QueueSize is the maximum number of batches allowed in queue at a given time. + QueueSize int `mapstructure:"queue_size"` +} + +func CreateDefaultQueuedSettings() QueuedSettings { + return QueuedSettings{ + Disabled: false, + NumWorkers: 10, + QueueSize: 5000, + } +} + +// RetrySettings defines configuration for retrying batches in case of export failure. +// The current supported strategy is exponential backoff. +type RetrySettings struct { + // Disabled indicates whether to not retry sending batches in case of export failure. + Disabled bool `mapstructure:"disabled"` + // InitialBackoff the time to wait after the first failure before retrying + InitialBackoff time.Duration `mapstructure:"initial_backoff"` + // MaxBackoff is the upper bound on backoff. + MaxBackoff time.Duration `mapstructure:"max_backoff"` + // MaxElapsedTime is the maximum amount of time spent trying to send a batch. + MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"` +} + +func CreateDefaultRetrySettings() RetrySettings { + return RetrySettings{ + Disabled: false, + InitialBackoff: 5 * time.Second, + MaxBackoff: 30 * time.Second, + MaxElapsedTime: 5 * time.Minute, + } +} + +type queuedSender struct { + cfg *QueuedSettings + nextSender requestSender + queue *queue.BoundedQueue +} + +var errorRefused = errors.New("failed to add to the queue") + +func newQueuedSender(cfg *QueuedSettings, nextSender requestSender) *queuedSender { + return &queuedSender{ + cfg: cfg, + nextSender: nextSender, + queue: queue.NewBoundedQueue(cfg.QueueSize, func(item interface{}) {}), + } +} + +// start is invoked during service startup. +func (sp *queuedSender) start() { + sp.queue.StartConsumers(sp.cfg.NumWorkers, func(item interface{}) { + value := item.(request) + _, _ = sp.nextSender.send(value) + }) +} + +// ExportTraces implements the TExporter interface +func (sp *queuedSender) send(req request) (int, error) { + if sp.cfg.Disabled { + return sp.nextSender.send(req) + } + + if !sp.queue.Produce(req) { + return req.count(), errorRefused + } + + return 0, nil +} + +// shutdown is invoked during service shutdown. +func (sp *queuedSender) shutdown() { + sp.queue.Stop() +} + +// TODO: Clean this by forcing all exporters to return an internal error type that always include the information about retries. +type throttleRetry struct { + error + delay time.Duration +} + +func NewThrottleRetry(err error, delay time.Duration) error { + return &throttleRetry{ + error: err, + delay: delay, + } +} + +type retrySender struct { + cfg *RetrySettings + nextSender requestSender + stopCh chan struct{} +} + +func newRetrySender(cfg *RetrySettings, nextSender requestSender) *retrySender { + return &retrySender{ + cfg: cfg, + nextSender: nextSender, + stopCh: make(chan struct{}), + } +} + +func (re *retrySender) send(req request) (int, error) { + if re.cfg.Disabled { + return re.nextSender.send(req) + } + + expBackoff := backoff.NewExponentialBackOff() + expBackoff.InitialInterval = re.cfg.InitialBackoff + expBackoff.MaxInterval = re.cfg.MaxBackoff + expBackoff.MaxElapsedTime = re.cfg.MaxElapsedTime + for { + droppedItems, err := re.nextSender.send(req) + + if err == nil { + return droppedItems, nil + } + + // Immediately drop data on permanent errors. + if consumererror.IsPermanent(err) { + return droppedItems, err + } + + // If partial error, update data and stats with non exported data. + if partialErr, isPartial := err.(consumererror.PartialError); isPartial { + req = req.onPartialError(partialErr) + } + + backoffDelay := expBackoff.NextBackOff() + + if backoffDelay == backoff.Stop { + // throw away the batch + return req.count(), fmt.Errorf("max elapsed time expired %w", err) + } + + if throttleErr, isThrottle := err.(*throttleRetry); isThrottle { + backoffDelay = max(backoffDelay, throttleErr.delay) + } + + // back-off, but get interrupted when shutting down or request is cancelled or timed out. + select { + case <-req.context().Done(): + return req.count(), fmt.Errorf("request is cancelled or timed out %w", err) + case <-re.stopCh: + return req.count(), fmt.Errorf("interrupted due to shutdown %w", err) + case <-time.After(backoffDelay): + } + } +} + +// shutdown is invoked during service shutdown. +func (re *retrySender) shutdown() { + close(re.stopCh) +} + +// max returns the larger of x or y. +func max(x, y time.Duration) time.Duration { + if x < y { + return y + } + return x +} diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go new file mode 100644 index 00000000000..6f1e8c6a73e --- /dev/null +++ b/exporter/exporterhelper/queued_retry_test.go @@ -0,0 +1,451 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/internal/data/testdata" + "go.opentelemetry.io/collector/obsreport/obsreporttest" +) + +func TestQueuedRetry_DropOnPermanentError(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(consumererror.Permanent(errors.New("bad data"))) + + qCfg := CreateDefaultQueuedSettings() + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialBackoff = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + <-time.After(200 * time.Millisecond) + require.Zero(t, be.qSender.queue.Size()) +} + +func TestQueuedRetry_DropOnNoRetry(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueuedSettings() + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = true + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + <-time.After(200 * time.Millisecond) + require.Zero(t, be.qSender.queue.Size()) +} + +func TestQueuedRetry_PartialError(t *testing.T) { + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan()) + mockP := newMockConcurrentExporter() + mockP.updateError(partialErr) + + qCfg := CreateDefaultQueuedSettings() + qCfg.NumWorkers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialBackoff = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + // There is a small race condition in this test, but expect to execute this in less than 1 second. + mockP.updateError(nil) + mockP.waitGroup.Add(1) + mockP.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockP.checkNumRequests(t, 2) + mockP.checkNumItems(t, 2+1) + require.Zero(t, be.qSender.queue.Size()) +} + +func TestQueuedRetry_StopWhileWaiting(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueuedSettings() + qCfg.NumWorkers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialBackoff = 30 * time.Minute + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockP.checkNumRequests(t, 1) + mockP.checkNumItems(t, 2) + require.Zero(t, be.qSender.queue.Size()) +} + +func TestQueuedRetry_PreserveCancellation(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueuedSettings() + qCfg.NumWorkers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialBackoff = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + ctx, cancelFunc := context.WithCancel(context.Background()) + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(ctx, 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + + cancelFunc() + + // In the newMockConcurrentExporter we count requests and items even for failed requests. + mockP.checkNumRequests(t, 1) + mockP.checkNumItems(t, 2) + require.Zero(t, be.qSender.queue.Size()) + + // Stop should succeed and not retry. + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + + // In the newMockConcurrentExporter we count requests and items even for failed requests. + mockP.checkNumRequests(t, 1) + mockP.checkNumItems(t, 2) + require.Zero(t, be.qSender.queue.Size()) +} + +func TestQueuedRetry_MaxElapsedTime(t *testing.T) { + mockP := newMockConcurrentExporter() + + qCfg := CreateDefaultQueuedSettings() + qCfg.NumWorkers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialBackoff = 100 * time.Millisecond + rCfg.MaxElapsedTime = 1 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + // Add an item that will always fail. + droppedItems, err := be.sender.send(newErrorRequest(context.Background())) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests. + mockP.checkNumRequests(t, 1) + mockP.checkNumItems(t, 2) + require.Zero(t, be.qSender.queue.Size()) +} + +func TestQueuedRetry_ThrottleError(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(NewThrottleRetry(errors.New("throttle error"), 1*time.Second)) + + qCfg := CreateDefaultQueuedSettings() + qCfg.NumWorkers = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialBackoff = 100 * time.Millisecond + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + start := time.Now() + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + // There is a small race condition in this test, but expect to execute this in less than 2 second. + mockP.updateError(nil) + mockP.waitGroup.Add(1) + mockP.awaitAsyncProcessing() + + // The initial backoff is 100ms, but because of the throttle this should wait at least 1 seconds. + assert.True(t, 1*time.Second < time.Since(start)) + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockP.checkNumRequests(t, 2) + mockP.checkNumItems(t, 4) + require.Zero(t, be.qSender.queue.Size()) +} + +func TestQueuedRetry_RetryOnError(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueuedSettings() + qCfg.NumWorkers = 1 + qCfg.QueueSize = 1 + rCfg := CreateDefaultRetrySettings() + rCfg.Disabled = false + rCfg.InitialBackoff = 2 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.awaitAsyncProcessing() + + // There is a small race condition in this test, but expect to execute this in less than 2 second. + mockP.updateError(nil) + mockP.waitGroup.Add(1) + mockP.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockP.checkNumRequests(t, 2) + mockP.checkNumItems(t, 4) + require.Zero(t, be.qSender.queue.Size()) +} + +func TestQueuedRetry_DropOnFull(t *testing.T) { + mockP := newMockConcurrentExporter() + mockP.updateError(errors.New("transient error")) + + qCfg := CreateDefaultQueuedSettings() + qCfg.QueueSize = 0 + rCfg := CreateDefaultRetrySettings() + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.Error(t, err) + assert.Equal(t, 2, droppedItems) +} + +func TestQueuedRetryHappyPath(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + mockP := newMockConcurrentExporter() + qCfg := CreateDefaultQueuedSettings() + rCfg := CreateDefaultRetrySettings() + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + mockP.stop() + assert.NoError(t, be.Shutdown(context.Background())) + }) + + wantRequests := 10 + for i := 0; i < wantRequests; i++ { + mockP.run(func() { + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + } + + // Wait until all batches received + mockP.awaitAsyncProcessing() + + mockP.checkNumRequests(t, wantRequests) + mockP.checkNumItems(t, 2*wantRequests) +} + +type mockErrorRequest struct { + baseRequest +} + +func (mer *mockErrorRequest) export(_ context.Context) (int, error) { + return 0, errors.New("transient error") +} + +func (mer *mockErrorRequest) onPartialError(consumererror.PartialError) request { + return mer +} + +func (mer *mockErrorRequest) count() int { + return 0 +} + +func newErrorRequest(ctx context.Context) request { + return &mockErrorRequest{ + baseRequest: baseRequest{ctx: ctx}, + } +} + +type mockRequest struct { + baseRequest + cnt int + mce *mockConcurrentExporter +} + +func (m *mockRequest) export(_ context.Context) (int, error) { + err := m.mce.export(m.cnt) + if err != nil { + return m.cnt, err + } + return 0, nil +} + +func (m *mockRequest) onPartialError(consumererror.PartialError) request { + return &mockRequest{ + baseRequest: m.baseRequest, + cnt: 1, + mce: m.mce, + } +} + +func (m *mockRequest) count() int { + return m.cnt +} + +func newMockRequest(ctx context.Context, cnt int, mce *mockConcurrentExporter) request { + return &mockRequest{ + baseRequest: baseRequest{ctx: ctx}, + cnt: cnt, + mce: mce, + } +} + +type mockConcurrentExporter struct { + waitGroup *sync.WaitGroup + mu sync.Mutex + consumeError error + requestCount int64 + itemsCount int64 + stopped int32 +} + +func newMockConcurrentExporter() *mockConcurrentExporter { + return &mockConcurrentExporter{waitGroup: new(sync.WaitGroup)} +} + +func (p *mockConcurrentExporter) export(cnt int) error { + if atomic.LoadInt32(&p.stopped) == 1 { + return nil + } + atomic.AddInt64(&p.requestCount, 1) + atomic.AddInt64(&p.itemsCount, int64(cnt)) + p.mu.Lock() + defer p.mu.Unlock() + defer p.waitGroup.Done() + return p.consumeError +} + +func (p *mockConcurrentExporter) checkNumRequests(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&p.requestCount)) +} + +func (p *mockConcurrentExporter) checkNumItems(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&p.itemsCount)) +} + +func (p *mockConcurrentExporter) updateError(err error) { + p.mu.Lock() + defer p.mu.Unlock() + p.consumeError = err +} + +func (p *mockConcurrentExporter) run(fn func()) { + p.waitGroup.Add(1) + fn() +} + +func (p *mockConcurrentExporter) awaitAsyncProcessing() { + p.waitGroup.Wait() +} + +func (p *mockConcurrentExporter) stop() { + atomic.StoreInt32(&p.stopped, 1) +} diff --git a/exporter/exporterhelper/tracehelper.go b/exporter/exporterhelper/tracehelper.go index 463924fd2b2..74b95b1bcc4 100644 --- a/exporter/exporterhelper/tracehelper.go +++ b/exporter/exporterhelper/tracehelper.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/obsreport" ) @@ -28,32 +29,28 @@ import ( // returns the number of dropped spans. type traceDataPusherOld func(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error) -// traceDataPusher is a helper function that is similar to ConsumeTraceData but also -// returns the number of dropped spans. -type traceDataPusher func(ctx context.Context, td pdata.Traces) (droppedSpans int, err error) - -// traceExporterOld implements the exporter with additional helper internalOptions. +// traceExporterOld implements the nextSender with additional helper internalOptions. type traceExporterOld struct { - baseExporter + *baseExporter dataPusher traceDataPusherOld } -func (te *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { - exporterCtx := obsreport.ExporterContext(ctx, te.exporterFullName) - _, err := te.dataPusher(exporterCtx, td) +func (texp *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { + exporterCtx := obsreport.ExporterContext(ctx, texp.cfg.Name()) + _, err := texp.dataPusher(exporterCtx, td) return err } // NewTraceExporterOld creates an TraceExporterOld that can record metrics and can wrap every -// request with a Span. If no internalOptions are passed it just adds the exporter format as a +// request with a Span. If no internalOptions are passed it just adds the nextSender format as a // tag in the Context. func NewTraceExporterOld( - config configmodels.Exporter, + cfg configmodels.Exporter, dataPusher traceDataPusherOld, options ...ExporterOption, ) (component.TraceExporterOld, error) { - if config == nil { + if cfg == nil { return nil, errNilConfig } @@ -61,10 +58,10 @@ func NewTraceExporterOld( return nil, errNilPushTraceData } - dataPusher = dataPusher.withObservability(config.Name()) + dataPusher = dataPusher.withObservability(cfg.Name()) return &traceExporterOld{ - baseExporter: newBaseExporter(config.Name(), options...), + baseExporter: newBaseExporter(cfg, options...), dataPusher: dataPusher, }, nil } @@ -86,29 +83,57 @@ func (p traceDataPusherOld) withObservability(exporterName string) traceDataPush } } +// traceDataPusher is a helper function that is similar to ConsumeTraceData but also +// returns the number of dropped spans. +type traceDataPusher func(ctx context.Context, td pdata.Traces) (droppedSpans int, err error) + +type tracesRequest struct { + baseRequest + td pdata.Traces + pusher traceDataPusher +} + +func newTracesRequest(ctx context.Context, td pdata.Traces, pusher traceDataPusher) request { + return &tracesRequest{ + baseRequest: baseRequest{ctx: ctx}, + td: td, + pusher: pusher, + } +} + +func (req *tracesRequest) onPartialError(partialErr consumererror.PartialError) request { + return newTracesRequest(req.ctx, partialErr.GetTraces(), req.pusher) +} + +func (req *tracesRequest) export(ctx context.Context) (int, error) { + return req.pusher(ctx, req.td) +} + +func (req *tracesRequest) count() int { + return req.td.SpanCount() +} + type traceExporter struct { - baseExporter - dataPusher traceDataPusher + *baseExporter + pusher traceDataPusher } -func (te *traceExporter) ConsumeTraces( - ctx context.Context, - td pdata.Traces, -) error { - exporterCtx := obsreport.ExporterContext(ctx, te.exporterFullName) - _, err := te.dataPusher(exporterCtx, td) +func (texp *traceExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) error { + exporterCtx := obsreport.ExporterContext(ctx, texp.cfg.Name()) + req := newTracesRequest(exporterCtx, td, texp.pusher) + _, err := texp.sender.send(req) return err } // NewTraceExporter creates a TraceExporter that can record metrics and can wrap // every request with a Span. func NewTraceExporter( - config configmodels.Exporter, + cfg configmodels.Exporter, dataPusher traceDataPusher, options ...ExporterOption, ) (component.TraceExporter, error) { - if config == nil { + if cfg == nil { return nil, errNilConfig } @@ -116,27 +141,33 @@ func NewTraceExporter( return nil, errNilPushTraceData } - dataPusher = dataPusher.withObservability(config.Name()) + be := newBaseExporter(cfg, options...) + + // Record metrics on the consumer. + be.qSender.nextSender = &tracesExporterWithObservability{ + exporterName: cfg.Name(), + sender: be.qSender.nextSender, + } return &traceExporter{ - baseExporter: newBaseExporter(config.Name(), options...), - dataPusher: dataPusher, + baseExporter: be, + pusher: dataPusher, }, nil } -// withObservability wraps the current pusher into a function that records -// the observability signals during the pusher execution. -func (p traceDataPusher) withObservability(exporterName string) traceDataPusher { - return func(ctx context.Context, td pdata.Traces) (int, error) { - ctx = obsreport.StartTraceDataExportOp(ctx, exporterName) - // Forward the data to the next consumer (this pusher is the next). - droppedSpans, err := p(ctx, td) +type tracesExporterWithObservability struct { + exporterName string + sender requestSender +} - // TODO: this is not ideal: it should come from the next function itself. - // temporarily loading it from internal format. Once full switch is done - // to new metrics will remove this. - numSpans := td.SpanCount() - obsreport.EndTraceDataExportOp(ctx, numSpans, droppedSpans, err) - return droppedSpans, err - } +func (tewo *tracesExporterWithObservability) send(req request) (int, error) { + req.setContext(obsreport.StartTraceDataExportOp(req.context(), tewo.exporterName)) + // Forward the data to the next consumer (this pusher is the next). + droppedSpans, err := tewo.sender.send(req) + + // TODO: this is not ideal: req should come from the next function itself. + // temporarily loading req from internal format. Once full switch is done + // to new metrics will remove this. + obsreport.EndTraceDataExportOp(req.context(), req.count(), droppedSpans, err) + return droppedSpans, err } diff --git a/exporter/exporterhelper/tracehelper_test.go b/exporter/exporterhelper/tracehelper_test.go index eef81757b3d..9c18d6d7321 100644 --- a/exporter/exporterhelper/tracehelper_test.go +++ b/exporter/exporterhelper/tracehelper_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/obsreport" @@ -46,6 +47,13 @@ var ( } ) +func TestTracesRequest(t *testing.T) { + mr := newTracesRequest(context.Background(), testdata.GenerateTraceDataOneSpan(), nil) + + partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataEmpty()) + assert.EqualValues(t, newTracesRequest(context.Background(), testdata.GenerateTraceDataEmpty(), nil), mr.onPartialError(partialErr.(consumererror.PartialError))) +} + // TODO https://go.opentelemetry.io/collector/issues/266 // Migrate tests to use testify/assert instead of t.Fatal pattern. func TestTraceExporterOld_InvalidName(t *testing.T) { diff --git a/exporter/otlpexporter/README.md b/exporter/otlpexporter/README.md index 820659e7acf..59a8c25d731 100644 --- a/exporter/otlpexporter/README.md +++ b/exporter/otlpexporter/README.md @@ -28,6 +28,16 @@ The following settings can be optionally configured: [grpc.WithInsecure()](https://godoc.org/google.golang.org/grpc#WithInsecure). - `balancer_name`(default = pick_first): Sets the balancer in grpclb_policy to discover the servers. See [grpc loadbalancing example](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md). +- `timeout` (default = 5s): Is the timeout for each operation. +- `retry_settings` + - `disabled` (default = false) + - `initial_backoff` (default = 5s): Time to wait after the first failure before retrying; ignored if retry_on_failure is false + - `max_backoff` (default = 30s): Is the upper bound on backoff; ignored if retry_on_failure is false + - `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if retry_on_failure is false +- `queued_settings` + - `disabled` (default = true) + - `num_workers` (default = 10): Number of workers that dequeue batches + - `queue_size` (default = 5000): Maximum number of batches kept in memory before data Example: diff --git a/exporter/otlpexporter/config.go b/exporter/otlpexporter/config.go index be4dfa65521..469d0ba1835 100644 --- a/exporter/otlpexporter/config.go +++ b/exporter/otlpexporter/config.go @@ -17,11 +17,15 @@ package otlpexporter import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) // Config defines configuration for OpenCensus exporter. type Config struct { - configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.QueuedSettings `mapstructure:"queued_settings"` + exporterhelper.RetrySettings `mapstructure:"retry_settings"` configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. } diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index 5433e59ea05..1ab79b67834 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) func TestLoadConfig(t *testing.T) { @@ -49,6 +50,20 @@ func TestLoadConfig(t *testing.T) { NameVal: "otlp/2", TypeVal: "otlp", }, + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: 10 * time.Second, + }, + RetrySettings: exporterhelper.RetrySettings{ + Disabled: false, + InitialBackoff: 10 * time.Second, + MaxBackoff: 1 * time.Minute, + MaxElapsedTime: 10 * time.Minute, + }, + QueuedSettings: exporterhelper.QueuedSettings{ + Disabled: false, + NumWorkers: 2, + QueueSize: 10, + }, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index 6134d066297..8d33b3b1ef1 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -39,11 +39,17 @@ func NewFactory() component.ExporterFactory { } func createDefaultConfig() configmodels.Exporter { + // TODO: Enable the queued settings. + qs := exporterhelper.CreateDefaultQueuedSettings() + qs.Disabled = true return &Config{ ExporterSettings: configmodels.ExporterSettings{ TypeVal: typeStr, NameVal: typeStr, }, + TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(), + RetrySettings: exporterhelper.CreateDefaultRetrySettings(), + QueuedSettings: qs, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{}, // We almost read 0 bytes, so no need to tune ReadBufferSize. @@ -61,9 +67,13 @@ func createTraceExporter( if err != nil { return nil, err } + oCfg := cfg.(*Config) oexp, err := exporterhelper.NewTraceExporter( cfg, oce.pushTraceData, + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueued(oCfg.QueuedSettings), exporterhelper.WithShutdown(oce.shutdown)) if err != nil { return nil, err @@ -81,9 +91,13 @@ func createMetricsExporter( if err != nil { return nil, err } + oCfg := cfg.(*Config) oexp, err := exporterhelper.NewMetricsExporter( cfg, oce.pushMetricsData, + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueued(oCfg.QueuedSettings), exporterhelper.WithShutdown(oce.shutdown), ) if err != nil { @@ -102,9 +116,13 @@ func createLogExporter( if err != nil { return nil, err } + oCfg := cfg.(*Config) oexp, err := exporterhelper.NewLogsExporter( cfg, oce.pushLogData, + exporterhelper.WithTimeout(oCfg.TimeoutSettings), + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueued(oCfg.QueuedSettings), exporterhelper.WithShutdown(oce.shutdown), ) if err != nil { diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index f0c33704fcb..25586ff0d8d 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -18,20 +18,19 @@ import ( "context" "errors" "fmt" - "sync" "time" - "github.com/cenkalti/backoff" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/internal/data" otlplogs "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1" otlpmetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/metrics/v1" @@ -41,9 +40,7 @@ import ( type exporterImp struct { // Input configuration. config *Config - - stopOnce sync.Once - w sender + w sender } type sender interface { @@ -54,8 +51,7 @@ type sender interface { } var ( - errTimeout = errors.New("timeout") - errFatalError = errors.New("fatal error sending to server") + errPermanentError = consumererror.Permanent(errors.New("fatal error sending to server")) ) // Crete new exporter and start it. The exporter will begin connecting but @@ -78,12 +74,7 @@ func newExporter(cfg configmodels.Exporter) (*exporterImp, error) { } func (e *exporterImp) shutdown(context.Context) error { - err := componenterror.ErrAlreadyStopped - e.stopOnce.Do(func() { - // Close the connection. - err = e.w.stop() - }) - return err + return e.w.stop() } func (e *exporterImp) pushTraceData(ctx context.Context, td pdata.Traces) (int, error) { @@ -160,24 +151,18 @@ func (gs *grpcSender) stop() error { } func (gs *grpcSender) exportTrace(ctx context.Context, request *otlptrace.ExportTraceServiceRequest) error { - return exportRequest(gs.enhanceContext(ctx), func(ctx context.Context) error { - _, err := gs.traceExporter.Export(ctx, request, grpc.WaitForReady(gs.waitForReady)) - return err - }) + _, err := gs.traceExporter.Export(gs.enhanceContext(ctx), request, grpc.WaitForReady(gs.waitForReady)) + return processError(err) } func (gs *grpcSender) exportMetrics(ctx context.Context, request *otlpmetrics.ExportMetricsServiceRequest) error { - return exportRequest(gs.enhanceContext(ctx), func(ctx context.Context) error { - _, err := gs.metricExporter.Export(ctx, request, grpc.WaitForReady(gs.waitForReady)) - return err - }) + _, err := gs.metricExporter.Export(gs.enhanceContext(ctx), request, grpc.WaitForReady(gs.waitForReady)) + return processError(err) } func (gs *grpcSender) exportLogs(ctx context.Context, request *otlplogs.ExportLogsServiceRequest) error { - return exportRequest(gs.enhanceContext(ctx), func(ctx context.Context) error { - _, err := gs.logExporter.Export(ctx, request, grpc.WaitForReady(gs.waitForReady)) - return err - }) + _, err := gs.logExporter.Export(gs.enhanceContext(ctx), request, grpc.WaitForReady(gs.waitForReady)) + return processError(err) } func (gs *grpcSender) enhanceContext(ctx context.Context) context.Context { @@ -190,63 +175,36 @@ func (gs *grpcSender) enhanceContext(ctx context.Context) context.Context { // Send a trace or metrics request to the server. "perform" function is expected to make // the actual gRPC unary call that sends the request. This function implements the // common OTLP logic around request handling such as retries and throttling. -func exportRequest(ctx context.Context, perform func(ctx context.Context) error) error { - - expBackoff := backoff.NewExponentialBackOff() - - // Spend max 15 mins on this operation. This is just a reasonable number that - // gives plenty of time for typical quick transient errors to resolve. - expBackoff.MaxElapsedTime = time.Minute * 15 - - for { - // Send to server. - err := perform(ctx) - - if err == nil { - // Request is successful, we are done. - return nil - } - - // We have an error, check gRPC status code. - - status := status.Convert(err) - - statusCode := status.Code() - if statusCode == codes.OK { - // Not really an error, still success. - return nil - } +func processError(err error) error { + if err == nil { + // Request is successful, we are done. + return nil + } - // Now, this is this a real error. + // We have an error, check gRPC status code. - if !shouldRetry(statusCode) { - // It is not a retryable error, we should not retry. - return errFatalError - } + st := status.Convert(err) + if st.Code() == codes.OK { + // Not really an error, still success. + return nil + } - // Need to retry. + // Now, this is this a real error. - // Check if server returned throttling information. - waitDuration := getThrottleDuration(status) - if waitDuration == 0 { - // No explicit throttle duration. Use exponential backoff strategy. - waitDuration = expBackoff.NextBackOff() - if waitDuration == backoff.Stop { - // We run out of max time allocated to this operation. - return errTimeout - } - } + if !shouldRetry(st.Code()) { + // It is not a retryable error, we should not retry. + return errPermanentError + } - // Wait until one of the conditions below triggers. - select { - case <-ctx.Done(): - // This request is cancelled or timed out. - return errTimeout + // Need to retry. - case <-time.After(waitDuration): - // Time to try again. - } + // Check if server returned throttling information. + throttleDuration := getThrottleDuration(st) + if throttleDuration != 0 { + return exporterhelper.NewThrottleRetry(err, throttleDuration) } + + return err } func shouldRetry(code codes.Code) bool { diff --git a/exporter/otlpexporter/testdata/config.yaml b/exporter/otlpexporter/testdata/config.yaml index 737fabdefaf..253b8e66120 100644 --- a/exporter/otlpexporter/testdata/config.yaml +++ b/exporter/otlpexporter/testdata/config.yaml @@ -10,6 +10,16 @@ exporters: endpoint: "1.2.3.4:1234" compression: "on" ca_file: /var/lib/mycert.pem + timeout: 10s + queued_settings: + disabled: false + num_workers: 2 + queue_size: 10 + retry_settings: + disabled: false + initial_backoff: 10s + max_backoff: 60s + max_elapsed_time: 10m per_rpc_auth: type: bearer bearer_token: some-token From f7508a2271d01e740fb2d798fd1a63f3ecfcc26b Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 17 Jul 2020 10:42:29 -0700 Subject: [PATCH 2/2] First round of comments addressed --- exporter/exporterhelper/common.go | 160 ++++++++-------- exporter/exporterhelper/logshelper.go | 18 +- exporter/exporterhelper/metricshelper.go | 26 ++- exporter/exporterhelper/queued_retry.go | 128 +++++++------ exporter/exporterhelper/queued_retry_test.go | 183 ++++++++++++------- exporter/exporterhelper/tracehelper.go | 28 ++- exporter/opencensusexporter/factory_test.go | 2 +- exporter/otlpexporter/README.md | 19 +- exporter/otlpexporter/config.go | 4 +- exporter/otlpexporter/config_test.go | 16 +- exporter/otlpexporter/factory.go | 10 +- exporter/otlpexporter/factory_test.go | 2 +- exporter/otlpexporter/testdata/config.yaml | 10 +- processor/queuedprocessor/config.go | 2 +- 14 files changed, 337 insertions(+), 271 deletions(-) diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index ef27874685d..2c287faabbb 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -31,38 +31,38 @@ var ( okStatus = trace.Status{Code: trace.StatusCodeOK} ) +// Settings for timeout. The timeout applies to individual attempts to send data to the backend. type TimeoutSettings struct { - // Timeout is the timeout for each operation. + // Timeout is the timeout for every attempt to send data to the backend. Timeout time.Duration `mapstructure:"timeout"` } +// CreateDefaultTimeoutSettings returns the default settings for TimeoutSettings. func CreateDefaultTimeoutSettings() TimeoutSettings { return TimeoutSettings{ Timeout: 5 * time.Second, } } -type settings struct { - configmodels.Exporter - TimeoutSettings - QueuedSettings - RetrySettings -} - +// request is an abstraction of an individual request (batch of data) independent of the type of the data (traces, metrics, logs). type request interface { + // context returns the Context of the requests. context() context.Context + // setContext updates the Context of the requests. setContext(context.Context) export(ctx context.Context) (int, error) - // Returns a new queue request that contains the items left to be exported. + // Returns a new request that contains the items left to be sent. onPartialError(consumererror.PartialError) request - // Returns the cnt of spans/metric points or log records. + // Returns the count of spans/metric points or log records. count() int } +// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). type requestSender interface { send(req request) (int, error) } +// baseRequest is a base implementation for the request. type baseRequest struct { ctx context.Context } @@ -81,139 +81,151 @@ type Start func(context.Context, component.Host) error // Shutdown specifies the function invoked when the exporter is being shutdown. type Shutdown func(context.Context) error +// internalOptions represents all the options that users can configure. +type internalOptions struct { + TimeoutSettings + QueueSettings + RetrySettings + Start + Shutdown +} + +// fromConfiguredOptions returns the internal options starting from the default and applying all configured options. +func fromConfiguredOptions(options ...ExporterOption) *internalOptions { + // Start from the default options: + opts := &internalOptions{ + TimeoutSettings: CreateDefaultTimeoutSettings(), + // TODO: Enable queuing by default (call CreateDefaultQueueSettings) + QueueSettings: QueueSettings{Disabled: true}, + // TODO: Enable retry by default (call CreateDefaultRetrySettings) + RetrySettings: RetrySettings{Disabled: true}, + Start: func(ctx context.Context, host component.Host) error { return nil }, + Shutdown: func(ctx context.Context) error { return nil }, + } + + for _, op := range options { + op(opts) + } + + return opts +} + // ExporterOption apply changes to internalOptions. -type ExporterOption func(*baseExporter) +type ExporterOption func(*internalOptions) // WithShutdown overrides the default Shutdown function for an exporter. // The default shutdown function does nothing and always returns nil. func WithShutdown(shutdown Shutdown) ExporterOption { - return func(o *baseExporter) { - o.shutdown = shutdown + return func(o *internalOptions) { + o.Shutdown = shutdown } } // WithStart overrides the default Start function for an exporter. // The default shutdown function does nothing and always returns nil. func WithStart(start Start) ExporterOption { - return func(o *baseExporter) { - o.start = start + return func(o *internalOptions) { + o.Start = start } } -// WithShutdown overrides the default TimeoutSettings for an exporter. +// WithTimeout overrides the default TimeoutSettings for an exporter. // The default TimeoutSettings is 5 seconds. -func WithTimeout(timeout TimeoutSettings) ExporterOption { - return func(o *baseExporter) { - o.cfg.TimeoutSettings = timeout +func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption { + return func(o *internalOptions) { + o.TimeoutSettings = timeoutSettings } } // WithRetry overrides the default RetrySettings for an exporter. // The default RetrySettings is to disable retries. -func WithRetry(retry RetrySettings) ExporterOption { - return func(o *baseExporter) { - o.cfg.RetrySettings = retry +func WithRetry(retrySettings RetrySettings) ExporterOption { + return func(o *internalOptions) { + o.RetrySettings = retrySettings } } -// WithQueued overrides the default QueuedSettings for an exporter. -// The default QueuedSettings is to disable queueing. -func WithQueued(queued QueuedSettings) ExporterOption { - return func(o *baseExporter) { - o.cfg.QueuedSettings = queued +// WithQueue overrides the default QueueSettings for an exporter. +// The default QueueSettings is to disable queueing. +func WithQueue(queueSettings QueueSettings) ExporterOption { + return func(o *internalOptions) { + o.QueueSettings = queueSettings } } -// internalOptions contains internalOptions concerning how an Exporter is configured. +// baseExporter contains common fields between different exporter types. type baseExporter struct { - cfg *settings + cfg configmodels.Exporter sender requestSender - rSender *retrySender - qSender *queuedSender + qrSender *queuedRetrySender start Start shutdown Shutdown startOnce sync.Once shutdownOnce sync.Once } -// Construct the internalOptions from multiple ExporterOption. func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *baseExporter { + opts := fromConfiguredOptions(options...) be := &baseExporter{ - cfg: &settings{ - Exporter: cfg, - TimeoutSettings: CreateDefaultTimeoutSettings(), - // TODO: Enable queuing by default (call CreateDefaultQueuedSettings - QueuedSettings: QueuedSettings{Disabled: true}, - // TODO: Enable retry by default (call CreateDefaultRetrySettings) - RetrySettings: RetrySettings{Disabled: true}, - }, + cfg: cfg, + start: opts.Start, + shutdown: opts.Shutdown, } - for _, op := range options { - op(be) - } - - if be.start == nil { - be.start = func(ctx context.Context, host component.Host) error { return nil } - } - - if be.shutdown == nil { - be.shutdown = func(ctx context.Context) error { return nil } - } - - be.sender = &timeoutSender{cfg: &be.cfg.TimeoutSettings} - - be.rSender = newRetrySender(&be.cfg.RetrySettings, be.sender) - be.sender = be.rSender - - be.qSender = newQueuedSender(&be.cfg.QueuedSettings, be.sender) - be.sender = be.qSender + be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings}) + be.sender = be.qrSender return be } +// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper. +// This can be used to wrap with observability (create spans, record metrics) the consumer sender. +func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) { + be.qrSender.consumerSender = f(be.qrSender.consumerSender) +} + +// Start all senders and exporter and is invoked during service start. func (be *baseExporter) Start(ctx context.Context, host component.Host) error { err := componenterror.ErrAlreadyStarted be.startOnce.Do(func() { - // First start the nextSender + // First start the wrapped exporter. err = be.start(ctx, host) if err != nil { + // TODO: Log errors, or check if it is recorded by the caller. return } - // If no error then start the queuedSender - be.qSender.start() + // If no error then start the queuedRetrySender. + be.qrSender.start() }) return err } -// Shutdown stops the nextSender and is invoked during shutdown. +// Shutdown all senders and exporter and is invoked during service shutdown. func (be *baseExporter) Shutdown(ctx context.Context) error { err := componenterror.ErrAlreadyStopped be.shutdownOnce.Do(func() { - // First stop the retry goroutines - be.rSender.shutdown() - - // All operations will try to export once but will not retry because retrying was disabled when be.rSender stopped. - be.qSender.shutdown() - - // Last shutdown the nextSender itself. + // First shutdown the queued retry sender + be.qrSender.shutdown() + // Last shutdown the wrapped exporter itself. err = be.shutdown(ctx) }) return err } +// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender. type timeoutSender struct { - cfg *TimeoutSettings + cfg TimeoutSettings } -func (te *timeoutSender) send(req request) (int, error) { +// send implements the requestSender interface +func (ts *timeoutSender) send(req request) (int, error) { // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be // updated because this deadline most likely is before the next one. ctx := req.context() - if te.cfg.Timeout > 0 { + if ts.cfg.Timeout > 0 { var cancelFunc func() - ctx, cancelFunc = context.WithTimeout(req.context(), te.cfg.Timeout) + ctx, cancelFunc = context.WithTimeout(req.context(), ts.cfg.Timeout) defer cancelFunc() } return req.export(ctx) diff --git a/exporter/exporterhelper/logshelper.go b/exporter/exporterhelper/logshelper.go index 6d9d81c8922..71525f6d3de 100644 --- a/exporter/exporterhelper/logshelper.go +++ b/exporter/exporterhelper/logshelper.go @@ -66,7 +66,7 @@ func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld data.Logs) error { return err } -// NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span. +// NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span. func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) { if cfg == nil { return nil, errNilConfig @@ -77,12 +77,12 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio } be := newBaseExporter(cfg, options...) - - // Record metrics on the consumer. - be.qSender.nextSender = &logsExporterWithObservability{ - exporterName: cfg.Name(), - sender: be.qSender.nextSender, - } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &logsExporterWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &logsExporter{ baseExporter: be, @@ -92,12 +92,12 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio type logsExporterWithObservability struct { exporterName string - sender requestSender + nextSender requestSender } func (lewo *logsExporterWithObservability) send(req request) (int, error) { req.setContext(obsreport.StartLogsExportOp(req.context(), lewo.exporterName)) - numDroppedLogs, err := lewo.sender.send(req) + numDroppedLogs, err := lewo.nextSender.send(req) obsreport.EndLogsExportOp(req.context(), req.count(), numDroppedLogs, err) return numDroppedLogs, err } diff --git a/exporter/exporterhelper/metricshelper.go b/exporter/exporterhelper/metricshelper.go index 5e2b60330c4..0b1c8717bde 100644 --- a/exporter/exporterhelper/metricshelper.go +++ b/exporter/exporterhelper/metricshelper.go @@ -41,8 +41,7 @@ func (mexp *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consu return err } -// NewMetricsExporterOld creates an MetricsExporter that can record metrics and can wrap every request with a Span. -// If no internalOptions are passed it just adds the nextSender format as a tag in the Context. +// NewMetricsExporterOld creates an MetricsExporter that records observability metrics and wraps every request with a Span. // TODO: Add support for retries. func NewMetricsExporterOld(cfg configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) { if cfg == nil { @@ -129,8 +128,7 @@ func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metric return err } -// NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span. -// If no internalOptions are passed it just adds the nextSender format as a tag in the Context. +// NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span. func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) { if cfg == nil { return nil, errNilConfig @@ -141,12 +139,12 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa } be := newBaseExporter(cfg, options...) - - // Record metrics on the consumer. - be.qSender.nextSender = &metricsSenderWithObservability{ - exporterName: cfg.Name(), - sender: be.qSender.nextSender, - } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &metricsSenderWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &metricsExporter{ baseExporter: be, @@ -156,15 +154,15 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa type metricsSenderWithObservability struct { exporterName string - sender requestSender + nextSender requestSender } func (mewo *metricsSenderWithObservability) send(req request) (int, error) { req.setContext(obsreport.StartMetricsExportOp(req.context(), mewo.exporterName)) - numDroppedMetrics, err := mewo.sender.send(req) + numDroppedMetrics, err := mewo.nextSender.send(req) - // TODO: this is not ideal: req should come from the next function itself. - // temporarily loading req from internal format. Once full switch is done + // TODO: this is not ideal: it should come from the next function itself. + // temporarily loading it from internal format. Once full switch is done // to new metrics will remove this. mReq := req.(*metricsRequest) numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(mReq.md) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index ba3bfae2ed4..aa4c4b01e34 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -25,21 +25,26 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" ) -// QueuedSettings defines configuration for queueing batches before sending to the nextSender. -type QueuedSettings struct { - // Disabled indicates whether to not enqueue batches before sending to the nextSender. +// QueueSettings defines configuration for queueing batches before sending to the consumerSender. +type QueueSettings struct { + // Disabled indicates whether to not enqueue batches before sending to the consumerSender. Disabled bool `mapstructure:"disabled"` - // NumWorkers is the number of consumers from the queue. - NumWorkers int `mapstructure:"num_workers"` + // NumConsumers is the number of consumers from the queue. + NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` } -func CreateDefaultQueuedSettings() QueuedSettings { - return QueuedSettings{ - Disabled: false, - NumWorkers: 10, - QueueSize: 5000, +// CreateDefaultQueueSettings returns the default settings for QueueSettings. +func CreateDefaultQueueSettings() QueueSettings { + return QueueSettings{ + Disabled: false, + NumConsumers: 10, + // For 5000 queue elements at 100 requests/sec gives about 50 sec of survival of destination outage. + // This is a pretty decent value for production. + // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, + // multiply that by the number of requests per seconds. + QueueSize: 5000, } } @@ -48,54 +53,64 @@ func CreateDefaultQueuedSettings() QueuedSettings { type RetrySettings struct { // Disabled indicates whether to not retry sending batches in case of export failure. Disabled bool `mapstructure:"disabled"` - // InitialBackoff the time to wait after the first failure before retrying - InitialBackoff time.Duration `mapstructure:"initial_backoff"` - // MaxBackoff is the upper bound on backoff. - MaxBackoff time.Duration `mapstructure:"max_backoff"` - // MaxElapsedTime is the maximum amount of time spent trying to send a batch. + // InitialInterval the time to wait after the first failure before retrying. + InitialInterval time.Duration `mapstructure:"initial_interval"` + // MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between + // consecutive retries will always be `MaxInterval`. + MaxInterval time.Duration `mapstructure:"max_interval"` + // MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch. + // Once this value is reached, the data is discarded. MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"` } +// CreateDefaultRetrySettings returns the default settings for RetrySettings. func CreateDefaultRetrySettings() RetrySettings { return RetrySettings{ - Disabled: false, - InitialBackoff: 5 * time.Second, - MaxBackoff: 30 * time.Second, - MaxElapsedTime: 5 * time.Minute, + Disabled: false, + InitialInterval: 5 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 5 * time.Minute, } } -type queuedSender struct { - cfg *QueuedSettings - nextSender requestSender - queue *queue.BoundedQueue +type queuedRetrySender struct { + cfg QueueSettings + consumerSender requestSender + queue *queue.BoundedQueue + retryStopCh chan struct{} } var errorRefused = errors.New("failed to add to the queue") -func newQueuedSender(cfg *QueuedSettings, nextSender requestSender) *queuedSender { - return &queuedSender{ - cfg: cfg, - nextSender: nextSender, - queue: queue.NewBoundedQueue(cfg.QueueSize, func(item interface{}) {}), +func newQueuedRetrySender(qCfg QueueSettings, rCfg RetrySettings, nextSender requestSender) *queuedRetrySender { + retryStopCh := make(chan struct{}) + return &queuedRetrySender{ + cfg: qCfg, + consumerSender: &retrySender{ + cfg: rCfg, + nextSender: nextSender, + stopCh: retryStopCh, + }, + queue: queue.NewBoundedQueue(qCfg.QueueSize, func(item interface{}) {}), + retryStopCh: retryStopCh, } } // start is invoked during service startup. -func (sp *queuedSender) start() { - sp.queue.StartConsumers(sp.cfg.NumWorkers, func(item interface{}) { +func (qrs *queuedRetrySender) start() { + qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) { value := item.(request) - _, _ = sp.nextSender.send(value) + _, _ = qrs.consumerSender.send(value) }) } -// ExportTraces implements the TExporter interface -func (sp *queuedSender) send(req request) (int, error) { - if sp.cfg.Disabled { - return sp.nextSender.send(req) +// send implements the requestSender interface +func (qrs *queuedRetrySender) send(req request) (int, error) { + if qrs.cfg.Disabled { + return qrs.consumerSender.send(req) } - if !sp.queue.Produce(req) { + if !qrs.queue.Produce(req) { return req.count(), errorRefused } @@ -103,8 +118,13 @@ func (sp *queuedSender) send(req request) (int, error) { } // shutdown is invoked during service shutdown. -func (sp *queuedSender) shutdown() { - sp.queue.Stop() +func (qrs *queuedRetrySender) shutdown() { + // First stop the retry goroutines, so that unblocks the queue workers. + close(qrs.retryStopCh) + + // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only + // try once every request. + qrs.queue.Stop() } // TODO: Clean this by forcing all exporters to return an internal error type that always include the information about retries. @@ -121,30 +141,23 @@ func NewThrottleRetry(err error, delay time.Duration) error { } type retrySender struct { - cfg *RetrySettings + cfg RetrySettings nextSender requestSender stopCh chan struct{} } -func newRetrySender(cfg *RetrySettings, nextSender requestSender) *retrySender { - return &retrySender{ - cfg: cfg, - nextSender: nextSender, - stopCh: make(chan struct{}), - } -} - -func (re *retrySender) send(req request) (int, error) { - if re.cfg.Disabled { - return re.nextSender.send(req) +// send implements the requestSender interface +func (rs *retrySender) send(req request) (int, error) { + if rs.cfg.Disabled { + return rs.nextSender.send(req) } expBackoff := backoff.NewExponentialBackOff() - expBackoff.InitialInterval = re.cfg.InitialBackoff - expBackoff.MaxInterval = re.cfg.MaxBackoff - expBackoff.MaxElapsedTime = re.cfg.MaxElapsedTime + expBackoff.InitialInterval = rs.cfg.InitialInterval + expBackoff.MaxInterval = rs.cfg.MaxInterval + expBackoff.MaxElapsedTime = rs.cfg.MaxElapsedTime for { - droppedItems, err := re.nextSender.send(req) + droppedItems, err := rs.nextSender.send(req) if err == nil { return droppedItems, nil @@ -175,18 +188,13 @@ func (re *retrySender) send(req request) (int, error) { select { case <-req.context().Done(): return req.count(), fmt.Errorf("request is cancelled or timed out %w", err) - case <-re.stopCh: + case <-rs.stopCh: return req.count(), fmt.Errorf("interrupted due to shutdown %w", err) case <-time.After(backoffDelay): } } } -// shutdown is invoked during service shutdown. -func (re *retrySender) shutdown() { - close(re.stopCh) -} - // max returns the larger of x or y. func max(x, y time.Duration) time.Duration { if x < y { diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index 6f1e8c6a73e..995aeaa30ea 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -35,11 +35,11 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(consumererror.Permanent(errors.New("bad data"))) - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false - rCfg.InitialBackoff = 30 * time.Second - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + rCfg.InitialInterval = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -54,17 +54,17 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { }) mockP.awaitAsyncProcessing() <-time.After(200 * time.Millisecond) - require.Zero(t, be.qSender.queue.Size()) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_DropOnNoRetry(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(errors.New("transient error")) - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() rCfg := CreateDefaultRetrySettings() rCfg.Disabled = true - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -79,7 +79,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { }) mockP.awaitAsyncProcessing() <-time.After(200 * time.Millisecond) - require.Zero(t, be.qSender.queue.Size()) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_PartialError(t *testing.T) { @@ -87,12 +87,14 @@ func TestQueuedRetry_PartialError(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(partialErr) - qCfg := CreateDefaultQueuedSettings() - qCfg.NumWorkers = 1 + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false - rCfg.InitialBackoff = 30 * time.Second - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + rCfg.InitialInterval = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -113,20 +115,23 @@ func TestQueuedRetry_PartialError(t *testing.T) { // In the newMockConcurrentExporter we count requests and items even for failed requests mockP.checkNumRequests(t, 2) - mockP.checkNumItems(t, 2+1) - require.Zero(t, be.qSender.queue.Size()) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_StopWhileWaiting(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(errors.New("transient error")) - qCfg := CreateDefaultQueuedSettings() - qCfg.NumWorkers = 1 + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false - rCfg.InitialBackoff = 30 * time.Minute - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + rCfg.InitialInterval = 30 * time.Minute + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) mockP.run(func() { @@ -137,28 +142,40 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { }) mockP.awaitAsyncProcessing() + // Enqueue another request to ensure when calling shutdown we drain the queue. + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 3, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.stop() assert.NoError(t, be.Shutdown(context.Background())) - // In the newMockConcurrentExporter we count requests and items even for failed requests mockP.checkNumRequests(t, 1) - mockP.checkNumItems(t, 2) - require.Zero(t, be.qSender.queue.Size()) + // TODO: Ensure that queue is drained, and uncomment the next 3 lines. + // https://github.com/jaegertracing/jaeger/pull/2349 + // ocs.checkSendItemsCount(t, 3) + ocs.checkDroppedItemsCount(t, 2) + // require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_PreserveCancellation(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(errors.New("transient error")) - qCfg := CreateDefaultQueuedSettings() - qCfg.NumWorkers = 1 + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false - rCfg.InitialBackoff = 30 * time.Second - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + rCfg.InitialInterval = 30 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ctx, cancelFunc := context.WithCancel(context.Background()) + start := time.Now() mockP.run(func() { // This is asynchronous so it should just enqueue, no errors expected. droppedItems, err := be.sender.send(newMockRequest(ctx, 2, mockP)) @@ -169,31 +186,35 @@ func TestQueuedRetry_PreserveCancellation(t *testing.T) { cancelFunc() - // In the newMockConcurrentExporter we count requests and items even for failed requests. mockP.checkNumRequests(t, 1) - mockP.checkNumItems(t, 2) - require.Zero(t, be.qSender.queue.Size()) + require.Zero(t, be.qrSender.queue.Size()) // Stop should succeed and not retry. mockP.stop() assert.NoError(t, be.Shutdown(context.Background())) + // We should ensure that we actually did not wait for the initial backoff (30 sec). + assert.True(t, 5*time.Second > time.Since(start)) + // In the newMockConcurrentExporter we count requests and items even for failed requests. mockP.checkNumRequests(t, 1) - mockP.checkNumItems(t, 2) - require.Zero(t, be.qSender.queue.Size()) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_MaxElapsedTime(t *testing.T) { mockP := newMockConcurrentExporter() - qCfg := CreateDefaultQueuedSettings() - qCfg.NumWorkers = 1 + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false - rCfg.InitialBackoff = 100 * time.Millisecond + rCfg.InitialInterval = 100 * time.Millisecond rCfg.MaxElapsedTime = 1 * time.Second - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -215,20 +236,23 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { // In the newMockConcurrentExporter we count requests and items even for failed requests. mockP.checkNumRequests(t, 1) - mockP.checkNumItems(t, 2) - require.Zero(t, be.qSender.queue.Size()) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 7) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_ThrottleError(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(NewThrottleRetry(errors.New("throttle error"), 1*time.Second)) - qCfg := CreateDefaultQueuedSettings() - qCfg.NumWorkers = 1 + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false - rCfg.InitialBackoff = 100 * time.Millisecond - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + rCfg.InitialInterval = 100 * time.Millisecond + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -251,23 +275,25 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { // The initial backoff is 100ms, but because of the throttle this should wait at least 1 seconds. assert.True(t, 1*time.Second < time.Since(start)) - // In the newMockConcurrentExporter we count requests and items even for failed requests mockP.checkNumRequests(t, 2) - mockP.checkNumItems(t, 4) - require.Zero(t, be.qSender.queue.Size()) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_RetryOnError(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(errors.New("transient error")) - qCfg := CreateDefaultQueuedSettings() - qCfg.NumWorkers = 1 + qCfg := CreateDefaultQueueSettings() + qCfg.NumConsumers = 1 qCfg.QueueSize = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false - rCfg.InitialBackoff = 2 * time.Second - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + rCfg.InitialInterval = 2 * time.Second + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -281,6 +307,8 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { assert.Equal(t, 0, droppedItems) }) mockP.awaitAsyncProcessing() + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 0) // There is a small race condition in this test, but expect to execute this in less than 2 second. mockP.updateError(nil) @@ -289,18 +317,21 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { // In the newMockConcurrentExporter we count requests and items even for failed requests mockP.checkNumRequests(t, 2) - mockP.checkNumItems(t, 4) - require.Zero(t, be.qSender.queue.Size()) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_DropOnFull(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(errors.New("transient error")) - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := CreateDefaultRetrySettings() - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -317,9 +348,11 @@ func TestQueuedRetryHappyPath(t *testing.T) { defer doneFn() mockP := newMockConcurrentExporter() - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() rCfg := CreateDefaultRetrySettings() - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -339,7 +372,8 @@ func TestQueuedRetryHappyPath(t *testing.T) { mockP.awaitAsyncProcessing() mockP.checkNumRequests(t, wantRequests) - mockP.checkNumItems(t, 2*wantRequests) + ocs.checkSendItemsCount(t, 2*wantRequests) + ocs.checkDroppedItemsCount(t, 0) } type mockErrorRequest struct { @@ -355,7 +389,7 @@ func (mer *mockErrorRequest) onPartialError(consumererror.PartialError) request } func (mer *mockErrorRequest) count() int { - return 0 + return 7 } func newErrorRequest(ctx context.Context) request { @@ -371,7 +405,7 @@ type mockRequest struct { } func (m *mockRequest) export(_ context.Context) (int, error) { - err := m.mce.export(m.cnt) + err := m.mce.export() if err != nil { return m.cnt, err } @@ -403,7 +437,6 @@ type mockConcurrentExporter struct { mu sync.Mutex consumeError error requestCount int64 - itemsCount int64 stopped int32 } @@ -411,32 +444,27 @@ func newMockConcurrentExporter() *mockConcurrentExporter { return &mockConcurrentExporter{waitGroup: new(sync.WaitGroup)} } -func (p *mockConcurrentExporter) export(cnt int) error { +func (p *mockConcurrentExporter) export() error { if atomic.LoadInt32(&p.stopped) == 1 { return nil } atomic.AddInt64(&p.requestCount, 1) - atomic.AddInt64(&p.itemsCount, int64(cnt)) p.mu.Lock() defer p.mu.Unlock() defer p.waitGroup.Done() return p.consumeError } -func (p *mockConcurrentExporter) checkNumRequests(t *testing.T, want int) { - assert.EqualValues(t, want, atomic.LoadInt64(&p.requestCount)) -} - -func (p *mockConcurrentExporter) checkNumItems(t *testing.T, want int) { - assert.EqualValues(t, want, atomic.LoadInt64(&p.itemsCount)) -} - func (p *mockConcurrentExporter) updateError(err error) { p.mu.Lock() defer p.mu.Unlock() p.consumeError = err } +func (p *mockConcurrentExporter) checkNumRequests(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&p.requestCount)) +} + func (p *mockConcurrentExporter) run(fn func()) { p.waitGroup.Add(1) fn() @@ -449,3 +477,24 @@ func (p *mockConcurrentExporter) awaitAsyncProcessing() { func (p *mockConcurrentExporter) stop() { atomic.StoreInt32(&p.stopped, 1) } + +type observabilityConsumerSender struct { + sentItemsCount int64 + droppedItemsCount int64 + nextSender requestSender +} + +func (ocs *observabilityConsumerSender) send(req request) (int, error) { + dic, err := ocs.nextSender.send(req) + atomic.AddInt64(&ocs.sentItemsCount, int64(req.count()-dic)) + atomic.AddInt64(&ocs.droppedItemsCount, int64(dic)) + return dic, err +} + +func (ocs *observabilityConsumerSender) checkSendItemsCount(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&ocs.sentItemsCount)) +} + +func (ocs *observabilityConsumerSender) checkDroppedItemsCount(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&ocs.droppedItemsCount)) +} diff --git a/exporter/exporterhelper/tracehelper.go b/exporter/exporterhelper/tracehelper.go index 74b95b1bcc4..119d57c7ddf 100644 --- a/exporter/exporterhelper/tracehelper.go +++ b/exporter/exporterhelper/tracehelper.go @@ -29,7 +29,6 @@ import ( // returns the number of dropped spans. type traceDataPusherOld func(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error) -// traceExporterOld implements the nextSender with additional helper internalOptions. type traceExporterOld struct { *baseExporter dataPusher traceDataPusherOld @@ -41,9 +40,7 @@ func (texp *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerd return err } -// NewTraceExporterOld creates an TraceExporterOld that can record metrics and can wrap every -// request with a Span. If no internalOptions are passed it just adds the nextSender format as a -// tag in the Context. +// NewTraceExporterOld creates an TraceExporterOld that records observability metrics and wraps every request with a Span. func NewTraceExporterOld( cfg configmodels.Exporter, dataPusher traceDataPusherOld, @@ -125,8 +122,7 @@ func (texp *traceExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) e return err } -// NewTraceExporter creates a TraceExporter that can record metrics and can wrap -// every request with a Span. +// NewTraceExporter creates a TraceExporter that records observability metrics and wraps every request with a Span. func NewTraceExporter( cfg configmodels.Exporter, dataPusher traceDataPusher, @@ -142,12 +138,12 @@ func NewTraceExporter( } be := newBaseExporter(cfg, options...) - - // Record metrics on the consumer. - be.qSender.nextSender = &tracesExporterWithObservability{ - exporterName: cfg.Name(), - sender: be.qSender.nextSender, - } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &tracesExporterWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &traceExporter{ baseExporter: be, @@ -157,16 +153,16 @@ func NewTraceExporter( type tracesExporterWithObservability struct { exporterName string - sender requestSender + nextSender requestSender } func (tewo *tracesExporterWithObservability) send(req request) (int, error) { req.setContext(obsreport.StartTraceDataExportOp(req.context(), tewo.exporterName)) // Forward the data to the next consumer (this pusher is the next). - droppedSpans, err := tewo.sender.send(req) + droppedSpans, err := tewo.nextSender.send(req) - // TODO: this is not ideal: req should come from the next function itself. - // temporarily loading req from internal format. Once full switch is done + // TODO: this is not ideal: it should come from the next function itself. + // temporarily loading it from internal format. Once full switch is done // to new metrics will remove this. obsreport.EndTraceDataExportOp(req.context(), req.count(), droppedSpans, err) return droppedSpans, err diff --git a/exporter/opencensusexporter/factory_test.go b/exporter/opencensusexporter/factory_test.go index 7d1854e6f6d..b7eee6bbb9f 100644 --- a/exporter/opencensusexporter/factory_test.go +++ b/exporter/opencensusexporter/factory_test.go @@ -138,7 +138,7 @@ func TestCreateTraceExporter(t *testing.T) { }, }, { - name: "NumWorkers", + name: "NumConsumers", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: rcvCfg.NetAddr.Endpoint, diff --git a/exporter/otlpexporter/README.md b/exporter/otlpexporter/README.md index 59a8c25d731..107773dc4e1 100644 --- a/exporter/otlpexporter/README.md +++ b/exporter/otlpexporter/README.md @@ -28,16 +28,19 @@ The following settings can be optionally configured: [grpc.WithInsecure()](https://godoc.org/google.golang.org/grpc#WithInsecure). - `balancer_name`(default = pick_first): Sets the balancer in grpclb_policy to discover the servers. See [grpc loadbalancing example](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md). -- `timeout` (default = 5s): Is the timeout for each operation. -- `retry_settings` +- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend. +- `retry_on_failure` - `disabled` (default = false) - - `initial_backoff` (default = 5s): Time to wait after the first failure before retrying; ignored if retry_on_failure is false - - `max_backoff` (default = 30s): Is the upper bound on backoff; ignored if retry_on_failure is false - - `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if retry_on_failure is false -- `queued_settings` + - `initial_interval` (default = 5s): Time to wait after the first failure before retrying; ignored if `disabled` is `true` + - `max_interval` (default = 30s): Is the upper bound on backoff; ignored if `disabled` is `true` + - `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if `disabled` is `true` +- `sending_queue` - `disabled` (default = true) - - `num_workers` (default = 10): Number of workers that dequeue batches - - `queue_size` (default = 5000): Maximum number of batches kept in memory before data + - `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `disabled` is `true` + - `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `disabled` is `true`; + User should calculate this as `num_seconds * requests_per_second` where: + - `num_seconds` is the number of seconds to buffer in case of a backend outage + - `requests_per_second` is the average number of requests per seconds. Example: diff --git a/exporter/otlpexporter/config.go b/exporter/otlpexporter/config.go index 469d0ba1835..fe9c3f0ded8 100644 --- a/exporter/otlpexporter/config.go +++ b/exporter/otlpexporter/config.go @@ -24,8 +24,8 @@ import ( type Config struct { configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. - exporterhelper.QueuedSettings `mapstructure:"queued_settings"` - exporterhelper.RetrySettings `mapstructure:"retry_settings"` + exporterhelper.QueueSettings `mapstructure:"sending_queue"` + exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. } diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index 1ab79b67834..821e295494d 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -54,15 +54,15 @@ func TestLoadConfig(t *testing.T) { Timeout: 10 * time.Second, }, RetrySettings: exporterhelper.RetrySettings{ - Disabled: false, - InitialBackoff: 10 * time.Second, - MaxBackoff: 1 * time.Minute, - MaxElapsedTime: 10 * time.Minute, + Disabled: false, + InitialInterval: 10 * time.Second, + MaxInterval: 1 * time.Minute, + MaxElapsedTime: 10 * time.Minute, }, - QueuedSettings: exporterhelper.QueuedSettings{ - Disabled: false, - NumWorkers: 2, - QueueSize: 10, + QueueSettings: exporterhelper.QueueSettings{ + Disabled: false, + NumConsumers: 2, + QueueSize: 10, }, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{ diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index 8d33b3b1ef1..4b9f28e9c2f 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -40,7 +40,7 @@ func NewFactory() component.ExporterFactory { func createDefaultConfig() configmodels.Exporter { // TODO: Enable the queued settings. - qs := exporterhelper.CreateDefaultQueuedSettings() + qs := exporterhelper.CreateDefaultQueueSettings() qs.Disabled = true return &Config{ ExporterSettings: configmodels.ExporterSettings{ @@ -49,7 +49,7 @@ func createDefaultConfig() configmodels.Exporter { }, TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(), RetrySettings: exporterhelper.CreateDefaultRetrySettings(), - QueuedSettings: qs, + QueueSettings: qs, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{}, // We almost read 0 bytes, so no need to tune ReadBufferSize. @@ -73,7 +73,7 @@ func createTraceExporter( oce.pushTraceData, exporterhelper.WithTimeout(oCfg.TimeoutSettings), exporterhelper.WithRetry(oCfg.RetrySettings), - exporterhelper.WithQueued(oCfg.QueuedSettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown)) if err != nil { return nil, err @@ -97,7 +97,7 @@ func createMetricsExporter( oce.pushMetricsData, exporterhelper.WithTimeout(oCfg.TimeoutSettings), exporterhelper.WithRetry(oCfg.RetrySettings), - exporterhelper.WithQueued(oCfg.QueuedSettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown), ) if err != nil { @@ -122,7 +122,7 @@ func createLogExporter( oce.pushLogData, exporterhelper.WithTimeout(oCfg.TimeoutSettings), exporterhelper.WithRetry(oCfg.RetrySettings), - exporterhelper.WithQueued(oCfg.QueuedSettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown), ) if err != nil { diff --git a/exporter/otlpexporter/factory_test.go b/exporter/otlpexporter/factory_test.go index c554592748f..6cf00d55959 100644 --- a/exporter/otlpexporter/factory_test.go +++ b/exporter/otlpexporter/factory_test.go @@ -111,7 +111,7 @@ func TestCreateTraceExporter(t *testing.T) { }, }, { - name: "NumWorkers", + name: "NumConsumers", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: endpoint, diff --git a/exporter/otlpexporter/testdata/config.yaml b/exporter/otlpexporter/testdata/config.yaml index 253b8e66120..38787c3b0a4 100644 --- a/exporter/otlpexporter/testdata/config.yaml +++ b/exporter/otlpexporter/testdata/config.yaml @@ -11,14 +11,14 @@ exporters: compression: "on" ca_file: /var/lib/mycert.pem timeout: 10s - queued_settings: + sending_queue: disabled: false - num_workers: 2 + num_consumers: 2 queue_size: 10 - retry_settings: + retry_on_failure: disabled: false - initial_backoff: 10s - max_backoff: 60s + initial_interval: 10s + max_interval: 60s max_elapsed_time: 10m per_rpc_auth: type: bearer diff --git a/processor/queuedprocessor/config.go b/processor/queuedprocessor/config.go index 9b71ed78d59..c7d1c6335a0 100644 --- a/processor/queuedprocessor/config.go +++ b/processor/queuedprocessor/config.go @@ -24,7 +24,7 @@ import ( type Config struct { configmodels.ProcessorSettings `mapstructure:",squash"` - // NumWorkers is the number of queue workers that dequeue batches and send them out. + // NumConsumers is the number of queue workers that dequeue batches and send them out. NumWorkers int `mapstructure:"num_workers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"`