From 224eb93ec05f03d34969b2c59da6ad9424eacb55 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Thu, 7 Sep 2023 12:09:23 -0700 Subject: [PATCH] [chore] [exporterhelper] Refactor options and baseExporter Separate all the parts of the baseExporter into an explicit chain of request senders. It makes it easier to follow the data flow and add additional senders. This change also removes the baseSettings, because it's not needed to keep the settings anymore. All the options are now applied on the baseExporter and update the internal senders in place. --- .chloggen/exporterhelper-avoid-messages.yaml | 20 ++ exporter/exporterhelper/common.go | 226 ++++++++++++------- exporter/exporterhelper/common_test.go | 20 +- exporter/exporterhelper/logs.go | 36 ++- exporter/exporterhelper/metrics.go | 36 ++- exporter/exporterhelper/queued_retry.go | 123 ++++------ exporter/exporterhelper/queued_retry_test.go | 175 +++++++------- exporter/exporterhelper/traces.go | 36 ++- 8 files changed, 364 insertions(+), 308 deletions(-) create mode 100644 .chloggen/exporterhelper-avoid-messages.yaml diff --git a/.chloggen/exporterhelper-avoid-messages.yaml b/.chloggen/exporterhelper-avoid-messages.yaml new file mode 100644 index 00000000000..beec973dce2 --- /dev/null +++ b/.chloggen/exporterhelper-avoid-messages.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Stop logging error messages suggesting user to enable `retry_on_failure` or `sending_queue` when they are not available. + +# One or more tracking issues or pull requests related to the change +issues: [8369] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 3dfe4c41311..443fc477865 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -7,6 +7,9 @@ import ( "context" "time" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -29,9 +32,34 @@ func NewDefaultTimeoutSettings() TimeoutSettings { // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). type requestSender interface { + start(ctx context.Context, host component.Host, set exporter.CreateSettings) error + shutdown() send(req internal.Request) error + setNextSender(nextSender requestSender) +} + +type baseRequestSender struct { + nextSender requestSender +} + +var _ requestSender = (*baseRequestSender)(nil) + +func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error { + return nil +} + +func (b *baseRequestSender) shutdown() {} + +func (b *baseRequestSender) send(req internal.Request) error { + return b.nextSender.send(req) +} + +func (b *baseRequestSender) setNextSender(nextSender requestSender) { + b.nextSender = nextSender } +type obsrepSenderFactory func(obsrep *obsExporter) requestSender + // baseRequest is a base implementation for the internal.Request. type baseRequest struct { ctx context.Context @@ -56,47 +84,13 @@ func (req *baseRequest) OnProcessingFinished() { } } -// baseSettings represents all the options that users can configure. -type baseSettings struct { - component.StartFunc - component.ShutdownFunc - consumerOptions []consumer.Option - TimeoutSettings - queue internal.ProducerConsumerQueue - RetrySettings - requestExporter bool - marshaler internal.RequestMarshaler - unmarshaler internal.RequestUnmarshaler -} - -// newBaseSettings returns the baseSettings starting from the default and applying all configured options. -// requestExporter indicates whether the base settings are for a new request exporter or not. -// TODO: The first three arguments will be removed when the old exporter helpers will be updated to call the new ones. -func newBaseSettings(requestExporter bool, marshaler internal.RequestMarshaler, - unmarshaler internal.RequestUnmarshaler, options ...Option) *baseSettings { - bs := &baseSettings{ - requestExporter: requestExporter, - TimeoutSettings: NewDefaultTimeoutSettings(), - // TODO: Enable retry by default (call DefaultRetrySettings) - RetrySettings: RetrySettings{Enabled: false}, - marshaler: marshaler, - unmarshaler: unmarshaler, - } - - for _, op := range options { - op(bs) - } - - return bs -} - -// Option apply changes to baseSettings. -type Option func(*baseSettings) +// Option apply changes to baseExporter. +type Option func(*baseExporter) // WithStart overrides the default Start function for an exporter. // The default start function does nothing and always returns nil. func WithStart(start component.StartFunc) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { o.StartFunc = start } } @@ -104,7 +98,7 @@ func WithStart(start component.StartFunc) Option { // WithShutdown overrides the default Shutdown function for an exporter. // The default shutdown function does nothing and always returns nil. func WithShutdown(shutdown component.ShutdownFunc) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { o.ShutdownFunc = shutdown } } @@ -112,16 +106,16 @@ func WithShutdown(shutdown component.ShutdownFunc) Option { // WithTimeout overrides the default TimeoutSettings for an exporter. // The default TimeoutSettings is 5 seconds. func WithTimeout(timeoutSettings TimeoutSettings) Option { - return func(o *baseSettings) { - o.TimeoutSettings = timeoutSettings + return func(o *baseExporter) { + o.timeoutSender.cfg = timeoutSettings } } // WithRetry overrides the default RetrySettings for an exporter. // The default RetrySettings is to disable retries. func WithRetry(retrySettings RetrySettings) Option { - return func(o *baseSettings) { - o.RetrySettings = retrySettings + return func(o *baseExporter) { + o.retrySender = newRetrySender(o.set.ID, retrySettings, o.sampledLogger, o.onTemporaryFailure) } } @@ -129,18 +123,21 @@ func WithRetry(retrySettings RetrySettings) Option { // The default QueueSettings is to disable queueing. // This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. func WithQueue(config QueueSettings) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { if o.requestExporter { panic("queueing is not available for the new request exporters yet") } - if !config.Enabled { - return + var queue internal.ProducerConsumerQueue + if config.Enabled { + if config.StorageID == nil { + queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) + } else { + queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) + } } - if config.StorageID == nil { - o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) - return - } - o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) + qs := newQueueSender(o.set.ID, o.signal, queue, o.sampledLogger) + o.queueSender = qs + o.setOnTemporaryFailure(qs.onTemporaryFailure) } } @@ -148,7 +145,7 @@ func WithQueue(config QueueSettings) Option { // The default is non-mutable data. // TODO: Verify if we can change the default to be mutable as we do for processors. func WithCapabilities(capabilities consumer.Capabilities) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities)) } } @@ -157,48 +154,106 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { type baseExporter struct { component.StartFunc component.ShutdownFunc - obsrep *obsExporter - sender requestSender - qrSender *queuedRetrySender + + requestExporter bool + marshaler internal.RequestMarshaler + unmarshaler internal.RequestUnmarshaler + signal component.DataType + + set exporter.CreateSettings + obsrep *obsExporter + sampledLogger *zap.Logger + + // Chain of senders that the exporter helper applies before passing the data to the actual exporter. + // The data is handled by each sender in the respective order starting from the queueSender. + // Most of the senders are optional, and initialized with a no-op path-through sender. + queueSender requestSender + obsrepSender requestSender + retrySender requestSender + timeoutSender *timeoutSender // timeoutSender is always initialized. + + // onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer. + onTemporaryFailure onRequestHandlingFinishedFunc + + consumerOptions []consumer.Option } -func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) { - be := &baseExporter{} +// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones. +func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler, + unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { - var err error - be.obsrep, err = newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments) + obsrep, err := newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments) if err != nil { return nil, err } - be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) - be.sender = be.qrSender - be.StartFunc = func(ctx context.Context, host component.Host) error { - // First start the wrapped exporter. - if err := bs.StartFunc.Start(ctx, host); err != nil { - return err - } + be := &baseExporter{ + requestExporter: requestExporter, + marshaler: marshaler, + unmarshaler: unmarshaler, + signal: signal, + + queueSender: &baseRequestSender{}, + obsrepSender: osf(obsrep), + retrySender: &baseRequestSender{}, + timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()}, - // If no error then start the queuedRetrySender. - return be.qrSender.start(ctx, host, set) + set: set, + obsrep: obsrep, + sampledLogger: createSampledLogger(set.Logger), } - be.ShutdownFunc = func(ctx context.Context) error { - // First shutdown the queued retry sender - be.qrSender.shutdown() - // Last shutdown the wrapped exporter itself. - return bs.ShutdownFunc.Shutdown(ctx) + + for _, op := range options { + op(be) } + be.connectSenders() + return be, nil } -// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper. -// This can be used to wrap with observability (create spans, record metrics) the consumer sender. -func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) { - be.qrSender.consumerSender = f(be.qrSender.consumerSender) +// send sends the request using the first sender in the chain. +func (be *baseExporter) send(req internal.Request) error { + return be.queueSender.send(req) +} + +// connectSenders connects the senders in the predefined order. +func (be *baseExporter) connectSenders() { + be.queueSender.setNextSender(be.obsrepSender) + be.obsrepSender.setNextSender(be.retrySender) + be.retrySender.setNextSender(be.timeoutSender) +} + +func (be *baseExporter) Start(ctx context.Context, host component.Host) error { + // First start the wrapped exporter. + if err := be.StartFunc.Start(ctx, host); err != nil { + return err + } + + // If no error then start the queueSender. + return be.queueSender.start(ctx, host, be.set) +} + +func (be *baseExporter) Shutdown(ctx context.Context) error { + // First shutdown the retry sender, so it can push any pending requests to back the queue. + be.retrySender.shutdown() + + // Then shutdown the queue sender. + be.queueSender.shutdown() + + // Last shutdown the wrapped exporter itself. + return be.ShutdownFunc.Shutdown(ctx) +} + +func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) { + be.onTemporaryFailure = onTemporaryFailure + if rs, ok := be.retrySender.(*retrySender); ok { + rs.onTemporaryFailure = onTemporaryFailure + } } // timeoutSender is a requestSender that adds a `timeout` to every request that passes this sender. type timeoutSender struct { + baseRequestSender cfg TimeoutSettings } @@ -213,3 +268,22 @@ func (ts *timeoutSender) send(req internal.Request) error { } return req.Export(ctx) } + +func createSampledLogger(logger *zap.Logger) *zap.Logger { + if logger.Core().Enabled(zapcore.DebugLevel) { + // Debugging is enabled. Don't do any sampling. + return logger + } + + // Create a logger that samples all messages to 1 per 10 seconds initially, + // and 1/100 of messages after that. + opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return zapcore.NewSamplerWithOptions( + core, + 10*time.Second, + 1, + 100, + ) + }) + return logger.WithOptions(opts) +} diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 076cd55b00e..d505a3a35be 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -1,5 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 + package exporterhelper import ( @@ -31,12 +32,16 @@ var ( } ) +func newNoopObsrepSender(_ *obsExporter) requestSender { + return &baseRequestSender{} +} + func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) - be, err = newBaseExporter(defaultSettings, newBaseSettings(true, nil, nil), "") + be, err = newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -45,13 +50,10 @@ func TestBaseExporter(t *testing.T) { func TestBaseExporterWithOptions(t *testing.T) { want := errors.New("my error") be, err := newBaseExporter( - defaultSettings, - newBaseSettings( - false, nil, nil, - WithStart(func(ctx context.Context, host component.Host) error { return want }), - WithShutdown(func(ctx context.Context) error { return want }), - WithTimeout(NewDefaultTimeoutSettings())), - "", + defaultSettings, "", false, nil, nil, newNoopObsrepSender, + WithStart(func(ctx context.Context, host component.Host) error { return want }), + WithShutdown(func(ctx context.Context) error { return want }), + WithTimeout(NewDefaultTimeoutSettings()), ) require.NoError(t, err) require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index ef22d50dd02..a6a6e162710 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -89,26 +89,20 @@ func NewLogsExporter( return nil, errNilPushLogsData } - bs := newBaseSettings(false, logsRequestMarshaler, newLogsRequestUnmarshalerFunc(pusher), options...) - be, err := newBaseExporter(set, bs, component.DataTypeLogs) + be, err := newBaseExporter(set, component.DataTypeLogs, false, logsRequestMarshaler, + newLogsRequestUnmarshalerFunc(pusher), newLogsExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &logsExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req := newLogsRequest(ctx, ld, pusher) - serr := be.sender.send(req) + serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count())) } return serr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &logsExporter{ baseExporter: be, @@ -141,18 +135,10 @@ func NewLogsRequestExporter( return nil, errNilLogsConverter } - bs := newBaseSettings(true, nil, nil, options...) - - be, err := newBaseExporter(set, bs, component.DataTypeLogs) + be, err := newBaseExporter(set, component.DataTypeLogs, true, nil, nil, newLogsExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &logsExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req, cErr := converter.RequestFromLogs(ctx, ld) @@ -166,12 +152,12 @@ func NewLogsRequestExporter( baseRequest: baseRequest{ctx: ctx}, Request: req, } - sErr := be.sender.send(r) + sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count())) } return sErr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &logsExporter{ baseExporter: be, @@ -180,8 +166,12 @@ func NewLogsRequestExporter( } type logsExporterWithObservability struct { - obsrep *obsExporter - nextSender requestSender + baseRequestSender + obsrep *obsExporter +} + +func newLogsExporterWithObservability(obsrep *obsExporter) requestSender { + return &logsExporterWithObservability{obsrep: obsrep} } func (lewo *logsExporterWithObservability) send(req internal.Request) error { diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index a13b010e955..a678ebeebb5 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -89,26 +89,20 @@ func NewMetricsExporter( return nil, errNilPushMetricsData } - bs := newBaseSettings(false, metricsRequestMarshaler, newMetricsRequestUnmarshalerFunc(pusher), options...) - be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + be, err := newBaseExporter(set, component.DataTypeMetrics, false, metricsRequestMarshaler, + newMetricsRequestUnmarshalerFunc(pusher), newMetricsSenderWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &metricsSenderWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { req := newMetricsRequest(ctx, md, pusher) - serr := be.sender.send(req) + serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count())) } return serr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &metricsExporter{ baseExporter: be, @@ -141,18 +135,10 @@ func NewMetricsRequestExporter( return nil, errNilMetricsConverter } - bs := newBaseSettings(true, nil, nil, options...) - - be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + be, err := newBaseExporter(set, component.DataTypeMetrics, true, nil, nil, newMetricsSenderWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &metricsSenderWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { req, cErr := converter.RequestFromMetrics(ctx, md) @@ -166,12 +152,12 @@ func NewMetricsRequestExporter( Request: req, baseRequest: baseRequest{ctx: ctx}, } - sErr := be.sender.send(r) + sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count())) } return sErr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &metricsExporter{ baseExporter: be, @@ -180,8 +166,12 @@ func NewMetricsRequestExporter( } type metricsSenderWithObservability struct { - obsrep *obsExporter - nextSender requestSender + baseRequestSender + obsrep *obsExporter +} + +func newMetricsSenderWithObservability(obsrep *obsExporter) requestSender { + return &metricsSenderWithObservability{obsrep: obsrep} } func (mewo *metricsSenderWithObservability) send(req internal.Request) error { diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 2e2bc1f9a82..3aceb411745 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -14,7 +14,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" @@ -65,51 +64,32 @@ func (qCfg *QueueSettings) Validate() error { return nil } -type queuedRetrySender struct { +type queueSender struct { + baseRequestSender fullName string id component.ID signal component.DataType - consumerSender requestSender queue internal.ProducerConsumerQueue - retryStopCh chan struct{} traceAttribute attribute.KeyValue logger *zap.Logger requeuingEnabled bool } -func newQueuedRetrySender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, - rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { - retryStopCh := make(chan struct{}) - sampledLogger := createSampledLogger(logger) - traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) - - qrs := &queuedRetrySender{ +func newQueueSender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, logger *zap.Logger) *queueSender { + return &queueSender{ fullName: id.String(), id: id, signal: signal, queue: queue, - retryStopCh: retryStopCh, - traceAttribute: traceAttr, - logger: sampledLogger, + traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), + logger: logger, // TODO: this can be further exposed as a config param rather than relying on a type of queue requeuingEnabled: queue != nil && queue.IsPersistent(), } - - qrs.consumerSender = &retrySender{ - traceAttribute: traceAttr, - cfg: rCfg, - nextSender: nextSender, - stopCh: retryStopCh, - logger: sampledLogger, - // Following three functions actually depend on queuedRetrySender - onTemporaryFailure: qrs.onTemporaryFailure, - } - - return qrs } -func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { - if !qrs.requeuingEnabled || qrs.queue == nil { +func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { + if !qs.requeuingEnabled || qs.queue == nil { logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), @@ -118,7 +98,7 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna return err } - if qrs.queue.Produce(req) { + if qs.queue.Produce(req) { logger.Error( "Exporting failed. Putting back to the end of the queue.", zap.Error(err), @@ -134,16 +114,16 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna } // start is invoked during service startup. -func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { - if qrs.queue == nil { +func (qs *queueSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { + if qs.queue == nil { return nil } - err := qrs.queue.Start(ctx, host, internal.QueueSettings{ + err := qs.queue.Start(ctx, host, internal.QueueSettings{ CreateSettings: set, - DataType: qrs.signal, + DataType: qs.signal, Callback: func(item internal.Request) { - _ = qrs.consumerSender.send(item) + _ = qs.nextSender.send(item) item.OnProcessingFinished() }, }) @@ -153,14 +133,14 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host, se // Start reporting queue length metric err = globalInstruments.queueSize.UpsertEntry(func() int64 { - return int64(qrs.queue.Size()) - }, metricdata.NewLabelValue(qrs.fullName)) + return int64(qs.queue.Size()) + }, metricdata.NewLabelValue(qs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue size metric: %w", err) } err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.queue.Capacity()) - }, metricdata.NewLabelValue(qrs.fullName)) + return int64(qs.queue.Capacity()) + }, metricdata.NewLabelValue(qs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue capacity metric: %w", err) } @@ -169,19 +149,16 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host, se } // shutdown is invoked during service shutdown. -func (qrs *queuedRetrySender) shutdown() { - // First Stop the retry goroutines, so that unblocks the queue numWorkers. - close(qrs.retryStopCh) - - if qrs.queue != nil { +func (qs *queueSender) shutdown() { + if qs.queue != nil { // Cleanup queue metrics reporting _ = globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(0) - }, metricdata.NewLabelValue(qrs.fullName)) + }, metricdata.NewLabelValue(qs.fullName)) // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only // try once every request. - qrs.queue.Stop() + qs.queue.Stop() } } @@ -217,31 +194,12 @@ func NewDefaultRetrySettings() RetrySettings { } } -func createSampledLogger(logger *zap.Logger) *zap.Logger { - if logger.Core().Enabled(zapcore.DebugLevel) { - // Debugging is enabled. Don't do any sampling. - return logger - } - - // Create a logger that samples all messages to 1 per 10 seconds initially, - // and 1/100 of messages after that. - opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { - return zapcore.NewSamplerWithOptions( - core, - 10*time.Second, - 1, - 100, - ) - }) - return logger.WithOptions(opts) -} - // send implements the requestSender interface -func (qrs *queuedRetrySender) send(req internal.Request) error { - if qrs.queue == nil { - err := qrs.consumerSender.send(req) +func (qs *queueSender) send(req internal.Request) error { + if qs.queue == nil { + err := qs.nextSender.send(req) if err != nil { - qrs.logger.Error( + qs.logger.Error( "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", zap.Int("dropped_items", req.Count()), ) @@ -254,16 +212,16 @@ func (qrs *queuedRetrySender) send(req internal.Request) error { req.SetContext(noCancellationContext{Context: req.Context()}) span := trace.SpanFromContext(req.Context()) - if !qrs.queue.Produce(req) { - qrs.logger.Error( + if !qs.queue.Produce(req) { + qs.logger.Error( "Dropping data because sending_queue is full. Try increasing queue_size.", zap.Int("dropped_items", req.Count()), ) - span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qrs.traceAttribute)) + span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qs.traceAttribute)) return errSendingQueueIsFull } - span.AddEvent("Enqueued item.", trace.WithAttributes(qrs.traceAttribute)) + span.AddEvent("Enqueued item.", trace.WithAttributes(qs.traceAttribute)) return nil } @@ -292,14 +250,33 @@ func NewThrottleRetry(err error, delay time.Duration) error { type onRequestHandlingFinishedFunc func(*zap.Logger, internal.Request, error) error type retrySender struct { + baseRequestSender traceAttribute attribute.KeyValue cfg RetrySettings - nextSender requestSender stopCh chan struct{} logger *zap.Logger onTemporaryFailure onRequestHandlingFinishedFunc } +func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender { + if onTemporaryFailure == nil { + onTemporaryFailure = func(logger *zap.Logger, req internal.Request, err error) error { + return err + } + } + return &retrySender{ + traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), + cfg: rCfg, + stopCh: make(chan struct{}), + logger: logger, + onTemporaryFailure: onTemporaryFailure, + } +} + +func (rs *retrySender) shutdown() { + close(rs.stopCh) +} + // send implements the requestSender interface func (rs *retrySender) send(req internal.Request) error { if !rs.cfg.Enabled { diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index cb0885751b3..7472cdeff49 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -41,11 +41,9 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) - bs := newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)) - be, err := newBaseExporter(defaultSettings, bs, "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -53,7 +51,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -66,13 +64,11 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false - bs := newBaseSettings(false, mockRequestMarshaler, + be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))), - WithRetry(rCfg), WithQueue(qCfg)) - be, err := newBaseExporter(defaultSettings, bs, "") + newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -81,7 +77,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -95,10 +91,8 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -106,9 +100,10 @@ func TestQueuedRetry_OnError(t *testing.T) { traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) mockR := newMockRequest(context.Background(), 2, traceErr) + ocs := be.obsrepSender.(*observabilityConsumerSender) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -122,23 +117,22 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) firstMockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(firstMockR)) + require.NoError(t, be.send(firstMockR)) }) // Enqueue another request to ensure when calling shutdown we drain the queue. secondMockR := newMockRequest(context.Background(), 3, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(secondMockR)) + require.NoError(t, be.send(secondMockR)) }) assert.NoError(t, be.Shutdown(context.Background())) @@ -156,10 +150,9 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -170,14 +163,14 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { mockR := newMockRequest(ctx, 2, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } func TestQueuedRetry_MaxElapsedTime(t *testing.T) { @@ -186,10 +179,9 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -197,14 +189,14 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { ocs.run(func() { // Add an item that will always fail. - require.NoError(t, be.sender.send(newErrorRequest(context.Background()))) + require.NoError(t, be.send(newErrorRequest(context.Background()))) }) mockR := newMockRequest(context.Background(), 2, nil) start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -217,7 +209,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } type wrappedError struct { @@ -233,10 +225,9 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -247,7 +238,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -257,7 +248,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { mockR.checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } func TestQueuedRetry_RetryOnError(t *testing.T) { @@ -266,10 +257,9 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -278,7 +268,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -286,23 +276,22 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { mockR.checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) ocs.run(func() { - require.Error(t, be.sender.send(newMockRequest(context.Background(), 2, errors.New("transient error")))) + require.Error(t, be.send(newMockRequest(context.Background(), 2, errors.New("transient error")))) }) } @@ -314,10 +303,9 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -329,7 +317,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { ocs.run(func() { req := newMockRequest(context.Background(), 2, nil) reqs = append(reqs, req) - require.NoError(t, be.sender.send(req)) + require.NoError(t, be.send(req)) }) } @@ -349,13 +337,13 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) checkValueForGlobalManager(t, defaultExporterTags, int64(defaultQueueSize), "exporter/queue_capacity") for i := 0; i < 7; i++ { - require.NoError(t, be.sender.send(newErrorRequest(context.Background()))) + require.NoError(t, be.send(newErrorRequest(context.Background()))) } checkValueForGlobalManager(t, defaultExporterTags, int64(7), "exporter/queue_size") @@ -397,11 +385,10 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs - be.qrSender.requeuingEnabled = true + ocs := be.obsrepSender.(*observabilityConsumerSender) + be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -411,7 +398,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { mockR := newMockRequest(context.Background(), 1, traceErr) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) ocs.waitGroup.Add(1) // necessary because we'll call send() again after requeueing }) ocs.awaitAsyncProcessing() @@ -429,9 +416,9 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - be.qrSender.requeuingEnabled = true + be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -440,18 +427,46 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) mockR := newMockRequest(context.Background(), 1, traceErr) - require.Error(t, be.qrSender.consumerSender.send(mockR), "sending_queue is full") + require.Error(t, be.retrySender.send(mockR), "sending_queue is full") mockR.checkNumRequests(t, 1) } func TestQueueRetryWithDisabledQueue(t *testing.T) { qs := NewDefaultQueueSettings() qs.Enabled = false - bs := newBaseSettings(false, nil, nil, WithQueue(qs)) - require.Nil(t, bs.queue) - be, err := newBaseExporter(exportertest.NewNopCreateSettings(), bs, component.DataTypeLogs) + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs)) + require.Nil(t, be.queueSender.(*queueSender).queue) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.obsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(context.Background(), 2, errors.New("some error")) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.Error(t, be.send(mockR)) + }) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestQueueRetryWithNoQueue(t *testing.T) { + rCfg := NewDefaultRetrySettings() + rCfg.MaxElapsedTime = time.Nanosecond // fail fast + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.obsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(context.Background(), 2, errors.New("some error")) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.Error(t, be.send(mockR)) + }) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) require.NoError(t, be.Shutdown(context.Background())) } @@ -465,7 +480,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -489,10 +504,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - bs := newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)) - bs.marshaler = mockRequestMarshaler - bs.unmarshaler = mockRequestUnmarshaler(&mockRequest{}) - be, err := newBaseExporter(set, bs, "") + be, err := newBaseExporter(set, "", false, mockRequestMarshaler, mockRequestUnmarshaler(&mockRequest{}), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -516,27 +528,25 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { req := newMockRequest(context.Background(), 3, errors.New("some error")) - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), &mockHost{})) // wraps original queue so we can count operations - be.qrSender.queue = &producerConsumerQueueWithCounter{ - ProducerConsumerQueue: be.qrSender.queue, + be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{ + ProducerConsumerQueue: be.queueSender.(*queueSender).queue, produceCounter: produceCounter, } - be.qrSender.requeuingEnabled = true + be.queueSender.(*queueSender).requeuingEnabled = true // replace nextSender inside retrySender to always return error so it doesn't exit send loop - castedSender, ok := be.qrSender.consumerSender.(*retrySender) - require.True(t, ok, "consumerSender should be a retrySender type") - castedSender.nextSender = &errorRequestSender{ + be.retrySender.setNextSender(&errorRequestSender{ errToReturn: errors.New("some error"), - } + }) // Invoke queuedRetrySender so the producer will put the item for consumer to poll - require.NoError(t, be.sender.send(req)) + require.NoError(t, be.send(req)) // first wait for the item to be produced to the queue initially assert.Eventually(t, func() bool { @@ -551,10 +561,13 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { } func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { - bs := newBaseSettings(true, nil, nil, WithRetry(NewDefaultRetrySettings())) + bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, + WithRetry(NewDefaultRetrySettings())) + assert.Nil(t, err) assert.True(t, bs.requestExporter) assert.Panics(t, func() { - _ = newBaseSettings(true, nil, nil, WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) + _, _ = newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, + WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) }) } @@ -630,16 +643,15 @@ func newMockRequest(ctx context.Context, cnt int, consumeError error) *mockReque } type observabilityConsumerSender struct { + baseRequestSender waitGroup *sync.WaitGroup sentItemsCount *atomic.Int64 droppedItemsCount *atomic.Int64 - nextSender requestSender } -func newObservabilityConsumerSender(nextSender requestSender) *observabilityConsumerSender { +func newObservabilityConsumerSender(_ *obsExporter) requestSender { return &observabilityConsumerSender{ waitGroup: new(sync.WaitGroup), - nextSender: nextSender, droppedItemsCount: &atomic.Int64{}, sentItemsCount: &atomic.Int64{}, } @@ -737,6 +749,7 @@ func (pcq *producerConsumerQueueWithCounter) Produce(item internal.Request) bool } type errorRequestSender struct { + baseRequestSender errToReturn error } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 03b0fefb95e..7e855b64bfc 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -89,26 +89,20 @@ func NewTracesExporter( return nil, errNilPushTraceData } - bs := newBaseSettings(false, tracesRequestMarshaler, newTraceRequestUnmarshalerFunc(pusher), options...) - be, err := newBaseExporter(set, bs, component.DataTypeTraces) + be, err := newBaseExporter(set, component.DataTypeTraces, false, tracesRequestMarshaler, + newTraceRequestUnmarshalerFunc(pusher), newTracesExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &tracesExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { req := newTracesRequest(ctx, td, pusher) - serr := be.sender.send(req) + serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.Count())) } return serr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &traceExporter{ baseExporter: be, @@ -141,18 +135,10 @@ func NewTracesRequestExporter( return nil, errNilTracesConverter } - bs := newBaseSettings(true, nil, nil, options...) - - be, err := newBaseExporter(set, bs, component.DataTypeTraces) + be, err := newBaseExporter(set, component.DataTypeTraces, true, nil, nil, newTracesExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &tracesExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { req, cErr := converter.RequestFromTraces(ctx, td) @@ -166,12 +152,12 @@ func NewTracesRequestExporter( baseRequest: baseRequest{ctx: ctx}, Request: req, } - sErr := be.sender.send(r) + sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count())) } return sErr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &traceExporter{ baseExporter: be, @@ -180,8 +166,12 @@ func NewTracesRequestExporter( } type tracesExporterWithObservability struct { - obsrep *obsExporter - nextSender requestSender + baseRequestSender + obsrep *obsExporter +} + +func newTracesExporterWithObservability(obsrep *obsExporter) requestSender { + return &tracesExporterWithObservability{obsrep: obsrep} } func (tewo *tracesExporterWithObservability) send(req internal.Request) error {