From 69215b9e38fcbb24a3abce1671899565eb0d6d42 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 31 May 2021 12:11:53 -0700 Subject: [PATCH 1/2] Add telemetry for dropped data due to exporter sending queue overflow This change adds internal metrics for dropped spans, metric points and log records when exporter sending queue is full: - exporter/enqueue_failed_metric_points - exporter/enqueue_failed_spans - exporter/enqueue_failed_log_records --- CHANGELOG.md | 1 + exporter/exporterhelper/common.go | 7 ++++++ exporter/exporterhelper/logs.go | 14 ++++++----- exporter/exporterhelper/logs_test.go | 24 +++++++++++++++++++ exporter/exporterhelper/metrics.go | 14 ++++++----- exporter/exporterhelper/metrics_test.go | 24 +++++++++++++++++++ exporter/exporterhelper/queued_retry.go | 4 +++- exporter/exporterhelper/traces.go | 15 ++++++------ exporter/exporterhelper/traces_test.go | 24 +++++++++++++++++++ .../obsmetrics/obs_exporter.go | 18 ++++++++++++++ internal/obsreportconfig/obsreportconfig.go | 3 +++ obsreport/obsreport_exporter.go | 15 ++++++++++++ obsreport/obsreport_test.go | 20 ++++++++++++++++ obsreport/obsreporttest/obsreporttest.go | 21 ++++++++++++++++ 14 files changed, 184 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 207495f6010..0487b6a55f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ - Add `doc.go` files to the consumer package and its subpackages (#3270) - Automate triggering of doc-update on release (#3234) - Enable Dependabot for Github Actions (#3312) +- Add telemetry for dropped data due to exporter sending queue overflow (#3328) ## v0.27.0 Beta diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 27d533f5fd8..6dd023f3cbe 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -23,8 +23,10 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenthelper" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerhelper" + "go.opentelemetry.io/collector/obsreport" ) // TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend. @@ -164,6 +166,7 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel // baseExporter contains common fields between different exporter types. type baseExporter struct { component.Component + obsrep *obsreport.Exporter sender requestSender qrSender *queuedRetrySender } @@ -173,6 +176,10 @@ func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings) Component: componenthelper.New(bs.componentOptions...), } + be.obsrep = obsreport.NewExporter(obsreport.ExporterSettings{ + Level: configtelemetry.GetMetricsLevelFlagValue(), + ExporterID: cfg.ID(), + }) be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger) be.sender = be.qrSender diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 0e822cf11c5..14a62dfc54d 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -16,12 +16,12 @@ package exporterhelper import ( "context" + "errors" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" @@ -87,16 +87,18 @@ func NewLogsExporter( be := newBaseExporter(cfg, logger, bs) be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &logsExporterWithObservability{ - obsrep: obsreport.NewExporter(obsreport.ExporterSettings{ - Level: configtelemetry.GetMetricsLevelFlagValue(), - ExporterID: cfg.ID(), - }), + obsrep: be.obsrep, nextSender: nextSender, } }) lc, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error { - return be.sender.send(newLogsRequest(ctx, ld, pusher)) + req := newLogsRequest(ctx, ld, pusher) + err := be.sender.send(req) + if errors.Is(err, errSendingQueueIsFull) { + be.obsrep.RecordLogsEnqueueFailure(req.context(), req.count()) + } + return err }, bs.consumerOptions...) return &logsExporter{ diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 7333c727b55..f25d231c04c 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -121,6 +121,30 @@ func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) { checkRecordedMetricsForLogsExporter(t, le, want) } +func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + rCfg := DefaultRetrySettings() + qCfg := DefaultQueueSettings() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 2 + wantErr := errors.New("some-error") + te, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(wantErr), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NotNil(t, te) + + md := testdata.GenerateLogsTwoLogRecordsSameResourceOneDifferent() + const numBatches = 7 + for i := 0; i < numBatches; i++ { + te.ConsumeLogs(context.Background(), md) + } + + // 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow + obsreporttest.CheckExporterEnqueueFailedLogs(t, fakeLogsExporterName, int64(15)) +} + func TestLogsExporter_WithSpan(t *testing.T) { le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil)) require.Nil(t, err) diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 7f462e6706e..a2a2d4c513c 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -16,12 +16,12 @@ package exporterhelper import ( "context" + "errors" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" @@ -88,10 +88,7 @@ func NewMetricsExporter( be := newBaseExporter(cfg, logger, bs) be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &metricsSenderWithObservability{ - obsrep: obsreport.NewExporter(obsreport.ExporterSettings{ - Level: configtelemetry.GetMetricsLevelFlagValue(), - ExporterID: cfg.ID(), - }), + obsrep: be.obsrep, nextSender: nextSender, } }) @@ -100,7 +97,12 @@ func NewMetricsExporter( if bs.ResourceToTelemetrySettings.Enabled { md = convertResourceToLabels(md) } - return be.sender.send(newMetricsRequest(ctx, md, pusher)) + req := newMetricsRequest(ctx, md, pusher) + err := be.sender.send(req) + if errors.Is(err, errSendingQueueIsFull) { + be.obsrep.RecordMetricsEnqueueFailure(req.context(), req.count()) + } + return err }, bs.consumerOptions...) return &metricsExporter{ diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 61baee86517..ccc3f09c72e 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -120,6 +120,30 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { checkRecordedMetricsForMetricsExporter(t, me, want) } +func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + rCfg := DefaultRetrySettings() + qCfg := DefaultQueueSettings() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 2 + wantErr := errors.New("some-error") + te, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(wantErr), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NotNil(t, te) + + md := testdata.GenerateMetricsOneMetricOneDataPoint() + const numBatches = 7 + for i := 0; i < numBatches; i++ { + te.ConsumeMetrics(context.Background(), md) + } + + // 2 batched must be in queue, and 5 metric points rejected due to queue overflow + obsreporttest.CheckExporterEnqueueFailedMetrics(t, fakeMetricsExporterName, int64(5)) +} + func TestMetricsExporter_WithSpan(t *testing.T) { me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil)) require.NoError(t, err) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 313a13b241e..c413e8495a4 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -41,6 +41,8 @@ var ( metric.WithDescription("Current size of the retry queue (in batches)"), metric.WithLabelKeys(obsmetrics.ExporterKey), metric.WithUnit(metricdata.UnitDimensionless)) + + errSendingQueueIsFull = errors.New("sending_queue is full") ) func init() { @@ -189,7 +191,7 @@ func (qrs *queuedRetrySender) send(req request) error { zap.Int("dropped_items", req.count()), ) span.Annotate(qrs.traceAttributes, "Dropped item, sending_queue is full.") - return errors.New("sending_queue is full") + return errSendingQueueIsFull } span.Annotate(qrs.traceAttributes, "Enqueued item.") diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 48d243cc9d6..0df0c516af9 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -16,12 +16,12 @@ package exporterhelper import ( "context" + "errors" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" @@ -88,17 +88,18 @@ func NewTracesExporter( be := newBaseExporter(cfg, logger, bs) be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &tracesExporterWithObservability{ - obsrep: obsreport.NewExporter( - obsreport.ExporterSettings{ - Level: configtelemetry.GetMetricsLevelFlagValue(), - ExporterID: cfg.ID(), - }), + obsrep: be.obsrep, nextSender: nextSender, } }) tc, err := consumerhelper.NewTraces(func(ctx context.Context, td pdata.Traces) error { - return be.sender.send(newTracesRequest(ctx, td, pusher)) + req := newTracesRequest(ctx, td, pusher) + err := be.sender.send(req) + if errors.Is(err, errSendingQueueIsFull) { + be.obsrep.RecordTracesEnqueueFailure(req.context(), req.count()) + } + return err }, bs.consumerOptions...) return &traceExporter{ diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 4fc1f0b38f7..19d36d6a3fb 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -131,6 +131,30 @@ func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { checkRecordedMetricsForTracesExporter(t, te, want) } +func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + rCfg := DefaultRetrySettings() + qCfg := DefaultQueueSettings() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 2 + wantErr := errors.New("some-error") + te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(wantErr), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NotNil(t, te) + + td := testdata.GenerateTracesTwoSpansSameResource() + const numBatches = 7 + for i := 0; i < numBatches; i++ { + te.ConsumeTraces(context.Background(), td) + } + + // 2 batched must be in queue, and 5 batches (10 spans) rejected due to queue overflow + obsreporttest.CheckExporterEnqueueFailedTraces(t, fakeTracesExporterName, int64(10)) +} + func TestTracesExporter_WithSpan(t *testing.T) { te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(nil)) require.NoError(t, err) diff --git a/internal/obsreportconfig/obsmetrics/obs_exporter.go b/internal/obsreportconfig/obsmetrics/obs_exporter.go index 01f88ece936..e85f6ef119f 100644 --- a/internal/obsreportconfig/obsmetrics/obs_exporter.go +++ b/internal/obsreportconfig/obsmetrics/obs_exporter.go @@ -27,16 +27,22 @@ const ( SentSpansKey = "sent_spans" // FailedToSendSpansKey used to track spans that failed to be sent by exporters. FailedToSendSpansKey = "send_failed_spans" + // FailedToEnqueueSpansKey used to track spans that failed to be added to the sending queue. + FailedToEnqueueSpansKey = "enqueue_failed_spans" // SentMetricPointsKey used to track metric points sent by exporters. SentMetricPointsKey = "sent_metric_points" // FailedToSendMetricPointsKey used to track metric points that failed to be sent by exporters. FailedToSendMetricPointsKey = "send_failed_metric_points" + // FailedToEnqueueMetricPointsKey used to track metric points that failed to be added to the sending queue. + FailedToEnqueueMetricPointsKey = "enqueue_failed_metric_points" // SentLogRecordsKey used to track logs sent by exporters. SentLogRecordsKey = "sent_log_records" // FailedToSendLogRecordsKey used to track logs that failed to be sent by exporters. FailedToSendLogRecordsKey = "send_failed_log_records" + // FailedToEnqueueLogRecordsKey used to track logs records that failed to be added to the sending queue. + FailedToEnqueueLogRecordsKey = "enqueue_failed_log_records" ) var ( @@ -60,6 +66,10 @@ var ( ExporterPrefix+FailedToSendSpansKey, "Number of spans in failed attempts to send to destination.", stats.UnitDimensionless) + ExporterFailedToEnqueueSpans = stats.Int64( + ExporterPrefix+FailedToEnqueueSpansKey, + "Number of spans failed to be added to the sending queue.", + stats.UnitDimensionless) ExporterSentMetricPoints = stats.Int64( ExporterPrefix+SentMetricPointsKey, "Number of metric points successfully sent to destination.", @@ -68,6 +78,10 @@ var ( ExporterPrefix+FailedToSendMetricPointsKey, "Number of metric points in failed attempts to send to destination.", stats.UnitDimensionless) + ExporterFailedToEnqueueMetricPoints = stats.Int64( + ExporterPrefix+FailedToEnqueueMetricPointsKey, + "Number of metric points failed to be added to the sending queue.", + stats.UnitDimensionless) ExporterSentLogRecords = stats.Int64( ExporterPrefix+SentLogRecordsKey, "Number of log record successfully sent to destination.", @@ -76,4 +90,8 @@ var ( ExporterPrefix+FailedToSendLogRecordsKey, "Number of log records in failed attempts to send to destination.", stats.UnitDimensionless) + ExporterFailedToEnqueueLogRecords = stats.Int64( + ExporterPrefix+FailedToEnqueueLogRecordsKey, + "Number of log records failed to be added to the sending queue.", + stats.UnitDimensionless) ) diff --git a/internal/obsreportconfig/obsreportconfig.go b/internal/obsreportconfig/obsreportconfig.go index da1ce262ff7..cf51bf6ecfa 100644 --- a/internal/obsreportconfig/obsreportconfig.go +++ b/internal/obsreportconfig/obsreportconfig.go @@ -77,10 +77,13 @@ func allViews() *ObsMetrics { measures = []*stats.Int64Measure{ obsmetrics.ExporterSentSpans, obsmetrics.ExporterFailedToSendSpans, + obsmetrics.ExporterFailedToEnqueueSpans, obsmetrics.ExporterSentMetricPoints, obsmetrics.ExporterFailedToSendMetricPoints, + obsmetrics.ExporterFailedToEnqueueMetricPoints, obsmetrics.ExporterSentLogRecords, obsmetrics.ExporterFailedToSendLogRecords, + obsmetrics.ExporterFailedToEnqueueLogRecords, } tagKeys = []tag.Key{obsmetrics.TagKeyExporter} views = append(views, genViews(measures, tagKeys, view.Sum())...) diff --git a/obsreport/obsreport_exporter.go b/obsreport/obsreport_exporter.go index 784e3a4f8c6..c82d6149493 100644 --- a/obsreport/obsreport_exporter.go +++ b/obsreport/obsreport_exporter.go @@ -63,6 +63,11 @@ func (eor *Exporter) EndTracesOp(ctx context.Context, numSpans int, err error) { endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey) } +// RecordTracesEnqueueFailure records number of spans that failed to be added to the sending queue. +func (eor *Exporter) RecordTracesEnqueueFailure(ctx context.Context, numSpans int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans))) +} + // StartMetricsOp is called at the start of an Export operation. // The returned context should be used in other calls to the Exporter functions // dealing with the same export operation. @@ -78,6 +83,11 @@ func (eor *Exporter) EndMetricsOp(ctx context.Context, numMetricPoints int, err endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentMetricPointsKey, obsmetrics.FailedToSendMetricPointsKey) } +// RecordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue. +func (eor *Exporter) RecordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints))) +} + // StartLogsOp is called at the start of an Export operation. // The returned context should be used in other calls to the Exporter functions // dealing with the same export operation. @@ -92,6 +102,11 @@ func (eor *Exporter) EndLogsOp(ctx context.Context, numLogRecords int, err error endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentLogRecordsKey, obsmetrics.FailedToSendLogRecordsKey) } +// RecordLogsEnqueueFailure records number of log records that failed to be added to the sending queue. +func (eor *Exporter) RecordLogsEnqueueFailure(ctx context.Context, numLogRecords int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords))) +} + // startSpan creates the span used to trace the operation. Returning // the updated context and the created span. func (eor *Exporter) startSpan(ctx context.Context, operationSuffix string) context.Context { diff --git a/obsreport/obsreport_test.go b/obsreport/obsreport_test.go index 67d1949a61b..f12f38d7739 100644 --- a/obsreport/obsreport_test.go +++ b/obsreport/obsreport_test.go @@ -436,6 +436,26 @@ func TestExportLogsOp(t *testing.T) { obsreporttest.CheckExporterLogs(t, exporter, int64(sentLogRecords), int64(failedToSendLogRecords)) } +func TestExportEnqueueFailure(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) + + logRecords := 7 + obsrep.RecordLogsEnqueueFailure(context.Background(), logRecords) + obsreporttest.CheckExporterEnqueueFailedLogs(t, exporter, int64(logRecords)) + + spans := 12 + obsrep.RecordTracesEnqueueFailure(context.Background(), spans) + obsreporttest.CheckExporterEnqueueFailedTraces(t, exporter, int64(spans)) + + metricPoints := 21 + obsrep.RecordMetricsEnqueueFailure(context.Background(), metricPoints) + obsreporttest.CheckExporterEnqueueFailedMetrics(t, exporter, int64(metricPoints)) +} + func TestReceiveWithLongLivedCtx(t *testing.T) { ss := &spanStore{} trace.RegisterExporter(ss) diff --git a/obsreport/obsreporttest/obsreporttest.go b/obsreport/obsreporttest/obsreporttest.go index e0ca299cd0d..01eefc6ced9 100644 --- a/obsreport/obsreporttest/obsreporttest.go +++ b/obsreport/obsreporttest/obsreporttest.go @@ -65,6 +65,13 @@ func CheckExporterTraces(t *testing.T, exporter config.ComponentID, acceptedSpan checkValueForView(t, exporterTags, droppedSpans, "exporter/send_failed_spans") } +// CheckExporterEnqueueFailedTraces checks that reported number of spans failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckExporterEnqueueFailedTraces(t *testing.T, exporter config.ComponentID, spans int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans") +} + // CheckExporterMetrics checks that for the current exported values for metrics exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckExporterMetrics(t *testing.T, exporter config.ComponentID, acceptedMetricsPoints, droppedMetricsPoints int64) { @@ -73,6 +80,13 @@ func CheckExporterMetrics(t *testing.T, exporter config.ComponentID, acceptedMet checkValueForView(t, exporterTags, droppedMetricsPoints, "exporter/send_failed_metric_points") } +// CheckExporterEnqueueFailedMetrics checks that reported number of metric points failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckExporterEnqueueFailedMetrics(t *testing.T, exporter config.ComponentID, metricPoints int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points") +} + // CheckExporterLogs checks that for the current exported values for logs exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckExporterLogs(t *testing.T, exporter config.ComponentID, acceptedLogRecords, droppedLogRecords int64) { @@ -81,6 +95,13 @@ func CheckExporterLogs(t *testing.T, exporter config.ComponentID, acceptedLogRec checkValueForView(t, exporterTags, droppedLogRecords, "exporter/send_failed_log_records") } +// CheckExporterEnqueueFailedLogs checks that reported number of log records failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckExporterEnqueueFailedLogs(t *testing.T, exporter config.ComponentID, logRecords int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records") +} + // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckProcessorTraces(t *testing.T, processor config.ComponentID, acceptedSpans, refusedSpans, droppedSpans int64) { From c882e64a6b46b326e0fef8d5037f7f51183a8c14 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Wed, 2 Jun 2021 12:56:43 -0700 Subject: [PATCH 2/2] Make report*EnqueueFailure methods private By moving them to the package where they are being used. It requires some code duplication --- exporter/exporterhelper/common.go | 4 +- exporter/exporterhelper/logs.go | 5 +- exporter/exporterhelper/logs_test.go | 2 +- exporter/exporterhelper/metrics.go | 5 +- exporter/exporterhelper/metrics_test.go | 2 +- exporter/exporterhelper/obsreport.go | 58 ++++++++++++ exporter/exporterhelper/obsreport_test.go | 109 ++++++++++++++++++++++ exporter/exporterhelper/traces.go | 5 +- exporter/exporterhelper/traces_test.go | 2 +- obsreport/obsreport_exporter.go | 15 --- obsreport/obsreport_test.go | 20 ---- obsreport/obsreporttest/obsreporttest.go | 21 ----- 12 files changed, 178 insertions(+), 70 deletions(-) create mode 100644 exporter/exporterhelper/obsreport.go create mode 100644 exporter/exporterhelper/obsreport_test.go diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 6dd023f3cbe..2c7edc8e095 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -166,7 +166,7 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel // baseExporter contains common fields between different exporter types. type baseExporter struct { component.Component - obsrep *obsreport.Exporter + obsrep *obsExporter sender requestSender qrSender *queuedRetrySender } @@ -176,7 +176,7 @@ func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings) Component: componenthelper.New(bs.componentOptions...), } - be.obsrep = obsreport.NewExporter(obsreport.ExporterSettings{ + be.obsrep = newObsExporter(obsreport.ExporterSettings{ Level: configtelemetry.GetMetricsLevelFlagValue(), ExporterID: cfg.ID(), }) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 14a62dfc54d..d2797c6b477 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/obsreport" ) type logsRequest struct { @@ -96,7 +95,7 @@ func NewLogsExporter( req := newLogsRequest(ctx, ld, pusher) err := be.sender.send(req) if errors.Is(err, errSendingQueueIsFull) { - be.obsrep.RecordLogsEnqueueFailure(req.context(), req.count()) + be.obsrep.recordLogsEnqueueFailure(req.context(), req.count()) } return err }, bs.consumerOptions...) @@ -108,7 +107,7 @@ func NewLogsExporter( } type logsExporterWithObservability struct { - obsrep *obsreport.Exporter + obsrep *obsExporter nextSender requestSender } diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index f25d231c04c..3767625444e 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -142,7 +142,7 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { } // 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow - obsreporttest.CheckExporterEnqueueFailedLogs(t, fakeLogsExporterName, int64(15)) + checkExporterEnqueueFailedLogsStats(t, fakeLogsExporterName, int64(15)) } func TestLogsExporter_WithSpan(t *testing.T) { diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index a2a2d4c513c..1a23b8e6a43 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/obsreport" ) type metricsRequest struct { @@ -100,7 +99,7 @@ func NewMetricsExporter( req := newMetricsRequest(ctx, md, pusher) err := be.sender.send(req) if errors.Is(err, errSendingQueueIsFull) { - be.obsrep.RecordMetricsEnqueueFailure(req.context(), req.count()) + be.obsrep.recordMetricsEnqueueFailure(req.context(), req.count()) } return err }, bs.consumerOptions...) @@ -112,7 +111,7 @@ func NewMetricsExporter( } type metricsSenderWithObservability struct { - obsrep *obsreport.Exporter + obsrep *obsExporter nextSender requestSender } diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index ccc3f09c72e..d8ce822ec54 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -141,7 +141,7 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { } // 2 batched must be in queue, and 5 metric points rejected due to queue overflow - obsreporttest.CheckExporterEnqueueFailedMetrics(t, fakeMetricsExporterName, int64(5)) + checkExporterEnqueueFailedMetricsStats(t, fakeMetricsExporterName, int64(5)) } func TestMetricsExporter_WithSpan(t *testing.T) { diff --git a/exporter/exporterhelper/obsreport.go b/exporter/exporterhelper/obsreport.go new file mode 100644 index 00000000000..dc799156a17 --- /dev/null +++ b/exporter/exporterhelper/obsreport.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "context" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" + "go.opentelemetry.io/collector/obsreport" +) + +// TODO: Incorporate this functionality along with tests from obsreport_test.go +// into existing `obsreport` package once its functionally is not exposed +// as public API. For now this part is kept private. + +// obsExporter is a helper to add observability to a component.Exporter. +type obsExporter struct { + *obsreport.Exporter + mutators []tag.Mutator +} + +// newObsExporter creates a new observability exporter. +func newObsExporter(cfg obsreport.ExporterSettings) *obsExporter { + return &obsExporter{ + obsreport.NewExporter(cfg), + []tag.Mutator{tag.Upsert(obsmetrics.TagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))}, + } +} + +// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue. +func (eor *obsExporter) recordTracesEnqueueFailure(ctx context.Context, numSpans int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans))) +} + +// recordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue. +func (eor *obsExporter) recordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints))) +} + +// recordLogsEnqueueFailure records number of log records that failed to be added to the sending queue. +func (eor *obsExporter) recordLogsEnqueueFailure(ctx context.Context, numLogRecords int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords))) +} diff --git a/exporter/exporterhelper/obsreport_test.go b/exporter/exporterhelper/obsreport_test.go new file mode 100644 index 00000000000..be7b09d3055 --- /dev/null +++ b/exporter/exporterhelper/obsreport_test.go @@ -0,0 +1,109 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "context" + "reflect" + "sort" + "testing" + + "github.com/stretchr/testify/require" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/obsreport/obsreporttest" +) + +func TestExportEnqueueFailure(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + exporter := config.NewID("fakeExporter") + + obsrep := newObsExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) + + logRecords := 7 + obsrep.recordLogsEnqueueFailure(context.Background(), logRecords) + checkExporterEnqueueFailedLogsStats(t, exporter, int64(logRecords)) + + spans := 12 + obsrep.recordTracesEnqueueFailure(context.Background(), spans) + checkExporterEnqueueFailedTracesStats(t, exporter, int64(spans)) + + metricPoints := 21 + obsrep.recordMetricsEnqueueFailure(context.Background(), metricPoints) + checkExporterEnqueueFailedMetricsStats(t, exporter, int64(metricPoints)) +} + +// checkExporterEnqueueFailedTracesStats checks that reported number of spans failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func checkExporterEnqueueFailedTracesStats(t *testing.T, exporter config.ComponentID, spans int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans") +} + +// checkExporterEnqueueFailedMetricsStats checks that reported number of metric points failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func checkExporterEnqueueFailedMetricsStats(t *testing.T, exporter config.ComponentID, metricPoints int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points") +} + +// checkExporterEnqueueFailedLogsStats checks that reported number of log records failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func checkExporterEnqueueFailedLogsStats(t *testing.T, exporter config.ComponentID, logRecords int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records") +} + +// checkValueForView checks that for the current exported value in the view with the given name +// for {LegacyTagKeyReceiver: receiverName} is equal to "value". +func checkValueForView(t *testing.T, wantTags []tag.Tag, value int64, vName string) { + // Make sure the tags slice is sorted by tag keys. + sortTags(wantTags) + + rows, err := view.RetrieveData(vName) + require.NoError(t, err) + + for _, row := range rows { + // Make sure the tags slice is sorted by tag keys. + sortTags(row.Tags) + if reflect.DeepEqual(wantTags, row.Tags) { + sum := row.Data.(*view.SumData) + require.Equal(t, float64(value), sum.Value) + return + } + } + + require.Failf(t, "could not find tags", "wantTags: %s in rows %v", wantTags, rows) +} + +// tagsForExporterView returns the tags that are needed for the exporter views. +func tagsForExporterView(exporter config.ComponentID) []tag.Tag { + return []tag.Tag{ + {Key: exporterTag, Value: exporter.String()}, + } +} + +func sortTags(tags []tag.Tag) { + sort.SliceStable(tags, func(i, j int) bool { + return tags[i].Key.Name() < tags[j].Key.Name() + }) +} diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 0df0c516af9..8f67b4484a3 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/obsreport" ) type tracesRequest struct { @@ -97,7 +96,7 @@ func NewTracesExporter( req := newTracesRequest(ctx, td, pusher) err := be.sender.send(req) if errors.Is(err, errSendingQueueIsFull) { - be.obsrep.RecordTracesEnqueueFailure(req.context(), req.count()) + be.obsrep.recordTracesEnqueueFailure(req.context(), req.count()) } return err }, bs.consumerOptions...) @@ -109,7 +108,7 @@ func NewTracesExporter( } type tracesExporterWithObservability struct { - obsrep *obsreport.Exporter + obsrep *obsExporter nextSender requestSender } diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 19d36d6a3fb..39e93f441f0 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -152,7 +152,7 @@ func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { } // 2 batched must be in queue, and 5 batches (10 spans) rejected due to queue overflow - obsreporttest.CheckExporterEnqueueFailedTraces(t, fakeTracesExporterName, int64(10)) + checkExporterEnqueueFailedTracesStats(t, fakeTracesExporterName, int64(10)) } func TestTracesExporter_WithSpan(t *testing.T) { diff --git a/obsreport/obsreport_exporter.go b/obsreport/obsreport_exporter.go index c82d6149493..784e3a4f8c6 100644 --- a/obsreport/obsreport_exporter.go +++ b/obsreport/obsreport_exporter.go @@ -63,11 +63,6 @@ func (eor *Exporter) EndTracesOp(ctx context.Context, numSpans int, err error) { endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey) } -// RecordTracesEnqueueFailure records number of spans that failed to be added to the sending queue. -func (eor *Exporter) RecordTracesEnqueueFailure(ctx context.Context, numSpans int) { - _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans))) -} - // StartMetricsOp is called at the start of an Export operation. // The returned context should be used in other calls to the Exporter functions // dealing with the same export operation. @@ -83,11 +78,6 @@ func (eor *Exporter) EndMetricsOp(ctx context.Context, numMetricPoints int, err endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentMetricPointsKey, obsmetrics.FailedToSendMetricPointsKey) } -// RecordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue. -func (eor *Exporter) RecordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) { - _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints))) -} - // StartLogsOp is called at the start of an Export operation. // The returned context should be used in other calls to the Exporter functions // dealing with the same export operation. @@ -102,11 +92,6 @@ func (eor *Exporter) EndLogsOp(ctx context.Context, numLogRecords int, err error endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentLogRecordsKey, obsmetrics.FailedToSendLogRecordsKey) } -// RecordLogsEnqueueFailure records number of log records that failed to be added to the sending queue. -func (eor *Exporter) RecordLogsEnqueueFailure(ctx context.Context, numLogRecords int) { - _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords))) -} - // startSpan creates the span used to trace the operation. Returning // the updated context and the created span. func (eor *Exporter) startSpan(ctx context.Context, operationSuffix string) context.Context { diff --git a/obsreport/obsreport_test.go b/obsreport/obsreport_test.go index f12f38d7739..67d1949a61b 100644 --- a/obsreport/obsreport_test.go +++ b/obsreport/obsreport_test.go @@ -436,26 +436,6 @@ func TestExportLogsOp(t *testing.T) { obsreporttest.CheckExporterLogs(t, exporter, int64(sentLogRecords), int64(failedToSendLogRecords)) } -func TestExportEnqueueFailure(t *testing.T) { - doneFn, err := obsreporttest.SetupRecordedMetricsTest() - require.NoError(t, err) - defer doneFn() - - obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) - - logRecords := 7 - obsrep.RecordLogsEnqueueFailure(context.Background(), logRecords) - obsreporttest.CheckExporterEnqueueFailedLogs(t, exporter, int64(logRecords)) - - spans := 12 - obsrep.RecordTracesEnqueueFailure(context.Background(), spans) - obsreporttest.CheckExporterEnqueueFailedTraces(t, exporter, int64(spans)) - - metricPoints := 21 - obsrep.RecordMetricsEnqueueFailure(context.Background(), metricPoints) - obsreporttest.CheckExporterEnqueueFailedMetrics(t, exporter, int64(metricPoints)) -} - func TestReceiveWithLongLivedCtx(t *testing.T) { ss := &spanStore{} trace.RegisterExporter(ss) diff --git a/obsreport/obsreporttest/obsreporttest.go b/obsreport/obsreporttest/obsreporttest.go index 01eefc6ced9..e0ca299cd0d 100644 --- a/obsreport/obsreporttest/obsreporttest.go +++ b/obsreport/obsreporttest/obsreporttest.go @@ -65,13 +65,6 @@ func CheckExporterTraces(t *testing.T, exporter config.ComponentID, acceptedSpan checkValueForView(t, exporterTags, droppedSpans, "exporter/send_failed_spans") } -// CheckExporterEnqueueFailedTraces checks that reported number of spans failed to enqueue match given values. -// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. -func CheckExporterEnqueueFailedTraces(t *testing.T, exporter config.ComponentID, spans int64) { - exporterTags := tagsForExporterView(exporter) - checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans") -} - // CheckExporterMetrics checks that for the current exported values for metrics exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckExporterMetrics(t *testing.T, exporter config.ComponentID, acceptedMetricsPoints, droppedMetricsPoints int64) { @@ -80,13 +73,6 @@ func CheckExporterMetrics(t *testing.T, exporter config.ComponentID, acceptedMet checkValueForView(t, exporterTags, droppedMetricsPoints, "exporter/send_failed_metric_points") } -// CheckExporterEnqueueFailedMetrics checks that reported number of metric points failed to enqueue match given values. -// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. -func CheckExporterEnqueueFailedMetrics(t *testing.T, exporter config.ComponentID, metricPoints int64) { - exporterTags := tagsForExporterView(exporter) - checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points") -} - // CheckExporterLogs checks that for the current exported values for logs exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckExporterLogs(t *testing.T, exporter config.ComponentID, acceptedLogRecords, droppedLogRecords int64) { @@ -95,13 +81,6 @@ func CheckExporterLogs(t *testing.T, exporter config.ComponentID, acceptedLogRec checkValueForView(t, exporterTags, droppedLogRecords, "exporter/send_failed_log_records") } -// CheckExporterEnqueueFailedLogs checks that reported number of log records failed to enqueue match given values. -// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. -func CheckExporterEnqueueFailedLogs(t *testing.T, exporter config.ComponentID, logRecords int64) { - exporterTags := tagsForExporterView(exporter) - checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records") -} - // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckProcessorTraces(t *testing.T, processor config.ComponentID, acceptedSpans, refusedSpans, droppedSpans int64) {