From afdd09e072e6644fae38a5241b95796f880d40fa Mon Sep 17 00:00:00 2001 From: alexgreenbank Date: Fri, 26 Jul 2024 16:26:43 +0100 Subject: [PATCH] [chore] [receiver/datadog] Add support for Service Checks Signed-off-by: alexgreenbank --- .../translator/service_check_translator.go | 48 +++ .../service_check_translator_test.go | 325 ++++++++++++++++++ receiver/datadogreceiver/receiver.go | 32 +- receiver/datadogreceiver/receiver_test.go | 60 ++++ 4 files changed, 462 insertions(+), 3 deletions(-) create mode 100644 receiver/datadogreceiver/internal/translator/service_check_translator.go create mode 100644 receiver/datadogreceiver/internal/translator/service_check_translator_test.go diff --git a/receiver/datadogreceiver/internal/translator/service_check_translator.go b/receiver/datadogreceiver/internal/translator/service_check_translator.go new file mode 100644 index 000000000000..f3f0fa12d3fb --- /dev/null +++ b/receiver/datadogreceiver/internal/translator/service_check_translator.go @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator" + +import ( + "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +type ServiceCheck struct { + Check string `json:"check"` + HostName string `json:"host_name"` + Status datadogV1.ServiceCheckStatus `json:"status"` + Timestamp int64 `json:"timestamp,omitempty"` + Tags []string `json:"tags,omitempty"` +} + +// More information on Datadog service checks: https://docs.datadoghq.com/api/latest/service-checks/ +func (mt *MetricsTranslator) TranslateServices(services []ServiceCheck) pmetric.Metrics { + bt := newBatcher() + bt.Metrics = pmetric.NewMetrics() + + for _, service := range services { + metricProperties := parseSeriesProperties("service_check", "service_check", service.Tags, service.HostName, mt.buildInfo.Version, mt.stringPool) + metric, metricID := bt.Lookup(metricProperties) // TODO(alexg): proper name + + dps := metric.Gauge().DataPoints() + dps.EnsureCapacity(1) + + dp := dps.AppendEmpty() + dp.SetTimestamp(pcommon.Timestamp(service.Timestamp * 1_000_000_000)) // OTel uses nanoseconds, while Datadog uses seconds + metricProperties.dpAttrs.CopyTo(dp.Attributes()) + dp.SetIntValue(int64(service.Status)) + + // TODO(alexg): Do this stream thing for service check metrics? + stream := identity.OfStream(metricID, dp) + ts, ok := mt.streamHasTimestamp(stream) + if ok { + dp.SetStartTimestamp(ts) + } + mt.updateLastTsForStream(stream, dp.Timestamp()) + } + return bt.Metrics +} diff --git a/receiver/datadogreceiver/internal/translator/service_check_translator_test.go b/receiver/datadogreceiver/internal/translator/service_check_translator_test.go new file mode 100644 index 000000000000..ad5b29411194 --- /dev/null +++ b/receiver/datadogreceiver/internal/translator/service_check_translator_test.go @@ -0,0 +1,325 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translator + +import ( + "encoding/json" + "testing" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +var ( + testTimestamp = int64(1700000000) +) + +func TestHandleStructureParsing(t *testing.T) { + tests := []struct { + name string + checkRunPayload []byte + expectedServices []ServiceCheck + }{ + { + name: "happy", + checkRunPayload: []byte(`[ + { + "check": "datadog.agent.check_status", + "host_name": "hosta", + "status": 0, + "message": "", + "tags": [ + "check:container" + ] + }, + { + "check": "app.working", + "host_name": "hosta", + "timestamp": 1700000000, + "status": 0, + "message": "", + "tags": null + }, + { + "check": "env.test", + "host_name": "hosta", + "status": 0, + "message": "", + "tags": [ + "env:argle", "foo:bargle" + ] + } + ]`), + expectedServices: []ServiceCheck{ + { + Check: "datadog.agent.check_status", + HostName: "hosta", + Status: 0, + Tags: []string{"check:container"}, + }, + { + Check: "app.working", + HostName: "hosta", + Status: 0, + Timestamp: 1700000000, + }, + { + Check: "env.test", + HostName: "hosta", + Status: 0, + Tags: []string{"env:argle", "foo:bargle"}, + }, + }, + }, + { + name: "happy no tags", + checkRunPayload: []byte(`[ + { + "check": "app.working", + "host_name": "hosta", + "timestamp": 1700000000, + "status": 0, + "message": "", + "tags": null + } + ]`), + expectedServices: []ServiceCheck{ + { + Check: "app.working", + HostName: "hosta", + Status: 0, + Timestamp: 1700000000, + }, + }, + }, + { + name: "happy no timestamp", + checkRunPayload: []byte(`[ + { + "check": "env.test", + "host_name": "hosta", + "status": 0, + "message": "", + "tags": [ + "env:argle", "foo:bargle" + ] + } + ]`), + expectedServices: []ServiceCheck{ + { + Check: "env.test", + HostName: "hosta", + Status: 0, + Tags: []string{"env:argle", "foo:bargle"}, + }, + }, + }, + { + name: "empty", + checkRunPayload: []byte(`[]`), + expectedServices: []ServiceCheck{}, + }, + { + name: "happy no hostname", + checkRunPayload: []byte(`[ + { + "check": "env.test", + "status": 0, + "message": "", + "tags": [ + "env:argle", "foo:bargle" + ] + } + ]`), + expectedServices: []ServiceCheck{ + { + Check: "env.test", + Status: 0, + Tags: []string{"env:argle", "foo:bargle"}, + }, + }, + }, + { + name: "empty", + checkRunPayload: []byte(`[]`), + expectedServices: []ServiceCheck{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var services []ServiceCheck + err := json.Unmarshal(tt.checkRunPayload, &services) + require.NoError(t, err, "Failed to unmarshal service payload JSON") + assert.Equal(t, tt.expectedServices, services, "Parsed series does not match expected series") + }) + } +} + +func TestTranslateCheckRun(t *testing.T) { + tests := []struct { + name string + services []ServiceCheck + expect func(t *testing.T, result pmetric.Metrics) + }{ + { + name: "OK status, with TS, no tags, no hostname", + services: []ServiceCheck{ + { + Check: "app.working", + Timestamp: 1700000000, + Status: datadogV1.SERVICECHECKSTATUS_OK, + Tags: []string{}, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + expectedResourceAttrs, expectedScopeAttrs, expectedDpAttrs := tagsToAttributes([]string{}, "", newStringPool()) + requireResourceMetrics(t, result, expectedResourceAttrs, 1) + require.Equal(t, 1, result.MetricCount()) + require.Equal(t, 1, result.DataPointCount()) + + requireScopeMetrics(t, result, 1, 1) + + requireScope(t, result, expectedScopeAttrs, "otelcol/datadogreceiver", component.NewDefaultBuildInfo().Version) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + requireGauge(t, metric, "service_check", 1) + + dp := metric.Gauge().DataPoints().At(0) + requireDp(t, dp, expectedDpAttrs, 1700000000, 0) + }, + }, + { + name: "OK status, no TS", + services: []ServiceCheck{ + { + Check: "app.working", + HostName: "foo", + Status: datadogV1.SERVICECHECKSTATUS_OK, + Tags: []string{"env:tag1", "version:tag2"}, + }, + }, + expect: func(t *testing.T, result pmetric.Metrics) { + expectedResourceAttrs, expectedScopeAttrs, expectedDpAttrs := tagsToAttributes([]string{"env:tag1", "version:tag2"}, "foo", newStringPool()) + requireResourceMetrics(t, result, expectedResourceAttrs, 1) + require.Equal(t, 1, result.MetricCount()) + require.Equal(t, 1, result.DataPointCount()) + + requireScopeMetrics(t, result, 1, 1) + + requireScope(t, result, expectedScopeAttrs, "otelcol/datadogreceiver", component.NewDefaultBuildInfo().Version) + + metric := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + requireGauge(t, metric, "service_check", 1) + + dp := metric.Gauge().DataPoints().At(0) + requireDp(t, dp, expectedDpAttrs, 0, 0) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := createMetricsTranslator() + mt.buildInfo = component.BuildInfo{ + Command: "otelcol", + Description: "OpenTelemetry Collector", + Version: "latest", + } + result := mt.TranslateServices(tt.services) + + tt.expect(t, result) + }) + } +} + +func TestTranslateCheckRunStatuses(t *testing.T) { + tests := []struct { + name string + services []ServiceCheck + expectedStatus int64 + }{ + { + name: "OK status, no TS", + services: []ServiceCheck{ + { + Check: "app.working", + HostName: "foo", + Status: datadogV1.SERVICECHECKSTATUS_OK, + Tags: []string{"env:tag1", "version:tag2"}, + }, + }, + expectedStatus: 0, + }, + { + name: "Warning status", + services: []ServiceCheck{ + { + Check: "app.warning", + HostName: "foo", + Status: datadogV1.SERVICECHECKSTATUS_WARNING, + Tags: []string{"env:tag1", "version:tag2"}, + Timestamp: testTimestamp, + }, + }, + expectedStatus: 1, + }, + { + name: "Critical status", + services: []ServiceCheck{ + { + Check: "app.critical", + HostName: "foo", + Status: datadogV1.SERVICECHECKSTATUS_CRITICAL, + Tags: []string{"env:tag1", "version:tag2"}, + Timestamp: testTimestamp, + }, + }, + expectedStatus: 2, + }, + { + name: "Unknown status", + services: []ServiceCheck{ + { + Check: "app.unknown", + HostName: "foo", + Status: datadogV1.SERVICECHECKSTATUS_UNKNOWN, + Tags: []string{"env:tag1", "version:tag2"}, + Timestamp: testTimestamp, + }, + }, + expectedStatus: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := createMetricsTranslator() + mt.buildInfo = component.BuildInfo{ + Command: "otelcol", + Description: "OpenTelemetry Collector", + Version: "latest", + } + result := mt.TranslateServices(tt.services) + + require.Equal(t, 1, result.MetricCount()) + require.Equal(t, 1, result.DataPointCount()) + + requireScopeMetrics(t, result, 1, 1) + + requireScope(t, result, pcommon.NewMap(), "otelcol/datadogreceiver", component.NewDefaultBuildInfo().Version) + + metrics := result.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + for i := 0; i < metrics.Len(); i++ { + metric := metrics.At(i) + assert.Equal(t, tt.expectedStatus, metric.Gauge().DataPoints().At(0).IntValue()) + } + }) + } +} diff --git a/receiver/datadogreceiver/receiver.go b/receiver/datadogreceiver/receiver.go index a00b7edd54eb..a93a515a60c0 100644 --- a/receiver/datadogreceiver/receiver.go +++ b/receiver/datadogreceiver/receiver.go @@ -199,9 +199,35 @@ func (ddr *datadogReceiver) handleCheckRun(w http.ResponseWriter, req *http.Requ ddr.tReceiver.EndMetricsOp(obsCtx, "datadog", *metricsCount, err) }(&metricsCount) - err = fmt.Errorf("service checks endpoint not implemented") - http.Error(w, err.Error(), http.StatusMethodNotAllowed) - ddr.params.Logger.Warn("metrics consumer errored out", zap.Error(err)) + buf := translator.GetBuffer() + defer translator.PutBuffer(buf) + if _, err = io.Copy(buf, req.Body); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + ddr.params.Logger.Error(err.Error()) + return + } + + var services []translator.ServiceCheck + + err = json.Unmarshal(buf.Bytes(), &services) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + ddr.params.Logger.Error(err.Error()) + return + } + + metrics := ddr.metricsTranslator.TranslateServices(services) + metricsCount = metrics.DataPointCount() + + err = ddr.nextMetricsConsumer.ConsumeMetrics(obsCtx, metrics) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + ddr.params.Logger.Error("metrics consumer errored out", zap.Error(err)) + return + } + + w.WriteHeader(http.StatusAccepted) + _, _ = w.Write([]byte("OK")) } // handleSketches handles sketches, the underlying data structure of distributions https://docs.datadoghq.com/metrics/distributions/ diff --git a/receiver/datadogreceiver/receiver_test.go b/receiver/datadogreceiver/receiver_test.go index f39377d1f07d..9e9e29994acd 100644 --- a/receiver/datadogreceiver/receiver_test.go +++ b/receiver/datadogreceiver/receiver_test.go @@ -168,3 +168,63 @@ func TestDatadogMetricsV1_EndToEnd(t *testing.T) { expectedEnvironment, _ := metric.Sum().DataPoints().At(0).Attributes().Get("environment") assert.Equal(t, "test", expectedEnvironment.AsString()) } + +func TestDatadogServices_EndToEnd(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:0" // Using a randomly assigned address + sink := new(consumertest.MetricsSink) + + dd, err := newDataDogReceiver( + cfg, + receivertest.NewNopCreateSettings(), + ) + require.NoError(t, err, "Must not error when creating receiver") + dd.(*datadogReceiver).nextMetricsConsumer = sink + + require.NoError(t, dd.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, dd.Shutdown(context.Background())) + }() + + servicesPayload := []byte(`[ + { + "check": "app.working", + "host_name": "hosta", + "status": 2, + "tags": ["environment:test"] + } + ]`) + + req, err := http.NewRequest( + http.MethodPost, + fmt.Sprintf("http://%s/api/v1/check_run", dd.(*datadogReceiver).address), + io.NopCloser(bytes.NewReader(servicesPayload)), + ) + require.NoError(t, err, "Must not error when creating request") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err, "Must not error performing request") + + body, err := io.ReadAll(resp.Body) + require.NoError(t, multierr.Combine(err, resp.Body.Close()), "Must not error when reading body") + require.Equal(t, string(body), "OK", "Expected response to be 'OK', got %s", string(body)) + require.Equal(t, http.StatusAccepted, resp.StatusCode) + + mds := sink.AllMetrics() + require.Len(t, mds, 1) + got := mds[0] + require.Equal(t, 1, got.ResourceMetrics().Len()) + metrics := got.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + assert.Equal(t, 1, metrics.Len()) + metric := metrics.At(0) + assert.Equal(t, pmetric.MetricTypeGauge, metric.Type()) + dps := metric.Gauge().DataPoints() + assert.Equal(t, 1, dps.Len()) + dp := dps.At(0) + assert.Equal(t, int64(2), dp.IntValue()) + assert.Equal(t, 1, dp.Attributes().Len()) + environment, _ := dp.Attributes().Get("environment") + assert.Equal(t, "test", environment.AsString()) + hostName, _ := got.ResourceMetrics().At(0).Resource().Attributes().Get("host.name") + assert.Equal(t, "hosta", hostName.AsString()) +}