diff --git a/CHANGELOG.md b/CHANGELOG.md index 007f97677b9c..184ca008e687 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ ## 💡 Enhancements 💡 - Add `doc.go` files to the consumer package and its subpackages (#3270) +- Add telemetry for dropped data due to exporter sending queue overflow (#3328) ## v0.27.0 Beta diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 27d533f5fd8c..6dd023f3cbe4 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -23,8 +23,10 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenthelper" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerhelper" + "go.opentelemetry.io/collector/obsreport" ) // TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend. @@ -164,6 +166,7 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel // baseExporter contains common fields between different exporter types. type baseExporter struct { component.Component + obsrep *obsreport.Exporter sender requestSender qrSender *queuedRetrySender } @@ -173,6 +176,10 @@ func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings) Component: componenthelper.New(bs.componentOptions...), } + be.obsrep = obsreport.NewExporter(obsreport.ExporterSettings{ + Level: configtelemetry.GetMetricsLevelFlagValue(), + ExporterID: cfg.ID(), + }) be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger) be.sender = be.qrSender diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 0e822cf11c59..14a62dfc54df 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -16,12 +16,12 @@ package exporterhelper import ( "context" + "errors" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" @@ -87,16 +87,18 @@ func NewLogsExporter( be := newBaseExporter(cfg, logger, bs) be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &logsExporterWithObservability{ - obsrep: obsreport.NewExporter(obsreport.ExporterSettings{ - Level: configtelemetry.GetMetricsLevelFlagValue(), - ExporterID: cfg.ID(), - }), + obsrep: be.obsrep, nextSender: nextSender, } }) lc, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error { - return be.sender.send(newLogsRequest(ctx, ld, pusher)) + req := newLogsRequest(ctx, ld, pusher) + err := be.sender.send(req) + if errors.Is(err, errSendingQueueIsFull) { + be.obsrep.RecordLogsEnqueueFailure(req.context(), req.count()) + } + return err }, bs.consumerOptions...) return &logsExporter{ diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 7333c727b557..f25d231c04cd 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -121,6 +121,30 @@ func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) { checkRecordedMetricsForLogsExporter(t, le, want) } +func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + rCfg := DefaultRetrySettings() + qCfg := DefaultQueueSettings() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 2 + wantErr := errors.New("some-error") + te, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(wantErr), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NotNil(t, te) + + md := testdata.GenerateLogsTwoLogRecordsSameResourceOneDifferent() + const numBatches = 7 + for i := 0; i < numBatches; i++ { + te.ConsumeLogs(context.Background(), md) + } + + // 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow + obsreporttest.CheckExporterEnqueueFailedLogs(t, fakeLogsExporterName, int64(15)) +} + func TestLogsExporter_WithSpan(t *testing.T) { le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil)) require.Nil(t, err) diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 7f462e6706e4..a2a2d4c513c9 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -16,12 +16,12 @@ package exporterhelper import ( "context" + "errors" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" @@ -88,10 +88,7 @@ func NewMetricsExporter( be := newBaseExporter(cfg, logger, bs) be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &metricsSenderWithObservability{ - obsrep: obsreport.NewExporter(obsreport.ExporterSettings{ - Level: configtelemetry.GetMetricsLevelFlagValue(), - ExporterID: cfg.ID(), - }), + obsrep: be.obsrep, nextSender: nextSender, } }) @@ -100,7 +97,12 @@ func NewMetricsExporter( if bs.ResourceToTelemetrySettings.Enabled { md = convertResourceToLabels(md) } - return be.sender.send(newMetricsRequest(ctx, md, pusher)) + req := newMetricsRequest(ctx, md, pusher) + err := be.sender.send(req) + if errors.Is(err, errSendingQueueIsFull) { + be.obsrep.RecordMetricsEnqueueFailure(req.context(), req.count()) + } + return err }, bs.consumerOptions...) return &metricsExporter{ diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 61baee865176..ccc3f09c72e6 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -120,6 +120,30 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { checkRecordedMetricsForMetricsExporter(t, me, want) } +func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + rCfg := DefaultRetrySettings() + qCfg := DefaultQueueSettings() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 2 + wantErr := errors.New("some-error") + te, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(wantErr), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NotNil(t, te) + + md := testdata.GenerateMetricsOneMetricOneDataPoint() + const numBatches = 7 + for i := 0; i < numBatches; i++ { + te.ConsumeMetrics(context.Background(), md) + } + + // 2 batched must be in queue, and 5 metric points rejected due to queue overflow + obsreporttest.CheckExporterEnqueueFailedMetrics(t, fakeMetricsExporterName, int64(5)) +} + func TestMetricsExporter_WithSpan(t *testing.T) { me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil)) require.NoError(t, err) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 313a13b241e5..c413e8495a40 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -41,6 +41,8 @@ var ( metric.WithDescription("Current size of the retry queue (in batches)"), metric.WithLabelKeys(obsmetrics.ExporterKey), metric.WithUnit(metricdata.UnitDimensionless)) + + errSendingQueueIsFull = errors.New("sending_queue is full") ) func init() { @@ -189,7 +191,7 @@ func (qrs *queuedRetrySender) send(req request) error { zap.Int("dropped_items", req.count()), ) span.Annotate(qrs.traceAttributes, "Dropped item, sending_queue is full.") - return errors.New("sending_queue is full") + return errSendingQueueIsFull } span.Annotate(qrs.traceAttributes, "Enqueued item.") diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 48d243cc9d61..0df0c516af91 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -16,12 +16,12 @@ package exporterhelper import ( "context" + "errors" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" @@ -88,17 +88,18 @@ func NewTracesExporter( be := newBaseExporter(cfg, logger, bs) be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &tracesExporterWithObservability{ - obsrep: obsreport.NewExporter( - obsreport.ExporterSettings{ - Level: configtelemetry.GetMetricsLevelFlagValue(), - ExporterID: cfg.ID(), - }), + obsrep: be.obsrep, nextSender: nextSender, } }) tc, err := consumerhelper.NewTraces(func(ctx context.Context, td pdata.Traces) error { - return be.sender.send(newTracesRequest(ctx, td, pusher)) + req := newTracesRequest(ctx, td, pusher) + err := be.sender.send(req) + if errors.Is(err, errSendingQueueIsFull) { + be.obsrep.RecordTracesEnqueueFailure(req.context(), req.count()) + } + return err }, bs.consumerOptions...) return &traceExporter{ diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 4fc1f0b38f7c..19d36d6a3fb7 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -131,6 +131,30 @@ func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { checkRecordedMetricsForTracesExporter(t, te, want) } +func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + rCfg := DefaultRetrySettings() + qCfg := DefaultQueueSettings() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 2 + wantErr := errors.New("some-error") + te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(wantErr), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NotNil(t, te) + + td := testdata.GenerateTracesTwoSpansSameResource() + const numBatches = 7 + for i := 0; i < numBatches; i++ { + te.ConsumeTraces(context.Background(), td) + } + + // 2 batched must be in queue, and 5 batches (10 spans) rejected due to queue overflow + obsreporttest.CheckExporterEnqueueFailedTraces(t, fakeTracesExporterName, int64(10)) +} + func TestTracesExporter_WithSpan(t *testing.T) { te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(nil)) require.NoError(t, err) diff --git a/internal/obsreportconfig/obsmetrics/obs_exporter.go b/internal/obsreportconfig/obsmetrics/obs_exporter.go index 01f88ece9362..e85f6ef119f3 100644 --- a/internal/obsreportconfig/obsmetrics/obs_exporter.go +++ b/internal/obsreportconfig/obsmetrics/obs_exporter.go @@ -27,16 +27,22 @@ const ( SentSpansKey = "sent_spans" // FailedToSendSpansKey used to track spans that failed to be sent by exporters. FailedToSendSpansKey = "send_failed_spans" + // FailedToEnqueueSpansKey used to track spans that failed to be added to the sending queue. + FailedToEnqueueSpansKey = "enqueue_failed_spans" // SentMetricPointsKey used to track metric points sent by exporters. SentMetricPointsKey = "sent_metric_points" // FailedToSendMetricPointsKey used to track metric points that failed to be sent by exporters. FailedToSendMetricPointsKey = "send_failed_metric_points" + // FailedToEnqueueMetricPointsKey used to track metric points that failed to be added to the sending queue. + FailedToEnqueueMetricPointsKey = "enqueue_failed_metric_points" // SentLogRecordsKey used to track logs sent by exporters. SentLogRecordsKey = "sent_log_records" // FailedToSendLogRecordsKey used to track logs that failed to be sent by exporters. FailedToSendLogRecordsKey = "send_failed_log_records" + // FailedToEnqueueLogRecordsKey used to track logs records that failed to be added to the sending queue. + FailedToEnqueueLogRecordsKey = "enqueue_failed_log_records" ) var ( @@ -60,6 +66,10 @@ var ( ExporterPrefix+FailedToSendSpansKey, "Number of spans in failed attempts to send to destination.", stats.UnitDimensionless) + ExporterFailedToEnqueueSpans = stats.Int64( + ExporterPrefix+FailedToEnqueueSpansKey, + "Number of spans failed to be added to the sending queue.", + stats.UnitDimensionless) ExporterSentMetricPoints = stats.Int64( ExporterPrefix+SentMetricPointsKey, "Number of metric points successfully sent to destination.", @@ -68,6 +78,10 @@ var ( ExporterPrefix+FailedToSendMetricPointsKey, "Number of metric points in failed attempts to send to destination.", stats.UnitDimensionless) + ExporterFailedToEnqueueMetricPoints = stats.Int64( + ExporterPrefix+FailedToEnqueueMetricPointsKey, + "Number of metric points failed to be added to the sending queue.", + stats.UnitDimensionless) ExporterSentLogRecords = stats.Int64( ExporterPrefix+SentLogRecordsKey, "Number of log record successfully sent to destination.", @@ -76,4 +90,8 @@ var ( ExporterPrefix+FailedToSendLogRecordsKey, "Number of log records in failed attempts to send to destination.", stats.UnitDimensionless) + ExporterFailedToEnqueueLogRecords = stats.Int64( + ExporterPrefix+FailedToEnqueueLogRecordsKey, + "Number of log records failed to be added to the sending queue.", + stats.UnitDimensionless) ) diff --git a/internal/obsreportconfig/obsreportconfig.go b/internal/obsreportconfig/obsreportconfig.go index da1ce262ff73..cf51bf6ecfa4 100644 --- a/internal/obsreportconfig/obsreportconfig.go +++ b/internal/obsreportconfig/obsreportconfig.go @@ -77,10 +77,13 @@ func allViews() *ObsMetrics { measures = []*stats.Int64Measure{ obsmetrics.ExporterSentSpans, obsmetrics.ExporterFailedToSendSpans, + obsmetrics.ExporterFailedToEnqueueSpans, obsmetrics.ExporterSentMetricPoints, obsmetrics.ExporterFailedToSendMetricPoints, + obsmetrics.ExporterFailedToEnqueueMetricPoints, obsmetrics.ExporterSentLogRecords, obsmetrics.ExporterFailedToSendLogRecords, + obsmetrics.ExporterFailedToEnqueueLogRecords, } tagKeys = []tag.Key{obsmetrics.TagKeyExporter} views = append(views, genViews(measures, tagKeys, view.Sum())...) diff --git a/obsreport/obsreport_exporter.go b/obsreport/obsreport_exporter.go index 784e3a4f8c65..c82d61494939 100644 --- a/obsreport/obsreport_exporter.go +++ b/obsreport/obsreport_exporter.go @@ -63,6 +63,11 @@ func (eor *Exporter) EndTracesOp(ctx context.Context, numSpans int, err error) { endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey) } +// RecordTracesEnqueueFailure records number of spans that failed to be added to the sending queue. +func (eor *Exporter) RecordTracesEnqueueFailure(ctx context.Context, numSpans int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans))) +} + // StartMetricsOp is called at the start of an Export operation. // The returned context should be used in other calls to the Exporter functions // dealing with the same export operation. @@ -78,6 +83,11 @@ func (eor *Exporter) EndMetricsOp(ctx context.Context, numMetricPoints int, err endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentMetricPointsKey, obsmetrics.FailedToSendMetricPointsKey) } +// RecordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue. +func (eor *Exporter) RecordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints))) +} + // StartLogsOp is called at the start of an Export operation. // The returned context should be used in other calls to the Exporter functions // dealing with the same export operation. @@ -92,6 +102,11 @@ func (eor *Exporter) EndLogsOp(ctx context.Context, numLogRecords int, err error endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentLogRecordsKey, obsmetrics.FailedToSendLogRecordsKey) } +// RecordLogsEnqueueFailure records number of log records that failed to be added to the sending queue. +func (eor *Exporter) RecordLogsEnqueueFailure(ctx context.Context, numLogRecords int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords))) +} + // startSpan creates the span used to trace the operation. Returning // the updated context and the created span. func (eor *Exporter) startSpan(ctx context.Context, operationSuffix string) context.Context { diff --git a/obsreport/obsreport_test.go b/obsreport/obsreport_test.go index 67d1949a61bc..f12f38d7739b 100644 --- a/obsreport/obsreport_test.go +++ b/obsreport/obsreport_test.go @@ -436,6 +436,26 @@ func TestExportLogsOp(t *testing.T) { obsreporttest.CheckExporterLogs(t, exporter, int64(sentLogRecords), int64(failedToSendLogRecords)) } +func TestExportEnqueueFailure(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) + + logRecords := 7 + obsrep.RecordLogsEnqueueFailure(context.Background(), logRecords) + obsreporttest.CheckExporterEnqueueFailedLogs(t, exporter, int64(logRecords)) + + spans := 12 + obsrep.RecordTracesEnqueueFailure(context.Background(), spans) + obsreporttest.CheckExporterEnqueueFailedTraces(t, exporter, int64(spans)) + + metricPoints := 21 + obsrep.RecordMetricsEnqueueFailure(context.Background(), metricPoints) + obsreporttest.CheckExporterEnqueueFailedMetrics(t, exporter, int64(metricPoints)) +} + func TestReceiveWithLongLivedCtx(t *testing.T) { ss := &spanStore{} trace.RegisterExporter(ss) diff --git a/obsreport/obsreporttest/obsreporttest.go b/obsreport/obsreporttest/obsreporttest.go index e0ca299cd0d2..01eefc6ced9c 100644 --- a/obsreport/obsreporttest/obsreporttest.go +++ b/obsreport/obsreporttest/obsreporttest.go @@ -65,6 +65,13 @@ func CheckExporterTraces(t *testing.T, exporter config.ComponentID, acceptedSpan checkValueForView(t, exporterTags, droppedSpans, "exporter/send_failed_spans") } +// CheckExporterEnqueueFailedTraces checks that reported number of spans failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckExporterEnqueueFailedTraces(t *testing.T, exporter config.ComponentID, spans int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans") +} + // CheckExporterMetrics checks that for the current exported values for metrics exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckExporterMetrics(t *testing.T, exporter config.ComponentID, acceptedMetricsPoints, droppedMetricsPoints int64) { @@ -73,6 +80,13 @@ func CheckExporterMetrics(t *testing.T, exporter config.ComponentID, acceptedMet checkValueForView(t, exporterTags, droppedMetricsPoints, "exporter/send_failed_metric_points") } +// CheckExporterEnqueueFailedMetrics checks that reported number of metric points failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckExporterEnqueueFailedMetrics(t *testing.T, exporter config.ComponentID, metricPoints int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points") +} + // CheckExporterLogs checks that for the current exported values for logs exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckExporterLogs(t *testing.T, exporter config.ComponentID, acceptedLogRecords, droppedLogRecords int64) { @@ -81,6 +95,13 @@ func CheckExporterLogs(t *testing.T, exporter config.ComponentID, acceptedLogRec checkValueForView(t, exporterTags, droppedLogRecords, "exporter/send_failed_log_records") } +// CheckExporterEnqueueFailedLogs checks that reported number of log records failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckExporterEnqueueFailedLogs(t *testing.T, exporter config.ComponentID, logRecords int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records") +} + // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckProcessorTraces(t *testing.T, processor config.ComponentID, acceptedSpans, refusedSpans, droppedSpans int64) {