Skip to content

Commit

Permalink
intake api implemeneted (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
naman-jain-15 authored May 17, 2024
1 parent a783dcc commit 3a15d63
Show file tree
Hide file tree
Showing 3 changed files with 724 additions and 10 deletions.
101 changes: 95 additions & 6 deletions receiver/datadogmetricreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/receiver"
"io"
"net/http"
"strings"

metricsV2 "github.com/DataDog/agent-payload/v5/gogen"
processv1 "github.com/DataDog/agent-payload/v5/process"
Expand Down Expand Up @@ -95,6 +97,34 @@ type MetaDataPayload struct {
UUID string `json:"uuid"`
}

type IntakePayload struct {
GohaiPayload string `json:"gohai"`
Meta *Meta `json:"meta"`
ContainerMeta map[string]string `json:"container-meta,omitempty"`
}

type Meta struct {
SocketHostname string `json:"socket-hostname"`
Timezones []string `json:"timezones"`
SocketFqdn string `json:"socket-fqdn"`
EC2Hostname string `json:"ec2-hostname"`
Hostname string `json:"hostname"`
HostAliases []string `json:"host_aliases"`
InstanceID string `json:"instance-id"`
AgentHostname string `json:"agent-hostname,omitempty"`
ClusterName string `json:"cluster-name,omitempty"`
}

type GoHaiData struct {
FileSystem []FileInfo `json:"filesystem"`
}

type FileInfo struct {
KbSize string `json:"kb_size"`
MountedOn string `json:"mounted_on"`
Name string `json:"name"`
}

func newdatadogmetricreceiver(config *Config, nextConsumer consumer.Metrics, params receiver.CreateSettings) (receiver.Metrics, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
Expand All @@ -120,11 +150,12 @@ func (ddr *datadogmetricreceiver) Start(_ context.Context, host component.Host)
ddmux := http.NewServeMux()
ddmux.HandleFunc("/api/v2/series", ddr.handleV2Series)
ddmux.HandleFunc("/api/v1/metadata", ddr.handleMetaData)
ddmux.HandleFunc("/intake", ddr.handleIntake)
ddmux.HandleFunc("/intake/", ddr.handleIntake)
ddmux.HandleFunc("/api/v1/validate", ddr.handleValidate)
ddmux.HandleFunc("/api/v1/series", ddr.handleV2Series)
ddmux.HandleFunc("/api/v1/collector", ddr.handleCollector)
ddmux.HandleFunc("/api/v1/check_run", ddr.handleCheckRun)
ddmux.HandleFunc("/api/v1/connections", ddr.handleConnections)

var err error
ddr.server, err = ddr.config.HTTPServerSettings.ToServer(
Expand Down Expand Up @@ -243,8 +274,60 @@ func (ddr *datadogmetricreceiver) handleV2Series(w http.ResponseWriter, req *htt
}

func (ddr *datadogmetricreceiver) handleIntake(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{}`)
origin := req.Header.Get("Origin")
key := req.Header.Get(datadogAPIKeyHeader)

body, ok := readAndCloseBody(w, req)
if !ok {
http.Error(w, "error in reading request body", http.StatusBadRequest)
return
}
var otlpReq pmetricotlp.ExportRequest

var err error
var intake IntakePayload
if err = json.Unmarshal(body, &intake); err != nil {
fmt.Println("error unmarshalling intake payload:", err)
http.Error(w, "error in unmarshaling json", http.StatusBadRequest)
return
}

// Unmarshal Gohai FileDatapayload from IntakePayload
var gohai GoHaiData
if err = json.Unmarshal([]byte(intake.GohaiPayload), &gohai); err != nil {
http.Error(w, "error in unmarshaling json", http.StatusBadRequest)
return
}

if intake.Meta.Hostname == "" {
http.Error(w, "HostName not found", http.StatusBadRequest)
return
}

hostname := intake.Meta.Hostname

otlpReq, err = getOtlpExportReqFromDatadogIntakeData(origin, key, gohai, struct {
hostname string
containerInfo map[string]string
milliseconds int64
}{
hostname: hostname,
containerInfo: intake.ContainerMeta,
milliseconds: (time.Now().UnixNano() / int64(time.Millisecond)) * 1000000,
})

if err != nil {
http.Error(w, "error in metadata getOtlpExportReqFromDatadogV1MetaData", http.StatusBadRequest)
return
}
obsCtx := ddr.tReceiver.StartLogsOp(req.Context())
errs := ddr.nextConsumer.ConsumeMetrics(obsCtx, otlpReq.Metrics())
if errs != nil {
http.Error(w, "Logs consumer errored out", http.StatusInternalServerError)
ddr.params.Logger.Error("Logs consumer errored out")
} else {
_, _ = w.Write([]byte("OK"))
}
}

func (ddr *datadogmetricreceiver) handleCheckRun(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -289,6 +372,12 @@ func (ddr *datadogmetricreceiver) handleMetaData(w http.ResponseWriter, req *htt
}
}

func (ddr *datadogmetricreceiver) handleConnections(w http.ResponseWriter, req *http.Request) {
// TODO Implement translation flow if any connection related info required in future
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"valid":true}`)
}

func (ddr *datadogmetricreceiver) handleCollector(w http.ResponseWriter, req *http.Request) {
origin := req.Header.Get("Origin")
key := req.Header.Get(datadogAPIKeyHeader)
Expand Down
106 changes: 102 additions & 4 deletions receiver/datadogmetricreceiver/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ package datadogmetricreceiver // import "github.com/open-telemetry/opentelemetry
import (
"errors"
"fmt"
"log"
"math"
"reflect"
"strconv"
"strings"
"time"

metricsV2 "github.com/DataDog/agent-payload/v5/gogen"
processv1 "github.com/DataDog/agent-payload/v5/process"
metricsV1 "github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"math"
"reflect"
"strings"
"time"
)

type commonResourceAttributes struct {
Expand Down Expand Up @@ -452,3 +455,98 @@ func getOtlpExportReqFromDatadogProcessesData(origin string, key string,
}
return pmetricotlp.NewExportRequestFromMetrics(metrics), nil
}

func convertSize(sizeInKB float64) string {
units := []string{"K", "M", "G"}
unitIndex := 0

size := sizeInKB
for size >= 1024 && unitIndex < len(units)-1 {
size /= 1024
unitIndex++
}

return fmt.Sprintf("%.2f%s", size, units[unitIndex])
}

func getOtlpExportReqFromDatadogIntakeData(origin string, key string,
ddReq GoHaiData, input struct {
hostname string
containerInfo map[string]string
milliseconds int64
}) (pmetricotlp.ExportRequest, error) {
// assumption is that host is same for all the metrics in a given request

if len(ddReq.FileSystem) == 0 {
log.Println("no metadata found so skipping")
return pmetricotlp.ExportRequest{}, ErrNoMetricsInPayload
}

metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics()
rm := resourceMetrics.AppendEmpty()
resourceAttributes := rm.Resource().Attributes()

// assumption is that host is same for all the metrics in a given request
var metricHost string
metricHost = input.hostname

commonResourceAttributes := commonResourceAttributes{
origin: origin,
ApiKey: key,
mwSource: "datadog",
host: metricHost,
}
setMetricResourceAttributes(resourceAttributes, commonResourceAttributes)

scopeMetrics := rm.ScopeMetrics().AppendEmpty()
instrumentationScope := scopeMetrics.Scope()
instrumentationScope.SetName("mw")
instrumentationScope.SetVersion("v0.0.1")

for _, fileData := range ddReq.FileSystem {

scopeMetric := scopeMetrics.Metrics().AppendEmpty()
scopeMetric.SetName("system.intake.metadata")
//scopeMetric.SetUnit(s.GetUnit())

floatVal, err := strconv.ParseFloat(fileData.KbSize, 64)
if err != nil {
log.Println("error converting string to float64")
return pmetricotlp.ExportRequest{}, err
}

metricAttributes := pcommon.NewMap()
str := fileData.Name + " mounted on " + fileData.MountedOn + " " + convertSize(floatVal)
metricAttributes.PutStr("FILESYSTEM", str)

if docker_swarm, ok := input.containerInfo["docker_swarm"]; ok {
metricAttributes.PutStr("docker_swarm", docker_swarm)
}

if docker_version, ok := input.containerInfo["docker_version"]; ok {
metricAttributes.PutStr("docker_version", docker_version)
}

if kubelet_version, ok := input.containerInfo["kubelet_version"]; ok {
metricAttributes.PutStr("kubelet_version", kubelet_version)
}

// current time in millis
// currentTime := time.Now()
// milliseconds := (currentTime.UnixNano() / int64(time.Millisecond)) * 1000000

var dataPoints pmetric.NumberDataPointSlice
gauge := scopeMetric.SetEmptyGauge()
dataPoints = gauge.DataPoints()

dp := dataPoints.AppendEmpty()
dp.SetTimestamp(pcommon.Timestamp(input.milliseconds))

dp.SetDoubleValue(1.0) // setting a dummy value for this metric as only resource attribute needed
attributeMap := dp.Attributes()
metricAttributes.CopyTo(attributeMap)
}

return pmetricotlp.NewExportRequestFromMetrics(metrics), nil
}
Loading

0 comments on commit 3a15d63

Please sign in to comment.