Skip to content

Commit

Permalink
[receiver/signalfx] Accept otlp metrics (#31008)
Browse files Browse the repository at this point in the history
**Description:** Accept OTLP payloads on /v2/datapoint api of the SignalFx receiver

**Link to tracking Issue:** #26298
  • Loading branch information
jinja2 authored Feb 9, 2024
1 parent 8ccadc3 commit c0ce352
Show file tree
Hide file tree
Showing 3 changed files with 353 additions and 73 deletions.
27 changes: 27 additions & 0 deletions .chloggen/signalfx-recv-otlp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/signalfx

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Accept otlp protobuf requests when content-type is "application/x-protobuf;format=otlp"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31052]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
130 changes: 86 additions & 44 deletions receiver/signalfxreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
Expand All @@ -32,9 +34,11 @@ import (
const (
defaultServerTimeout = 20 * time.Second

responseOK = "OK"
responseInvalidMethod = "Only \"POST\" method is supported"
responseInvalidContentType = "\"Content-Type\" must be \"application/x-protobuf\""
responseOK = "OK"
responseInvalidMethod = "Only \"POST\" method is supported"
responseEventsInvalidContentType = "\"Content-Type\" must be \"application/x-protobuf\""

responseInvalidContentType = "\"Content-Type\" must be either \"application/x-protobuf\" or \"application/x-protobuf;format=otlp\""
responseInvalidEncoding = "\"Content-Encoding\" must be \"gzip\" or empty"
responseErrGzipReader = "Error on gzip body"
responseErrReadBody = "Failed to read message body"
Expand All @@ -45,22 +49,24 @@ const (

// Centralizing some HTTP and related string constants.
protobufContentType = "application/x-protobuf"
otlpProtobufContentType = "application/x-protobuf;format=otlp"
gzipEncoding = "gzip"
httpContentTypeHeader = "Content-Type"
httpContentEncodingHeader = "Content-Encoding"
)

var (
okRespBody = initJSONResponse(responseOK)
invalidMethodRespBody = initJSONResponse(responseInvalidMethod)
invalidContentRespBody = initJSONResponse(responseInvalidContentType)
invalidEncodingRespBody = initJSONResponse(responseInvalidEncoding)
errGzipReaderRespBody = initJSONResponse(responseErrGzipReader)
errReadBodyRespBody = initJSONResponse(responseErrReadBody)
errUnmarshalBodyRespBody = initJSONResponse(responseErrUnmarshalBody)
errNextConsumerRespBody = initJSONResponse(responseErrNextConsumer)
errLogsNotConfigured = initJSONResponse(responseErrLogsNotConfigured)
errMetricsNotConfigured = initJSONResponse(responseErrMetricsNotConfigured)
okRespBody = initJSONResponse(responseOK)
invalidMethodRespBody = initJSONResponse(responseInvalidMethod)
invalidContentRespBody = initJSONResponse(responseInvalidContentType)
invalidEventsContentRespBody = initJSONResponse(responseEventsInvalidContentType)
invalidEncodingRespBody = initJSONResponse(responseInvalidEncoding)
errGzipReaderRespBody = initJSONResponse(responseErrGzipReader)
errReadBodyRespBody = initJSONResponse(responseErrReadBody)
errUnmarshalBodyRespBody = initJSONResponse(responseErrUnmarshalBody)
errNextConsumerRespBody = initJSONResponse(responseErrNextConsumer)
errLogsNotConfigured = initJSONResponse(responseErrLogsNotConfigured)
errMetricsNotConfigured = initJSONResponse(responseErrMetricsNotConfigured)

translator = &signalfx.ToTranslator{}
)
Expand Down Expand Up @@ -166,16 +172,6 @@ func (r *sfxReceiver) Shutdown(context.Context) error {
}

func (r *sfxReceiver) readBody(ctx context.Context, resp http.ResponseWriter, req *http.Request) ([]byte, bool) {
if req.Method != http.MethodPost {
r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBody, nil)
return nil, false
}

if req.Header.Get(httpContentTypeHeader) != protobufContentType {
r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidContentRespBody, nil)
return nil, false
}

encoding := req.Header.Get(httpContentEncodingHeader)
if encoding != "" && encoding != gzipEncoding {
r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, nil)
Expand Down Expand Up @@ -221,40 +217,64 @@ func (r *sfxReceiver) handleDatapointReq(resp http.ResponseWriter, req *http.Req
return
}

body, ok := r.readBody(ctx, resp, req)
if !ok {
if req.Method != http.MethodPost {
r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBody, nil)
return
}

msg := &sfxpb.DataPointUploadMessage{}
if err := msg.Unmarshal(body); err != nil {
r.failRequest(ctx, resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err)
otlpFormat := false
switch req.Header.Get(httpContentTypeHeader) {
case protobufContentType:
case otlpProtobufContentType:
otlpFormat = true
default:
r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidContentRespBody, nil)
return
}

if len(msg.Datapoints) == 0 {
r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, nil)
_, _ = resp.Write(okRespBody)
body, ok := r.readBody(ctx, resp, req)
if !ok {
return
}

md, err := translator.ToMetrics(msg.Datapoints)
if err != nil {
r.settings.Logger.Debug("SignalFx conversion error", zap.Error(err))
}
r.settings.Logger.Debug("Handling metrics data")

if r.config.AccessTokenPassthrough {
if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" {
for i := 0; i < md.ResourceMetrics().Len(); i++ {
rm := md.ResourceMetrics().At(i)
res := rm.Resource()
res.Attributes().PutStr(splunk.SFxAccessTokenLabel, accessToken)
}
var md pmetric.Metrics

if otlpFormat {
r.settings.Logger.Debug("Received request is in OTLP format")
otlpreq := pmetricotlp.NewExportRequest()
if err := otlpreq.UnmarshalProto(body); err != nil {
r.settings.Logger.Debug("OTLP data unmarshalling failed", zap.Error(err))
r.failRequest(ctx, resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err)
return
}
md = otlpreq.Metrics()
} else {
msg := &sfxpb.DataPointUploadMessage{}
err := msg.Unmarshal(body)
if err != nil {
r.failRequest(ctx, resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err)
return
}

md, err = translator.ToMetrics(msg.Datapoints)
if err != nil {
r.settings.Logger.Debug("SignalFx conversion error", zap.Error(err))
}
}

dataPointCount := md.DataPointCount()
if dataPointCount == 0 {
r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, nil)
_, _ = resp.Write(okRespBody)
return
}

err = r.metricsConsumer.ConsumeMetrics(ctx, md)
r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), len(msg.Datapoints), err)
r.addAccessTokenLabel(md, req)

err := r.metricsConsumer.ConsumeMetrics(ctx, md)
r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), dataPointCount, err)

r.writeResponse(ctx, resp, err)
}
Expand All @@ -267,6 +287,16 @@ func (r *sfxReceiver) handleEventReq(resp http.ResponseWriter, req *http.Request
return
}

if req.Method != http.MethodPost {
r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBody, nil)
return
}

if req.Header.Get(httpContentTypeHeader) != protobufContentType {
r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidEventsContentRespBody, nil)
return
}

body, ok := r.readBody(ctx, resp, req)
if !ok {
return
Expand Down Expand Up @@ -336,6 +366,18 @@ func (r *sfxReceiver) failRequest(
)
}

func (r *sfxReceiver) addAccessTokenLabel(md pmetric.Metrics, req *http.Request) {
if r.config.AccessTokenPassthrough {
if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" {
for i := 0; i < md.ResourceMetrics().Len(); i++ {
rm := md.ResourceMetrics().At(i)
res := rm.Resource()
res.Attributes().PutStr(splunk.SFxAccessTokenLabel, accessToken)
}
}
}
}

func initJSONResponse(s string) []byte {
respBody, err := json.Marshal(s)
if err != nil {
Expand Down
Loading

0 comments on commit c0ce352

Please sign in to comment.