From 9e4d50f5fa364ee10b322136a47b687e6d47bdff Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Mon, 12 Jun 2023 20:28:17 -0700 Subject: [PATCH] remove opencensus from carbonreceiver and wavefrontreceiver (#23248) Update carbonreceiver and wavefrontreceiver to use pdata directly instead of OpenCensus --- .chloggen/use-pdata-with-carbonreceiver.yaml | 20 ++ receiver/carbonreceiver/go.mod | 16 +- receiver/carbonreceiver/go.sum | 8 - receiver/carbonreceiver/protocol/parser.go | 38 +- .../protocol/path_parser_helper.go | 69 ++-- .../protocol/plaintext_parser.go | 14 +- .../protocol/plaintext_parser_test.go | 200 ++++++----- .../carbonreceiver/protocol/regex_parser.go | 20 +- .../protocol/regex_parser_test.go | 69 ++-- receiver/carbonreceiver/receiver_test.go | 10 +- .../carbonreceiver/transport/server_test.go | 6 +- .../carbonreceiver/transport/tcp_server.go | 11 +- .../carbonreceiver/transport/udp_server.go | 13 +- receiver/wavefrontreceiver/go.mod | 8 +- receiver/wavefrontreceiver/receiver_test.go | 106 +++--- .../wavefrontreceiver/wavefront_parser.go | 95 ++--- .../wavefront_parser_test.go | 340 +++++++++--------- 17 files changed, 476 insertions(+), 567 deletions(-) create mode 100644 .chloggen/use-pdata-with-carbonreceiver.yaml diff --git a/.chloggen/use-pdata-with-carbonreceiver.yaml b/.chloggen/use-pdata-with-carbonreceiver.yaml new file mode 100644 index 000000000000..7b009e72f880 --- /dev/null +++ b/.chloggen/use-pdata-with-carbonreceiver.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: carbonreceiver, wavefrontreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove use of opencensus model in carbonreceiver and wavefrontreceiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [20759, 20761] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/receiver/carbonreceiver/go.mod b/receiver/carbonreceiver/go.mod index 30b3cd8f0ce6..b09a407eb4b9 100644 --- a/receiver/carbonreceiver/go.mod +++ b/receiver/carbonreceiver/go.mod @@ -3,18 +3,16 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbon go 1.19 require ( - github.com/census-instrumentation/opencensus-proto v0.4.1 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.79.0 - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.79.0 github.com/stretchr/testify v1.8.4 go.opencensus.io v0.24.0 go.opentelemetry.io/collector v0.79.0 go.opentelemetry.io/collector/component v0.79.0 go.opentelemetry.io/collector/confmap v0.79.0 go.opentelemetry.io/collector/consumer v0.79.0 + go.opentelemetry.io/collector/pdata v1.0.0-rcv0012 go.opentelemetry.io/collector/receiver v0.79.0 go.uber.org/zap v1.24.0 - google.golang.org/protobuf v1.30.0 ) require ( @@ -37,7 +35,6 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.79.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.15.1 // indirect github.com/prometheus/client_model v0.4.0 // indirect @@ -46,8 +43,6 @@ require ( github.com/prometheus/statsd_exporter v0.22.7 // indirect go.opentelemetry.io/collector/exporter v0.79.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012 // indirect - go.opentelemetry.io/collector/pdata v1.0.0-rcv0012 // indirect - go.opentelemetry.io/collector/semconv v0.79.0 // indirect go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.39.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect @@ -61,22 +56,15 @@ require ( golang.org/x/text v0.9.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/grpc v1.55.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus => ../../pkg/translator/opencensus - replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common -replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal - retract ( v0.76.2 v0.76.1 v0.65.0 ) - -replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest - -replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil diff --git a/receiver/carbonreceiver/go.sum b/receiver/carbonreceiver/go.sum index 224dabbf2eb3..d50910d29567 100644 --- a/receiver/carbonreceiver/go.sum +++ b/receiver/carbonreceiver/go.sum @@ -63,8 +63,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= -github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= @@ -181,9 +179,7 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= -github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/hashicorp/consul/api v1.13.0/go.mod h1:ZlVrynguJKcYr54zGaDbaL3fOvKC9m72FhPvA8T35KQ= github.com/hashicorp/consul/sdk v0.8.0/go.mod h1:GBvyrGALthsZObzUGsfgHZQDXjg4lOjagTIwIR1vPms= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -396,8 +392,6 @@ go.opentelemetry.io/collector/pdata v1.0.0-rcv0012 h1:R+cfEUMyLn9Q1QknyQ4QU77pbf go.opentelemetry.io/collector/pdata v1.0.0-rcv0012/go.mod h1:rEAKFqc1L03lidKtra/2/dJtI0Hp+JsQxuPEIkj/2Vg= go.opentelemetry.io/collector/receiver v0.79.0 h1:Ag4hciAYklQWDpKbnmqhfh9zJlUskWvThpCpphp12b4= go.opentelemetry.io/collector/receiver v0.79.0/go.mod h1:+/xe0VoYl6Mli+KQTZWBR2apqFsbioAAqu7abzKDskI= -go.opentelemetry.io/collector/semconv v0.79.0 h1:74pzP4c7xWk9Eihs14kEQvE4m4hHgXrQ/YbWkdn1bVY= -go.opentelemetry.io/collector/semconv v0.79.0/go.mod h1:TlYPtzvsXyHOgr5eATi43qEMqwSmIziivJB2uctKswo= go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1KUxj3KoPINZdtDaDj3IA= @@ -693,8 +687,6 @@ google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= -google.golang.org/genproto v0.0.0-20230525234025-438c736192d0 h1:x1vNwUhVOcsYoKyEGCZBH694SBmmBjA2EfauFVEI2+M= -google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc h1:kVKPf/IiYSBWEWtkIn6wZXwWGCnLKcC8oWfZvXjsGnM= google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc= google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= diff --git a/receiver/carbonreceiver/protocol/parser.go b/receiver/carbonreceiver/protocol/parser.go index a878e663fd7f..092b2c74cdb5 100644 --- a/receiver/carbonreceiver/protocol/parser.go +++ b/receiver/carbonreceiver/protocol/parser.go @@ -4,8 +4,7 @@ package protocol // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" import ( - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" - "google.golang.org/protobuf/types/known/timestamppb" + "go.opentelemetry.io/collector/pdata/pmetric" ) // Parser abstracts the type of parsing being done by the receiver. @@ -24,38 +23,5 @@ type Parser interface { // // The is the Unix time text of when the measurement was // made. - Parse(line string) (*metricspb.Metric, error) -} - -// Below a few helper functions useful to different parsers. -func buildMetricForSinglePoint( - metricName string, - metricType metricspb.MetricDescriptor_Type, - labelKeys []*metricspb.LabelKey, - labelValues []*metricspb.LabelValue, - point *metricspb.Point, -) *metricspb.Metric { - return &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: metricName, - Type: metricType, - LabelKeys: labelKeys, - }, - Timeseries: []*metricspb.TimeSeries{ - { - // TODO: StartTimestamp can be set if each cumulative time series are - // tracked but right now it is not clear if it brings benefits. - // Perhaps as an option so cost is "pay for play". - LabelValues: labelValues, - Points: []*metricspb.Point{point}, - }, - }, - } -} - -func convertUnixSec(sec int64) *timestamppb.Timestamp { - ts := ×tamppb.Timestamp{ - Seconds: sec, - } - return ts + Parse(line string) (pmetric.Metric, error) } diff --git a/receiver/carbonreceiver/protocol/path_parser_helper.go b/receiver/carbonreceiver/protocol/path_parser_helper.go index 6b09979ca22c..2b508d5d512a 100644 --- a/receiver/carbonreceiver/protocol/path_parser_helper.go +++ b/receiver/carbonreceiver/protocol/path_parser_helper.go @@ -8,8 +8,10 @@ import ( "fmt" "strconv" "strings" + "time" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" ) // PathParser implements the code needed to handle only the part of @@ -35,10 +37,8 @@ type PathParser interface { type ParsedPath struct { // MetricName extracted/generated by the parser. MetricName string - // LabelKeys extracted/generated by the parser. - LabelKeys []*metricspb.LabelKey - // LabelValues extracted/generated by the parser. - LabelValues []*metricspb.LabelValue + // Attributes extracted/generated by the parser. + Attributes pcommon.Map // MetricType instructs the helper to generate the metric as the specified // TargetMetricType. MetricType TargetMetricType @@ -88,10 +88,10 @@ func NewParser(pathParser PathParser) (Parser, error) { // // The is the Unix time text of when the measurement was // made. -func (pph *PathParserHelper) Parse(line string) (*metricspb.Metric, error) { +func (pph *PathParserHelper) Parse(line string) (pmetric.Metric, error) { parts := strings.SplitN(line, " ", 4) if len(parts) != 3 { - return nil, fmt.Errorf("invalid carbon metric [%s]", line) + return pmetric.Metric{}, fmt.Errorf("invalid carbon metric [%s]", line) } path := parts[0] @@ -101,44 +101,39 @@ func (pph *PathParserHelper) Parse(line string) (*metricspb.Metric, error) { parsedPath := ParsedPath{} err := pph.pathParser.ParsePath(path, &parsedPath) if err != nil { - return nil, fmt.Errorf("invalid carbon metric [%s]: %w", line, err) + return pmetric.Metric{}, fmt.Errorf("invalid carbon metric [%s]: %w", line, err) } unixTime, err := strconv.ParseInt(timestampStr, 10, 64) if err != nil { - return nil, fmt.Errorf("invalid carbon metric time [%s]: %w", line, err) + return pmetric.Metric{}, fmt.Errorf("invalid carbon metric time [%s]: %w", line, err) } - var metricType metricspb.MetricDescriptor_Type - point := metricspb.Point{ - Timestamp: convertUnixSec(unixTime), - } - intVal, err := strconv.ParseInt(valueStr, 10, 64) - if err == nil { - if parsedPath.MetricType == CumulativeMetricType { - metricType = metricspb.MetricDescriptor_CUMULATIVE_INT64 - } else { - metricType = metricspb.MetricDescriptor_GAUGE_INT64 - } - point.Value = &metricspb.Point_Int64Value{Int64Value: intVal} - } else { - dblVal, err := strconv.ParseFloat(valueStr, 64) + intVal, errIsFloat := strconv.ParseInt(valueStr, 10, 64) + var dblVal float64 + if errIsFloat != nil { + dblVal, err = strconv.ParseFloat(valueStr, 64) if err != nil { - return nil, fmt.Errorf("invalid carbon metric value [%s]: %w", line, err) - } - if parsedPath.MetricType == CumulativeMetricType { - metricType = metricspb.MetricDescriptor_CUMULATIVE_DOUBLE - } else { - metricType = metricspb.MetricDescriptor_GAUGE_DOUBLE + return pmetric.Metric{}, fmt.Errorf("invalid carbon metric value [%s]: %w", line, err) } - point.Value = &metricspb.Point_DoubleValue{DoubleValue: dblVal} } - metric := buildMetricForSinglePoint( - parsedPath.MetricName, - metricType, - parsedPath.LabelKeys, - parsedPath.LabelValues, - &point) - return metric, nil + m := pmetric.NewMetric() + m.SetName(parsedPath.MetricName) + var dp pmetric.NumberDataPoint + if parsedPath.MetricType == CumulativeMetricType { + sum := m.SetEmptySum() + sum.SetIsMonotonic(true) + dp = sum.DataPoints().AppendEmpty() + } else { + dp = m.SetEmptyGauge().DataPoints().AppendEmpty() + } + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(unixTime, 0))) + if errIsFloat != nil { + dp.SetDoubleValue(dblVal) + } else { + dp.SetIntValue(intVal) + } + parsedPath.Attributes.CopyTo(dp.Attributes()) + return m, nil } diff --git a/receiver/carbonreceiver/protocol/plaintext_parser.go b/receiver/carbonreceiver/protocol/plaintext_parser.go index 8781f41c45c3..15e1bd05b81d 100644 --- a/receiver/carbonreceiver/protocol/plaintext_parser.go +++ b/receiver/carbonreceiver/protocol/plaintext_parser.go @@ -7,7 +7,7 @@ import ( "fmt" "strings" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "go.opentelemetry.io/collector/pdata/pcommon" ) // PlaintextConfig holds the configuration for the plaintext parser. @@ -44,6 +44,7 @@ func (p *PlaintextPathParser) ParsePath(path string, parsedPath *ParsedPath) err } parsedPath.MetricName = parts[0] + parsedPath.Attributes = pcommon.NewMap() if len(parts) == 1 { // No tags, no more work here. return nil @@ -55,8 +56,6 @@ func (p *PlaintextPathParser) ParsePath(path string, parsedPath *ParsedPath) err } tags := strings.Split(parts[1], ";") - keys := make([]*metricspb.LabelKey, 0, len(tags)) - values := make([]*metricspb.LabelValue, 0, len(tags)) for _, tag := range tags { idx := strings.IndexByte(tag, '=') if idx < 1 { @@ -64,17 +63,10 @@ func (p *PlaintextPathParser) ParsePath(path string, parsedPath *ParsedPath) err } key := tag[:idx] - keys = append(keys, &metricspb.LabelKey{Key: key}) - value := tag[idx+1:] // If value is empty, ie.: tag == "k=", this will return "". - values = append(values, &metricspb.LabelValue{ - Value: value, - HasValue: true, - }) + parsedPath.Attributes.PutStr(key, value) } - parsedPath.LabelKeys = keys - parsedPath.LabelValues = values return nil } diff --git a/receiver/carbonreceiver/protocol/plaintext_parser_test.go b/receiver/carbonreceiver/protocol/plaintext_parser_test.go index 4aeb224b621a..2c2f9d52bc15 100644 --- a/receiver/carbonreceiver/protocol/plaintext_parser_test.go +++ b/receiver/carbonreceiver/protocol/plaintext_parser_test.go @@ -5,11 +5,12 @@ package protocol import ( "testing" + "time" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/timestamppb" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" ) func Test_plaintextParser_Parse(t *testing.T) { @@ -17,59 +18,53 @@ func Test_plaintextParser_Parse(t *testing.T) { require.NoError(t, err) tests := []struct { line string - want *metricspb.Metric + want pmetric.Metric wantErr bool }{ { line: "tst.int 1 1582230020", - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: buildIntMetric( + GaugeMetricType, "tst.int", - nil, - nil, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582230020}, - Value: &metricspb.Point_Int64Value{Int64Value: 1}, - }, + pcommon.NewMap(), + 1582230020, + 1, ), }, { line: "tst.dbl 3.14 1582230020", - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_DOUBLE, + want: buildDoubleMetric( + GaugeMetricType, "tst.dbl", nil, - nil, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582230020}, - Value: &metricspb.Point_DoubleValue{DoubleValue: 3.14}, - }, + 1582230020, + 3.14, ), }, { line: "tst.int.3tags;k0=v_0;k1=v_1;k2=v_2 128 1582230020", - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: buildIntMetric( + GaugeMetricType, "tst.int.3tags", - []string{"k0", "k1", "k2"}, - []string{"v_0", "v_1", "v_2"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582230020}, - Value: &metricspb.Point_Int64Value{Int64Value: 128}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("k0", "v_0") + m.PutStr("k1", "v_1") + m.PutStr("k2", "v_2") + return m + }(), + 1582230020, + 128, ), }, { line: "tst.int.1tag;k0=v_0 1.23 1582230020", - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_DOUBLE, + want: buildDoubleMetric( + GaugeMetricType, "tst.int.1tag", - []string{"k0"}, - []string{"v_0"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582230020}, - Value: &metricspb.Point_DoubleValue{DoubleValue: 1.23}, - }, + map[string]any{"k0": "v_0"}, + 1582230020, + 1.23, ), }, { @@ -101,12 +96,11 @@ func Test_plaintextParser_Parse(t *testing.T) { func TestPlaintextParser_parsePath(t *testing.T) { tests := []struct { - name string - path string - wantName string - wantKeys []*metricspb.LabelKey - wantValues []*metricspb.LabelValue - wantErr bool + name string + path string + wantName string + wantAttributes pcommon.Map + wantErr bool }{ { name: "empty_path", @@ -114,45 +108,50 @@ func TestPlaintextParser_parsePath(t *testing.T) { wantErr: true, }, { - name: "no_tags_but_delim", - path: "no.tags;", - wantName: "no.tags", + name: "no_tags_but_delim", + path: "no.tags;", + wantName: "no.tags", + wantAttributes: pcommon.NewMap(), }, { - name: "void_tags", - path: "void.tags;;;", - wantName: "void.tags", - wantKeys: []*metricspb.LabelKey{}, - wantValues: []*metricspb.LabelValue{}, - wantErr: true, + name: "void_tags", + path: "void.tags;;;", + wantName: "void.tags", + wantAttributes: pcommon.NewMap(), + wantErr: true, }, { - name: "invalid_tag", - path: "invalid.tag;k0=v0;k1_v1", - wantName: "invalid.tag", - wantKeys: []*metricspb.LabelKey{{Key: "k0"}}, - wantValues: []*metricspb.LabelValue{{Value: "v0", HasValue: true}}, - wantErr: true, + name: "invalid_tag", + path: "invalid.tag;k0=v0;k1_v1", + wantName: "invalid.tag", + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("k0", "v0") + return m + }(), + wantErr: true, }, { name: "empty_tag_value_middle", path: "empty.tag.value.middle;k0=;k1=v1", wantName: "empty.tag.value.middle", - wantKeys: []*metricspb.LabelKey{{Key: "k0"}, {Key: "k1"}}, - wantValues: []*metricspb.LabelValue{ - {Value: "", HasValue: true}, - {Value: "v1", HasValue: true}, - }, + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("k0", "") + m.PutStr("k1", "v1") + return m + }(), }, { name: "empty_tag_value_end", path: "empty.tag.value.end;k0=v0;k1=", wantName: "empty.tag.value.end", - wantKeys: []*metricspb.LabelKey{{Key: "k0"}, {Key: "k1"}}, - wantValues: []*metricspb.LabelValue{ - {Value: "v0", HasValue: true}, - {Value: "", HasValue: true}, - }, + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("k0", "v0") + m.PutStr("k1", "") + return m + }(), }, } for _, tt := range tests { @@ -165,42 +164,55 @@ func TestPlaintextParser_parsePath(t *testing.T) { } else { assert.NoError(t, err) assert.Equal(t, tt.wantName, got.MetricName) - assert.Equal(t, tt.wantKeys, got.LabelKeys) - assert.Equal(t, tt.wantValues, got.LabelValues) + assert.Equal(t, tt.wantAttributes, got.Attributes) assert.Equal(t, DefaultMetricType, got.MetricType) } }) } } -func buildMetric( - typ metricspb.MetricDescriptor_Type, +func buildIntMetric( + typ TargetMetricType, name string, - keys []string, - values []string, - point *metricspb.Point, -) *metricspb.Metric { - var labelKeys []*metricspb.LabelKey - if len(keys) > 0 { - labelKeys = make([]*metricspb.LabelKey, 0, len(keys)) - for _, key := range keys { - labelKeys = append(labelKeys, &metricspb.LabelKey{Key: key}) - } + attributes pcommon.Map, + timestamp int64, + value int64, +) pmetric.Metric { + m := pmetric.NewMetric() + m.SetName(name) + var dp pmetric.NumberDataPoint + if typ == CumulativeMetricType { + sum := m.SetEmptySum() + sum.SetIsMonotonic(true) + dp = sum.DataPoints().AppendEmpty() + } else { + dp = m.SetEmptyGauge().DataPoints().AppendEmpty() } - var labelValues []*metricspb.LabelValue - if len(values) > 0 { - labelValues = make([]*metricspb.LabelValue, 0, len(values)) - for _, value := range values { - labelValues = append(labelValues, &metricspb.LabelValue{ - Value: value, - HasValue: true, - }) - } + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(timestamp, 0))) + attributes.CopyTo(dp.Attributes()) + dp.SetIntValue(value) + return m +} + +func buildDoubleMetric( + typ TargetMetricType, + name string, + attributes map[string]any, + timestamp int64, + value float64, +) pmetric.Metric { + m := pmetric.NewMetric() + m.SetName(name) + var dp pmetric.NumberDataPoint + if typ == CumulativeMetricType { + sum := m.SetEmptySum() + sum.SetIsMonotonic(true) + dp = sum.DataPoints().AppendEmpty() + } else { + dp = m.SetEmptyGauge().DataPoints().AppendEmpty() } - return buildMetricForSinglePoint( - name, - typ, - labelKeys, - labelValues, - point) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(timestamp, 0))) + _ = dp.Attributes().FromRaw(attributes) + dp.SetDoubleValue(value) + return m } diff --git a/receiver/carbonreceiver/protocol/regex_parser.go b/receiver/carbonreceiver/protocol/regex_parser.go index b1d0f5add048..0cb93a910352 100644 --- a/receiver/carbonreceiver/protocol/regex_parser.go +++ b/receiver/carbonreceiver/protocol/regex_parser.go @@ -10,7 +10,7 @@ import ( "sort" "strings" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "go.opentelemetry.io/collector/pdata/pcommon" ) const ( @@ -165,27 +165,18 @@ func (rpp *regexPathParser) ParsePath(path string, parsedPath *ParsedPath) error ms := rule.compRegexp.FindStringSubmatch(path) nms := rule.compRegexp.SubexpNames() // regexp pre-computes this slice. metricNameLookup := map[string]string{} + attributes := pcommon.NewMap() - keys := make([]*metricspb.LabelKey, 0, len(nms)+len(rule.Labels)) - values := make([]*metricspb.LabelValue, 0, len(nms)+len(rule.Labels)) for i := 1; i < len(ms); i++ { if strings.HasPrefix(nms[i], metricNameCapturePrefix) { metricNameLookup[nms[i]] = ms[i] } else { - keys = append(keys, &metricspb.LabelKey{Key: nms[i][len(keyCapturePrefix):]}) - values = append(values, &metricspb.LabelValue{ - Value: ms[i], - HasValue: true, - }) + attributes.PutStr(nms[i][len(keyCapturePrefix):], ms[i]) } } for k, v := range rule.Labels { - keys = append(keys, &metricspb.LabelKey{Key: k}) - values = append(values, &metricspb.LabelValue{ - Value: v, - HasValue: true, - }) + attributes.PutStr(k, v) } var actualMetricName string @@ -206,8 +197,7 @@ func (rpp *regexPathParser) ParsePath(path string, parsedPath *ParsedPath) error } parsedPath.MetricName = actualMetricName - parsedPath.LabelKeys = keys - parsedPath.LabelValues = values + parsedPath.Attributes = attributes parsedPath.MetricType = TargetMetricType(rule.MetricType) return nil } diff --git a/receiver/carbonreceiver/protocol/regex_parser_test.go b/receiver/carbonreceiver/protocol/regex_parser_test.go index ee04662ea2e2..ef5f63eba547 100644 --- a/receiver/carbonreceiver/protocol/regex_parser_test.go +++ b/receiver/carbonreceiver/protocol/regex_parser_test.go @@ -6,9 +6,9 @@ package protocol import ( "testing" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" ) func TestRegexParserConfigBuildParser(t *testing.T) { @@ -112,57 +112,50 @@ func Test_regexParser_parsePath(t *testing.T) { name string path string wantName string - wantKeys []*metricspb.LabelKey - wantValues []*metricspb.LabelValue + wantAttributes pcommon.Map wantMetricType TargetMetricType wantErr bool }{ { - name: "no_rule_match", - path: "service_name.host01.rpc.duration.seconds", - wantName: "service_name.host01.rpc.duration.seconds", + name: "no_rule_match", + path: "service_name.host01.rpc.duration.seconds", + wantName: "service_name.host01.rpc.duration.seconds", + wantAttributes: pcommon.NewMap(), }, { name: "match_rule0", path: "service_name.host00.cpu.seconds", wantName: "cpu_seconds", - wantKeys: []*metricspb.LabelKey{ - {Key: "svc"}, - {Key: "host"}, - {Key: "k"}, - }, - wantValues: []*metricspb.LabelValue{ - {Value: "service_name", HasValue: true}, - {Value: "host00", HasValue: true}, - {Value: "v", HasValue: true}, - }, + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("svc", "service_name") + m.PutStr("host", "host00") + m.PutStr("k", "v") + return m + }(), }, { name: "match_rule1", path: "service_name.host01.rpc.count", wantName: "rpc", - wantKeys: []*metricspb.LabelKey{ - {Key: "svc"}, - {Key: "host"}, - }, - wantValues: []*metricspb.LabelValue{ - {Value: "service_name", HasValue: true}, - {Value: "host01", HasValue: true}, - }, + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("svc", "service_name") + m.PutStr("host", "host01") + return m + }(), wantMetricType: CumulativeMetricType, }, { name: "match_rule2", path: "svc_02.host02.avg.duration", wantName: "avgduration", - wantKeys: []*metricspb.LabelKey{ - {Key: "svc"}, - {Key: "host"}, - }, - wantValues: []*metricspb.LabelValue{ - {Value: "svc_02", HasValue: true}, - {Value: "host02", HasValue: true}, - }, + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("svc", "svc_02") + m.PutStr("host", "host02") + return m + }(), wantMetricType: GaugeMetricType, }, } @@ -177,8 +170,7 @@ func Test_regexParser_parsePath(t *testing.T) { } assert.Equal(t, tt.wantName, got.MetricName) - assert.Equal(t, tt.wantKeys, got.LabelKeys) - assert.Equal(t, tt.wantValues, got.LabelValues) + assert.Equal(t, tt.wantAttributes, got.Attributes) assert.Equal(t, tt.wantMetricType, got.MetricType) }) } @@ -186,8 +178,7 @@ func Test_regexParser_parsePath(t *testing.T) { var res struct { name string - keys []*metricspb.LabelKey - values []*metricspb.LabelValue + attributes pcommon.Map metricType TargetMetricType err error } @@ -226,8 +217,7 @@ func Benchmark_regexPathParser_ParsePath(b *testing.B) { got := ParsedPath{} err := rp.ParsePath(tests[0], &got) res.name = got.MetricName - res.keys = got.LabelKeys - res.values = got.LabelValues + res.attributes = got.Attributes res.metricType = got.MetricType res.err = err @@ -238,8 +228,7 @@ func Benchmark_regexPathParser_ParsePath(b *testing.B) { } res.name = got.MetricName - res.keys = got.LabelKeys - res.values = got.LabelValues + res.attributes = got.Attributes res.metricType = got.MetricType res.err = err } diff --git a/receiver/carbonreceiver/receiver_test.go b/receiver/carbonreceiver/receiver_test.go index 867f3d3c68b7..88d78bd0cac3 100644 --- a/receiver/carbonreceiver/receiver_test.go +++ b/receiver/carbonreceiver/receiver_test.go @@ -20,7 +20,6 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" - internaldata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport/client" @@ -231,11 +230,10 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) { mdd := sink.AllMetrics() require.Len(t, mdd, 1) - _, _, metrics := internaldata.ResourceMetricsToOC(mdd[0].ResourceMetrics().At(0)) - require.Len(t, metrics, 1) - assert.Equal(t, carbonMetric.Name, metrics[0].GetMetricDescriptor().GetName()) - tss := metrics[0].GetTimeseries() - require.Equal(t, 1, len(tss)) + require.Equal(t, 1, mdd[0].MetricCount()) + m := mdd[0].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + assert.Equal(t, carbonMetric.Name, m.Name()) + require.Equal(t, 1, m.Gauge().DataPoints().Len()) }) } } diff --git a/receiver/carbonreceiver/transport/server_test.go b/receiver/carbonreceiver/transport/server_test.go index a87d73dd8f73..f864e0555a3d 100644 --- a/receiver/carbonreceiver/transport/server_test.go +++ b/receiver/carbonreceiver/transport/server_test.go @@ -14,7 +14,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" - internaldata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport/client" ) @@ -92,9 +91,8 @@ func Test_Server_ListenAndServe(t *testing.T) { mdd := mc.AllMetrics() require.Len(t, mdd, 1) - _, _, metrics := internaldata.ResourceMetricsToOC(mdd[0].ResourceMetrics().At(0)) - require.Len(t, metrics, 1) - assert.Equal(t, "test.metric", metrics[0].GetMetricDescriptor().GetName()) + require.Equal(t, 1, mdd[0].MetricCount()) + assert.Equal(t, "test.metric", mdd[0].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Name()) }) } } diff --git a/receiver/carbonreceiver/transport/tcp_server.go b/receiver/carbonreceiver/transport/tcp_server.go index 36756668dc05..54e341521f48 100644 --- a/receiver/carbonreceiver/transport/tcp_server.go +++ b/receiver/carbonreceiver/transport/tcp_server.go @@ -14,11 +14,10 @@ import ( "sync" "time" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "go.opencensus.io/trace" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" - internaldata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" ) @@ -168,14 +167,16 @@ func (t *tcpServer) handleConnection( line := strings.TrimSpace(string(bytes)) if line != "" { numReceivedMetricPoints++ - var metric *metricspb.Metric + var metric pmetric.Metric metric, err = p.Parse(line) if err != nil { t.reporter.OnTranslationError(ctx, err) continue } - - err = nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(nil, nil, []*metricspb.Metric{metric})) + metrics := pmetric.NewMetrics() + newMetric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.MoveTo(newMetric) + err = nextConsumer.ConsumeMetrics(ctx, metrics) t.reporter.OnMetricsProcessed(ctx, numReceivedMetricPoints, err) if err != nil { // The protocol doesn't account for returning errors. diff --git a/receiver/carbonreceiver/transport/udp_server.go b/receiver/carbonreceiver/transport/udp_server.go index ff4254ae643e..fc58b1b1c659 100644 --- a/receiver/carbonreceiver/transport/udp_server.go +++ b/receiver/carbonreceiver/transport/udp_server.go @@ -12,10 +12,9 @@ import ( "strings" "sync" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" - internaldata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" ) @@ -92,7 +91,9 @@ func (u *udpServer) handlePacket( ) { ctx := u.reporter.OnDataReceived(context.Background()) var numReceivedMetricPoints int - var metrics []*metricspb.Metric + metrics := pmetric.NewMetrics() + sm := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() + buf := bytes.NewBuffer(data) for { bytes, err := buf.ReadBytes((byte)('\n')) @@ -110,11 +111,11 @@ func (u *udpServer) handlePacket( u.reporter.OnTranslationError(ctx, err) continue } - - metrics = append(metrics, metric) + newMetric := sm.Metrics().AppendEmpty() + metric.MoveTo(newMetric) } } - err := nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(nil, nil, metrics)) + err := nextConsumer.ConsumeMetrics(ctx, metrics) u.reporter.OnMetricsProcessed(ctx, numReceivedMetricPoints, err) } diff --git a/receiver/wavefrontreceiver/go.mod b/receiver/wavefrontreceiver/go.mod index 36b1694344a5..40ae94440ffe 100644 --- a/receiver/wavefrontreceiver/go.mod +++ b/receiver/wavefrontreceiver/go.mod @@ -3,9 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefr go 1.19 require ( - github.com/census-instrumentation/opencensus-proto v0.4.1 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.79.0 - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.79.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver v0.79.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver v0.79.0 github.com/stretchr/testify v1.8.4 @@ -13,11 +11,12 @@ require ( go.opentelemetry.io/collector/component v0.79.0 go.opentelemetry.io/collector/confmap v0.79.0 go.opentelemetry.io/collector/consumer v0.79.0 + go.opentelemetry.io/collector/pdata v1.0.0-rcv0012 go.opentelemetry.io/collector/receiver v0.79.0 - google.golang.org/protobuf v1.30.0 ) require ( + github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect @@ -30,11 +29,11 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.79.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.79.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/exporter v0.79.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012 // indirect - go.opentelemetry.io/collector/pdata v1.0.0-rcv0012 // indirect go.opentelemetry.io/collector/semconv v0.79.0 // indirect go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect @@ -47,6 +46,7 @@ require ( golang.org/x/text v0.9.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/grpc v1.55.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/receiver/wavefrontreceiver/receiver_test.go b/receiver/wavefrontreceiver/receiver_test.go index 869ef36faafe..592b76526646 100644 --- a/receiver/wavefrontreceiver/receiver_test.go +++ b/receiver/wavefrontreceiver/receiver_test.go @@ -11,16 +11,15 @@ import ( "testing" "time" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" - internaldata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus" ) func Test_wavefrontreceiver_EndToEnd(t *testing.T) { @@ -44,73 +43,73 @@ func Test_wavefrontreceiver_EndToEnd(t *testing.T) { tests := []struct { name string msg string - want []*metricspb.Metric + want []pmetric.Metric }{ { name: "single.line", msg: "single.metric 1 1582231120 source=e2e\n", - want: []*metricspb.Metric{ - buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: []pmetric.Metric{ + buildIntMetric( "single.metric", - []string{"source"}, - []string{"e2e"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582231120}, - Value: &metricspb.Point_Int64Value{Int64Value: 1}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "e2e") + return m + }(), + 1582231120, + 1, ), }, }, { name: "single.line.no.newline", msg: "single.metric 1 1582231120 source=e2e", - want: []*metricspb.Metric{ - buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: []pmetric.Metric{ + buildIntMetric( "single.metric", - []string{"source"}, - []string{"e2e"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582231120}, - Value: &metricspb.Point_Int64Value{Int64Value: 1}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "e2e") + return m + }(), + 1582231120, + 1, ), }, }, { name: "multiple.lines", msg: "m0 0 1582231120 source=s0\nm1 1 1582231121 source=s1\nm2 2 1582231122 source=s2\n", - want: []*metricspb.Metric{ - buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: []pmetric.Metric{ + buildIntMetric( "m0", - []string{"source"}, - []string{"s0"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582231120}, - Value: &metricspb.Point_Int64Value{Int64Value: 0}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "s0") + return m + }(), + 1582231120, + 0, ), - buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + buildIntMetric( "m1", - []string{"source"}, - []string{"s1"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582231121}, - Value: &metricspb.Point_Int64Value{Int64Value: 1}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "s1") + return m + }(), + 1582231121, + 1, ), - buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + buildIntMetric( "m2", - []string{"source"}, - []string{"s2"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582231122}, - Value: &metricspb.Point_Int64Value{Int64Value: 2}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "s2") + return m + }(), + 1582231122, + 2, ), }, }, @@ -133,14 +132,19 @@ func Test_wavefrontreceiver_EndToEnd(t *testing.T) { }, 10*time.Second, 5*time.Millisecond) metrics := sink.AllMetrics() - var gotOldMetrics []*metricspb.Metric + var gotMetrics []pmetric.Metric for _, md := range metrics { for i := 0; i < md.ResourceMetrics().Len(); i++ { - _, _, metrics := internaldata.ResourceMetricsToOC(md.ResourceMetrics().At(i)) - gotOldMetrics = append(gotOldMetrics, metrics...) + rm := md.ResourceMetrics().At(i) + for j := 0; j < rm.ScopeMetrics().Len(); j++ { + sm := rm.ScopeMetrics().At(j) + for k := 0; k < sm.Metrics().Len(); k++ { + gotMetrics = append(gotMetrics, sm.Metrics().At(k)) + } + } } } - assert.Equal(t, tt.want, gotOldMetrics) + assert.Equal(t, tt.want, gotMetrics) sink.Reset() } } diff --git a/receiver/wavefrontreceiver/wavefront_parser.go b/receiver/wavefrontreceiver/wavefront_parser.go index 646452ffa24f..1e2db7dfda03 100644 --- a/receiver/wavefrontreceiver/wavefront_parser.go +++ b/receiver/wavefrontreceiver/wavefront_parser.go @@ -9,8 +9,8 @@ import ( "strings" "time" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" - "google.golang.org/protobuf/types/known/timestamppb" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver" @@ -47,91 +47,74 @@ func (wp *WavefrontParser) BuildParser() (protocol.Parser, error) { // " [] source= [pointTags]" // // Detailed description of each element is available on the link above. -func (wp *WavefrontParser) Parse(line string) (*metricspb.Metric, error) { +func (wp *WavefrontParser) Parse(line string) (pmetric.Metric, error) { parts := strings.SplitN(line, " ", 3) if len(parts) < 3 { - return nil, fmt.Errorf("invalid wavefront metric [%s]", line) + return pmetric.Metric{}, fmt.Errorf("invalid wavefront metric [%s]", line) } metricName := unDoubleQuote(parts[0]) if metricName == "" { - return nil, fmt.Errorf("empty name for wavefront metric [%s]", line) + return pmetric.Metric{}, fmt.Errorf("empty name for wavefront metric [%s]", line) } valueStr := parts[1] rest := parts[2] - var metricType metricspb.MetricDescriptor_Type - var point metricspb.Point - if intVal, err := strconv.ParseInt(valueStr, 10, 64); err == nil { - metricType = metricspb.MetricDescriptor_GAUGE_INT64 - point.Value = &metricspb.Point_Int64Value{Int64Value: intVal} - } else { - dblVal, err := strconv.ParseFloat(valueStr, 64) - if err != nil { - return nil, fmt.Errorf("invalid wavefront metric value [%s]: %w", line, err) - } - metricType = metricspb.MetricDescriptor_GAUGE_DOUBLE - point.Value = &metricspb.Point_DoubleValue{DoubleValue: dblVal} - } - parts = strings.SplitN(rest, " ", 2) timestampStr := parts[0] var tags string if len(parts) == 2 { tags = parts[1] } - var ts timestamppb.Timestamp + var ts time.Time if unixTime, err := strconv.ParseInt(timestampStr, 10, 64); err == nil { - ts.Seconds = unixTime + ts = time.Unix(unixTime, 0) } else { // Timestamp can be omitted so it is only correct if the string was a tag. if strings.IndexByte(timestampStr, '=') == -1 { - return nil, fmt.Errorf( + return pmetric.Metric{}, fmt.Errorf( "invalid timestamp for wavefront metric [%s]", line) } // Assume timestamp was omitted, get current time and adjust index. - ts.Seconds = time.Now().Unix() + ts = time.Now() tags = rest } - point.Timestamp = &ts - var labelKeys []*metricspb.LabelKey - var labelValues []*metricspb.LabelValue + attributes := pcommon.NewMap() if tags != "" { // to need for special treatment for source, treat it as a normal tag since // tags are separated by space and are optionally double-quoted. - var err error - labelKeys, labelValues, err = buildLabels(tags) - if err != nil { - return nil, fmt.Errorf("invalid wavefront metric [%s]: %w", line, err) + + if err := buildLabels(attributes, tags); err != nil { + return pmetric.Metric{}, fmt.Errorf("invalid wavefront metric [%s]: %w", line, err) } } if wp.ExtractCollectdTags { - metricName, labelKeys, labelValues = wp.injectCollectDLabels(metricName, labelKeys, labelValues) + metricName = wp.injectCollectDLabels(metricName, attributes) } - - metric := &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: metricName, - Type: metricType, - LabelKeys: labelKeys, - }, - Timeseries: []*metricspb.TimeSeries{ - { - LabelValues: labelValues, - Points: []*metricspb.Point{&point}, - }, - }, + metric := pmetric.NewMetric() + metric.SetName(metricName) + dp := metric.SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.NewTimestampFromTime(ts)) + attributes.CopyTo(dp.Attributes()) + if intVal, err := strconv.ParseInt(valueStr, 10, 64); err == nil { + dp.SetIntValue(intVal) + } else { + dblVal, err := strconv.ParseFloat(valueStr, 64) + if err != nil { + return pmetric.Metric{}, fmt.Errorf("invalid wavefront metric value [%s]: %w", line, err) + } + dp.SetDoubleValue(dblVal) } + return metric, nil } func (wp *WavefrontParser) injectCollectDLabels( metricName string, - labelKeys []*metricspb.LabelKey, - labelValues []*metricspb.LabelValue, -) (string, []*metricspb.LabelKey, []*metricspb.LabelValue) { + attributes pcommon.Map, +) string { // This comes from SignalFx Gateway code that has the capability to // remove CollectD tags from the name of the metric. var toAddDims map[string]string @@ -147,24 +130,20 @@ func (wp *WavefrontParser) injectCollectDLabels( } for k, v := range toAddDims { - labelKeys = append(labelKeys, &metricspb.LabelKey{Key: k}) - labelValues = append(labelValues, &metricspb.LabelValue{ - Value: v, - HasValue: true, - }) + attributes.PutStr(k, v) } } - return metricName, labelKeys, labelValues + return metricName } -func buildLabels(tags string) (keys []*metricspb.LabelKey, values []*metricspb.LabelValue, err error) { +func buildLabels(attributes pcommon.Map, tags string) (err error) { if tags == "" { return } for { parts := strings.SplitN(tags, "=", 2) if len(parts) != 2 { - return nil, nil, fmt.Errorf("failed to break key for [%s]", tags) + return fmt.Errorf("failed to break key for [%s]", tags) } key := parts[0] @@ -203,11 +182,7 @@ func buildLabels(tags string) (keys []*metricspb.LabelKey, values []*metricspb.L value = rest[:i] tagLen += i } - - keys = append(keys, &metricspb.LabelKey{Key: key}) - values = append(values, &metricspb.LabelValue{ - Value: value, - HasValue: true}) + attributes.PutStr(key, value) tags = strings.TrimLeft(tags[tagLen:], " ") if tags == "" { diff --git a/receiver/wavefrontreceiver/wavefront_parser_test.go b/receiver/wavefrontreceiver/wavefront_parser_test.go index ebe1091c41c4..20aca6cc4b1a 100644 --- a/receiver/wavefrontreceiver/wavefront_parser_test.go +++ b/receiver/wavefrontreceiver/wavefront_parser_test.go @@ -8,97 +8,84 @@ import ( "testing" "time" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/types/known/timestamppb" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" ) func Test_buildLabels(t *testing.T) { tests := []struct { - name string - tags string - wantKeys []*metricspb.LabelKey - wantValues []*metricspb.LabelValue - wantErr bool + name string + tags string + wantAttributes pcommon.Map + wantErr bool }{ { - name: "empty_tags", + name: "empty_tags", + wantAttributes: pcommon.NewMap(), }, { - name: "only_source", - tags: "source=test", - wantKeys: []*metricspb.LabelKey{{Key: "source"}}, - wantValues: []*metricspb.LabelValue{ - {Value: "test", HasValue: true}, - }, + name: "only_source", + tags: "source=test", + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "test") + return m + }(), }, { name: "no_quotes", tags: "source=tst k0=v0 k1=v1", - wantKeys: []*metricspb.LabelKey{ - {Key: "source"}, - {Key: "k0"}, - {Key: "k1"}, - }, - wantValues: []*metricspb.LabelValue{ - {Value: "tst", HasValue: true}, - {Value: "v0", HasValue: true}, - {Value: "v1", HasValue: true}, - }, + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "tst") + m.PutStr("k0", "v0") + m.PutStr("k1", "v1") + return m + }(), }, { name: "end_with_quotes", tags: "source=\"tst escape\\\" tst\" x=\"tst spc\"", - wantKeys: []*metricspb.LabelKey{ - {Key: "source"}, - {Key: "x"}, - }, - wantValues: []*metricspb.LabelValue{ - {Value: "tst escape\" tst", HasValue: true}, - {Value: "tst spc", HasValue: true}, - }, + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "tst escape\" tst") + m.PutStr("x", "tst spc") + return m + }(), }, { name: "multiple_escapes", tags: "source=\"tst\\\"\\ntst\\\"\" bgn=\"\nb\" mid=\"tst\\nspc\" end=\"e\n\"", - wantKeys: []*metricspb.LabelKey{ - {Key: "source"}, - {Key: "bgn"}, - {Key: "mid"}, - {Key: "end"}, - }, - wantValues: []*metricspb.LabelValue{ - {Value: "tst\"\ntst\"", HasValue: true}, - {Value: "\nb", HasValue: true}, - {Value: "tst\nspc", HasValue: true}, - {Value: "e\n", HasValue: true}, - }, + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "tst\"\ntst\"") + m.PutStr("bgn", "\nb") + m.PutStr("mid", "tst\nspc") + m.PutStr("end", "e\n") + return m + }(), }, { name: "missing_tagValue", tags: "k0=0 k1= k2=2", - wantKeys: []*metricspb.LabelKey{ - {Key: "k0"}, - {Key: "k1"}, - {Key: "k2"}, - }, - wantValues: []*metricspb.LabelValue{ - {Value: "0", HasValue: true}, - {Value: "", HasValue: true}, - {Value: "2", HasValue: true}, - }, + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("k0", "0") + m.PutStr("k1", "") + m.PutStr("k2", "2") + return m + }(), }, { name: "empty_tagValue", tags: "k0=0 k1=\"\"", - wantKeys: []*metricspb.LabelKey{ - {Key: "k0"}, - {Key: "k1"}, - }, - wantValues: []*metricspb.LabelValue{ - {Value: "0", HasValue: true}, - {Value: "", HasValue: true}, - }, + wantAttributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("k0", "0") + m.PutStr("k1", "") + return m + }(), }, { name: "no_tag", @@ -108,9 +95,11 @@ func Test_buildLabels(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotKeys, gotValues, err := buildLabels(tt.tags) - assert.Equal(t, tt.wantKeys, gotKeys) - assert.Equal(t, tt.wantValues, gotValues) + gotAttributes := pcommon.NewMap() + err := buildLabels(gotAttributes, tt.tags) + if !tt.wantErr { + assert.Equal(t, tt.wantAttributes, gotAttributes) + } assert.Equal(t, tt.wantErr, err != nil) }) } @@ -121,127 +110,133 @@ func Test_wavefrontParser_Parse(t *testing.T) { line string extractCollectDTags bool missingTimestamp bool - want *metricspb.Metric + want pmetric.Metric wantErr bool }{ { line: "no.tags 1 1582230020", - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: buildIntMetric( "no.tags", - nil, - nil, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582230020}, - Value: &metricspb.Point_Int64Value{Int64Value: 1}, - }, + pcommon.NewMap(), + 1582230020, + 1, ), }, { line: "\"/and,\" 1 1582230020 source=tst", - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: buildIntMetric( "/and,", - []string{"source"}, - []string{"tst"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582230020}, - Value: &metricspb.Point_Int64Value{Int64Value: 1}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "tst") + return m + }(), + 1582230020, + 1, ), }, { line: "tst.int 1 1582230020 source=tst", - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: buildIntMetric( "tst.int", - []string{"source"}, - []string{"tst"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582230020}, - Value: &metricspb.Point_Int64Value{Int64Value: 1}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "tst") + return m + }(), + 1582230020, + 1, ), }, { line: "tst.dbl 3.14 source=tst k0=v0", missingTimestamp: true, - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_DOUBLE, + want: buildDoubleMetric( "tst.dbl", - []string{"source", "k0"}, - []string{"tst", "v0"}, - &metricspb.Point{ - Value: &metricspb.Point_DoubleValue{DoubleValue: 3.14}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "tst") + m.PutStr("k0", "v0") + return m + }(), + 0, + 3.14, ), }, { line: "tst.int.3tags 128 1582230020 k0=v_0 k1=v_1 k2=v_2", - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: buildIntMetric( "tst.int.3tags", - []string{"k0", "k1", "k2"}, - []string{"v_0", "v_1", "v_2"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582230020}, - Value: &metricspb.Point_Int64Value{Int64Value: 128}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("k0", "v_0") + m.PutStr("k1", "v_1") + m.PutStr("k2", "v_2") + return m + }(), + 1582230020, + 128, ), }, { line: "tst.int.1tag 1.23 1582230020 k0=v_0", - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_DOUBLE, + want: buildDoubleMetric( "tst.int.1tag", - []string{"k0"}, - []string{"v_0"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582230020}, - Value: &metricspb.Point_DoubleValue{DoubleValue: 1.23}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("k0", "v_0") + return m + }(), + 1582230020, + 1.23, ), }, { line: "collectd.[cdk=cdv].tags 1 source=tst k0=v0", missingTimestamp: true, extractCollectDTags: true, - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: buildIntMetric( "collectd.tags", - []string{"source", "k0", "cdk"}, - []string{"tst", "v0", "cdv"}, - &metricspb.Point{ - Value: &metricspb.Point_Int64Value{Int64Value: 1}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "tst") + m.PutStr("k0", "v0") + m.PutStr("cdk", "cdv") + return m + }(), + 0, + 1, ), }, { line: "mult.[cdk0=cdv0].collectd.[cdk1=cdv1].groups 1 1582230020 source=tst", extractCollectDTags: true, - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: buildIntMetric( "mult.collectd.groups", - []string{"source", "cdk0", "cdk1"}, - []string{"tst", "cdv0", "cdv1"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582230020}, - Value: &metricspb.Point_Int64Value{Int64Value: 1}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "tst") + m.PutStr("cdk0", "cdv0") + m.PutStr("cdk1", "cdv1") + return m + }(), + 1582230020, + 1, ), }, { line: "collectd.last[cdk0=cdv0] 1 1582230020 source=tst", extractCollectDTags: true, - want: buildMetric( - metricspb.MetricDescriptor_GAUGE_INT64, + want: buildIntMetric( "collectd.last", - []string{"source", "cdk0"}, - []string{"tst", "cdv0"}, - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{Seconds: 1582230020}, - Value: &metricspb.Point_Int64Value{Int64Value: 1}, - }, + func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("source", "tst") + m.PutStr("cdk0", "cdv0") + return m + }(), + 1582230020, + 1, ), }, { @@ -274,11 +269,11 @@ func Test_wavefrontParser_Parse(t *testing.T) { // The timestamp was actually generated by the parser. // Assert that it is within a certain range around now. unixNow := time.Now().Unix() - ts := got.Timeseries[0].Points[0].Timestamp - assert.LessOrEqual(t, ts.GetSeconds(), time.Now().Unix()) - assert.LessOrEqual(t, math.Abs(float64(ts.GetSeconds()-unixNow)), 2.0) + ts := got.Gauge().DataPoints().At(0).Timestamp().AsTime() + assert.LessOrEqual(t, ts, time.Now()) + assert.LessOrEqual(t, math.Abs(float64(ts.Unix()-unixNow)), 2.0) // Copy returned timestamp so asserts below can succeed. - tt.want.Timeseries[0].Points[0].Timestamp = ts + tt.want.Gauge().DataPoints().At(0).SetTimestamp(pcommon.NewTimestampFromTime(ts)) } assert.Equal(t, tt.want, got) assert.Equal(t, tt.wantErr, err != nil) @@ -286,41 +281,34 @@ func Test_wavefrontParser_Parse(t *testing.T) { } } -func buildMetric( - typ metricspb.MetricDescriptor_Type, +func buildDoubleMetric( name string, - keys []string, - values []string, - point *metricspb.Point, -) *metricspb.Metric { - var labelKeys []*metricspb.LabelKey - if len(keys) > 0 { - labelKeys = make([]*metricspb.LabelKey, 0, len(keys)) - for _, key := range keys { - labelKeys = append(labelKeys, &metricspb.LabelKey{Key: key}) - } - } - var labelValues []*metricspb.LabelValue - if len(values) > 0 { - labelValues = make([]*metricspb.LabelValue, 0, len(values)) - for _, value := range values { - labelValues = append(labelValues, &metricspb.LabelValue{ - Value: value, - HasValue: true, - }) - } - } - return &metricspb.Metric{ - MetricDescriptor: &metricspb.MetricDescriptor{ - Name: name, - Type: typ, - LabelKeys: labelKeys, - }, - Timeseries: []*metricspb.TimeSeries{ - { - LabelValues: labelValues, - Points: []*metricspb.Point{point}, - }, - }, - } + attributes pcommon.Map, + ts int64, + dblVal float64, +) pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName(name) + g := metric.SetEmptyGauge() + dp := g.DataPoints().AppendEmpty() + dp.SetDoubleValue(dblVal) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(ts, 0))) + attributes.CopyTo(dp.Attributes()) + return metric +} + +func buildIntMetric( + name string, + attributes pcommon.Map, + ts int64, + intVal int64, +) pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName(name) + g := metric.SetEmptyGauge() + dp := g.DataPoints().AppendEmpty() + dp.SetIntValue(intVal) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(ts, 0))) + attributes.CopyTo(dp.Attributes()) + return metric }