diff --git a/.chloggen/prwreceiver-parselabels.yaml b/.chloggen/prwreceiver-parselabels.yaml new file mode 100644 index 000000000000..3157f8d8165e --- /dev/null +++ b/.chloggen/prwreceiver-parselabels.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Parse labels from Prometheus Remote Write requests into Resource and Metric Attributes. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35656] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Warning - The HTTP Server still doesn't pass metrics to the next consumer. The component is unusable for now. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index 58198033f158..d2fd3a25422f 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -3,8 +3,10 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/promet go 1.22.0 require ( + github.com/cespare/xxhash/v2 v2.3.0 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.114.0 github.com/prometheus/prometheus v0.54.1 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8 @@ -29,7 +31,6 @@ require ( github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect github.com/aws/aws-sdk-go v1.54.19 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dennwc/varint v1.0.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -56,6 +57,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.115.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect @@ -105,3 +107,9 @@ require ( k8s.io/klog/v2 v2.130.1 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 61195af2fa3c..007d5a08199e 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -12,13 +12,16 @@ import ( "strings" "time" + "github.com/cespare/xxhash/v2" "github.com/gogo/protobuf/proto" promconfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/labels" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" promremote "github.com/prometheus/prometheus/storage/remote" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap/zapcore" @@ -150,8 +153,113 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco } // translateV2 translates a v2 remote-write request into OTLP metrics. -// For now translateV2 is not implemented and returns an empty metrics. +// translate is not feature complete. // nolint -func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { - return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil +func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { + var ( + badRequestErrors error + otelMetrics = pmetric.NewMetrics() + labelsBuilder = labels.NewScratchBuilder(0) + stats = promremote.WriteResponseStats{} + // Prometheus Remote-Write can send multiple time series with the same labels in the same request. + // Instead of creating a whole new OTLP metric, we just append the new sample to the existing OTLP metric. + // This cache is called "intra" because in the future we'll have a "interRequestCache" to cache resourceAttributes + // between requests based on the metric "target_info". + intraRequestCache = make(map[uint64]pmetric.ResourceMetrics) + ) + + for _, ts := range req.Timeseries { + ls := ts.ToLabels(&labelsBuilder, req.Symbols) + + if !ls.Has(labels.MetricName) { + badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("missing metric name in labels")) + continue + } else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate { + badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel)) + continue + } + + var rm pmetric.ResourceMetrics + hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) + intraCacheEntry, ok := intraRequestCache[hashedLabels] + if ok { + // We found the same time series in the same request, so we should append to the same OTLP metric. + rm = intraCacheEntry + } else { + rm = otelMetrics.ResourceMetrics().AppendEmpty() + parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) + intraRequestCache[hashedLabels] = rm + } + + switch ts.Metadata.Type { + case writev2.Metadata_METRIC_TYPE_COUNTER: + addCounterDatapoints(rm, ls, ts) + case writev2.Metadata_METRIC_TYPE_GAUGE: + addGaugeDatapoints(rm, ls, ts) + case writev2.Metadata_METRIC_TYPE_SUMMARY: + addSummaryDatapoints(rm, ls, ts) + case writev2.Metadata_METRIC_TYPE_HISTOGRAM: + addHistogramDatapoints(rm, ls, ts) + default: + badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, ls.Get(labels.MetricName))) + } + } + + return otelMetrics, stats, badRequestErrors +} + +// parseJobAndInstance turns the job and instance labels service resource attributes. +// Following the specification at https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/ +func parseJobAndInstance(dest pcommon.Map, job, instance string) { + if instance != "" { + dest.PutStr("service.instance.id", instance) + } + if job != "" { + parts := strings.Split(job, "/") + if len(parts) == 2 { + dest.PutStr("service.namespace", parts[0]) + dest.PutStr("service.name", parts[1]) + return + } + dest.PutStr("service.name", job) + } +} + +func addCounterDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { + // TODO: Implement this function +} + +func addGaugeDatapoints(rm pmetric.ResourceMetrics, ls labels.Labels, ts writev2.TimeSeries) { + // TODO: Cache metric name+type+unit and look up cache before creating new empty metric. + // In OTel name+type+unit is the unique identifier of a metric and we should not create + // a new metric if it already exists. + + // TODO: Check if Scope is already present by comparing labels "otel_scope_name" and "otel_scope_version" + // with Scope.Name and Scope.Version. If it is present, we should append to the existing Scope. + m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge() + addDatapoints(m.DataPoints(), ls, ts) +} + +func addSummaryDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { + // TODO: Implement this function +} + +func addHistogramDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { + // TODO: Implement this function +} + +// addDatapoints adds the labels to the datapoints attributes. +// TODO: We're still not handling several fields that make a datapoint complete, e.g. StartTimestamp, +// Timestamp, Value, etc. +func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, _ writev2.TimeSeries) { + attributes := datapoints.AppendEmpty().Attributes() + + for _, l := range ls { + if l.Name == "instance" || l.Name == "job" || // Become resource attributes "service.name", "service.instance.id" and "service.namespace" + l.Name == labels.MetricName || // Becomes metric name + l.Name == "otel_scope_name" || l.Name == "otel_scope_version" { // Becomes scope name and version + continue + } + attributes.PutStr(l.Name, l.Value) + } } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 8c5c9e659cfc..c1e46602452d 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -14,13 +14,38 @@ import ( "github.com/golang/snappy" promconfig "github.com/prometheus/prometheus/config" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/prometheus/prometheus/storage/remote" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) -func setupServer(t *testing.T) { +var writeV2RequestFixture = &writev2.Request{ + Symbols: []string{"", "__name__", "test_metric1", "job", "service-x/test", "instance", "107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. Should use the same resource metrics. + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 9, 5, 10, 7, 8, 9, 10}, // This series has different label values for job and instance. + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + }, +} + +func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver { t.Helper() factory := NewFactory() @@ -30,6 +55,13 @@ func setupServer(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, prwReceiver, "metrics receiver creation failed") + return prwReceiver.(*prometheusRemoteWriteReceiver) +} + +func setupServer(t *testing.T) { + t.Helper() + + prwReceiver := setupMetricsReceiver(t) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -98,3 +130,89 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) { }) } } + +func TestTranslateV2(t *testing.T) { + prwReceiver := setupMetricsReceiver(t) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + for _, tc := range []struct { + name string + request *writev2.Request + expectError string + expectedMetrics pmetric.Metrics + expectedStats remote.WriteResponseStats + }{ + { + name: "missing metric name", + request: &writev2.Request{ + Symbols: []string{"", "foo", "bar"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + }, + }, + expectError: "missing metric name in labels", + }, + { + name: "duplicate label", + request: &writev2.Request{ + Symbols: []string{"", "__name__", "test"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 1, 2}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + }, + }, + expectError: `duplicate label "__name__" in labels`, + }, + { + name: "valid request", + request: writeV2RequestFixture, + expectedMetrics: func() pmetric.Metrics { + expected := pmetric.NewMetrics() + rm1 := expected.ResourceMetrics().AppendEmpty() + rmAttributes1 := rm1.Resource().Attributes() + rmAttributes1.PutStr("service.namespace", "service-x") + rmAttributes1.PutStr("service.name", "test") + rmAttributes1.PutStr("service.instance.id", "107cn001") + sm1 := rm1.ScopeMetrics().AppendEmpty() + sm1Attributes := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() + sm1Attributes.PutStr("d", "e") + sm1Attributes.PutStr("foo", "bar") + // Since we don't check "scope_name" and "scope_version", we end up with duplicated scope metrics for repeated series. + // TODO: Properly handle scope metrics. + sm2 := rm1.ScopeMetrics().AppendEmpty() + sm2Attributes := sm2.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() + sm2Attributes.PutStr("d", "e") + sm2Attributes.PutStr("foo", "bar") + + rm2 := expected.ResourceMetrics().AppendEmpty() + rmAttributes2 := rm2.Resource().Attributes() + rmAttributes2.PutStr("service.name", "foo") + rmAttributes2.PutStr("service.instance.id", "bar") + mAttributes2 := rm2.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() + mAttributes2.PutStr("d", "e") + mAttributes2.PutStr("foo", "bar") + + return expected + }(), + expectedStats: remote.WriteResponseStats{}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + metrics, stats, err := prwReceiver.translateV2(ctx, tc.request) + if tc.expectError != "" { + assert.ErrorContains(t, err, tc.expectError) + return + } + + assert.NoError(t, err) + assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics)) + assert.Equal(t, tc.expectedStats, stats) + }) + } +}