From b7f9dcb856124c6cf8ef5eb39b6b199da48d8fa3 Mon Sep 17 00:00:00 2001 From: xiami Date: Fri, 21 May 2021 15:39:32 -0700 Subject: [PATCH] Hide OpenCensus reference from public APIs in obsreport package --- exporter/exporterhelper/logs_test.go | 6 +- exporter/exporterhelper/metrics_test.go | 6 +- exporter/exporterhelper/queued_retry.go | 8 +- exporter/exporterhelper/traces_test.go | 7 +- .../obsmetrics/obs_exporter.go | 79 +++++++++ .../obsmetrics/obs_processor.go | 79 +++++++++ .../obsmetrics/obs_receiver.go | 86 +++++++++ .../obsreportconfig/obsmetrics/obs_scraper.go | 50 ++++++ .../obsreportconfig/obsmetrics/obsmetrics.go | 22 +++ internal/obsreportconfig/obsreportconfig.go | 124 +++++++++++++ .../obsreportconfig/obsreportconfig_test.go | 58 ++++++ obsreport/obsreport.go | 102 +---------- obsreport/obsreport_exporter.go | 85 ++------- obsreport/obsreport_processor.go | 131 ++++---------- obsreport/obsreport_receiver.go | 116 +++--------- obsreport/obsreport_scraper.go | 50 ++---- obsreport/obsreport_test.go | 165 +++++++----------- obsreport/obsreporttest/obsreporttest.go | 5 +- processor/batchprocessor/metrics.go | 18 +- processor/processorhelper/processor.go | 4 +- service/telemetry.go | 5 +- 21 files changed, 688 insertions(+), 518 deletions(-) create mode 100644 internal/obsreportconfig/obsmetrics/obs_exporter.go create mode 100644 internal/obsreportconfig/obsmetrics/obs_processor.go create mode 100644 internal/obsreportconfig/obsmetrics/obs_receiver.go create mode 100644 internal/obsreportconfig/obsmetrics/obs_scraper.go create mode 100644 internal/obsreportconfig/obsmetrics/obsmetrics.go create mode 100644 internal/obsreportconfig/obsreportconfig.go create mode 100644 internal/obsreportconfig/obsreportconfig_test.go diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index cbbd2d4628e..7333c727b55 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -30,8 +30,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/obsreport/obsreporttest" ) @@ -222,7 +222,7 @@ func checkWrapSpanForLogsExporter(t *testing.T, le component.LogsExporter, wantE sentLogRecords = 0 failedToSendLogRecords = numLogRecords } - require.Equalf(t, sentLogRecords, sd.Attributes[obsreport.SentLogRecordsKey], "SpanData %v", sd) - require.Equalf(t, failedToSendLogRecords, sd.Attributes[obsreport.FailedToSendLogRecordsKey], "SpanData %v", sd) + require.Equalf(t, sentLogRecords, sd.Attributes[obsmetrics.SentLogRecordsKey], "SpanData %v", sd) + require.Equalf(t, failedToSendLogRecords, sd.Attributes[obsmetrics.FailedToSendLogRecordsKey], "SpanData %v", sd) } } diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 2f1ee8d3e73..61baee86517 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -30,8 +30,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/obsreport/obsreporttest" ) @@ -246,7 +246,7 @@ func checkWrapSpanForMetricsExporter(t *testing.T, me component.MetricsExporter, sentMetricPoints = 0 failedToSendMetricPoints = numMetricPoints } - require.Equalf(t, sentMetricPoints, sd.Attributes[obsreport.SentMetricPointsKey], "SpanData %v", sd) - require.Equalf(t, failedToSendMetricPoints, sd.Attributes[obsreport.FailedToSendMetricPointsKey], "SpanData %v", sd) + require.Equalf(t, sentMetricPoints, sd.Attributes[obsmetrics.SentMetricPointsKey], "SpanData %v", sd) + require.Equalf(t, failedToSendMetricPoints, sd.Attributes[obsmetrics.FailedToSendMetricPointsKey], "SpanData %v", sd) } } diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index ae68dafc467..313a13b241e 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -30,16 +30,16 @@ import ( "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) var ( r = metric.NewRegistry() queueSizeGauge, _ = r.AddInt64DerivedGauge( - obsreport.ExporterKey+"/queue_size", + obsmetrics.ExporterKey+"/queue_size", metric.WithDescription("Current size of the retry queue (in batches)"), - metric.WithLabelKeys(obsreport.ExporterKey), + metric.WithLabelKeys(obsmetrics.ExporterKey), metric.WithUnit(metricdata.UnitDimensionless)) ) @@ -127,7 +127,7 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger { func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { retryStopCh := make(chan struct{}) sampledLogger := createSampledLogger(logger) - traceAttr := trace.StringAttribute(obsreport.ExporterKey, fullName) + traceAttr := trace.StringAttribute(obsmetrics.ExporterKey, fullName) return &queuedRetrySender{ fullName: fullName, cfg: qCfg, diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 1ce9d5dbfe6..65a6345f507 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -16,6 +16,7 @@ package exporterhelper import ( "context" "errors" + "sync" "testing" @@ -31,8 +32,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/obsreport/obsreporttest" ) @@ -239,7 +240,7 @@ func checkWrapSpanForTracesExporter(t *testing.T, te component.TracesExporter, w failedToSendSpans = numSpans } - require.Equalf(t, sentSpans, sd.Attributes[obsreport.SentSpansKey], "SpanData %v", sd) - require.Equalf(t, failedToSendSpans, sd.Attributes[obsreport.FailedToSendSpansKey], "SpanData %v", sd) + require.Equalf(t, sentSpans, sd.Attributes[obsmetrics.SentSpansKey], "SpanData %v", sd) + require.Equalf(t, failedToSendSpans, sd.Attributes[obsmetrics.FailedToSendSpansKey], "SpanData %v", sd) } } diff --git a/internal/obsreportconfig/obsmetrics/obs_exporter.go b/internal/obsreportconfig/obsmetrics/obs_exporter.go new file mode 100644 index 00000000000..01f88ece936 --- /dev/null +++ b/internal/obsreportconfig/obsmetrics/obs_exporter.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package obsmetrics + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/tag" +) + +const ( + // ExporterKey used to identify exporters in metrics and traces. + ExporterKey = "exporter" + + // SentSpansKey used to track spans sent by exporters. + SentSpansKey = "sent_spans" + // FailedToSendSpansKey used to track spans that failed to be sent by exporters. + FailedToSendSpansKey = "send_failed_spans" + + // SentMetricPointsKey used to track metric points sent by exporters. + SentMetricPointsKey = "sent_metric_points" + // FailedToSendMetricPointsKey used to track metric points that failed to be sent by exporters. + FailedToSendMetricPointsKey = "send_failed_metric_points" + + // SentLogRecordsKey used to track logs sent by exporters. + SentLogRecordsKey = "sent_log_records" + // FailedToSendLogRecordsKey used to track logs that failed to be sent by exporters. + FailedToSendLogRecordsKey = "send_failed_log_records" +) + +var ( + TagKeyExporter, _ = tag.NewKey(ExporterKey) + + ExporterPrefix = ExporterKey + NameSep + ExportTraceDataOperationSuffix = NameSep + "traces" + ExportMetricsOperationSuffix = NameSep + "metrics" + ExportLogsOperationSuffix = NameSep + "logs" + + // Exporter metrics. Any count of data items below is in the final format + // that they were sent, reasoning: reconciliation is easier if measurements + // on backend and exporter are expected to be the same. Translation issues + // that result in a different number of elements should be reported in a + // separate way. + ExporterSentSpans = stats.Int64( + ExporterPrefix+SentSpansKey, + "Number of spans successfully sent to destination.", + stats.UnitDimensionless) + ExporterFailedToSendSpans = stats.Int64( + ExporterPrefix+FailedToSendSpansKey, + "Number of spans in failed attempts to send to destination.", + stats.UnitDimensionless) + ExporterSentMetricPoints = stats.Int64( + ExporterPrefix+SentMetricPointsKey, + "Number of metric points successfully sent to destination.", + stats.UnitDimensionless) + ExporterFailedToSendMetricPoints = stats.Int64( + ExporterPrefix+FailedToSendMetricPointsKey, + "Number of metric points in failed attempts to send to destination.", + stats.UnitDimensionless) + ExporterSentLogRecords = stats.Int64( + ExporterPrefix+SentLogRecordsKey, + "Number of log record successfully sent to destination.", + stats.UnitDimensionless) + ExporterFailedToSendLogRecords = stats.Int64( + ExporterPrefix+FailedToSendLogRecordsKey, + "Number of log records in failed attempts to send to destination.", + stats.UnitDimensionless) +) diff --git a/internal/obsreportconfig/obsmetrics/obs_processor.go b/internal/obsreportconfig/obsmetrics/obs_processor.go new file mode 100644 index 00000000000..ea4a9a51751 --- /dev/null +++ b/internal/obsreportconfig/obsmetrics/obs_processor.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package obsmetrics + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/tag" +) + +const ( + // ProcessorKey is the key used to identify processors in metrics and traces. + ProcessorKey = "processor" + + // DroppedSpansKey is the key used to identify spans dropped by the Collector. + DroppedSpansKey = "dropped_spans" + + // DroppedMetricPointsKey is the key used to identify metric points dropped by the Collector. + DroppedMetricPointsKey = "dropped_metric_points" + + // DroppedLogRecordsKey is the key used to identify log records dropped by the Collector. + DroppedLogRecordsKey = "dropped_log_records" +) + +var ( + TagKeyProcessor, _ = tag.NewKey(ProcessorKey) + + ProcessorPrefix = ProcessorKey + NameSep + + // Processor metrics. Any count of data items below is in the internal format + // of the collector since processors only deal with internal format. + ProcessorAcceptedSpans = stats.Int64( + ProcessorPrefix+AcceptedSpansKey, + "Number of spans successfully pushed into the next component in the pipeline.", + stats.UnitDimensionless) + ProcessorRefusedSpans = stats.Int64( + ProcessorPrefix+RefusedSpansKey, + "Number of spans that were rejected by the next component in the pipeline.", + stats.UnitDimensionless) + ProcessorDroppedSpans = stats.Int64( + ProcessorPrefix+DroppedSpansKey, + "Number of spans that were dropped.", + stats.UnitDimensionless) + ProcessorAcceptedMetricPoints = stats.Int64( + ProcessorPrefix+AcceptedMetricPointsKey, + "Number of metric points successfully pushed into the next component in the pipeline.", + stats.UnitDimensionless) + ProcessorRefusedMetricPoints = stats.Int64( + ProcessorPrefix+RefusedMetricPointsKey, + "Number of metric points that were rejected by the next component in the pipeline.", + stats.UnitDimensionless) + ProcessorDroppedMetricPoints = stats.Int64( + ProcessorPrefix+DroppedMetricPointsKey, + "Number of metric points that were dropped.", + stats.UnitDimensionless) + ProcessorAcceptedLogRecords = stats.Int64( + ProcessorPrefix+AcceptedLogRecordsKey, + "Number of log records successfully pushed into the next component in the pipeline.", + stats.UnitDimensionless) + ProcessorRefusedLogRecords = stats.Int64( + ProcessorPrefix+RefusedLogRecordsKey, + "Number of log records that were rejected by the next component in the pipeline.", + stats.UnitDimensionless) + ProcessorDroppedLogRecords = stats.Int64( + ProcessorPrefix+DroppedLogRecordsKey, + "Number of log records that were dropped.", + stats.UnitDimensionless) +) diff --git a/internal/obsreportconfig/obsmetrics/obs_receiver.go b/internal/obsreportconfig/obsmetrics/obs_receiver.go new file mode 100644 index 00000000000..8cc7bf8b296 --- /dev/null +++ b/internal/obsreportconfig/obsmetrics/obs_receiver.go @@ -0,0 +1,86 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package obsmetrics + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/tag" +) + +const ( + // ReceiverKey used to identify receivers in metrics and traces. + ReceiverKey = "receiver" + // TransportKey used to identify the transport used to received the data. + TransportKey = "transport" + // FormatKey used to identify the format of the data received. + FormatKey = "format" + + // AcceptedSpansKey used to identify spans accepted by the Collector. + AcceptedSpansKey = "accepted_spans" + // RefusedSpansKey used to identify spans refused (ie.: not ingested) by the Collector. + RefusedSpansKey = "refused_spans" + + // AcceptedMetricPointsKey used to identify metric points accepted by the Collector. + AcceptedMetricPointsKey = "accepted_metric_points" + // RefusedMetricPointsKey used to identify metric points refused (ie.: not ingested) by the + // Collector. + RefusedMetricPointsKey = "refused_metric_points" + + // AcceptedLogRecordsKey used to identify log records accepted by the Collector. + AcceptedLogRecordsKey = "accepted_log_records" + // RefusedLogRecordsKey used to identify log records refused (ie.: not ingested) by the + // Collector. + RefusedLogRecordsKey = "refused_log_records" +) + +var ( + TagKeyReceiver, _ = tag.NewKey(ReceiverKey) + TagKeyTransport, _ = tag.NewKey(TransportKey) + + ReceiverPrefix = ReceiverKey + NameSep + ReceiveTraceDataOperationSuffix = NameSep + "TraceDataReceived" + ReceiverMetricsOperationSuffix = NameSep + "MetricsReceived" + ReceiverLogsOperationSuffix = NameSep + "LogsReceived" + + // Receiver metrics. Any count of data items below is in the original format + // that they were received, reasoning: reconciliation is easier if measurement + // on clients and receiver are expected to be the same. Translation issues + // that result in a different number of elements should be reported in a + // separate way. + ReceiverAcceptedSpans = stats.Int64( + ReceiverPrefix+AcceptedSpansKey, + "Number of spans successfully pushed into the pipeline.", + stats.UnitDimensionless) + ReceiverRefusedSpans = stats.Int64( + ReceiverPrefix+RefusedSpansKey, + "Number of spans that could not be pushed into the pipeline.", + stats.UnitDimensionless) + ReceiverAcceptedMetricPoints = stats.Int64( + ReceiverPrefix+AcceptedMetricPointsKey, + "Number of metric points successfully pushed into the pipeline.", + stats.UnitDimensionless) + ReceiverRefusedMetricPoints = stats.Int64( + ReceiverPrefix+RefusedMetricPointsKey, + "Number of metric points that could not be pushed into the pipeline.", + stats.UnitDimensionless) + ReceiverAcceptedLogRecords = stats.Int64( + ReceiverPrefix+AcceptedLogRecordsKey, + "Number of log records successfully pushed into the pipeline.", + stats.UnitDimensionless) + ReceiverRefusedLogRecords = stats.Int64( + ReceiverPrefix+RefusedLogRecordsKey, + "Number of log records that could not be pushed into the pipeline.", + stats.UnitDimensionless) +) diff --git a/internal/obsreportconfig/obsmetrics/obs_scraper.go b/internal/obsreportconfig/obsmetrics/obs_scraper.go new file mode 100644 index 00000000000..dedbe4953ef --- /dev/null +++ b/internal/obsreportconfig/obsmetrics/obs_scraper.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package obsmetrics + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/tag" +) + +const ( + // ScraperKey used to identify scrapers in metrics and traces. + ScraperKey = "scraper" + + // ScrapedMetricPointsKey used to identify metric points scraped by the + // Collector. + ScrapedMetricPointsKey = "scraped_metric_points" + // ErroredMetricPointsKey used to identify metric points errored (i.e. + // unable to be scraped) by the Collector. + ErroredMetricPointsKey = "errored_metric_points" +) + +const ( + ScraperPrefix = ScraperKey + NameSep + ScraperMetricsOperationSuffix = NameSep + "MetricsScraped" +) + +var ( + TagKeyScraper, _ = tag.NewKey(ScraperKey) + + ScraperScrapedMetricPoints = stats.Int64( + ScraperPrefix+ScrapedMetricPointsKey, + "Number of metric points successfully scraped.", + stats.UnitDimensionless) + ScraperErroredMetricPoints = stats.Int64( + ScraperPrefix+ErroredMetricPointsKey, + "Number of metric points that were unable to be scraped.", + stats.UnitDimensionless) +) diff --git a/internal/obsreportconfig/obsmetrics/obsmetrics.go b/internal/obsreportconfig/obsmetrics/obsmetrics.go new file mode 100644 index 00000000000..fe7ea208116 --- /dev/null +++ b/internal/obsreportconfig/obsmetrics/obsmetrics.go @@ -0,0 +1,22 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package obsmetrics defines the obsreport metrics for each components +// all the metrics is in OpenCensus format which will be replaced with OTEL Metrics +// in the future +package obsmetrics + +const ( + NameSep = "/" +) diff --git a/internal/obsreportconfig/obsreportconfig.go b/internal/obsreportconfig/obsreportconfig.go new file mode 100644 index 00000000000..da1ce262ff7 --- /dev/null +++ b/internal/obsreportconfig/obsreportconfig.go @@ -0,0 +1,124 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package obsreportconfig + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" +) + +var ( + Level = configtelemetry.LevelBasic +) + +// ObsMetrics wraps OpenCensus View for Collector observability metrics +type ObsMetrics struct { + Views []*view.View +} + +// Configure is used to control the settings that will be used by the obsreport +// package. +func Configure(level configtelemetry.Level) *ObsMetrics { + Level = level + var views []*view.View + + if Level != configtelemetry.LevelNone { + obsMetricViews := allViews() + views = append(views, obsMetricViews.Views...) + } + + return &ObsMetrics{ + Views: views, + } +} + +// allViews return the list of all views that needs to be configured. +func allViews() *ObsMetrics { + var views []*view.View + // Receiver views. + measures := []*stats.Int64Measure{ + obsmetrics.ReceiverAcceptedSpans, + obsmetrics.ReceiverRefusedSpans, + obsmetrics.ReceiverAcceptedMetricPoints, + obsmetrics.ReceiverRefusedMetricPoints, + obsmetrics.ReceiverAcceptedLogRecords, + obsmetrics.ReceiverRefusedLogRecords, + } + tagKeys := []tag.Key{ + obsmetrics.TagKeyReceiver, obsmetrics.TagKeyTransport, + } + views = append(views, genViews(measures, tagKeys, view.Sum())...) + + // Scraper views. + measures = []*stats.Int64Measure{ + obsmetrics.ScraperScrapedMetricPoints, + obsmetrics.ScraperErroredMetricPoints, + } + tagKeys = []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper} + views = append(views, genViews(measures, tagKeys, view.Sum())...) + + // Exporter views. + measures = []*stats.Int64Measure{ + obsmetrics.ExporterSentSpans, + obsmetrics.ExporterFailedToSendSpans, + obsmetrics.ExporterSentMetricPoints, + obsmetrics.ExporterFailedToSendMetricPoints, + obsmetrics.ExporterSentLogRecords, + obsmetrics.ExporterFailedToSendLogRecords, + } + tagKeys = []tag.Key{obsmetrics.TagKeyExporter} + views = append(views, genViews(measures, tagKeys, view.Sum())...) + + // Processor views. + measures = []*stats.Int64Measure{ + obsmetrics.ProcessorAcceptedSpans, + obsmetrics.ProcessorRefusedSpans, + obsmetrics.ProcessorDroppedSpans, + obsmetrics.ProcessorAcceptedMetricPoints, + obsmetrics.ProcessorRefusedMetricPoints, + obsmetrics.ProcessorDroppedMetricPoints, + obsmetrics.ProcessorAcceptedLogRecords, + obsmetrics.ProcessorRefusedLogRecords, + obsmetrics.ProcessorDroppedLogRecords, + } + tagKeys = []tag.Key{obsmetrics.TagKeyProcessor} + views = append(views, genViews(measures, tagKeys, view.Sum())...) + + return &ObsMetrics{ + Views: views, + } +} + +func genViews( + measures []*stats.Int64Measure, + tagKeys []tag.Key, + aggregation *view.Aggregation, +) []*view.View { + views := make([]*view.View, 0, len(measures)) + for _, measure := range measures { + views = append(views, &view.View{ + Name: measure.Name(), + Description: measure.Description(), + TagKeys: tagKeys, + Measure: measure, + Aggregation: aggregation, + }) + } + return views +} diff --git a/internal/obsreportconfig/obsreportconfig_test.go b/internal/obsreportconfig/obsreportconfig_test.go new file mode 100644 index 00000000000..91a011b1902 --- /dev/null +++ b/internal/obsreportconfig/obsreportconfig_test.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package obsreportconfig + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opencensus.io/stats/view" + + "go.opentelemetry.io/collector/config/configtelemetry" +) + +func TestConfigure(t *testing.T) { + tests := []struct { + name string + level configtelemetry.Level + wantViews []*view.View + }{ + { + name: "none", + level: configtelemetry.LevelNone, + }, + { + name: "basic", + level: configtelemetry.LevelBasic, + wantViews: allViews().Views, + }, + { + name: "normal", + level: configtelemetry.LevelNormal, + wantViews: allViews().Views, + }, + { + name: "detailed", + level: configtelemetry.LevelDetailed, + wantViews: allViews().Views, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotViews := Configure(tt.level) + assert.Equal(t, tt.wantViews, gotViews.Views) + }) + } +} diff --git a/obsreport/obsreport.go b/obsreport/obsreport.go index 8bab9ca1c55..c1e12804eb7 100644 --- a/obsreport/obsreport.go +++ b/obsreport/obsreport.go @@ -18,21 +18,12 @@ import ( "context" "strings" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" "go.opencensus.io/trace" - "go.opentelemetry.io/collector/config/configtelemetry" -) - -const ( - nameSep = "/" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) var ( - gLevel = configtelemetry.LevelBasic - okStatus = trace.Status{Code: trace.StatusCodeOK} ) @@ -60,99 +51,14 @@ func setParentLink(parentCtx context.Context, childSpan *trace.Span) bool { return true } -// Configure is used to control the settings that will be used by the obsreport -// package. -func Configure(level configtelemetry.Level) (views []*view.View) { - gLevel = level - - if gLevel != configtelemetry.LevelNone { - gProcessor.level = level - views = append(views, AllViews()...) - } - - return views -} - func buildComponentPrefix(componentPrefix, configType string) string { - if !strings.HasSuffix(componentPrefix, nameSep) { - componentPrefix += nameSep + if !strings.HasSuffix(componentPrefix, obsmetrics.NameSep) { + componentPrefix += obsmetrics.NameSep } if configType == "" { return componentPrefix } - return componentPrefix + configType + nameSep -} - -// AllViews return the list of all views that needs to be configured. -func AllViews() (views []*view.View) { - // Receiver views. - measures := []*stats.Int64Measure{ - mReceiverAcceptedSpans, - mReceiverRefusedSpans, - mReceiverAcceptedMetricPoints, - mReceiverRefusedMetricPoints, - mReceiverAcceptedLogRecords, - mReceiverRefusedLogRecords, - } - tagKeys := []tag.Key{ - tagKeyReceiver, tagKeyTransport, - } - views = append(views, genViews(measures, tagKeys, view.Sum())...) - - // Scraper views. - measures = []*stats.Int64Measure{ - mScraperScrapedMetricPoints, - mScraperErroredMetricPoints, - } - tagKeys = []tag.Key{tagKeyReceiver, tagKeyScraper} - views = append(views, genViews(measures, tagKeys, view.Sum())...) - - // Exporter views. - measures = []*stats.Int64Measure{ - mExporterSentSpans, - mExporterFailedToSendSpans, - mExporterSentMetricPoints, - mExporterFailedToSendMetricPoints, - mExporterSentLogRecords, - mExporterFailedToSendLogRecords, - } - tagKeys = []tag.Key{tagKeyExporter} - views = append(views, genViews(measures, tagKeys, view.Sum())...) - - // Processor views. - measures = []*stats.Int64Measure{ - mProcessorAcceptedSpans, - mProcessorRefusedSpans, - mProcessorDroppedSpans, - mProcessorAcceptedMetricPoints, - mProcessorRefusedMetricPoints, - mProcessorDroppedMetricPoints, - mProcessorAcceptedLogRecords, - mProcessorRefusedLogRecords, - mProcessorDroppedLogRecords, - } - tagKeys = []tag.Key{tagKeyProcessor} - views = append(views, genViews(measures, tagKeys, view.Sum())...) - - return views -} - -func genViews( - measures []*stats.Int64Measure, - tagKeys []tag.Key, - aggregation *view.Aggregation, -) []*view.View { - views := make([]*view.View, 0, len(measures)) - for _, measure := range measures { - views = append(views, &view.View{ - Name: measure.Name(), - Description: measure.Description(), - TagKeys: tagKeys, - Measure: measure, - Aggregation: aggregation, - }) - } - return views + return componentPrefix + configType + obsmetrics.NameSep } func errToStatus(err error) trace.Status { diff --git a/obsreport/obsreport_exporter.go b/obsreport/obsreport_exporter.go index 1d8f5497632..9671dce146e 100644 --- a/obsreport/obsreport_exporter.go +++ b/obsreport/obsreport_exporter.go @@ -23,65 +23,8 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtelemetry" -) - -const ( - // ExporterKey used to identify exporters in metrics and traces. - ExporterKey = "exporter" - - // SentSpansKey used to track spans sent by exporters. - SentSpansKey = "sent_spans" - // FailedToSendSpansKey used to track spans that failed to be sent by exporters. - FailedToSendSpansKey = "send_failed_spans" - - // SentMetricPointsKey used to track metric points sent by exporters. - SentMetricPointsKey = "sent_metric_points" - // FailedToSendMetricPointsKey used to track metric points that failed to be sent by exporters. - FailedToSendMetricPointsKey = "send_failed_metric_points" - - // SentLogRecordsKey used to track logs sent by exporters. - SentLogRecordsKey = "sent_log_records" - // FailedToSendLogRecordsKey used to track logs that failed to be sent by exporters. - FailedToSendLogRecordsKey = "send_failed_log_records" -) - -var ( - tagKeyExporter, _ = tag.NewKey(ExporterKey) - - exporterPrefix = ExporterKey + nameSep - exportTraceDataOperationSuffix = nameSep + "traces" - exportMetricsOperationSuffix = nameSep + "metrics" - exportLogsOperationSuffix = nameSep + "logs" - - // Exporter metrics. Any count of data items below is in the final format - // that they were sent, reasoning: reconciliation is easier if measurements - // on backend and exporter are expected to be the same. Translation issues - // that result in a different number of elements should be reported in a - // separate way. - mExporterSentSpans = stats.Int64( - exporterPrefix+SentSpansKey, - "Number of spans successfully sent to destination.", - stats.UnitDimensionless) - mExporterFailedToSendSpans = stats.Int64( - exporterPrefix+FailedToSendSpansKey, - "Number of spans in failed attempts to send to destination.", - stats.UnitDimensionless) - mExporterSentMetricPoints = stats.Int64( - exporterPrefix+SentMetricPointsKey, - "Number of metric points successfully sent to destination.", - stats.UnitDimensionless) - mExporterFailedToSendMetricPoints = stats.Int64( - exporterPrefix+FailedToSendMetricPointsKey, - "Number of metric points in failed attempts to send to destination.", - stats.UnitDimensionless) - mExporterSentLogRecords = stats.Int64( - exporterPrefix+SentLogRecordsKey, - "Number of log record successfully sent to destination.", - stats.UnitDimensionless) - mExporterFailedToSendLogRecords = stats.Int64( - exporterPrefix+FailedToSendLogRecordsKey, - "Number of log records in failed attempts to send to destination.", - stats.UnitDimensionless) + "go.opentelemetry.io/collector/internal/obsreportconfig" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) // Exporter is a helper to add observability to a component.Exporter. @@ -102,7 +45,7 @@ func NewExporter(cfg ExporterSettings) *Exporter { return &Exporter{ level: cfg.Level, exporterName: cfg.ExporterID.String(), - mutators: []tag.Mutator{tag.Upsert(tagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))}, + mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))}, } } @@ -110,55 +53,55 @@ func NewExporter(cfg ExporterSettings) *Exporter { // The returned context should be used in other calls to the Exporter functions // dealing with the same export operation. func (eor *Exporter) StartTracesExportOp(ctx context.Context) context.Context { - return eor.startSpan(ctx, exportTraceDataOperationSuffix) + return eor.startSpan(ctx, obsmetrics.ExportTraceDataOperationSuffix) } // EndTracesExportOp completes the export operation that was started with StartTracesExportOp. func (eor *Exporter) EndTracesExportOp(ctx context.Context, numSpans int, err error) { numSent, numFailedToSend := toNumItems(numSpans, err) - eor.recordMetrics(ctx, numSent, numFailedToSend, mExporterSentSpans, mExporterFailedToSendSpans) - endSpan(ctx, err, numSent, numFailedToSend, SentSpansKey, FailedToSendSpansKey) + eor.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentSpans, obsmetrics.ExporterFailedToSendSpans) + endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey) } // StartMetricsExportOp is called at the start of an Export operation. // The returned context should be used in other calls to the Exporter functions // dealing with the same export operation. func (eor *Exporter) StartMetricsExportOp(ctx context.Context) context.Context { - return eor.startSpan(ctx, exportMetricsOperationSuffix) + return eor.startSpan(ctx, obsmetrics.ExportMetricsOperationSuffix) } // EndMetricsExportOp completes the export operation that was started with // StartMetricsExportOp. func (eor *Exporter) EndMetricsExportOp(ctx context.Context, numMetricPoints int, err error) { numSent, numFailedToSend := toNumItems(numMetricPoints, err) - eor.recordMetrics(ctx, numSent, numFailedToSend, mExporterSentMetricPoints, mExporterFailedToSendMetricPoints) - endSpan(ctx, err, numSent, numFailedToSend, SentMetricPointsKey, FailedToSendMetricPointsKey) + eor.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentMetricPoints, obsmetrics.ExporterFailedToSendMetricPoints) + endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentMetricPointsKey, obsmetrics.FailedToSendMetricPointsKey) } // StartLogsExportOp is called at the start of an Export operation. // The returned context should be used in other calls to the Exporter functions // dealing with the same export operation. func (eor *Exporter) StartLogsExportOp(ctx context.Context) context.Context { - return eor.startSpan(ctx, exportLogsOperationSuffix) + return eor.startSpan(ctx, obsmetrics.ExportLogsOperationSuffix) } // EndLogsExportOp completes the export operation that was started with StartLogsExportOp. func (eor *Exporter) EndLogsExportOp(ctx context.Context, numLogRecords int, err error) { numSent, numFailedToSend := toNumItems(numLogRecords, err) - eor.recordMetrics(ctx, numSent, numFailedToSend, mExporterSentLogRecords, mExporterFailedToSendLogRecords) - endSpan(ctx, err, numSent, numFailedToSend, SentLogRecordsKey, FailedToSendLogRecordsKey) + eor.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentLogRecords, obsmetrics.ExporterFailedToSendLogRecords) + endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentLogRecordsKey, obsmetrics.FailedToSendLogRecordsKey) } // startSpan creates the span used to trace the operation. Returning // the updated context and the created span. func (eor *Exporter) startSpan(ctx context.Context, operationSuffix string) context.Context { - spanName := exporterPrefix + eor.exporterName + operationSuffix + spanName := obsmetrics.ExporterPrefix + eor.exporterName + operationSuffix ctx, _ = trace.StartSpan(ctx, spanName) return ctx } func (eor *Exporter) recordMetrics(ctx context.Context, numSent, numFailedToSend int64, sentMeasure, failedToSendMeasure *stats.Int64Measure) { - if gLevel == configtelemetry.LevelNone { + if obsreportconfig.Level == configtelemetry.LevelNone { return } // Ignore the error for now. This should not happen. diff --git a/obsreport/obsreport_processor.go b/obsreport/obsreport_processor.go index e048f329bec..83a90f1bdca 100644 --- a/obsreport/obsreport_processor.go +++ b/obsreport/obsreport_processor.go @@ -23,79 +23,22 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtelemetry" -) - -const ( - // ProcessorKey is the key used to identify processors in metrics and traces. - ProcessorKey = "processor" - - // DroppedSpansKey is the key used to identify spans dropped by the Collector. - DroppedSpansKey = "dropped_spans" - - // DroppedMetricPointsKey is the key used to identify metric points dropped by the Collector. - DroppedMetricPointsKey = "dropped_metric_points" - - // DroppedLogRecordsKey is the key used to identify log records dropped by the Collector. - DroppedLogRecordsKey = "dropped_log_records" -) - -var ( - tagKeyProcessor, _ = tag.NewKey(ProcessorKey) - - processorPrefix = ProcessorKey + nameSep - - // Processor metrics. Any count of data items below is in the internal format - // of the collector since processors only deal with internal format. - mProcessorAcceptedSpans = stats.Int64( - processorPrefix+AcceptedSpansKey, - "Number of spans successfully pushed into the next component in the pipeline.", - stats.UnitDimensionless) - mProcessorRefusedSpans = stats.Int64( - processorPrefix+RefusedSpansKey, - "Number of spans that were rejected by the next component in the pipeline.", - stats.UnitDimensionless) - mProcessorDroppedSpans = stats.Int64( - processorPrefix+DroppedSpansKey, - "Number of spans that were dropped.", - stats.UnitDimensionless) - mProcessorAcceptedMetricPoints = stats.Int64( - processorPrefix+AcceptedMetricPointsKey, - "Number of metric points successfully pushed into the next component in the pipeline.", - stats.UnitDimensionless) - mProcessorRefusedMetricPoints = stats.Int64( - processorPrefix+RefusedMetricPointsKey, - "Number of metric points that were rejected by the next component in the pipeline.", - stats.UnitDimensionless) - mProcessorDroppedMetricPoints = stats.Int64( - processorPrefix+DroppedMetricPointsKey, - "Number of metric points that were dropped.", - stats.UnitDimensionless) - mProcessorAcceptedLogRecords = stats.Int64( - processorPrefix+AcceptedLogRecordsKey, - "Number of log records successfully pushed into the next component in the pipeline.", - stats.UnitDimensionless) - mProcessorRefusedLogRecords = stats.Int64( - processorPrefix+RefusedLogRecordsKey, - "Number of log records that were rejected by the next component in the pipeline.", - stats.UnitDimensionless) - mProcessorDroppedLogRecords = stats.Int64( - processorPrefix+DroppedLogRecordsKey, - "Number of log records that were dropped.", - stats.UnitDimensionless) + "go.opentelemetry.io/collector/internal/obsreportconfig" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) // BuildProcessorCustomMetricName is used to be build a metric name following // the standards used in the Collector. The configType should be the same // value used to identify the type on the config. func BuildProcessorCustomMetricName(configType, metric string) string { - return buildComponentPrefix(processorPrefix, configType) + metric + return buildComponentPrefix(obsmetrics.ProcessorPrefix, configType) + metric } // ProcessorMetricViews builds the metric views for custom metrics of processors. -func ProcessorMetricViews(configType string, legacyViews []*view.View) []*view.View { +func ProcessorMetricViews(configType string, legacyViews *obsreportconfig.ObsMetrics) *obsreportconfig.ObsMetrics { var allViews []*view.View - if gLevel != configtelemetry.LevelNone { - for _, legacyView := range legacyViews { + if obsreportconfig.Level != configtelemetry.LevelNone { + for _, legacyView := range legacyViews.Views { // Ignore any nil entry and views without measure or aggregation. // These can't be registered but some code registering legacy views may // ignore the errors. @@ -112,11 +55,11 @@ func ProcessorMetricViews(configType string, legacyViews []*view.View) []*view.V } } - return allViews + return &obsreportconfig.ObsMetrics{ + Views: allViews, + } } -var gProcessor = &Processor{level: configtelemetry.LevelNone} - // Processor is a helper to add observability to a component.Processor. type Processor struct { level configtelemetry.Level @@ -133,7 +76,7 @@ type ProcessorSettings struct { func NewProcessor(cfg ProcessorSettings) *Processor { return &Processor{ level: cfg.Level, - mutators: []tag.Mutator{tag.Upsert(tagKeyProcessor, cfg.ProcessorID.String(), tag.WithTTL(tag.TTLNoPropagation))}, + mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyProcessor, cfg.ProcessorID.String(), tag.WithTTL(tag.TTLNoPropagation))}, } } @@ -143,9 +86,9 @@ func (por *Processor) TracesAccepted(ctx context.Context, numSpans int) { stats.RecordWithTags( ctx, por.mutators, - mProcessorAcceptedSpans.M(int64(numSpans)), - mProcessorRefusedSpans.M(0), - mProcessorDroppedSpans.M(0), + obsmetrics.ProcessorAcceptedSpans.M(int64(numSpans)), + obsmetrics.ProcessorRefusedSpans.M(0), + obsmetrics.ProcessorDroppedSpans.M(0), ) } } @@ -156,9 +99,9 @@ func (por *Processor) TracesRefused(ctx context.Context, numSpans int) { stats.RecordWithTags( ctx, por.mutators, - mProcessorAcceptedSpans.M(0), - mProcessorRefusedSpans.M(int64(numSpans)), - mProcessorDroppedSpans.M(0), + obsmetrics.ProcessorAcceptedSpans.M(0), + obsmetrics.ProcessorRefusedSpans.M(int64(numSpans)), + obsmetrics.ProcessorDroppedSpans.M(0), ) } } @@ -169,9 +112,9 @@ func (por *Processor) TracesDropped(ctx context.Context, numSpans int) { stats.RecordWithTags( ctx, por.mutators, - mProcessorAcceptedSpans.M(0), - mProcessorRefusedSpans.M(0), - mProcessorDroppedSpans.M(int64(numSpans)), + obsmetrics.ProcessorAcceptedSpans.M(0), + obsmetrics.ProcessorRefusedSpans.M(0), + obsmetrics.ProcessorDroppedSpans.M(int64(numSpans)), ) } } @@ -182,9 +125,9 @@ func (por *Processor) MetricsAccepted(ctx context.Context, numPoints int) { stats.RecordWithTags( ctx, por.mutators, - mProcessorAcceptedMetricPoints.M(int64(numPoints)), - mProcessorRefusedMetricPoints.M(0), - mProcessorDroppedMetricPoints.M(0), + obsmetrics.ProcessorAcceptedMetricPoints.M(int64(numPoints)), + obsmetrics.ProcessorRefusedMetricPoints.M(0), + obsmetrics.ProcessorDroppedMetricPoints.M(0), ) } } @@ -195,9 +138,9 @@ func (por *Processor) MetricsRefused(ctx context.Context, numPoints int) { stats.RecordWithTags( ctx, por.mutators, - mProcessorAcceptedMetricPoints.M(0), - mProcessorRefusedMetricPoints.M(int64(numPoints)), - mProcessorDroppedMetricPoints.M(0), + obsmetrics.ProcessorAcceptedMetricPoints.M(0), + obsmetrics.ProcessorRefusedMetricPoints.M(int64(numPoints)), + obsmetrics.ProcessorDroppedMetricPoints.M(0), ) } } @@ -208,9 +151,9 @@ func (por *Processor) MetricsDropped(ctx context.Context, numPoints int) { stats.RecordWithTags( ctx, por.mutators, - mProcessorAcceptedMetricPoints.M(0), - mProcessorRefusedMetricPoints.M(0), - mProcessorDroppedMetricPoints.M(int64(numPoints)), + obsmetrics.ProcessorAcceptedMetricPoints.M(0), + obsmetrics.ProcessorRefusedMetricPoints.M(0), + obsmetrics.ProcessorDroppedMetricPoints.M(int64(numPoints)), ) } } @@ -221,9 +164,9 @@ func (por *Processor) LogsAccepted(ctx context.Context, numRecords int) { stats.RecordWithTags( ctx, por.mutators, - mProcessorAcceptedLogRecords.M(int64(numRecords)), - mProcessorRefusedLogRecords.M(0), - mProcessorDroppedLogRecords.M(0), + obsmetrics.ProcessorAcceptedLogRecords.M(int64(numRecords)), + obsmetrics.ProcessorRefusedLogRecords.M(0), + obsmetrics.ProcessorDroppedLogRecords.M(0), ) } } @@ -234,9 +177,9 @@ func (por *Processor) LogsRefused(ctx context.Context, numRecords int) { stats.RecordWithTags( ctx, por.mutators, - mProcessorAcceptedLogRecords.M(0), - mProcessorRefusedLogRecords.M(int64(numRecords)), - mProcessorDroppedMetricPoints.M(0), + obsmetrics.ProcessorAcceptedLogRecords.M(0), + obsmetrics.ProcessorRefusedLogRecords.M(int64(numRecords)), + obsmetrics.ProcessorDroppedMetricPoints.M(0), ) } } @@ -247,9 +190,9 @@ func (por *Processor) LogsDropped(ctx context.Context, numRecords int) { stats.RecordWithTags( ctx, por.mutators, - mProcessorAcceptedLogRecords.M(0), - mProcessorRefusedLogRecords.M(0), - mProcessorDroppedLogRecords.M(int64(numRecords)), + obsmetrics.ProcessorAcceptedLogRecords.M(0), + obsmetrics.ProcessorRefusedLogRecords.M(0), + obsmetrics.ProcessorDroppedLogRecords.M(int64(numRecords)), ) } } diff --git a/obsreport/obsreport_receiver.go b/obsreport/obsreport_receiver.go index 4cf9eb3d080..7467a9cd5a8 100644 --- a/obsreport/obsreport_receiver.go +++ b/obsreport/obsreport_receiver.go @@ -23,72 +23,8 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtelemetry" -) - -const ( - // ReceiverKey used to identify receivers in metrics and traces. - ReceiverKey = "receiver" - // TransportKey used to identify the transport used to received the data. - TransportKey = "transport" - // FormatKey used to identify the format of the data received. - FormatKey = "format" - - // AcceptedSpansKey used to identify spans accepted by the Collector. - AcceptedSpansKey = "accepted_spans" - // RefusedSpansKey used to identify spans refused (ie.: not ingested) by the Collector. - RefusedSpansKey = "refused_spans" - - // AcceptedMetricPointsKey used to identify metric points accepted by the Collector. - AcceptedMetricPointsKey = "accepted_metric_points" - // RefusedMetricPointsKey used to identify metric points refused (ie.: not ingested) by the - // Collector. - RefusedMetricPointsKey = "refused_metric_points" - - // AcceptedLogRecordsKey used to identify log records accepted by the Collector. - AcceptedLogRecordsKey = "accepted_log_records" - // RefusedLogRecordsKey used to identify log records refused (ie.: not ingested) by the - // Collector. - RefusedLogRecordsKey = "refused_log_records" -) - -var ( - tagKeyReceiver, _ = tag.NewKey(ReceiverKey) - tagKeyTransport, _ = tag.NewKey(TransportKey) - - receiverPrefix = ReceiverKey + nameSep - receiveTraceDataOperationSuffix = nameSep + "TraceDataReceived" - receiverMetricsOperationSuffix = nameSep + "MetricsReceived" - receiverLogsOperationSuffix = nameSep + "LogsReceived" - - // Receiver metrics. Any count of data items below is in the original format - // that they were received, reasoning: reconciliation is easier if measurements - // on clients and receiver are expected to be the same. Translation issues - // that result in a different number of elements should be reported in a - // separate way. - mReceiverAcceptedSpans = stats.Int64( - receiverPrefix+AcceptedSpansKey, - "Number of spans successfully pushed into the pipeline.", - stats.UnitDimensionless) - mReceiverRefusedSpans = stats.Int64( - receiverPrefix+RefusedSpansKey, - "Number of spans that could not be pushed into the pipeline.", - stats.UnitDimensionless) - mReceiverAcceptedMetricPoints = stats.Int64( - receiverPrefix+AcceptedMetricPointsKey, - "Number of metric points successfully pushed into the pipeline.", - stats.UnitDimensionless) - mReceiverRefusedMetricPoints = stats.Int64( - receiverPrefix+RefusedMetricPointsKey, - "Number of metric points that could not be pushed into the pipeline.", - stats.UnitDimensionless) - mReceiverAcceptedLogRecords = stats.Int64( - receiverPrefix+AcceptedLogRecordsKey, - "Number of log records successfully pushed into the pipeline.", - stats.UnitDimensionless) - mReceiverRefusedLogRecords = stats.Int64( - receiverPrefix+RefusedLogRecordsKey, - "Number of log records that could not be pushed into the pipeline.", - stats.UnitDimensionless) + "go.opentelemetry.io/collector/internal/obsreportconfig" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) // StartReceiveOptions has the options related to starting a receive operation. @@ -172,7 +108,7 @@ func (rec *Receiver) StartTraceDataReceiveOp( ) context.Context { return rec.traceReceiveOp( operationCtx, - receiveTraceDataOperationSuffix, + obsmetrics.ReceiveTraceDataOperationSuffix, opt...) } @@ -188,7 +124,7 @@ func StartTraceDataReceiveOp( rec := NewReceiver(ReceiverSettings{ReceiverID: receiverID, Transport: transport}) return rec.traceReceiveOp( operationCtx, - receiveTraceDataOperationSuffix, + obsmetrics.ReceiveTraceDataOperationSuffix, opt...) } @@ -236,7 +172,7 @@ func (rec *Receiver) StartLogsReceiveOp( ) context.Context { return rec.traceReceiveOp( operationCtx, - receiverLogsOperationSuffix, + obsmetrics.ReceiverLogsOperationSuffix, opt...) } @@ -252,7 +188,7 @@ func StartLogsReceiveOp( rec := NewReceiver(ReceiverSettings{ReceiverID: receiverID, Transport: transport}) return rec.traceReceiveOp( operationCtx, - receiverLogsOperationSuffix, + obsmetrics.ReceiverLogsOperationSuffix, opt...) } @@ -300,7 +236,7 @@ func (rec *Receiver) StartMetricsReceiveOp( ) context.Context { return rec.traceReceiveOp( operationCtx, - receiverMetricsOperationSuffix, + obsmetrics.ReceiverMetricsOperationSuffix, opt...) } @@ -316,7 +252,7 @@ func StartMetricsReceiveOp( rec := NewReceiver(ReceiverSettings{ReceiverID: receiverID, Transport: transport}) return rec.traceReceiveOp( operationCtx, - receiverMetricsOperationSuffix, + obsmetrics.ReceiverMetricsOperationSuffix, opt...) } @@ -365,8 +301,8 @@ func ReceiverContext( transport string, ) context.Context { ctx, _ = tag.New(ctx, - tag.Upsert(tagKeyReceiver, receiverID.String(), tag.WithTTL(tag.TTLNoPropagation)), - tag.Upsert(tagKeyTransport, transport, tag.WithTTL(tag.TTLNoPropagation))) + tag.Upsert(obsmetrics.TagKeyReceiver, receiverID.String(), tag.WithTTL(tag.TTLNoPropagation)), + tag.Upsert(obsmetrics.TagKeyTransport, transport, tag.WithTTL(tag.TTLNoPropagation))) return ctx } @@ -385,7 +321,7 @@ func (rec *Receiver) traceReceiveOp( var ctx context.Context var span *trace.Span - spanName := receiverPrefix + rec.receiverID.String() + operationSuffix + spanName := obsmetrics.ReceiverPrefix + rec.receiverID.String() + operationSuffix if !opts.LongLivedCtx { ctx, span = trace.StartSpan(receiverCtx, spanName) } else { @@ -401,7 +337,7 @@ func (rec *Receiver) traceReceiveOp( } if rec.transport != "" { - span.AddAttributes(trace.StringAttribute(TransportKey, rec.transport)) + span.AddAttributes(trace.StringAttribute(obsmetrics.TransportKey, rec.transport)) } return ctx } @@ -423,18 +359,18 @@ func (rec *Receiver) endReceiveOp( span := trace.FromContext(receiverCtx) - if gLevel != configtelemetry.LevelNone { + if obsreportconfig.Level != configtelemetry.LevelNone { var acceptedMeasure, refusedMeasure *stats.Int64Measure switch dataType { case config.TracesDataType: - acceptedMeasure = mReceiverAcceptedSpans - refusedMeasure = mReceiverRefusedSpans + acceptedMeasure = obsmetrics.ReceiverAcceptedSpans + refusedMeasure = obsmetrics.ReceiverRefusedSpans case config.MetricsDataType: - acceptedMeasure = mReceiverAcceptedMetricPoints - refusedMeasure = mReceiverRefusedMetricPoints + acceptedMeasure = obsmetrics.ReceiverAcceptedMetricPoints + refusedMeasure = obsmetrics.ReceiverRefusedMetricPoints case config.LogsDataType: - acceptedMeasure = mReceiverAcceptedLogRecords - refusedMeasure = mReceiverRefusedLogRecords + acceptedMeasure = obsmetrics.ReceiverAcceptedLogRecords + refusedMeasure = obsmetrics.ReceiverRefusedLogRecords } stats.Record( @@ -448,19 +384,19 @@ func (rec *Receiver) endReceiveOp( var acceptedItemsKey, refusedItemsKey string switch dataType { case config.TracesDataType: - acceptedItemsKey = AcceptedSpansKey - refusedItemsKey = RefusedSpansKey + acceptedItemsKey = obsmetrics.AcceptedSpansKey + refusedItemsKey = obsmetrics.RefusedSpansKey case config.MetricsDataType: - acceptedItemsKey = AcceptedMetricPointsKey - refusedItemsKey = RefusedMetricPointsKey + acceptedItemsKey = obsmetrics.AcceptedMetricPointsKey + refusedItemsKey = obsmetrics.RefusedMetricPointsKey case config.LogsDataType: - acceptedItemsKey = AcceptedLogRecordsKey - refusedItemsKey = RefusedLogRecordsKey + acceptedItemsKey = obsmetrics.AcceptedLogRecordsKey + refusedItemsKey = obsmetrics.RefusedLogRecordsKey } span.AddAttributes( trace.StringAttribute( - FormatKey, format), + obsmetrics.FormatKey, format), trace.Int64Attribute( acceptedItemsKey, int64(numAccepted)), trace.Int64Attribute( diff --git a/obsreport/obsreport_scraper.go b/obsreport/obsreport_scraper.go index d9d334eda23..362b5ea312e 100644 --- a/obsreport/obsreport_scraper.go +++ b/obsreport/obsreport_scraper.go @@ -23,39 +23,11 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/internal/obsreportconfig" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/receiver/scrapererror" ) -const ( - // ScraperKey used to identify scrapers in metrics and traces. - ScraperKey = "scraper" - - // ScrapedMetricPointsKey used to identify metric points scraped by the - // Collector. - ScrapedMetricPointsKey = "scraped_metric_points" - // ErroredMetricPointsKey used to identify metric points errored (i.e. - // unable to be scraped) by the Collector. - ErroredMetricPointsKey = "errored_metric_points" -) - -const ( - scraperPrefix = ScraperKey + nameSep - scraperMetricsOperationSuffix = nameSep + "MetricsScraped" -) - -var ( - tagKeyScraper, _ = tag.NewKey(ScraperKey) - - mScraperScrapedMetricPoints = stats.Int64( - scraperPrefix+ScrapedMetricPointsKey, - "Number of metric points successfully scraped.", - stats.UnitDimensionless) - mScraperErroredMetricPoints = stats.Int64( - scraperPrefix+ErroredMetricPointsKey, - "Number of metric points that were unable to be scraped.", - stats.UnitDimensionless) -) - // ScraperContext adds the keys used when recording observability metrics to // the given context returning the newly created context. This context should // be used in related calls to the obsreport functions so metrics are properly @@ -67,8 +39,8 @@ func ScraperContext( ) context.Context { ctx, _ = tag.New( ctx, - tag.Upsert(tagKeyReceiver, receiverID.String(), tag.WithTTL(tag.TTLNoPropagation)), - tag.Upsert(tagKeyScraper, scraper.String(), tag.WithTTL(tag.TTLNoPropagation))) + tag.Upsert(obsmetrics.TagKeyReceiver, receiverID.String(), tag.WithTTL(tag.TTLNoPropagation)), + tag.Upsert(obsmetrics.TagKeyScraper, scraper.String(), tag.WithTTL(tag.TTLNoPropagation))) return ctx } @@ -81,7 +53,7 @@ func StartMetricsScrapeOp( receiverID config.ComponentID, scraper config.ComponentID, ) context.Context { - spanName := scraperPrefix + receiverID.String() + nameSep + scraper.String() + scraperMetricsOperationSuffix + spanName := obsmetrics.ScraperPrefix + receiverID.String() + obsmetrics.NameSep + scraper.String() + obsmetrics.ScraperMetricsOperationSuffix ctx, _ := trace.StartSpan(scraperCtx, spanName) return ctx } @@ -105,19 +77,19 @@ func EndMetricsScrapeOp( span := trace.FromContext(scraperCtx) - if gLevel != configtelemetry.LevelNone { + if obsreportconfig.Level != configtelemetry.LevelNone { stats.Record( scraperCtx, - mScraperScrapedMetricPoints.M(int64(numScrapedMetrics)), - mScraperErroredMetricPoints.M(int64(numErroredMetrics))) + obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)), + obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics))) } // end span according to errors if span.IsRecordingEvents() { span.AddAttributes( - trace.StringAttribute(FormatKey, string(config.MetricsDataType)), - trace.Int64Attribute(ScrapedMetricPointsKey, int64(numScrapedMetrics)), - trace.Int64Attribute(ErroredMetricPointsKey, int64(numErroredMetrics)), + trace.StringAttribute(obsmetrics.FormatKey, string(config.MetricsDataType)), + trace.Int64Attribute(obsmetrics.ScrapedMetricPointsKey, int64(numScrapedMetrics)), + trace.Int64Attribute(obsmetrics.ErroredMetricPointsKey, int64(numErroredMetrics)), ) span.SetStatus(errToStatus(err)) diff --git a/obsreport/obsreport_test.go b/obsreport/obsreport_test.go index 6a28f7599f0..4acbce8ff18 100644 --- a/obsreport/obsreport_test.go +++ b/obsreport/obsreport_test.go @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// obsreport_test instead of just obsreport to avoid dependency cycle between -// obsreport_test and obsreporttest -package obsreport_test +package obsreport import ( "context" @@ -30,7 +28,8 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/internal/obsreportconfig" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/obsreport/obsreporttest" "go.opentelemetry.io/collector/receiver/scrapererror" ) @@ -55,40 +54,6 @@ type receiveTestParams struct { err error } -func TestConfigure(t *testing.T) { - tests := []struct { - name string - level configtelemetry.Level - wantViews []*view.View - }{ - { - name: "none", - level: configtelemetry.LevelNone, - }, - { - name: "basic", - level: configtelemetry.LevelBasic, - wantViews: obsreport.AllViews(), - }, - { - name: "normal", - level: configtelemetry.LevelNormal, - wantViews: obsreport.AllViews(), - }, - { - name: "detailed", - level: configtelemetry.LevelDetailed, - wantViews: obsreport.AllViews(), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - gotViews := obsreport.Configure(tt.level) - assert.Equal(t, tt.wantViews, gotViews) - }) - } -} - func TestReceiveTraceDataOp(t *testing.T) { doneFn, err := obsreporttest.SetupRecordedMetricsTest() require.NoError(t, err) @@ -102,14 +67,14 @@ func TestReceiveTraceDataOp(t *testing.T) { t.Name(), trace.WithSampler(trace.AlwaysSample())) defer parentSpan.End() - receiverCtx := obsreport.ReceiverContext(parentCtx, receiver, transport) + receiverCtx := ReceiverContext(parentCtx, receiver, transport) params := []receiveTestParams{ {transport, errFake}, {"", nil}, } rcvdSpans := []int{13, 42} for i, param := range params { - rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport}) + rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport}) ctx := rec.StartTraceDataReceiveOp(receiverCtx) assert.NotNil(t, ctx) @@ -129,22 +94,22 @@ func TestReceiveTraceDataOp(t *testing.T) { switch params[i].err { case nil: acceptedSpans += rcvdSpans[i] - assert.Equal(t, int64(rcvdSpans[i]), span.Attributes[obsreport.AcceptedSpansKey]) - assert.Equal(t, int64(0), span.Attributes[obsreport.RefusedSpansKey]) + assert.Equal(t, int64(rcvdSpans[i]), span.Attributes[obsmetrics.AcceptedSpansKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.RefusedSpansKey]) assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status) case errFake: refusedSpans += rcvdSpans[i] - assert.Equal(t, int64(0), span.Attributes[obsreport.AcceptedSpansKey]) - assert.Equal(t, int64(rcvdSpans[i]), span.Attributes[obsreport.RefusedSpansKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.AcceptedSpansKey]) + assert.Equal(t, int64(rcvdSpans[i]), span.Attributes[obsmetrics.RefusedSpansKey]) assert.Equal(t, params[i].err.Error(), span.Status.Message) default: t.Fatalf("unexpected param: %v", params[i]) } switch params[i].transport { case "": - assert.NotContains(t, span.Attributes, obsreport.TransportKey) + assert.NotContains(t, span.Attributes, obsmetrics.TransportKey) default: - assert.Equal(t, params[i].transport, span.Attributes[obsreport.TransportKey]) + assert.Equal(t, params[i].transport, span.Attributes[obsmetrics.TransportKey]) } } obsreporttest.CheckReceiverTraces(t, receiver, transport, int64(acceptedSpans), int64(refusedSpans)) @@ -163,14 +128,14 @@ func TestReceiveLogsOp(t *testing.T) { t.Name(), trace.WithSampler(trace.AlwaysSample())) defer parentSpan.End() - receiverCtx := obsreport.ReceiverContext(parentCtx, receiver, transport) + receiverCtx := ReceiverContext(parentCtx, receiver, transport) params := []receiveTestParams{ {transport, errFake}, {"", nil}, } rcvdLogRecords := []int{13, 42} for i, param := range params { - rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport}) + rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport}) ctx := rec.StartLogsReceiveOp(receiverCtx) assert.NotNil(t, ctx) @@ -190,22 +155,22 @@ func TestReceiveLogsOp(t *testing.T) { switch params[i].err { case nil: acceptedLogRecords += rcvdLogRecords[i] - assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsreport.AcceptedLogRecordsKey]) - assert.Equal(t, int64(0), span.Attributes[obsreport.RefusedLogRecordsKey]) + assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsmetrics.AcceptedLogRecordsKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.RefusedLogRecordsKey]) assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status) case errFake: refusedLogRecords += rcvdLogRecords[i] - assert.Equal(t, int64(0), span.Attributes[obsreport.AcceptedLogRecordsKey]) - assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsreport.RefusedLogRecordsKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.AcceptedLogRecordsKey]) + assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsmetrics.RefusedLogRecordsKey]) assert.Equal(t, params[i].err.Error(), span.Status.Message) default: t.Fatalf("unexpected param: %v", params[i]) } switch params[i].transport { case "": - assert.NotContains(t, span.Attributes, obsreport.TransportKey) + assert.NotContains(t, span.Attributes, obsmetrics.TransportKey) default: - assert.Equal(t, params[i].transport, span.Attributes[obsreport.TransportKey]) + assert.Equal(t, params[i].transport, span.Attributes[obsmetrics.TransportKey]) } } obsreporttest.CheckReceiverLogs(t, receiver, transport, int64(acceptedLogRecords), int64(refusedLogRecords)) @@ -224,14 +189,14 @@ func TestReceiveMetricsOp(t *testing.T) { t.Name(), trace.WithSampler(trace.AlwaysSample())) defer parentSpan.End() - receiverCtx := obsreport.ReceiverContext(parentCtx, receiver, transport) + receiverCtx := ReceiverContext(parentCtx, receiver, transport) params := []receiveTestParams{ {transport, errFake}, {"", nil}, } rcvdMetricPts := []int{23, 29} for i, param := range params { - rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport}) + rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport}) ctx := rec.StartMetricsReceiveOp(receiverCtx) assert.NotNil(t, ctx) @@ -251,22 +216,22 @@ func TestReceiveMetricsOp(t *testing.T) { switch params[i].err { case nil: acceptedMetricPoints += rcvdMetricPts[i] - assert.Equal(t, int64(rcvdMetricPts[i]), span.Attributes[obsreport.AcceptedMetricPointsKey]) - assert.Equal(t, int64(0), span.Attributes[obsreport.RefusedMetricPointsKey]) + assert.Equal(t, int64(rcvdMetricPts[i]), span.Attributes[obsmetrics.AcceptedMetricPointsKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.RefusedMetricPointsKey]) assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status) case errFake: refusedMetricPoints += rcvdMetricPts[i] - assert.Equal(t, int64(0), span.Attributes[obsreport.AcceptedMetricPointsKey]) - assert.Equal(t, int64(rcvdMetricPts[i]), span.Attributes[obsreport.RefusedMetricPointsKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.AcceptedMetricPointsKey]) + assert.Equal(t, int64(rcvdMetricPts[i]), span.Attributes[obsmetrics.RefusedMetricPointsKey]) assert.Equal(t, params[i].err.Error(), span.Status.Message) default: t.Fatalf("unexpected param: %v", params[i]) } switch params[i].transport { case "": - assert.NotContains(t, span.Attributes, obsreport.TransportKey) + assert.NotContains(t, span.Attributes, obsmetrics.TransportKey) default: - assert.Equal(t, params[i].transport, span.Attributes[obsreport.TransportKey]) + assert.Equal(t, params[i].transport, span.Attributes[obsmetrics.TransportKey]) } } @@ -286,14 +251,14 @@ func TestScrapeMetricsDataOp(t *testing.T) { t.Name(), trace.WithSampler(trace.AlwaysSample())) defer parentSpan.End() - receiverCtx := obsreport.ScraperContext(parentCtx, receiver, scraper) + receiverCtx := ScraperContext(parentCtx, receiver, scraper) errParams := []error{partialErrFake, errFake, nil} scrapedMetricPts := []int{23, 29, 15} for i, err := range errParams { - ctx := obsreport.StartMetricsScrapeOp(receiverCtx, receiver, scraper) + ctx := StartMetricsScrapeOp(receiverCtx, receiver, scraper) assert.NotNil(t, ctx) - obsreport.EndMetricsScrapeOp( + EndMetricsScrapeOp( ctx, scrapedMetricPts[i], err) @@ -308,19 +273,19 @@ func TestScrapeMetricsDataOp(t *testing.T) { switch errParams[i] { case nil: scrapedMetricPoints += scrapedMetricPts[i] - assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsreport.ScrapedMetricPointsKey]) - assert.Equal(t, int64(0), span.Attributes[obsreport.ErroredMetricPointsKey]) + assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsmetrics.ScrapedMetricPointsKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.ErroredMetricPointsKey]) assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status) case errFake: erroredMetricPoints += scrapedMetricPts[i] - assert.Equal(t, int64(0), span.Attributes[obsreport.ScrapedMetricPointsKey]) - assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsreport.ErroredMetricPointsKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.ScrapedMetricPointsKey]) + assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsmetrics.ErroredMetricPointsKey]) assert.Equal(t, errParams[i].Error(), span.Status.Message) case partialErrFake: scrapedMetricPoints += scrapedMetricPts[i] erroredMetricPoints++ - assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsreport.ScrapedMetricPointsKey]) - assert.Equal(t, int64(1), span.Attributes[obsreport.ErroredMetricPointsKey]) + assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsmetrics.ScrapedMetricPointsKey]) + assert.Equal(t, int64(1), span.Attributes[obsmetrics.ErroredMetricPointsKey]) assert.Equal(t, errParams[i].Error(), span.Status.Message) default: t.Fatalf("unexpected err param: %v", errParams[i]) @@ -343,7 +308,7 @@ func TestExportTraceDataOp(t *testing.T) { t.Name(), trace.WithSampler(trace.AlwaysSample())) defer parentSpan.End() - obsrep := obsreport.NewExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) + obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) errs := []error{nil, errFake} numExportedSpans := []int{22, 14} for i, err := range errs { @@ -361,13 +326,13 @@ func TestExportTraceDataOp(t *testing.T) { switch errs[i] { case nil: sentSpans += numExportedSpans[i] - assert.Equal(t, int64(numExportedSpans[i]), span.Attributes[obsreport.SentSpansKey]) - assert.Equal(t, int64(0), span.Attributes[obsreport.FailedToSendSpansKey]) + assert.Equal(t, int64(numExportedSpans[i]), span.Attributes[obsmetrics.SentSpansKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.FailedToSendSpansKey]) assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status) case errFake: failedToSendSpans += numExportedSpans[i] - assert.Equal(t, int64(0), span.Attributes[obsreport.SentSpansKey]) - assert.Equal(t, int64(numExportedSpans[i]), span.Attributes[obsreport.FailedToSendSpansKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.SentSpansKey]) + assert.Equal(t, int64(numExportedSpans[i]), span.Attributes[obsmetrics.FailedToSendSpansKey]) assert.Equal(t, errs[i].Error(), span.Status.Message) default: t.Fatalf("unexpected error: %v", errs[i]) @@ -390,7 +355,7 @@ func TestExportMetricsOp(t *testing.T) { t.Name(), trace.WithSampler(trace.AlwaysSample())) defer parentSpan.End() - obsrep := obsreport.NewExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) + obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) errs := []error{nil, errFake} toSendMetricPoints := []int{17, 23} @@ -410,13 +375,13 @@ func TestExportMetricsOp(t *testing.T) { switch errs[i] { case nil: sentMetricPoints += toSendMetricPoints[i] - assert.Equal(t, int64(toSendMetricPoints[i]), span.Attributes[obsreport.SentMetricPointsKey]) - assert.Equal(t, int64(0), span.Attributes[obsreport.FailedToSendMetricPointsKey]) + assert.Equal(t, int64(toSendMetricPoints[i]), span.Attributes[obsmetrics.SentMetricPointsKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.FailedToSendMetricPointsKey]) assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status) case errFake: failedToSendMetricPoints += toSendMetricPoints[i] - assert.Equal(t, int64(0), span.Attributes[obsreport.SentMetricPointsKey]) - assert.Equal(t, int64(toSendMetricPoints[i]), span.Attributes[obsreport.FailedToSendMetricPointsKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.SentMetricPointsKey]) + assert.Equal(t, int64(toSendMetricPoints[i]), span.Attributes[obsmetrics.FailedToSendMetricPointsKey]) assert.Equal(t, errs[i].Error(), span.Status.Message) default: t.Fatalf("unexpected error: %v", errs[i]) @@ -439,7 +404,7 @@ func TestExportLogsOp(t *testing.T) { t.Name(), trace.WithSampler(trace.AlwaysSample())) defer parentSpan.End() - obsrep := obsreport.NewExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) + obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter}) errs := []error{nil, errFake} toSendLogRecords := []int{17, 23} for i, err := range errs { @@ -458,13 +423,13 @@ func TestExportLogsOp(t *testing.T) { switch errs[i] { case nil: sentLogRecords += toSendLogRecords[i] - assert.Equal(t, int64(toSendLogRecords[i]), span.Attributes[obsreport.SentLogRecordsKey]) - assert.Equal(t, int64(0), span.Attributes[obsreport.FailedToSendLogRecordsKey]) + assert.Equal(t, int64(toSendLogRecords[i]), span.Attributes[obsmetrics.SentLogRecordsKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.FailedToSendLogRecordsKey]) assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status) case errFake: failedToSendLogRecords += toSendLogRecords[i] - assert.Equal(t, int64(0), span.Attributes[obsreport.SentLogRecordsKey]) - assert.Equal(t, int64(toSendLogRecords[i]), span.Attributes[obsreport.FailedToSendLogRecordsKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.SentLogRecordsKey]) + assert.Equal(t, int64(toSendLogRecords[i]), span.Attributes[obsmetrics.FailedToSendLogRecordsKey]) assert.Equal(t, errs[i].Error(), span.Status.Message) default: t.Fatalf("unexpected error: %v", errs[i]) @@ -491,7 +456,7 @@ func TestReceiveWithLongLivedCtx(t *testing.T) { parentCtx, parentSpan := trace.StartSpan(context.Background(), t.Name()) defer parentSpan.End() - longLivedCtx := obsreport.ReceiverContext(parentCtx, receiver, transport) + longLivedCtx := ReceiverContext(parentCtx, receiver, transport) ops := []struct { numSpans int err error @@ -502,10 +467,10 @@ func TestReceiveWithLongLivedCtx(t *testing.T) { for _, op := range ops { // Use a new context on each operation to simulate distinct operations // under the same long lived context. - rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport}) + rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: transport}) ctx := rec.StartTraceDataReceiveOp( longLivedCtx, - obsreport.WithLongLivedCtx()) + WithLongLivedCtx()) assert.NotNil(t, ctx) rec.EndTraceDataReceiveOp( @@ -526,15 +491,15 @@ func TestReceiveWithLongLivedCtx(t *testing.T) { assert.Equal(t, parentSpan.SpanContext().TraceID, link.TraceID) assert.Equal(t, parentSpan.SpanContext().SpanID, link.SpanID) assert.Equal(t, "receiver/"+receiver.String()+"/TraceDataReceived", span.Name) - assert.Equal(t, transport, span.Attributes[obsreport.TransportKey]) + assert.Equal(t, transport, span.Attributes[obsmetrics.TransportKey]) switch ops[i].err { case nil: - assert.Equal(t, int64(ops[i].numSpans), span.Attributes[obsreport.AcceptedSpansKey]) - assert.Equal(t, int64(0), span.Attributes[obsreport.RefusedSpansKey]) + assert.Equal(t, int64(ops[i].numSpans), span.Attributes[obsmetrics.AcceptedSpansKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.RefusedSpansKey]) assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status) case errFake: - assert.Equal(t, int64(0), span.Attributes[obsreport.AcceptedSpansKey]) - assert.Equal(t, int64(ops[i].numSpans), span.Attributes[obsreport.RefusedSpansKey]) + assert.Equal(t, int64(0), span.Attributes[obsmetrics.AcceptedSpansKey]) + assert.Equal(t, int64(ops[i].numSpans), span.Attributes[obsmetrics.RefusedSpansKey]) assert.Equal(t, ops[i].err.Error(), span.Status.Message) default: t.Fatalf("unexpected error: %v", ops[i].err) @@ -551,7 +516,7 @@ func TestProcessorTraceData(t *testing.T) { const refusedSpans = 19 const droppedSpans = 13 - obsrep := obsreport.NewProcessor(obsreport.ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor}) + obsrep := NewProcessor(ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor}) obsrep.TracesAccepted(context.Background(), acceptedSpans) obsrep.TracesRefused(context.Background(), refusedSpans) obsrep.TracesDropped(context.Background(), droppedSpans) @@ -568,7 +533,7 @@ func TestProcessorMetricsData(t *testing.T) { const refusedPoints = 11 const droppedPoints = 17 - obsrep := obsreport.NewProcessor(obsreport.ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor}) + obsrep := NewProcessor(ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor}) obsrep.MetricsAccepted(context.Background(), acceptedPoints) obsrep.MetricsRefused(context.Background(), refusedPoints) obsrep.MetricsDropped(context.Background(), droppedPoints) @@ -623,9 +588,9 @@ func TestProcessorMetricViews(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - obsreport.Configure(tt.level) - got := obsreport.ProcessorMetricViews("test_type", legacyViews) - assert.Equal(t, tt.want, got) + obsreportconfig.Configure(tt.level) + got := ProcessorMetricViews("test_type", &obsreportconfig.ObsMetrics{Views: legacyViews}) + assert.Equal(t, tt.want, got.Views) }) } } @@ -639,7 +604,7 @@ func TestProcessorLogRecords(t *testing.T) { const refusedRecords = 11 const droppedRecords = 17 - obsrep := obsreport.NewProcessor(obsreport.ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor}) + obsrep := NewProcessor(ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor}) obsrep.LogsAccepted(context.Background(), acceptedRecords) obsrep.LogsRefused(context.Background(), refusedRecords) obsrep.LogsDropped(context.Background(), droppedRecords) diff --git a/obsreport/obsreporttest/obsreporttest.go b/obsreport/obsreporttest/obsreporttest.go index d7b5d701077..e0ca299cd0d 100644 --- a/obsreport/obsreporttest/obsreporttest.go +++ b/obsreport/obsreporttest/obsreporttest.go @@ -25,7 +25,7 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/internal/obsreportconfig" ) var ( @@ -45,7 +45,8 @@ var ( // SetupRecordedMetricsTest does setup the testing environment to check the metrics recorded by receivers, producers or exporters. // The returned function should be deferred. func SetupRecordedMetricsTest() (func(), error) { - views := obsreport.Configure(configtelemetry.LevelNormal) + obsMetrics := obsreportconfig.Configure(configtelemetry.LevelNormal) + views := obsMetrics.Views err := view.Register(views...) if err != nil { return nil, err diff --git a/processor/batchprocessor/metrics.go b/processor/batchprocessor/metrics.go index 6dbeee02569..96ae457ec5d 100644 --- a/processor/batchprocessor/metrics.go +++ b/processor/batchprocessor/metrics.go @@ -19,11 +19,13 @@ import ( "go.opencensus.io/stats/view" "go.opencensus.io/tag" + "go.opentelemetry.io/collector/internal/obsreportconfig" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/obsreport" ) var ( - processorTagKey = tag.MustNewKey(obsreport.ProcessorKey) + processorTagKey = tag.MustNewKey(obsmetrics.ProcessorKey) statBatchSizeTriggerSend = stats.Int64("batch_size_trigger_send", "Number of times the batch was sent due to a size trigger", stats.UnitDimensionless) statTimeoutTriggerSend = stats.Int64("timeout_trigger_send", "Number of times the batch was sent due to a timeout trigger", stats.UnitDimensionless) statBatchSendSize = stats.Int64("batch_send_size", "Number of units in the batch", stats.UnitDimensionless) @@ -68,12 +70,14 @@ func MetricViews() []*view.View { 1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000), } - legacyViews := []*view.View{ - countBatchSizeTriggerSendView, - countTimeoutTriggerSendView, - distributionBatchSendSizeView, - distributionBatchSendSizeBytesView, + legacyViews := &obsreportconfig.ObsMetrics{ + Views: []*view.View{ + countBatchSizeTriggerSendView, + countTimeoutTriggerSendView, + distributionBatchSendSizeView, + distributionBatchSendSizeBytesView, + }, } - return obsreport.ProcessorMetricViews(typeStr, legacyViews) + return obsreport.ProcessorMetricViews(typeStr, legacyViews).Views } diff --git a/processor/processorhelper/processor.go b/processor/processorhelper/processor.go index b8151b100d9..449e0645612 100644 --- a/processor/processorhelper/processor.go +++ b/processor/processorhelper/processor.go @@ -23,7 +23,7 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerhelper" - "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) // ErrSkipProcessingData is a sentinel value to indicate when traces or metrics should intentionally be dropped @@ -79,6 +79,6 @@ func fromOptions(options []Option) *baseSettings { func spanAttributes(id config.ComponentID) []trace.Attribute { return []trace.Attribute{ - trace.StringAttribute(obsreport.ProcessorKey, id.String()), + trace.StringAttribute(obsmetrics.ProcessorKey, id.String()), } } diff --git a/service/telemetry.go b/service/telemetry.go index 8ac6c91913b..cd785f6d15e 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -27,7 +27,7 @@ import ( "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/exporter/jaegerexporter" "go.opentelemetry.io/collector/internal/collector/telemetry" - "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/internal/obsreportconfig" "go.opentelemetry.io/collector/processor/batchprocessor" "go.opentelemetry.io/collector/receiver/kafkareceiver" telemetry2 "go.opentelemetry.io/collector/service/internal/telemetry" @@ -61,10 +61,11 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u } var views []*view.View + obsMetrics := obsreportconfig.Configure(level) views = append(views, batchprocessor.MetricViews()...) views = append(views, jaegerexporter.MetricViews()...) views = append(views, kafkareceiver.MetricViews()...) - views = append(views, obsreport.Configure(level)...) + views = append(views, obsMetrics.Views...) views = append(views, processMetricsViews.Views()...) tel.views = views