From c882e64a6b46b326e0fef8d5037f7f51183a8c14 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Wed, 2 Jun 2021 12:56:43 -0700 Subject: [PATCH] Make report*EnqueueFailure methods private By moving them to the package where they are being used. It requires some code duplication --- exporter/exporterhelper/common.go | 4 +- exporter/exporterhelper/logs.go | 5 +- exporter/exporterhelper/logs_test.go | 2 +- exporter/exporterhelper/metrics.go | 5 +- exporter/exporterhelper/metrics_test.go | 2 +- exporter/exporterhelper/obsreport.go | 58 ++++++++++++ exporter/exporterhelper/obsreport_test.go | 109 ++++++++++++++++++++++ exporter/exporterhelper/traces.go | 5 +- exporter/exporterhelper/traces_test.go | 2 +- obsreport/obsreport_exporter.go | 15 --- obsreport/obsreport_test.go | 20 ---- obsreport/obsreporttest/obsreporttest.go | 21 ----- 12 files changed, 178 insertions(+), 70 deletions(-) create mode 100644 exporter/exporterhelper/obsreport.go create mode 100644 exporter/exporterhelper/obsreport_test.go diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 6dd023f3cbe..2c7edc8e095 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -166,7 +166,7 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel // baseExporter contains common fields between different exporter types. type baseExporter struct { component.Component - obsrep *obsreport.Exporter + obsrep *obsExporter sender requestSender qrSender *queuedRetrySender } @@ -176,7 +176,7 @@ func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings) Component: componenthelper.New(bs.componentOptions...), } - be.obsrep = obsreport.NewExporter(obsreport.ExporterSettings{ + be.obsrep = newObsExporter(obsreport.ExporterSettings{ Level: configtelemetry.GetMetricsLevelFlagValue(), ExporterID: cfg.ID(), }) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 14a62dfc54d..d2797c6b477 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/obsreport" ) type logsRequest struct { @@ -96,7 +95,7 @@ func NewLogsExporter( req := newLogsRequest(ctx, ld, pusher) err := be.sender.send(req) if errors.Is(err, errSendingQueueIsFull) { - be.obsrep.RecordLogsEnqueueFailure(req.context(), req.count()) + be.obsrep.recordLogsEnqueueFailure(req.context(), req.count()) } return err }, bs.consumerOptions...) @@ -108,7 +107,7 @@ func NewLogsExporter( } type logsExporterWithObservability struct { - obsrep *obsreport.Exporter + obsrep *obsExporter nextSender requestSender } diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index f25d231c04c..3767625444e 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -142,7 +142,7 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { } // 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow - obsreporttest.CheckExporterEnqueueFailedLogs(t, fakeLogsExporterName, int64(15)) + checkExporterEnqueueFailedLogsStats(t, fakeLogsExporterName, int64(15)) } func TestLogsExporter_WithSpan(t *testing.T) { diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index a2a2d4c513c..1a23b8e6a43 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/obsreport" ) type metricsRequest struct { @@ -100,7 +99,7 @@ func NewMetricsExporter( req := newMetricsRequest(ctx, md, pusher) err := be.sender.send(req) if errors.Is(err, errSendingQueueIsFull) { - be.obsrep.RecordMetricsEnqueueFailure(req.context(), req.count()) + be.obsrep.recordMetricsEnqueueFailure(req.context(), req.count()) } return err }, bs.consumerOptions...) @@ -112,7 +111,7 @@ func NewMetricsExporter( } type metricsSenderWithObservability struct { - obsrep *obsreport.Exporter + obsrep *obsExporter nextSender requestSender } diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index ccc3f09c72e..d8ce822ec54 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -141,7 +141,7 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { } // 2 batched must be in queue, and 5 metric points rejected due to queue overflow - obsreporttest.CheckExporterEnqueueFailedMetrics(t, fakeMetricsExporterName, int64(5)) + checkExporterEnqueueFailedMetricsStats(t, fakeMetricsExporterName, int64(5)) } func TestMetricsExporter_WithSpan(t *testing.T) { diff --git a/exporter/exporterhelper/obsreport.go b/exporter/exporterhelper/obsreport.go new file mode 100644 index 00000000000..dc799156a17 --- /dev/null +++ b/exporter/exporterhelper/obsreport.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "context" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" + "go.opentelemetry.io/collector/obsreport" +) + +// TODO: Incorporate this functionality along with tests from obsreport_test.go +// into existing `obsreport` package once its functionally is not exposed +// as public API. For now this part is kept private. + +// obsExporter is a helper to add observability to a component.Exporter. +type obsExporter struct { + *obsreport.Exporter + mutators []tag.Mutator +} + +// newObsExporter creates a new observability exporter. +func newObsExporter(cfg obsreport.ExporterSettings) *obsExporter { + return &obsExporter{ + obsreport.NewExporter(cfg), + []tag.Mutator{tag.Upsert(obsmetrics.TagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))}, + } +} + +// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue. +func (eor *obsExporter) recordTracesEnqueueFailure(ctx context.Context, numSpans int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans))) +} + +// recordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue. +func (eor *obsExporter) recordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints))) +} + +// recordLogsEnqueueFailure records number of log records that failed to be added to the sending queue. +func (eor *obsExporter) recordLogsEnqueueFailure(ctx context.Context, numLogRecords int) { + _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords))) +} diff --git a/exporter/exporterhelper/obsreport_test.go b/exporter/exporterhelper/obsreport_test.go new file mode 100644 index 00000000000..be7b09d3055 --- /dev/null +++ b/exporter/exporterhelper/obsreport_test.go @@ -0,0 +1,109 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporterhelper + +import ( + "context" + "reflect" + "sort" + "testing" + + "github.com/stretchr/testify/require" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/obsreport/obsreporttest" +) + +func TestExportEnqueueFailure(t *testing.T) { + doneFn, err := obsreporttest.SetupRecordedMetricsTest() + require.NoError(t, err) + defer doneFn() + + exporter := config.NewID("fakeExporter") + + obsrep := newObsExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) + + logRecords := 7 + obsrep.recordLogsEnqueueFailure(context.Background(), logRecords) + checkExporterEnqueueFailedLogsStats(t, exporter, int64(logRecords)) + + spans := 12 + obsrep.recordTracesEnqueueFailure(context.Background(), spans) + checkExporterEnqueueFailedTracesStats(t, exporter, int64(spans)) + + metricPoints := 21 + obsrep.recordMetricsEnqueueFailure(context.Background(), metricPoints) + checkExporterEnqueueFailedMetricsStats(t, exporter, int64(metricPoints)) +} + +// checkExporterEnqueueFailedTracesStats checks that reported number of spans failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func checkExporterEnqueueFailedTracesStats(t *testing.T, exporter config.ComponentID, spans int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans") +} + +// checkExporterEnqueueFailedMetricsStats checks that reported number of metric points failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func checkExporterEnqueueFailedMetricsStats(t *testing.T, exporter config.ComponentID, metricPoints int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points") +} + +// checkExporterEnqueueFailedLogsStats checks that reported number of log records failed to enqueue match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func checkExporterEnqueueFailedLogsStats(t *testing.T, exporter config.ComponentID, logRecords int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records") +} + +// checkValueForView checks that for the current exported value in the view with the given name +// for {LegacyTagKeyReceiver: receiverName} is equal to "value". +func checkValueForView(t *testing.T, wantTags []tag.Tag, value int64, vName string) { + // Make sure the tags slice is sorted by tag keys. + sortTags(wantTags) + + rows, err := view.RetrieveData(vName) + require.NoError(t, err) + + for _, row := range rows { + // Make sure the tags slice is sorted by tag keys. + sortTags(row.Tags) + if reflect.DeepEqual(wantTags, row.Tags) { + sum := row.Data.(*view.SumData) + require.Equal(t, float64(value), sum.Value) + return + } + } + + require.Failf(t, "could not find tags", "wantTags: %s in rows %v", wantTags, rows) +} + +// tagsForExporterView returns the tags that are needed for the exporter views. +func tagsForExporterView(exporter config.ComponentID) []tag.Tag { + return []tag.Tag{ + {Key: exporterTag, Value: exporter.String()}, + } +} + +func sortTags(tags []tag.Tag) { + sort.SliceStable(tags, func(i, j int) bool { + return tags[i].Key.Name() < tags[j].Key.Name() + }) +} diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 0df0c516af9..8f67b4484a3 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/obsreport" ) type tracesRequest struct { @@ -97,7 +96,7 @@ func NewTracesExporter( req := newTracesRequest(ctx, td, pusher) err := be.sender.send(req) if errors.Is(err, errSendingQueueIsFull) { - be.obsrep.RecordTracesEnqueueFailure(req.context(), req.count()) + be.obsrep.recordTracesEnqueueFailure(req.context(), req.count()) } return err }, bs.consumerOptions...) @@ -109,7 +108,7 @@ func NewTracesExporter( } type tracesExporterWithObservability struct { - obsrep *obsreport.Exporter + obsrep *obsExporter nextSender requestSender } diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 19d36d6a3fb..39e93f441f0 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -152,7 +152,7 @@ func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { } // 2 batched must be in queue, and 5 batches (10 spans) rejected due to queue overflow - obsreporttest.CheckExporterEnqueueFailedTraces(t, fakeTracesExporterName, int64(10)) + checkExporterEnqueueFailedTracesStats(t, fakeTracesExporterName, int64(10)) } func TestTracesExporter_WithSpan(t *testing.T) { diff --git a/obsreport/obsreport_exporter.go b/obsreport/obsreport_exporter.go index c82d6149493..784e3a4f8c6 100644 --- a/obsreport/obsreport_exporter.go +++ b/obsreport/obsreport_exporter.go @@ -63,11 +63,6 @@ func (eor *Exporter) EndTracesOp(ctx context.Context, numSpans int, err error) { endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey) } -// RecordTracesEnqueueFailure records number of spans that failed to be added to the sending queue. -func (eor *Exporter) RecordTracesEnqueueFailure(ctx context.Context, numSpans int) { - _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans))) -} - // StartMetricsOp is called at the start of an Export operation. // The returned context should be used in other calls to the Exporter functions // dealing with the same export operation. @@ -83,11 +78,6 @@ func (eor *Exporter) EndMetricsOp(ctx context.Context, numMetricPoints int, err endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentMetricPointsKey, obsmetrics.FailedToSendMetricPointsKey) } -// RecordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue. -func (eor *Exporter) RecordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) { - _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints))) -} - // StartLogsOp is called at the start of an Export operation. // The returned context should be used in other calls to the Exporter functions // dealing with the same export operation. @@ -102,11 +92,6 @@ func (eor *Exporter) EndLogsOp(ctx context.Context, numLogRecords int, err error endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentLogRecordsKey, obsmetrics.FailedToSendLogRecordsKey) } -// RecordLogsEnqueueFailure records number of log records that failed to be added to the sending queue. -func (eor *Exporter) RecordLogsEnqueueFailure(ctx context.Context, numLogRecords int) { - _ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords))) -} - // startSpan creates the span used to trace the operation. Returning // the updated context and the created span. func (eor *Exporter) startSpan(ctx context.Context, operationSuffix string) context.Context { diff --git a/obsreport/obsreport_test.go b/obsreport/obsreport_test.go index f12f38d7739..67d1949a61b 100644 --- a/obsreport/obsreport_test.go +++ b/obsreport/obsreport_test.go @@ -436,26 +436,6 @@ func TestExportLogsOp(t *testing.T) { obsreporttest.CheckExporterLogs(t, exporter, int64(sentLogRecords), int64(failedToSendLogRecords)) } -func TestExportEnqueueFailure(t *testing.T) { - doneFn, err := obsreporttest.SetupRecordedMetricsTest() - require.NoError(t, err) - defer doneFn() - - obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) - - logRecords := 7 - obsrep.RecordLogsEnqueueFailure(context.Background(), logRecords) - obsreporttest.CheckExporterEnqueueFailedLogs(t, exporter, int64(logRecords)) - - spans := 12 - obsrep.RecordTracesEnqueueFailure(context.Background(), spans) - obsreporttest.CheckExporterEnqueueFailedTraces(t, exporter, int64(spans)) - - metricPoints := 21 - obsrep.RecordMetricsEnqueueFailure(context.Background(), metricPoints) - obsreporttest.CheckExporterEnqueueFailedMetrics(t, exporter, int64(metricPoints)) -} - func TestReceiveWithLongLivedCtx(t *testing.T) { ss := &spanStore{} trace.RegisterExporter(ss) diff --git a/obsreport/obsreporttest/obsreporttest.go b/obsreport/obsreporttest/obsreporttest.go index 01eefc6ced9..e0ca299cd0d 100644 --- a/obsreport/obsreporttest/obsreporttest.go +++ b/obsreport/obsreporttest/obsreporttest.go @@ -65,13 +65,6 @@ func CheckExporterTraces(t *testing.T, exporter config.ComponentID, acceptedSpan checkValueForView(t, exporterTags, droppedSpans, "exporter/send_failed_spans") } -// CheckExporterEnqueueFailedTraces checks that reported number of spans failed to enqueue match given values. -// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. -func CheckExporterEnqueueFailedTraces(t *testing.T, exporter config.ComponentID, spans int64) { - exporterTags := tagsForExporterView(exporter) - checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans") -} - // CheckExporterMetrics checks that for the current exported values for metrics exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckExporterMetrics(t *testing.T, exporter config.ComponentID, acceptedMetricsPoints, droppedMetricsPoints int64) { @@ -80,13 +73,6 @@ func CheckExporterMetrics(t *testing.T, exporter config.ComponentID, acceptedMet checkValueForView(t, exporterTags, droppedMetricsPoints, "exporter/send_failed_metric_points") } -// CheckExporterEnqueueFailedMetrics checks that reported number of metric points failed to enqueue match given values. -// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. -func CheckExporterEnqueueFailedMetrics(t *testing.T, exporter config.ComponentID, metricPoints int64) { - exporterTags := tagsForExporterView(exporter) - checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points") -} - // CheckExporterLogs checks that for the current exported values for logs exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckExporterLogs(t *testing.T, exporter config.ComponentID, acceptedLogRecords, droppedLogRecords int64) { @@ -95,13 +81,6 @@ func CheckExporterLogs(t *testing.T, exporter config.ComponentID, acceptedLogRec checkValueForView(t, exporterTags, droppedLogRecords, "exporter/send_failed_log_records") } -// CheckExporterEnqueueFailedLogs checks that reported number of log records failed to enqueue match given values. -// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. -func CheckExporterEnqueueFailedLogs(t *testing.T, exporter config.ComponentID, logRecords int64) { - exporterTags := tagsForExporterView(exporter) - checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records") -} - // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckProcessorTraces(t *testing.T, processor config.ComponentID, acceptedSpans, refusedSpans, droppedSpans int64) {