diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 3dfe4c41311..8d7a40cf6dd 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -7,6 +7,9 @@ import ( "context" "time" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -29,9 +32,48 @@ func NewDefaultTimeoutSettings() TimeoutSettings { // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). type requestSender interface { + start(ctx context.Context, host component.Host, set exporter.CreateSettings) error + shutdown() send(req internal.Request) error + setNextSender(nextSender requestSender) +} + +type baseRequestSender struct { + nextSender requestSender + startFunc func(ctx context.Context, host component.Host, set exporter.CreateSettings) error + shutdownFunc func() + sendFunc func(req internal.Request) error +} + +var _ requestSender = (*baseRequestSender)(nil) + +func (b *baseRequestSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { + if b.startFunc == nil { + return nil + } + return b.startFunc(ctx, host, set) +} + +func (b *baseRequestSender) send(req internal.Request) error { + if b.nextSender != nil { + return b.nextSender.send(req) + } + return req.Export(req.Context()) +} + +func (b *baseRequestSender) shutdown() { + if b.shutdownFunc == nil { + return + } + b.shutdownFunc() +} + +func (b *baseRequestSender) setNextSender(nextSender requestSender) { + b.nextSender = nextSender } +type obsrepSenderFactory func(obsrep *obsExporter) requestSender + // baseRequest is a base implementation for the internal.Request. type baseRequest struct { ctx context.Context @@ -56,47 +98,13 @@ func (req *baseRequest) OnProcessingFinished() { } } -// baseSettings represents all the options that users can configure. -type baseSettings struct { - component.StartFunc - component.ShutdownFunc - consumerOptions []consumer.Option - TimeoutSettings - queue internal.ProducerConsumerQueue - RetrySettings - requestExporter bool - marshaler internal.RequestMarshaler - unmarshaler internal.RequestUnmarshaler -} - -// newBaseSettings returns the baseSettings starting from the default and applying all configured options. -// requestExporter indicates whether the base settings are for a new request exporter or not. -// TODO: The first three arguments will be removed when the old exporter helpers will be updated to call the new ones. -func newBaseSettings(requestExporter bool, marshaler internal.RequestMarshaler, - unmarshaler internal.RequestUnmarshaler, options ...Option) *baseSettings { - bs := &baseSettings{ - requestExporter: requestExporter, - TimeoutSettings: NewDefaultTimeoutSettings(), - // TODO: Enable retry by default (call DefaultRetrySettings) - RetrySettings: RetrySettings{Enabled: false}, - marshaler: marshaler, - unmarshaler: unmarshaler, - } - - for _, op := range options { - op(bs) - } - - return bs -} - // Option apply changes to baseSettings. -type Option func(*baseSettings) +type Option func(*baseExporter) // WithStart overrides the default Start function for an exporter. // The default start function does nothing and always returns nil. func WithStart(start component.StartFunc) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { o.StartFunc = start } } @@ -104,7 +112,7 @@ func WithStart(start component.StartFunc) Option { // WithShutdown overrides the default Shutdown function for an exporter. // The default shutdown function does nothing and always returns nil. func WithShutdown(shutdown component.ShutdownFunc) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { o.ShutdownFunc = shutdown } } @@ -112,16 +120,16 @@ func WithShutdown(shutdown component.ShutdownFunc) Option { // WithTimeout overrides the default TimeoutSettings for an exporter. // The default TimeoutSettings is 5 seconds. func WithTimeout(timeoutSettings TimeoutSettings) Option { - return func(o *baseSettings) { - o.TimeoutSettings = timeoutSettings + return func(o *baseExporter) { + o.timeoutSender.(*timeoutSender).cfg = timeoutSettings } } // WithRetry overrides the default RetrySettings for an exporter. // The default RetrySettings is to disable retries. func WithRetry(retrySettings RetrySettings) Option { - return func(o *baseSettings) { - o.RetrySettings = retrySettings + return func(o *baseExporter) { + o.retrySender = newRetrySender(o.set.ID, retrySettings, o.sampledLogger, o.onTemporaryFailure) } } @@ -129,18 +137,21 @@ func WithRetry(retrySettings RetrySettings) Option { // The default QueueSettings is to disable queueing. // This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. func WithQueue(config QueueSettings) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { if o.requestExporter { panic("queueing is not available for the new request exporters yet") } - if !config.Enabled { - return + var queue internal.ProducerConsumerQueue + if config.Enabled { + if config.StorageID == nil { + queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) + } else { + queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) + } } - if config.StorageID == nil { - o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) - return - } - o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) + qs := newQueueSender(o.set.ID, o.signal, queue, o.sampledLogger) + o.queueSender = qs + o.setOnTemporaryFailure(qs.onTemporaryFailure) } } @@ -148,7 +159,7 @@ func WithQueue(config QueueSettings) Option { // The default is non-mutable data. // TODO: Verify if we can change the default to be mutable as we do for processors. func WithCapabilities(capabilities consumer.Capabilities) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities)) } } @@ -157,48 +168,98 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { type baseExporter struct { component.StartFunc component.ShutdownFunc - obsrep *obsExporter - sender requestSender - qrSender *queuedRetrySender + + requestExporter bool + marshaler internal.RequestMarshaler + unmarshaler internal.RequestUnmarshaler + signal component.DataType + + set exporter.CreateSettings + obsrep *obsExporter + sampledLogger *zap.Logger + + // Chain of senders that the exporter helper applies before passing the data to the actual exporter. + // The data is handled by each sender in the respective order starting from the queueSender. + queueSender requestSender + obsrepSender requestSender + retrySender requestSender + timeoutSender requestSender + + // onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer. + onTemporaryFailure onRequestHandlingFinishedFunc + + consumerOptions []consumer.Option } -func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) { - be := &baseExporter{} +func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler, + unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { - var err error - be.obsrep, err = newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments) + obsrep, err := newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments) if err != nil { return nil, err } - be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) - be.sender = be.qrSender - be.StartFunc = func(ctx context.Context, host component.Host) error { - // First start the wrapped exporter. - if err := bs.StartFunc.Start(ctx, host); err != nil { - return err - } + be := &baseExporter{ + requestExporter: requestExporter, + marshaler: marshaler, + unmarshaler: unmarshaler, + signal: signal, - // If no error then start the queuedRetrySender. - return be.qrSender.start(ctx, host, set) + queueSender: &baseRequestSender{}, + obsrepSender: osf(obsrep), + retrySender: &baseRequestSender{}, + timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()}, + + set: set, + obsrep: obsrep, + sampledLogger: createSampledLogger(set.Logger), } - be.ShutdownFunc = func(ctx context.Context) error { - // First shutdown the queued retry sender - be.qrSender.shutdown() - // Last shutdown the wrapped exporter itself. - return bs.ShutdownFunc.Shutdown(ctx) + + for _, op := range options { + op(be) } + be.connectSenders() + return be, nil } -// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper. -// This can be used to wrap with observability (create spans, record metrics) the consumer sender. -func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) { - be.qrSender.consumerSender = f(be.qrSender.consumerSender) +func (be *baseExporter) connectSenders() { + be.queueSender.setNextSender(be.obsrepSender) + be.obsrepSender.setNextSender(be.retrySender) + be.retrySender.setNextSender(be.timeoutSender) +} + +func (be *baseExporter) Start(ctx context.Context, host component.Host) error { + // First start the wrapped exporter. + if err := be.StartFunc.Start(ctx, host); err != nil { + return err + } + + // If no error then start the queueSender. + return be.queueSender.start(ctx, host, be.set) +} + +func (be *baseExporter) Shutdown(ctx context.Context) error { + // First shutdown the retry sender + be.retrySender.shutdown() + + // Then shutdown the queue sender + be.queueSender.shutdown() + + // Last shutdown the wrapped exporter itself. + return be.ShutdownFunc.Shutdown(ctx) +} + +func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) { + be.onTemporaryFailure = onTemporaryFailure + if rs, ok := be.retrySender.(*retrySender); ok { + rs.onTemporaryFailure = onTemporaryFailure + } } // timeoutSender is a requestSender that adds a `timeout` to every request that passes this sender. type timeoutSender struct { + *baseRequestSender cfg TimeoutSettings } @@ -213,3 +274,22 @@ func (ts *timeoutSender) send(req internal.Request) error { } return req.Export(ctx) } + +func createSampledLogger(logger *zap.Logger) *zap.Logger { + if logger.Core().Enabled(zapcore.DebugLevel) { + // Debugging is enabled. Don't do any sampling. + return logger + } + + // Create a logger that samples all messages to 1 per 10 seconds initially, + // and 1/100 of messages after that. + opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return zapcore.NewSamplerWithOptions( + core, + 10*time.Second, + 1, + 100, + ) + }) + return logger.WithOptions(opts) +} diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 076cd55b00e..d505a3a35be 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -1,5 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 + package exporterhelper import ( @@ -31,12 +32,16 @@ var ( } ) +func newNoopObsrepSender(_ *obsExporter) requestSender { + return &baseRequestSender{} +} + func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) - be, err = newBaseExporter(defaultSettings, newBaseSettings(true, nil, nil), "") + be, err = newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -45,13 +50,10 @@ func TestBaseExporter(t *testing.T) { func TestBaseExporterWithOptions(t *testing.T) { want := errors.New("my error") be, err := newBaseExporter( - defaultSettings, - newBaseSettings( - false, nil, nil, - WithStart(func(ctx context.Context, host component.Host) error { return want }), - WithShutdown(func(ctx context.Context) error { return want }), - WithTimeout(NewDefaultTimeoutSettings())), - "", + defaultSettings, "", false, nil, nil, newNoopObsrepSender, + WithStart(func(ctx context.Context, host component.Host) error { return want }), + WithShutdown(func(ctx context.Context) error { return want }), + WithTimeout(NewDefaultTimeoutSettings()), ) require.NoError(t, err) require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index ef22d50dd02..0f36cb80e1d 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -89,26 +89,20 @@ func NewLogsExporter( return nil, errNilPushLogsData } - bs := newBaseSettings(false, logsRequestMarshaler, newLogsRequestUnmarshalerFunc(pusher), options...) - be, err := newBaseExporter(set, bs, component.DataTypeLogs) + be, err := newBaseExporter(set, component.DataTypeLogs, false, logsRequestMarshaler, + newLogsRequestUnmarshalerFunc(pusher), newLogsExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &logsExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req := newLogsRequest(ctx, ld, pusher) - serr := be.sender.send(req) + serr := be.queueSender.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count())) } return serr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &logsExporter{ baseExporter: be, @@ -141,18 +135,10 @@ func NewLogsRequestExporter( return nil, errNilLogsConverter } - bs := newBaseSettings(true, nil, nil, options...) - - be, err := newBaseExporter(set, bs, component.DataTypeLogs) + be, err := newBaseExporter(set, component.DataTypeLogs, true, nil, nil, newLogsExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &logsExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req, cErr := converter.RequestFromLogs(ctx, ld) @@ -166,12 +152,12 @@ func NewLogsRequestExporter( baseRequest: baseRequest{ctx: ctx}, Request: req, } - sErr := be.sender.send(r) + sErr := be.queueSender.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count())) } return sErr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &logsExporter{ baseExporter: be, @@ -180,8 +166,12 @@ func NewLogsRequestExporter( } type logsExporterWithObservability struct { - obsrep *obsExporter - nextSender requestSender + baseRequestSender + obsrep *obsExporter +} + +func newLogsExporterWithObservability(obsrep *obsExporter) requestSender { + return &logsExporterWithObservability{obsrep: obsrep} } func (lewo *logsExporterWithObservability) send(req internal.Request) error { diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index a13b010e955..d8c29618d68 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -89,26 +89,20 @@ func NewMetricsExporter( return nil, errNilPushMetricsData } - bs := newBaseSettings(false, metricsRequestMarshaler, newMetricsRequestUnmarshalerFunc(pusher), options...) - be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + be, err := newBaseExporter(set, component.DataTypeMetrics, false, metricsRequestMarshaler, + newMetricsRequestUnmarshalerFunc(pusher), newMetricsSenderWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &metricsSenderWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { req := newMetricsRequest(ctx, md, pusher) - serr := be.sender.send(req) + serr := be.queueSender.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count())) } return serr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &metricsExporter{ baseExporter: be, @@ -141,18 +135,10 @@ func NewMetricsRequestExporter( return nil, errNilMetricsConverter } - bs := newBaseSettings(true, nil, nil, options...) - - be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + be, err := newBaseExporter(set, component.DataTypeMetrics, true, nil, nil, newMetricsSenderWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &metricsSenderWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { req, cErr := converter.RequestFromMetrics(ctx, md) @@ -166,12 +152,12 @@ func NewMetricsRequestExporter( Request: req, baseRequest: baseRequest{ctx: ctx}, } - sErr := be.sender.send(r) + sErr := be.queueSender.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count())) } return sErr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &metricsExporter{ baseExporter: be, @@ -180,8 +166,12 @@ func NewMetricsRequestExporter( } type metricsSenderWithObservability struct { - obsrep *obsExporter - nextSender requestSender + baseRequestSender + obsrep *obsExporter +} + +func newMetricsSenderWithObservability(obsrep *obsExporter) requestSender { + return &metricsSenderWithObservability{obsrep: obsrep} } func (mewo *metricsSenderWithObservability) send(req internal.Request) error { diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 2e2bc1f9a82..472d96582a7 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -14,7 +14,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" @@ -65,11 +64,11 @@ func (qCfg *QueueSettings) Validate() error { return nil } -type queuedRetrySender struct { +type queueSender struct { + baseRequestSender fullName string id component.ID signal component.DataType - consumerSender requestSender queue internal.ProducerConsumerQueue retryStopCh chan struct{} traceAttribute attribute.KeyValue @@ -77,38 +76,20 @@ type queuedRetrySender struct { requeuingEnabled bool } -func newQueuedRetrySender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, - rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { - retryStopCh := make(chan struct{}) - sampledLogger := createSampledLogger(logger) - traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) - - qrs := &queuedRetrySender{ +func newQueueSender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, logger *zap.Logger) *queueSender { + return &queueSender{ fullName: id.String(), id: id, signal: signal, queue: queue, - retryStopCh: retryStopCh, - traceAttribute: traceAttr, - logger: sampledLogger, + traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), + logger: logger, // TODO: this can be further exposed as a config param rather than relying on a type of queue requeuingEnabled: queue != nil && queue.IsPersistent(), } - - qrs.consumerSender = &retrySender{ - traceAttribute: traceAttr, - cfg: rCfg, - nextSender: nextSender, - stopCh: retryStopCh, - logger: sampledLogger, - // Following three functions actually depend on queuedRetrySender - onTemporaryFailure: qrs.onTemporaryFailure, - } - - return qrs } -func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { +func (qrs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { if !qrs.requeuingEnabled || qrs.queue == nil { logger.Error( "Exporting failed. No more retries left. Dropping data.", @@ -134,7 +115,7 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna } // start is invoked during service startup. -func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { +func (qrs *queueSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { if qrs.queue == nil { return nil } @@ -143,7 +124,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host, se CreateSettings: set, DataType: qrs.signal, Callback: func(item internal.Request) { - _ = qrs.consumerSender.send(item) + _ = qrs.nextSender.send(item) item.OnProcessingFinished() }, }) @@ -169,10 +150,7 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host, se } // shutdown is invoked during service shutdown. -func (qrs *queuedRetrySender) shutdown() { - // First Stop the retry goroutines, so that unblocks the queue numWorkers. - close(qrs.retryStopCh) - +func (qrs *queueSender) shutdown() { if qrs.queue != nil { // Cleanup queue metrics reporting _ = globalInstruments.queueSize.UpsertEntry(func() int64 { @@ -217,29 +195,10 @@ func NewDefaultRetrySettings() RetrySettings { } } -func createSampledLogger(logger *zap.Logger) *zap.Logger { - if logger.Core().Enabled(zapcore.DebugLevel) { - // Debugging is enabled. Don't do any sampling. - return logger - } - - // Create a logger that samples all messages to 1 per 10 seconds initially, - // and 1/100 of messages after that. - opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { - return zapcore.NewSamplerWithOptions( - core, - 10*time.Second, - 1, - 100, - ) - }) - return logger.WithOptions(opts) -} - // send implements the requestSender interface -func (qrs *queuedRetrySender) send(req internal.Request) error { +func (qrs *queueSender) send(req internal.Request) error { if qrs.queue == nil { - err := qrs.consumerSender.send(req) + err := qrs.nextSender.send(req) if err != nil { qrs.logger.Error( "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", @@ -292,14 +251,33 @@ func NewThrottleRetry(err error, delay time.Duration) error { type onRequestHandlingFinishedFunc func(*zap.Logger, internal.Request, error) error type retrySender struct { + baseRequestSender traceAttribute attribute.KeyValue cfg RetrySettings - nextSender requestSender stopCh chan struct{} logger *zap.Logger onTemporaryFailure onRequestHandlingFinishedFunc } +func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender { + if onTemporaryFailure == nil { + onTemporaryFailure = func(logger *zap.Logger, req internal.Request, err error) error { + return nil + } + } + return &retrySender{ + traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), + cfg: rCfg, + stopCh: make(chan struct{}), + logger: logger, + onTemporaryFailure: onTemporaryFailure, + } +} + +func (rs *retrySender) shutdown() { + close(rs.stopCh) +} + // send implements the requestSender interface func (rs *retrySender) send(req internal.Request) error { if !rs.cfg.Enabled { diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index cb0885751b3..f94bf460a26 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -41,11 +41,9 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) - bs := newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)) - be, err := newBaseExporter(defaultSettings, bs, "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -53,7 +51,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.queueSender.send(mockR)) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -66,13 +64,11 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false - bs := newBaseSettings(false, mockRequestMarshaler, + be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))), - WithRetry(rCfg), WithQueue(qCfg)) - be, err := newBaseExporter(defaultSettings, bs, "") + newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -81,7 +77,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.queueSender.send(mockR)) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -95,10 +91,8 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -106,9 +100,10 @@ func TestQueuedRetry_OnError(t *testing.T) { traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) mockR := newMockRequest(context.Background(), 2, traceErr) + ocs := be.obsrepSender.(*observabilityConsumerSender) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.queueSender.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -122,23 +117,22 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) firstMockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(firstMockR)) + require.NoError(t, be.queueSender.send(firstMockR)) }) // Enqueue another request to ensure when calling shutdown we drain the queue. secondMockR := newMockRequest(context.Background(), 3, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(secondMockR)) + require.NoError(t, be.queueSender.send(secondMockR)) }) assert.NoError(t, be.Shutdown(context.Background())) @@ -156,10 +150,9 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -170,14 +163,14 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { mockR := newMockRequest(ctx, 2, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.queueSender.send(mockR)) }) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } func TestQueuedRetry_MaxElapsedTime(t *testing.T) { @@ -186,10 +179,9 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -197,14 +189,14 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { ocs.run(func() { // Add an item that will always fail. - require.NoError(t, be.sender.send(newErrorRequest(context.Background()))) + require.NoError(t, be.queueSender.send(newErrorRequest(context.Background()))) }) mockR := newMockRequest(context.Background(), 2, nil) start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.queueSender.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -217,7 +209,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } type wrappedError struct { @@ -233,10 +225,9 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -247,7 +238,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.queueSender.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -257,7 +248,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { mockR.checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } func TestQueuedRetry_RetryOnError(t *testing.T) { @@ -266,10 +257,9 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -278,7 +268,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.queueSender.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -286,23 +276,22 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { mockR.checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) ocs.run(func() { - require.Error(t, be.sender.send(newMockRequest(context.Background(), 2, errors.New("transient error")))) + require.Error(t, be.queueSender.send(newMockRequest(context.Background(), 2, errors.New("transient error")))) }) } @@ -314,10 +303,9 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -329,7 +317,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { ocs.run(func() { req := newMockRequest(context.Background(), 2, nil) reqs = append(reqs, req) - require.NoError(t, be.sender.send(req)) + require.NoError(t, be.queueSender.send(req)) }) } @@ -349,13 +337,13 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) checkValueForGlobalManager(t, defaultExporterTags, int64(defaultQueueSize), "exporter/queue_capacity") for i := 0; i < 7; i++ { - require.NoError(t, be.sender.send(newErrorRequest(context.Background()))) + require.NoError(t, be.queueSender.send(newErrorRequest(context.Background()))) } checkValueForGlobalManager(t, defaultExporterTags, int64(7), "exporter/queue_size") @@ -397,11 +385,10 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs - be.qrSender.requeuingEnabled = true + ocs := be.obsrepSender.(*observabilityConsumerSender) + be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -411,7 +398,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { mockR := newMockRequest(context.Background(), 1, traceErr) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.queueSender.send(mockR)) ocs.waitGroup.Add(1) // necessary because we'll call send() again after requeueing }) ocs.awaitAsyncProcessing() @@ -429,9 +416,9 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - be.qrSender.requeuingEnabled = true + be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -440,16 +427,15 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) mockR := newMockRequest(context.Background(), 1, traceErr) - require.Error(t, be.qrSender.consumerSender.send(mockR), "sending_queue is full") + require.Error(t, be.retrySender.send(mockR), "sending_queue is full") mockR.checkNumRequests(t, 1) } func TestQueueRetryWithDisabledQueue(t *testing.T) { qs := NewDefaultQueueSettings() qs.Enabled = false - bs := newBaseSettings(false, nil, nil, WithQueue(qs)) - require.Nil(t, bs.queue) - be, err := newBaseExporter(exportertest.NewNopCreateSettings(), bs, component.DataTypeLogs) + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs)) + require.Nil(t, be.queueSender.(*queueSender).queue) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -465,7 +451,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -489,10 +475,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - bs := newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)) - bs.marshaler = mockRequestMarshaler - bs.unmarshaler = mockRequestUnmarshaler(&mockRequest{}) - be, err := newBaseExporter(set, bs, "") + be, err := newBaseExporter(set, "", false, mockRequestMarshaler, mockRequestUnmarshaler(&mockRequest{}), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -516,27 +499,25 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { req := newMockRequest(context.Background(), 3, errors.New("some error")) - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), &mockHost{})) // wraps original queue so we can count operations - be.qrSender.queue = &producerConsumerQueueWithCounter{ - ProducerConsumerQueue: be.qrSender.queue, + be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{ + ProducerConsumerQueue: be.queueSender.(*queueSender).queue, produceCounter: produceCounter, } - be.qrSender.requeuingEnabled = true + be.queueSender.(*queueSender).requeuingEnabled = true // replace nextSender inside retrySender to always return error so it doesn't exit send loop - castedSender, ok := be.qrSender.consumerSender.(*retrySender) - require.True(t, ok, "consumerSender should be a retrySender type") - castedSender.nextSender = &errorRequestSender{ + be.retrySender.setNextSender(&errorRequestSender{ errToReturn: errors.New("some error"), - } + }) // Invoke queuedRetrySender so the producer will put the item for consumer to poll - require.NoError(t, be.sender.send(req)) + require.NoError(t, be.queueSender.send(req)) // first wait for the item to be produced to the queue initially assert.Eventually(t, func() bool { @@ -551,10 +532,13 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { } func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { - bs := newBaseSettings(true, nil, nil, WithRetry(NewDefaultRetrySettings())) + bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, + WithRetry(NewDefaultRetrySettings())) + assert.Nil(t, err) assert.True(t, bs.requestExporter) assert.Panics(t, func() { - _ = newBaseSettings(true, nil, nil, WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) + _, _ = newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, + WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) }) } @@ -630,16 +614,15 @@ func newMockRequest(ctx context.Context, cnt int, consumeError error) *mockReque } type observabilityConsumerSender struct { + baseRequestSender waitGroup *sync.WaitGroup sentItemsCount *atomic.Int64 droppedItemsCount *atomic.Int64 - nextSender requestSender } -func newObservabilityConsumerSender(nextSender requestSender) *observabilityConsumerSender { +func newObservabilityConsumerSender(_ *obsExporter) requestSender { return &observabilityConsumerSender{ waitGroup: new(sync.WaitGroup), - nextSender: nextSender, droppedItemsCount: &atomic.Int64{}, sentItemsCount: &atomic.Int64{}, } @@ -737,6 +720,7 @@ func (pcq *producerConsumerQueueWithCounter) Produce(item internal.Request) bool } type errorRequestSender struct { + baseRequestSender errToReturn error } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 03b0fefb95e..54d3e64fa31 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -89,26 +89,20 @@ func NewTracesExporter( return nil, errNilPushTraceData } - bs := newBaseSettings(false, tracesRequestMarshaler, newTraceRequestUnmarshalerFunc(pusher), options...) - be, err := newBaseExporter(set, bs, component.DataTypeTraces) + be, err := newBaseExporter(set, component.DataTypeTraces, false, tracesRequestMarshaler, + newTraceRequestUnmarshalerFunc(pusher), newTracesExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &tracesExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { req := newTracesRequest(ctx, td, pusher) - serr := be.sender.send(req) + serr := be.queueSender.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.Count())) } return serr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &traceExporter{ baseExporter: be, @@ -141,18 +135,10 @@ func NewTracesRequestExporter( return nil, errNilTracesConverter } - bs := newBaseSettings(true, nil, nil, options...) - - be, err := newBaseExporter(set, bs, component.DataTypeTraces) + be, err := newBaseExporter(set, component.DataTypeTraces, true, nil, nil, newTracesExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &tracesExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { req, cErr := converter.RequestFromTraces(ctx, td) @@ -166,12 +152,12 @@ func NewTracesRequestExporter( baseRequest: baseRequest{ctx: ctx}, Request: req, } - sErr := be.sender.send(r) + sErr := be.queueSender.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count())) } return sErr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &traceExporter{ baseExporter: be, @@ -180,8 +166,12 @@ func NewTracesRequestExporter( } type tracesExporterWithObservability struct { - obsrep *obsExporter - nextSender requestSender + baseRequestSender + obsrep *obsExporter +} + +func newTracesExporterWithObservability(obsrep *obsExporter) requestSender { + return &tracesExporterWithObservability{obsrep: obsrep} } func (tewo *tracesExporterWithObservability) send(req internal.Request) error {