From 9d5992431d7f32f41f01f194bc4844b6313343dc Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 17 Jul 2020 10:42:29 -0700 Subject: [PATCH] First round of comments addressed --- exporter/exporterhelper/common.go | 158 +++++++++--------- exporter/exporterhelper/logshelper.go | 18 +-- exporter/exporterhelper/metricshelper.go | 26 ++- exporter/exporterhelper/queued_retry.go | 108 +++++++------ exporter/exporterhelper/queued_retry_test.go | 160 +++++++++++++------ exporter/exporterhelper/tracehelper.go | 28 ++-- exporter/otlpexporter/README.md | 19 ++- exporter/otlpexporter/config.go | 4 +- exporter/otlpexporter/config_test.go | 2 +- exporter/otlpexporter/factory.go | 10 +- exporter/otlpexporter/testdata/config.yaml | 4 +- 11 files changed, 303 insertions(+), 234 deletions(-) diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index ef27874685d..52cf3b225db 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -31,38 +31,36 @@ var ( okStatus = trace.Status{Code: trace.StatusCodeOK} ) +// Settings for timeout. The timeout applies to individual attempts to send data to the backend. type TimeoutSettings struct { - // Timeout is the timeout for each operation. + // Timeout is the timeout for every attempt to send data to the backend. Timeout time.Duration `mapstructure:"timeout"` } +// CreateDefaultTimeoutSettings returns the default settings for TimeoutSettings. func CreateDefaultTimeoutSettings() TimeoutSettings { return TimeoutSettings{ Timeout: 5 * time.Second, } } -type settings struct { - configmodels.Exporter - TimeoutSettings - QueuedSettings - RetrySettings -} - +// request is an abstraction of an individual request (batch of data) independent of the type of the data (traces, metrics, logs). type request interface { context() context.Context setContext(context.Context) export(ctx context.Context) (int, error) - // Returns a new queue request that contains the items left to be exported. + // Returns a new request that contains the items left to be sent. onPartialError(consumererror.PartialError) request - // Returns the cnt of spans/metric points or log records. + // Returns the count of spans/metric points or log records. count() int } +// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). type requestSender interface { send(req request) (int, error) } +// baseRequest is a base implementation for the request. type baseRequest struct { ctx context.Context } @@ -81,139 +79,151 @@ type Start func(context.Context, component.Host) error // Shutdown specifies the function invoked when the exporter is being shutdown. type Shutdown func(context.Context) error +// internalOptions represents all the options that users can configured. +type internalOptions struct { + TimeoutSettings + QueueSettings + RetrySettings + Start + Shutdown +} + +// fromConfiguredOptions returns the internal options starting from the default and applying all configured options. +func fromConfiguredOptions(options ...ExporterOption) *internalOptions { + // Start from the default options: + opts := &internalOptions{ + TimeoutSettings: CreateDefaultTimeoutSettings(), + // TODO: Enable queuing by default (call CreateDefaultQueueSettings) + QueueSettings: QueueSettings{Disabled: true}, + // TODO: Enable retry by default (call CreateDefaultRetrySettings) + RetrySettings: RetrySettings{Disabled: true}, + Start: func(ctx context.Context, host component.Host) error { return nil }, + Shutdown: func(ctx context.Context) error { return nil }, + } + + for _, op := range options { + op(opts) + } + + return opts +} + // ExporterOption apply changes to internalOptions. -type ExporterOption func(*baseExporter) +type ExporterOption func(*internalOptions) // WithShutdown overrides the default Shutdown function for an exporter. // The default shutdown function does nothing and always returns nil. func WithShutdown(shutdown Shutdown) ExporterOption { - return func(o *baseExporter) { - o.shutdown = shutdown + return func(o *internalOptions) { + o.Shutdown = shutdown } } // WithStart overrides the default Start function for an exporter. // The default shutdown function does nothing and always returns nil. func WithStart(start Start) ExporterOption { - return func(o *baseExporter) { - o.start = start + return func(o *internalOptions) { + o.Start = start } } -// WithShutdown overrides the default TimeoutSettings for an exporter. +// WithTimeout overrides the default TimeoutSettings for an exporter. // The default TimeoutSettings is 5 seconds. -func WithTimeout(timeout TimeoutSettings) ExporterOption { - return func(o *baseExporter) { - o.cfg.TimeoutSettings = timeout +func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption { + return func(o *internalOptions) { + o.TimeoutSettings = timeoutSettings } } // WithRetry overrides the default RetrySettings for an exporter. // The default RetrySettings is to disable retries. -func WithRetry(retry RetrySettings) ExporterOption { - return func(o *baseExporter) { - o.cfg.RetrySettings = retry +func WithRetry(retrySettings RetrySettings) ExporterOption { + return func(o *internalOptions) { + o.RetrySettings = retrySettings } } -// WithQueued overrides the default QueuedSettings for an exporter. -// The default QueuedSettings is to disable queueing. -func WithQueued(queued QueuedSettings) ExporterOption { - return func(o *baseExporter) { - o.cfg.QueuedSettings = queued +// WithQueue overrides the default QueueSettings for an exporter. +// The default QueueSettings is to disable queueing. +func WithQueue(queueSettings QueueSettings) ExporterOption { + return func(o *internalOptions) { + o.QueueSettings = queueSettings } } -// internalOptions contains internalOptions concerning how an Exporter is configured. +// baseExporter contains common fields between different exporter types. type baseExporter struct { - cfg *settings + cfg configmodels.Exporter sender requestSender - rSender *retrySender - qSender *queuedSender + qrSender *queuedRetrySender start Start shutdown Shutdown startOnce sync.Once shutdownOnce sync.Once } -// Construct the internalOptions from multiple ExporterOption. func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *baseExporter { + opts := fromConfiguredOptions(options...) be := &baseExporter{ - cfg: &settings{ - Exporter: cfg, - TimeoutSettings: CreateDefaultTimeoutSettings(), - // TODO: Enable queuing by default (call CreateDefaultQueuedSettings - QueuedSettings: QueuedSettings{Disabled: true}, - // TODO: Enable retry by default (call CreateDefaultRetrySettings) - RetrySettings: RetrySettings{Disabled: true}, - }, + cfg: cfg, + start: opts.Start, + shutdown: opts.Shutdown, } - for _, op := range options { - op(be) - } - - if be.start == nil { - be.start = func(ctx context.Context, host component.Host) error { return nil } - } - - if be.shutdown == nil { - be.shutdown = func(ctx context.Context) error { return nil } - } - - be.sender = &timeoutSender{cfg: &be.cfg.TimeoutSettings} - - be.rSender = newRetrySender(&be.cfg.RetrySettings, be.sender) - be.sender = be.rSender - - be.qSender = newQueuedSender(&be.cfg.QueuedSettings, be.sender) - be.sender = be.qSender + be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings}) + be.sender = be.qrSender return be } +// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper. +// This can be used to wrap with observability (create spans, record metrics) the consumer sender. +func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) { + be.qrSender.consumerSender = f(be.qrSender.consumerSender) +} + +// Start all senders and exporter and is invoked during service start. func (be *baseExporter) Start(ctx context.Context, host component.Host) error { err := componenterror.ErrAlreadyStarted be.startOnce.Do(func() { - // First start the nextSender + // First start the wrapped exporter. err = be.start(ctx, host) if err != nil { + // TODO: Log errors, or check if it is recorded by the caller. return } - // If no error then start the queuedSender - be.qSender.start() + // If no error then start the queuedRetrySender. + be.qrSender.start() }) return err } -// Shutdown stops the nextSender and is invoked during shutdown. +// Shutdown all senders and exporter and is invoked during service shutdown. func (be *baseExporter) Shutdown(ctx context.Context) error { err := componenterror.ErrAlreadyStopped be.shutdownOnce.Do(func() { - // First stop the retry goroutines - be.rSender.shutdown() - - // All operations will try to export once but will not retry because retrying was disabled when be.rSender stopped. - be.qSender.shutdown() - - // Last shutdown the nextSender itself. + // First shutdown the queued retry sender + be.qrSender.shutdown() + // Last shutdown the wrapped exporter itself. err = be.shutdown(ctx) }) return err } +// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender. type timeoutSender struct { - cfg *TimeoutSettings + cfg TimeoutSettings } -func (te *timeoutSender) send(req request) (int, error) { +// send implements the requestSender interface +func (ts *timeoutSender) send(req request) (int, error) { // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be // updated because this deadline most likely is before the next one. ctx := req.context() - if te.cfg.Timeout > 0 { + if ts.cfg.Timeout > 0 { var cancelFunc func() - ctx, cancelFunc = context.WithTimeout(req.context(), te.cfg.Timeout) + ctx, cancelFunc = context.WithTimeout(req.context(), ts.cfg.Timeout) defer cancelFunc() } return req.export(ctx) diff --git a/exporter/exporterhelper/logshelper.go b/exporter/exporterhelper/logshelper.go index 6d9d81c8922..71525f6d3de 100644 --- a/exporter/exporterhelper/logshelper.go +++ b/exporter/exporterhelper/logshelper.go @@ -66,7 +66,7 @@ func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld data.Logs) error { return err } -// NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span. +// NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span. func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) { if cfg == nil { return nil, errNilConfig @@ -77,12 +77,12 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio } be := newBaseExporter(cfg, options...) - - // Record metrics on the consumer. - be.qSender.nextSender = &logsExporterWithObservability{ - exporterName: cfg.Name(), - sender: be.qSender.nextSender, - } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &logsExporterWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &logsExporter{ baseExporter: be, @@ -92,12 +92,12 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio type logsExporterWithObservability struct { exporterName string - sender requestSender + nextSender requestSender } func (lewo *logsExporterWithObservability) send(req request) (int, error) { req.setContext(obsreport.StartLogsExportOp(req.context(), lewo.exporterName)) - numDroppedLogs, err := lewo.sender.send(req) + numDroppedLogs, err := lewo.nextSender.send(req) obsreport.EndLogsExportOp(req.context(), req.count(), numDroppedLogs, err) return numDroppedLogs, err } diff --git a/exporter/exporterhelper/metricshelper.go b/exporter/exporterhelper/metricshelper.go index 5e2b60330c4..0b1c8717bde 100644 --- a/exporter/exporterhelper/metricshelper.go +++ b/exporter/exporterhelper/metricshelper.go @@ -41,8 +41,7 @@ func (mexp *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consu return err } -// NewMetricsExporterOld creates an MetricsExporter that can record metrics and can wrap every request with a Span. -// If no internalOptions are passed it just adds the nextSender format as a tag in the Context. +// NewMetricsExporterOld creates an MetricsExporter that records observability metrics and wraps every request with a Span. // TODO: Add support for retries. func NewMetricsExporterOld(cfg configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) { if cfg == nil { @@ -129,8 +128,7 @@ func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metric return err } -// NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span. -// If no internalOptions are passed it just adds the nextSender format as a tag in the Context. +// NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span. func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) { if cfg == nil { return nil, errNilConfig @@ -141,12 +139,12 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa } be := newBaseExporter(cfg, options...) - - // Record metrics on the consumer. - be.qSender.nextSender = &metricsSenderWithObservability{ - exporterName: cfg.Name(), - sender: be.qSender.nextSender, - } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &metricsSenderWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &metricsExporter{ baseExporter: be, @@ -156,15 +154,15 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa type metricsSenderWithObservability struct { exporterName string - sender requestSender + nextSender requestSender } func (mewo *metricsSenderWithObservability) send(req request) (int, error) { req.setContext(obsreport.StartMetricsExportOp(req.context(), mewo.exporterName)) - numDroppedMetrics, err := mewo.sender.send(req) + numDroppedMetrics, err := mewo.nextSender.send(req) - // TODO: this is not ideal: req should come from the next function itself. - // temporarily loading req from internal format. Once full switch is done + // TODO: this is not ideal: it should come from the next function itself. + // temporarily loading it from internal format. Once full switch is done // to new metrics will remove this. mReq := req.(*metricsRequest) numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(mReq.md) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index ba3bfae2ed4..5fed8817e7b 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -25,9 +25,9 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" ) -// QueuedSettings defines configuration for queueing batches before sending to the nextSender. -type QueuedSettings struct { - // Disabled indicates whether to not enqueue batches before sending to the nextSender. +// QueueSettings defines configuration for queueing batches before sending to the consumerSender. +type QueueSettings struct { + // Disabled indicates whether to not enqueue batches before sending to the consumerSender. Disabled bool `mapstructure:"disabled"` // NumWorkers is the number of consumers from the queue. NumWorkers int `mapstructure:"num_workers"` @@ -35,11 +35,16 @@ type QueuedSettings struct { QueueSize int `mapstructure:"queue_size"` } -func CreateDefaultQueuedSettings() QueuedSettings { - return QueuedSettings{ +// CreateDefaultQueueSettings returns the default settings for QueueSettings. +func CreateDefaultQueueSettings() QueueSettings { + return QueueSettings{ Disabled: false, NumWorkers: 10, - QueueSize: 5000, + // For 5000 queue elements at 100 requests/sec gives about 50 sec of survival of destination outage. + // This is a pretty decent value for production. + // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, + // multiply that by the number of requests per seconds. + QueueSize: 5000, } } @@ -48,14 +53,17 @@ func CreateDefaultQueuedSettings() QueuedSettings { type RetrySettings struct { // Disabled indicates whether to not retry sending batches in case of export failure. Disabled bool `mapstructure:"disabled"` - // InitialBackoff the time to wait after the first failure before retrying + // InitialBackoff the time to wait after the first failure before retrying. InitialBackoff time.Duration `mapstructure:"initial_backoff"` - // MaxBackoff is the upper bound on backoff. + // MaxBackoff is the upper bound on backoff. Once this value is reached the delay between + // consecutive retries will always be `MaxBackoff`. MaxBackoff time.Duration `mapstructure:"max_backoff"` - // MaxElapsedTime is the maximum amount of time spent trying to send a batch. + // MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch. + // Once this value is reached, the data is discarded. MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"` } +// CreateDefaultRetrySettings returns the default settings for RetrySettings. func CreateDefaultRetrySettings() RetrySettings { return RetrySettings{ Disabled: false, @@ -65,37 +73,44 @@ func CreateDefaultRetrySettings() RetrySettings { } } -type queuedSender struct { - cfg *QueuedSettings - nextSender requestSender - queue *queue.BoundedQueue +type queuedRetrySender struct { + cfg QueueSettings + consumerSender requestSender + queue *queue.BoundedQueue + retryStopCh chan struct{} } var errorRefused = errors.New("failed to add to the queue") -func newQueuedSender(cfg *QueuedSettings, nextSender requestSender) *queuedSender { - return &queuedSender{ - cfg: cfg, - nextSender: nextSender, - queue: queue.NewBoundedQueue(cfg.QueueSize, func(item interface{}) {}), +func newQueuedRetrySender(qCfg QueueSettings, rCfg RetrySettings, nextSender requestSender) *queuedRetrySender { + retryStopCh := make(chan struct{}) + return &queuedRetrySender{ + cfg: qCfg, + consumerSender: &retrySender{ + cfg: rCfg, + nextSender: nextSender, + stopCh: retryStopCh, + }, + queue: queue.NewBoundedQueue(qCfg.QueueSize, func(item interface{}) {}), + retryStopCh: retryStopCh, } } // start is invoked during service startup. -func (sp *queuedSender) start() { - sp.queue.StartConsumers(sp.cfg.NumWorkers, func(item interface{}) { +func (qrs *queuedRetrySender) start() { + qrs.queue.StartConsumers(qrs.cfg.NumWorkers, func(item interface{}) { value := item.(request) - _, _ = sp.nextSender.send(value) + _, _ = qrs.consumerSender.send(value) }) } -// ExportTraces implements the TExporter interface -func (sp *queuedSender) send(req request) (int, error) { - if sp.cfg.Disabled { - return sp.nextSender.send(req) +// send implements the requestSender interface +func (qrs *queuedRetrySender) send(req request) (int, error) { + if qrs.cfg.Disabled { + return qrs.consumerSender.send(req) } - if !sp.queue.Produce(req) { + if !qrs.queue.Produce(req) { return req.count(), errorRefused } @@ -103,8 +118,13 @@ func (sp *queuedSender) send(req request) (int, error) { } // shutdown is invoked during service shutdown. -func (sp *queuedSender) shutdown() { - sp.queue.Stop() +func (qrs *queuedRetrySender) shutdown() { + // First stop the retry goroutines, so that unblocks the queue workers. + close(qrs.retryStopCh) + + // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only + // try once every request. + qrs.queue.Stop() } // TODO: Clean this by forcing all exporters to return an internal error type that always include the information about retries. @@ -121,30 +141,23 @@ func NewThrottleRetry(err error, delay time.Duration) error { } type retrySender struct { - cfg *RetrySettings + cfg RetrySettings nextSender requestSender stopCh chan struct{} } -func newRetrySender(cfg *RetrySettings, nextSender requestSender) *retrySender { - return &retrySender{ - cfg: cfg, - nextSender: nextSender, - stopCh: make(chan struct{}), - } -} - -func (re *retrySender) send(req request) (int, error) { - if re.cfg.Disabled { - return re.nextSender.send(req) +// send implements the requestSender interface +func (rs *retrySender) send(req request) (int, error) { + if rs.cfg.Disabled { + return rs.nextSender.send(req) } expBackoff := backoff.NewExponentialBackOff() - expBackoff.InitialInterval = re.cfg.InitialBackoff - expBackoff.MaxInterval = re.cfg.MaxBackoff - expBackoff.MaxElapsedTime = re.cfg.MaxElapsedTime + expBackoff.InitialInterval = rs.cfg.InitialBackoff + expBackoff.MaxInterval = rs.cfg.MaxBackoff + expBackoff.MaxElapsedTime = rs.cfg.MaxElapsedTime for { - droppedItems, err := re.nextSender.send(req) + droppedItems, err := rs.nextSender.send(req) if err == nil { return droppedItems, nil @@ -175,18 +188,13 @@ func (re *retrySender) send(req request) (int, error) { select { case <-req.context().Done(): return req.count(), fmt.Errorf("request is cancelled or timed out %w", err) - case <-re.stopCh: + case <-rs.stopCh: return req.count(), fmt.Errorf("interrupted due to shutdown %w", err) case <-time.After(backoffDelay): } } } -// shutdown is invoked during service shutdown. -func (re *retrySender) shutdown() { - close(re.stopCh) -} - // max returns the larger of x or y. func max(x, y time.Duration) time.Duration { if x < y { diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index 6f1e8c6a73e..a164ecf36c8 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -35,11 +35,11 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(consumererror.Permanent(errors.New("bad data"))) - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false rCfg.InitialBackoff = 30 * time.Second - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -54,17 +54,17 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { }) mockP.awaitAsyncProcessing() <-time.After(200 * time.Millisecond) - require.Zero(t, be.qSender.queue.Size()) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_DropOnNoRetry(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(errors.New("transient error")) - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() rCfg := CreateDefaultRetrySettings() rCfg.Disabled = true - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -79,7 +79,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { }) mockP.awaitAsyncProcessing() <-time.After(200 * time.Millisecond) - require.Zero(t, be.qSender.queue.Size()) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_PartialError(t *testing.T) { @@ -87,12 +87,14 @@ func TestQueuedRetry_PartialError(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(partialErr) - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() qCfg.NumWorkers = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false rCfg.InitialBackoff = 30 * time.Second - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -113,20 +115,23 @@ func TestQueuedRetry_PartialError(t *testing.T) { // In the newMockConcurrentExporter we count requests and items even for failed requests mockP.checkNumRequests(t, 2) - mockP.checkNumItems(t, 2+1) - require.Zero(t, be.qSender.queue.Size()) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_StopWhileWaiting(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(errors.New("transient error")) - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() qCfg.NumWorkers = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false rCfg.InitialBackoff = 30 * time.Minute - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) mockP.run(func() { @@ -137,28 +142,40 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { }) mockP.awaitAsyncProcessing() + // Enqueue another request to ensure when calling shutdown we drain the queue. + mockP.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + droppedItems, err := be.sender.send(newMockRequest(context.Background(), 3, mockP)) + require.NoError(t, err) + assert.Equal(t, 0, droppedItems) + }) + mockP.stop() assert.NoError(t, be.Shutdown(context.Background())) - // In the newMockConcurrentExporter we count requests and items even for failed requests mockP.checkNumRequests(t, 1) - mockP.checkNumItems(t, 2) - require.Zero(t, be.qSender.queue.Size()) + // TODO: Ensure that queue is drained, and uncomment the next 3 lines. + // https://github.com/jaegertracing/jaeger/pull/2349 + // ocs.checkSendItemsCount(t, 3) + ocs.checkDroppedItemsCount(t, 2) + // require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_PreserveCancellation(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(errors.New("transient error")) - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() qCfg.NumWorkers = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false rCfg.InitialBackoff = 30 * time.Second - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ctx, cancelFunc := context.WithCancel(context.Background()) + start := time.Now() mockP.run(func() { // This is asynchronous so it should just enqueue, no errors expected. droppedItems, err := be.sender.send(newMockRequest(ctx, 2, mockP)) @@ -169,31 +186,35 @@ func TestQueuedRetry_PreserveCancellation(t *testing.T) { cancelFunc() - // In the newMockConcurrentExporter we count requests and items even for failed requests. mockP.checkNumRequests(t, 1) - mockP.checkNumItems(t, 2) - require.Zero(t, be.qSender.queue.Size()) + require.Zero(t, be.qrSender.queue.Size()) // Stop should succeed and not retry. mockP.stop() assert.NoError(t, be.Shutdown(context.Background())) + // We should ensure that we actually did not wait for the initial backoff (30 sec). + assert.True(t, 5*time.Second > time.Since(start)) + // In the newMockConcurrentExporter we count requests and items even for failed requests. mockP.checkNumRequests(t, 1) - mockP.checkNumItems(t, 2) - require.Zero(t, be.qSender.queue.Size()) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_MaxElapsedTime(t *testing.T) { mockP := newMockConcurrentExporter() - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() qCfg.NumWorkers = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false rCfg.InitialBackoff = 100 * time.Millisecond rCfg.MaxElapsedTime = 1 * time.Second - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -215,20 +236,23 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { // In the newMockConcurrentExporter we count requests and items even for failed requests. mockP.checkNumRequests(t, 1) - mockP.checkNumItems(t, 2) - require.Zero(t, be.qSender.queue.Size()) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 7) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_ThrottleError(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(NewThrottleRetry(errors.New("throttle error"), 1*time.Second)) - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() qCfg.NumWorkers = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false rCfg.InitialBackoff = 100 * time.Millisecond - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -251,23 +275,25 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { // The initial backoff is 100ms, but because of the throttle this should wait at least 1 seconds. assert.True(t, 1*time.Second < time.Since(start)) - // In the newMockConcurrentExporter we count requests and items even for failed requests mockP.checkNumRequests(t, 2) - mockP.checkNumItems(t, 4) - require.Zero(t, be.qSender.queue.Size()) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_RetryOnError(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(errors.New("transient error")) - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() qCfg.NumWorkers = 1 qCfg.QueueSize = 1 rCfg := CreateDefaultRetrySettings() rCfg.Disabled = false rCfg.InitialBackoff = 2 * time.Second - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -281,6 +307,8 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { assert.Equal(t, 0, droppedItems) }) mockP.awaitAsyncProcessing() + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 0) // There is a small race condition in this test, but expect to execute this in less than 2 second. mockP.updateError(nil) @@ -289,18 +317,21 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { // In the newMockConcurrentExporter we count requests and items even for failed requests mockP.checkNumRequests(t, 2) - mockP.checkNumItems(t, 4) - require.Zero(t, be.qSender.queue.Size()) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.qrSender.queue.Size()) } func TestQueuedRetry_DropOnFull(t *testing.T) { mockP := newMockConcurrentExporter() mockP.updateError(errors.New("transient error")) - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := CreateDefaultRetrySettings() - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -317,9 +348,11 @@ func TestQueuedRetryHappyPath(t *testing.T) { defer doneFn() mockP := newMockConcurrentExporter() - qCfg := CreateDefaultQueuedSettings() + qCfg := CreateDefaultQueueSettings() rCfg := CreateDefaultRetrySettings() - be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueued(qCfg)) + be := newBaseExporter(defaultExporterCfg, WithRetry(rCfg), WithQueue(qCfg)) + ocs := &observabilityConsumerSender{nextSender: be.qrSender.consumerSender} + be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { mockP.stop() @@ -339,7 +372,8 @@ func TestQueuedRetryHappyPath(t *testing.T) { mockP.awaitAsyncProcessing() mockP.checkNumRequests(t, wantRequests) - mockP.checkNumItems(t, 2*wantRequests) + ocs.checkSendItemsCount(t, 2*wantRequests) + ocs.checkDroppedItemsCount(t, 0) } type mockErrorRequest struct { @@ -355,7 +389,7 @@ func (mer *mockErrorRequest) onPartialError(consumererror.PartialError) request } func (mer *mockErrorRequest) count() int { - return 0 + return 7 } func newErrorRequest(ctx context.Context) request { @@ -371,7 +405,7 @@ type mockRequest struct { } func (m *mockRequest) export(_ context.Context) (int, error) { - err := m.mce.export(m.cnt) + err := m.mce.export() if err != nil { return m.cnt, err } @@ -411,32 +445,27 @@ func newMockConcurrentExporter() *mockConcurrentExporter { return &mockConcurrentExporter{waitGroup: new(sync.WaitGroup)} } -func (p *mockConcurrentExporter) export(cnt int) error { +func (p *mockConcurrentExporter) export() error { if atomic.LoadInt32(&p.stopped) == 1 { return nil } atomic.AddInt64(&p.requestCount, 1) - atomic.AddInt64(&p.itemsCount, int64(cnt)) p.mu.Lock() defer p.mu.Unlock() defer p.waitGroup.Done() return p.consumeError } -func (p *mockConcurrentExporter) checkNumRequests(t *testing.T, want int) { - assert.EqualValues(t, want, atomic.LoadInt64(&p.requestCount)) -} - -func (p *mockConcurrentExporter) checkNumItems(t *testing.T, want int) { - assert.EqualValues(t, want, atomic.LoadInt64(&p.itemsCount)) -} - func (p *mockConcurrentExporter) updateError(err error) { p.mu.Lock() defer p.mu.Unlock() p.consumeError = err } +func (p *mockConcurrentExporter) checkNumRequests(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&p.requestCount)) +} + func (p *mockConcurrentExporter) run(fn func()) { p.waitGroup.Add(1) fn() @@ -449,3 +478,28 @@ func (p *mockConcurrentExporter) awaitAsyncProcessing() { func (p *mockConcurrentExporter) stop() { atomic.StoreInt32(&p.stopped, 1) } + +type observabilityConsumerSender struct { + sentItemsCount int64 + droppedItemsCount int64 + nextSender requestSender +} + +func newObservabilityConsumerSender(nextSender requestSender) requestSender { + return &observabilityConsumerSender{nextSender: nextSender} +} + +func (ocs *observabilityConsumerSender) send(req request) (int, error) { + dic, err := ocs.nextSender.send(req) + atomic.AddInt64(&ocs.sentItemsCount, int64(req.count()-dic)) + atomic.AddInt64(&ocs.droppedItemsCount, int64(dic)) + return dic, err +} + +func (p *observabilityConsumerSender) checkSendItemsCount(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&p.sentItemsCount)) +} + +func (p *observabilityConsumerSender) checkDroppedItemsCount(t *testing.T, want int) { + assert.EqualValues(t, want, atomic.LoadInt64(&p.droppedItemsCount)) +} diff --git a/exporter/exporterhelper/tracehelper.go b/exporter/exporterhelper/tracehelper.go index 74b95b1bcc4..119d57c7ddf 100644 --- a/exporter/exporterhelper/tracehelper.go +++ b/exporter/exporterhelper/tracehelper.go @@ -29,7 +29,6 @@ import ( // returns the number of dropped spans. type traceDataPusherOld func(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error) -// traceExporterOld implements the nextSender with additional helper internalOptions. type traceExporterOld struct { *baseExporter dataPusher traceDataPusherOld @@ -41,9 +40,7 @@ func (texp *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerd return err } -// NewTraceExporterOld creates an TraceExporterOld that can record metrics and can wrap every -// request with a Span. If no internalOptions are passed it just adds the nextSender format as a -// tag in the Context. +// NewTraceExporterOld creates an TraceExporterOld that records observability metrics and wraps every request with a Span. func NewTraceExporterOld( cfg configmodels.Exporter, dataPusher traceDataPusherOld, @@ -125,8 +122,7 @@ func (texp *traceExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) e return err } -// NewTraceExporter creates a TraceExporter that can record metrics and can wrap -// every request with a Span. +// NewTraceExporter creates a TraceExporter that records observability metrics and wraps every request with a Span. func NewTraceExporter( cfg configmodels.Exporter, dataPusher traceDataPusher, @@ -142,12 +138,12 @@ func NewTraceExporter( } be := newBaseExporter(cfg, options...) - - // Record metrics on the consumer. - be.qSender.nextSender = &tracesExporterWithObservability{ - exporterName: cfg.Name(), - sender: be.qSender.nextSender, - } + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &tracesExporterWithObservability{ + exporterName: cfg.Name(), + nextSender: nextSender, + } + }) return &traceExporter{ baseExporter: be, @@ -157,16 +153,16 @@ func NewTraceExporter( type tracesExporterWithObservability struct { exporterName string - sender requestSender + nextSender requestSender } func (tewo *tracesExporterWithObservability) send(req request) (int, error) { req.setContext(obsreport.StartTraceDataExportOp(req.context(), tewo.exporterName)) // Forward the data to the next consumer (this pusher is the next). - droppedSpans, err := tewo.sender.send(req) + droppedSpans, err := tewo.nextSender.send(req) - // TODO: this is not ideal: req should come from the next function itself. - // temporarily loading req from internal format. Once full switch is done + // TODO: this is not ideal: it should come from the next function itself. + // temporarily loading it from internal format. Once full switch is done // to new metrics will remove this. obsreport.EndTraceDataExportOp(req.context(), req.count(), droppedSpans, err) return droppedSpans, err diff --git a/exporter/otlpexporter/README.md b/exporter/otlpexporter/README.md index 59a8c25d731..2a262985675 100644 --- a/exporter/otlpexporter/README.md +++ b/exporter/otlpexporter/README.md @@ -28,16 +28,19 @@ The following settings can be optionally configured: [grpc.WithInsecure()](https://godoc.org/google.golang.org/grpc#WithInsecure). - `balancer_name`(default = pick_first): Sets the balancer in grpclb_policy to discover the servers. See [grpc loadbalancing example](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md). -- `timeout` (default = 5s): Is the timeout for each operation. -- `retry_settings` +- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend. +- `retry_on_failure` - `disabled` (default = false) - - `initial_backoff` (default = 5s): Time to wait after the first failure before retrying; ignored if retry_on_failure is false - - `max_backoff` (default = 30s): Is the upper bound on backoff; ignored if retry_on_failure is false - - `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if retry_on_failure is false -- `queued_settings` + - `initial_backoff` (default = 5s): Time to wait after the first failure before retrying; ignored if `disabled` is `true` + - `max_backoff` (default = 30s): Is the upper bound on backoff; ignored if `disabled` is `true` + - `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if `disabled` is `true` +- `sending_queue` - `disabled` (default = true) - - `num_workers` (default = 10): Number of workers that dequeue batches - - `queue_size` (default = 5000): Maximum number of batches kept in memory before data + - `num_workers` (default = 10): Number of workers that dequeue batches; ignored if `disabled` is `true` + - `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `disabled` is `true`; + User should calculate this as `num_seconds * requests_per_second` where: + - `num_seconds` is the number of seconds to buffer in case of a backend outage + - `requests_per_second` is the average number of requests per seconds. Example: diff --git a/exporter/otlpexporter/config.go b/exporter/otlpexporter/config.go index 469d0ba1835..fe9c3f0ded8 100644 --- a/exporter/otlpexporter/config.go +++ b/exporter/otlpexporter/config.go @@ -24,8 +24,8 @@ import ( type Config struct { configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. - exporterhelper.QueuedSettings `mapstructure:"queued_settings"` - exporterhelper.RetrySettings `mapstructure:"retry_settings"` + exporterhelper.QueueSettings `mapstructure:"sending_queue"` + exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. } diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index 1ab79b67834..a603f809d38 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -59,7 +59,7 @@ func TestLoadConfig(t *testing.T) { MaxBackoff: 1 * time.Minute, MaxElapsedTime: 10 * time.Minute, }, - QueuedSettings: exporterhelper.QueuedSettings{ + QueueSettings: exporterhelper.QueueSettings{ Disabled: false, NumWorkers: 2, QueueSize: 10, diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index 8d33b3b1ef1..4b9f28e9c2f 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -40,7 +40,7 @@ func NewFactory() component.ExporterFactory { func createDefaultConfig() configmodels.Exporter { // TODO: Enable the queued settings. - qs := exporterhelper.CreateDefaultQueuedSettings() + qs := exporterhelper.CreateDefaultQueueSettings() qs.Disabled = true return &Config{ ExporterSettings: configmodels.ExporterSettings{ @@ -49,7 +49,7 @@ func createDefaultConfig() configmodels.Exporter { }, TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(), RetrySettings: exporterhelper.CreateDefaultRetrySettings(), - QueuedSettings: qs, + QueueSettings: qs, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{}, // We almost read 0 bytes, so no need to tune ReadBufferSize. @@ -73,7 +73,7 @@ func createTraceExporter( oce.pushTraceData, exporterhelper.WithTimeout(oCfg.TimeoutSettings), exporterhelper.WithRetry(oCfg.RetrySettings), - exporterhelper.WithQueued(oCfg.QueuedSettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown)) if err != nil { return nil, err @@ -97,7 +97,7 @@ func createMetricsExporter( oce.pushMetricsData, exporterhelper.WithTimeout(oCfg.TimeoutSettings), exporterhelper.WithRetry(oCfg.RetrySettings), - exporterhelper.WithQueued(oCfg.QueuedSettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown), ) if err != nil { @@ -122,7 +122,7 @@ func createLogExporter( oce.pushLogData, exporterhelper.WithTimeout(oCfg.TimeoutSettings), exporterhelper.WithRetry(oCfg.RetrySettings), - exporterhelper.WithQueued(oCfg.QueuedSettings), + exporterhelper.WithQueue(oCfg.QueueSettings), exporterhelper.WithShutdown(oce.shutdown), ) if err != nil { diff --git a/exporter/otlpexporter/testdata/config.yaml b/exporter/otlpexporter/testdata/config.yaml index 253b8e66120..9fc8e593aed 100644 --- a/exporter/otlpexporter/testdata/config.yaml +++ b/exporter/otlpexporter/testdata/config.yaml @@ -11,11 +11,11 @@ exporters: compression: "on" ca_file: /var/lib/mycert.pem timeout: 10s - queued_settings: + sending_queue: disabled: false num_workers: 2 queue_size: 10 - retry_settings: + retry_on_failure: disabled: false initial_backoff: 10s max_backoff: 60s