From c0ce352e04e276654b1ac47aacefeeffe94e8197 Mon Sep 17 00:00:00 2001 From: Jina Jain Date: Thu, 8 Feb 2024 22:36:57 -0800 Subject: [PATCH] [receiver/signalfx] Accept otlp metrics (#31008) **Description:** Accept OTLP payloads on /v2/datapoint api of the SignalFx receiver **Link to tracking Issue:** #26298 --- .chloggen/signalfx-recv-otlp.yaml | 27 +++ receiver/signalfxreceiver/receiver.go | 130 ++++++---- receiver/signalfxreceiver/receiver_test.go | 269 ++++++++++++++++++--- 3 files changed, 353 insertions(+), 73 deletions(-) create mode 100755 .chloggen/signalfx-recv-otlp.yaml diff --git a/.chloggen/signalfx-recv-otlp.yaml b/.chloggen/signalfx-recv-otlp.yaml new file mode 100755 index 000000000000..c95e78a093ae --- /dev/null +++ b/.chloggen/signalfx-recv-otlp.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/signalfx + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Accept otlp protobuf requests when content-type is "application/x-protobuf;format=otlp" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31052] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/signalfxreceiver/receiver.go b/receiver/signalfxreceiver/receiver.go index 049850fffc93..077614782af3 100644 --- a/receiver/signalfxreceiver/receiver.go +++ b/receiver/signalfxreceiver/receiver.go @@ -20,6 +20,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" @@ -32,9 +34,11 @@ import ( const ( defaultServerTimeout = 20 * time.Second - responseOK = "OK" - responseInvalidMethod = "Only \"POST\" method is supported" - responseInvalidContentType = "\"Content-Type\" must be \"application/x-protobuf\"" + responseOK = "OK" + responseInvalidMethod = "Only \"POST\" method is supported" + responseEventsInvalidContentType = "\"Content-Type\" must be \"application/x-protobuf\"" + + responseInvalidContentType = "\"Content-Type\" must be either \"application/x-protobuf\" or \"application/x-protobuf;format=otlp\"" responseInvalidEncoding = "\"Content-Encoding\" must be \"gzip\" or empty" responseErrGzipReader = "Error on gzip body" responseErrReadBody = "Failed to read message body" @@ -45,22 +49,24 @@ const ( // Centralizing some HTTP and related string constants. protobufContentType = "application/x-protobuf" + otlpProtobufContentType = "application/x-protobuf;format=otlp" gzipEncoding = "gzip" httpContentTypeHeader = "Content-Type" httpContentEncodingHeader = "Content-Encoding" ) var ( - okRespBody = initJSONResponse(responseOK) - invalidMethodRespBody = initJSONResponse(responseInvalidMethod) - invalidContentRespBody = initJSONResponse(responseInvalidContentType) - invalidEncodingRespBody = initJSONResponse(responseInvalidEncoding) - errGzipReaderRespBody = initJSONResponse(responseErrGzipReader) - errReadBodyRespBody = initJSONResponse(responseErrReadBody) - errUnmarshalBodyRespBody = initJSONResponse(responseErrUnmarshalBody) - errNextConsumerRespBody = initJSONResponse(responseErrNextConsumer) - errLogsNotConfigured = initJSONResponse(responseErrLogsNotConfigured) - errMetricsNotConfigured = initJSONResponse(responseErrMetricsNotConfigured) + okRespBody = initJSONResponse(responseOK) + invalidMethodRespBody = initJSONResponse(responseInvalidMethod) + invalidContentRespBody = initJSONResponse(responseInvalidContentType) + invalidEventsContentRespBody = initJSONResponse(responseEventsInvalidContentType) + invalidEncodingRespBody = initJSONResponse(responseInvalidEncoding) + errGzipReaderRespBody = initJSONResponse(responseErrGzipReader) + errReadBodyRespBody = initJSONResponse(responseErrReadBody) + errUnmarshalBodyRespBody = initJSONResponse(responseErrUnmarshalBody) + errNextConsumerRespBody = initJSONResponse(responseErrNextConsumer) + errLogsNotConfigured = initJSONResponse(responseErrLogsNotConfigured) + errMetricsNotConfigured = initJSONResponse(responseErrMetricsNotConfigured) translator = &signalfx.ToTranslator{} ) @@ -166,16 +172,6 @@ func (r *sfxReceiver) Shutdown(context.Context) error { } func (r *sfxReceiver) readBody(ctx context.Context, resp http.ResponseWriter, req *http.Request) ([]byte, bool) { - if req.Method != http.MethodPost { - r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBody, nil) - return nil, false - } - - if req.Header.Get(httpContentTypeHeader) != protobufContentType { - r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidContentRespBody, nil) - return nil, false - } - encoding := req.Header.Get(httpContentEncodingHeader) if encoding != "" && encoding != gzipEncoding { r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, nil) @@ -221,40 +217,64 @@ func (r *sfxReceiver) handleDatapointReq(resp http.ResponseWriter, req *http.Req return } - body, ok := r.readBody(ctx, resp, req) - if !ok { + if req.Method != http.MethodPost { + r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBody, nil) return } - msg := &sfxpb.DataPointUploadMessage{} - if err := msg.Unmarshal(body); err != nil { - r.failRequest(ctx, resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err) + otlpFormat := false + switch req.Header.Get(httpContentTypeHeader) { + case protobufContentType: + case otlpProtobufContentType: + otlpFormat = true + default: + r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidContentRespBody, nil) return } - if len(msg.Datapoints) == 0 { - r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, nil) - _, _ = resp.Write(okRespBody) + body, ok := r.readBody(ctx, resp, req) + if !ok { return } - md, err := translator.ToMetrics(msg.Datapoints) - if err != nil { - r.settings.Logger.Debug("SignalFx conversion error", zap.Error(err)) - } + r.settings.Logger.Debug("Handling metrics data") - if r.config.AccessTokenPassthrough { - if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" { - for i := 0; i < md.ResourceMetrics().Len(); i++ { - rm := md.ResourceMetrics().At(i) - res := rm.Resource() - res.Attributes().PutStr(splunk.SFxAccessTokenLabel, accessToken) - } + var md pmetric.Metrics + + if otlpFormat { + r.settings.Logger.Debug("Received request is in OTLP format") + otlpreq := pmetricotlp.NewExportRequest() + if err := otlpreq.UnmarshalProto(body); err != nil { + r.settings.Logger.Debug("OTLP data unmarshalling failed", zap.Error(err)) + r.failRequest(ctx, resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err) + return } + md = otlpreq.Metrics() + } else { + msg := &sfxpb.DataPointUploadMessage{} + err := msg.Unmarshal(body) + if err != nil { + r.failRequest(ctx, resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err) + return + } + + md, err = translator.ToMetrics(msg.Datapoints) + if err != nil { + r.settings.Logger.Debug("SignalFx conversion error", zap.Error(err)) + } + } + + dataPointCount := md.DataPointCount() + if dataPointCount == 0 { + r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, nil) + _, _ = resp.Write(okRespBody) + return } - err = r.metricsConsumer.ConsumeMetrics(ctx, md) - r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), len(msg.Datapoints), err) + r.addAccessTokenLabel(md, req) + + err := r.metricsConsumer.ConsumeMetrics(ctx, md) + r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), dataPointCount, err) r.writeResponse(ctx, resp, err) } @@ -267,6 +287,16 @@ func (r *sfxReceiver) handleEventReq(resp http.ResponseWriter, req *http.Request return } + if req.Method != http.MethodPost { + r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBody, nil) + return + } + + if req.Header.Get(httpContentTypeHeader) != protobufContentType { + r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidEventsContentRespBody, nil) + return + } + body, ok := r.readBody(ctx, resp, req) if !ok { return @@ -336,6 +366,18 @@ func (r *sfxReceiver) failRequest( ) } +func (r *sfxReceiver) addAccessTokenLabel(md pmetric.Metrics, req *http.Request) { + if r.config.AccessTokenPassthrough { + if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" { + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rm := md.ResourceMetrics().At(i) + res := rm.Resource() + res.Attributes().PutStr(splunk.SFxAccessTokenLabel, accessToken) + } + } + } +} + func initJSONResponse(s string) []byte { respBody, err := json.Marshal(s) if err != nil { diff --git a/receiver/signalfxreceiver/receiver_test.go b/receiver/signalfxreceiver/receiver_test.go index 081673a5d202..e3b2dcab2b64 100644 --- a/receiver/signalfxreceiver/receiver_test.go +++ b/receiver/signalfxreceiver/receiver_test.go @@ -185,6 +185,10 @@ func Test_sfxReceiver_handleReq(t *testing.T) { currentTime := time.Now().Unix() * 1e3 sFxMsg := buildSFxDatapointMsg(currentTime, 13, 3) + otlpContentHeader := "application/x-protobuf;format=otlp" + otlpMetrics := buildOtlpMetrics(5) + marshaler := &pmetric.ProtoMarshaler{} + tests := []struct { name string req *http.Request @@ -227,7 +231,7 @@ func Test_sfxReceiver_handleReq(t *testing.T) { }, }, { - name: "incorrect_content_encoding", + name: "incorrect_content_encoding_sfx", req: func() *http.Request { req := httptest.NewRequest("POST", "http://localhost", nil) req.Header.Set("Content-Type", "application/x-protobuf") @@ -240,7 +244,20 @@ func Test_sfxReceiver_handleReq(t *testing.T) { }, }, { - name: "fail_to_read_body", + name: "incorrect_content_encoding_otlp", + req: func() *http.Request { + req := httptest.NewRequest("POST", "http://localhost", nil) + req.Header.Set("Content-Type", otlpContentHeader) + req.Header.Set("Content-Encoding", "superzipper") + return req + }(), + assertResponse: func(t *testing.T, status int, body string) { + assert.Equal(t, http.StatusUnsupportedMediaType, status) + assert.Equal(t, responseInvalidEncoding, body) + }, + }, + { + name: "fail_to_read_body_sfx", req: func() *http.Request { req := httptest.NewRequest("POST", "http://localhost", nil) req.Body = badReqBody{} @@ -253,7 +270,20 @@ func Test_sfxReceiver_handleReq(t *testing.T) { }, }, { - name: "bad_data_in_body", + name: "fail_to_read_body_otlp", + req: func() *http.Request { + req := httptest.NewRequest("POST", "http://localhost", nil) + req.Body = badReqBody{} + req.Header.Set("Content-Type", otlpContentHeader) + return req + }(), + assertResponse: func(t *testing.T, status int, body string) { + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, responseErrReadBody, body) + }, + }, + { + name: "bad_data_in_body_sfx", req: func() *http.Request { req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader([]byte{1, 2, 3, 4})) req.Header.Set("Content-Type", "application/x-protobuf") @@ -265,7 +295,19 @@ func Test_sfxReceiver_handleReq(t *testing.T) { }, }, { - name: "empty_body", + name: "bad_data_in_body_otlp", + req: func() *http.Request { + req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader([]byte{1, 2, 3, 4})) + req.Header.Set("Content-Type", otlpContentHeader) + return req + }(), + assertResponse: func(t *testing.T, status int, body string) { + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, responseErrUnmarshalBody, body) + }, + }, + { + name: "empty_body_sfx", req: func() *http.Request { req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(nil)) req.Header.Set("Content-Type", "application/x-protobuf") @@ -277,7 +319,19 @@ func Test_sfxReceiver_handleReq(t *testing.T) { }, }, { - name: "msg_accepted", + name: "empty_body_otlp", + req: func() *http.Request { + req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(nil)) + req.Header.Set("Content-Type", otlpContentHeader) + return req + }(), + assertResponse: func(t *testing.T, status int, body string) { + assert.Equal(t, http.StatusOK, status) + assert.Equal(t, responseOK, body) + }, + }, + { + name: "msg_accepted_sfx", req: func() *http.Request { msgBytes, err := sFxMsg.Marshal() require.NoError(t, err) @@ -291,18 +345,26 @@ func Test_sfxReceiver_handleReq(t *testing.T) { }, }, { - name: "msg_accepted_gzipped", + name: "msg_accepted_otlp", req: func() *http.Request { - msgBytes, err := sFxMsg.Marshal() + msgBytes, err := marshaler.MarshalMetrics(*otlpMetrics) require.NoError(t, err) - - var buf bytes.Buffer - gzipWriter := gzip.NewWriter(&buf) - _, err = gzipWriter.Write(msgBytes) + req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(msgBytes)) + req.Header.Set("Content-Type", otlpContentHeader) + return req + }(), + assertResponse: func(t *testing.T, status int, body string) { + assert.Equal(t, http.StatusOK, status) + assert.Equal(t, responseOK, body) + }, + }, + { + name: "msg_accepted_gzipped_sfx", + req: func() *http.Request { + msgBytes, err := sFxMsg.Marshal() require.NoError(t, err) - require.NoError(t, gzipWriter.Close()) - - req := httptest.NewRequest("POST", "http://localhost", &buf) + msgBytes = compressGzip(t, msgBytes) + req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(msgBytes)) req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("Content-Encoding", "gzip") return req @@ -313,7 +375,23 @@ func Test_sfxReceiver_handleReq(t *testing.T) { }, }, { - name: "bad_gzipped_msg", + name: "msg_accepted_gzipped_otlp", + req: func() *http.Request { + msgBytes, err := marshaler.MarshalMetrics(*otlpMetrics) + require.NoError(t, err) + msgBytes = compressGzip(t, msgBytes) + req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(msgBytes)) + req.Header.Set("Content-Type", otlpContentHeader) + req.Header.Set("Content-Encoding", "gzip") + return req + }(), + assertResponse: func(t *testing.T, status int, body string) { + assert.Equal(t, http.StatusOK, status) + assert.Equal(t, responseOK, body) + }, + }, + { + name: "bad_gzipped_msg_sfx", req: func() *http.Request { msgBytes, err := sFxMsg.Marshal() require.NoError(t, err) @@ -328,6 +406,22 @@ func Test_sfxReceiver_handleReq(t *testing.T) { assert.Equal(t, responseErrGzipReader, body) }, }, + { + name: "bad_gzipped_msg_otlp", + req: func() *http.Request { + msgBytes, err := marshaler.MarshalMetrics(*otlpMetrics) + require.NoError(t, err) + + req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(msgBytes)) + req.Header.Set("Content-Type", otlpContentHeader) + req.Header.Set("Content-Encoding", "gzip") + return req + }(), + assertResponse: func(t *testing.T, status int, body string) { + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, responseErrGzipReader, body) + }, + }, } for _, tt := range tests { @@ -344,7 +438,6 @@ func Test_sfxReceiver_handleReq(t *testing.T) { resp := w.Result() respBytes, err := io.ReadAll(resp.Body) - defer resp.Body.Close() assert.NoError(t, err) var bodyStr string @@ -395,12 +488,12 @@ func Test_sfxReceiver_handleEventReq(t *testing.T) { name: "incorrect_content_type", req: func() *http.Request { req := httptest.NewRequest("POST", "http://localhost", nil) - req.Header.Set("Content-Type", "application/not-protobuf") + req.Header.Set("Content-Type", "application/x-protobuf;format=otlp") return req }(), assertResponse: func(t *testing.T, status int, body string) { assert.Equal(t, http.StatusUnsupportedMediaType, status) - assert.Equal(t, responseInvalidContentType, body) + assert.Equal(t, responseEventsInvalidContentType, body) }, }, { @@ -472,14 +565,8 @@ func Test_sfxReceiver_handleEventReq(t *testing.T) { req: func() *http.Request { msgBytes, err := sFxMsg.Marshal() require.NoError(t, err) - - var buf bytes.Buffer - gzipWriter := gzip.NewWriter(&buf) - _, err = gzipWriter.Write(msgBytes) - require.NoError(t, err) - require.NoError(t, gzipWriter.Close()) - - req := httptest.NewRequest("POST", "http://localhost", &buf) + msgBytes = compressGzip(t, msgBytes) + req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(msgBytes)) req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("Content-Encoding", "gzip") return req @@ -620,26 +707,55 @@ func Test_sfxReceiver_DatapointAccessTokenPassthrough(t *testing.T) { name string passthrough bool token string + otlp bool }{ { name: "No token provided and passthrough false", passthrough: false, token: "", + otlp: false, }, { name: "No token provided and passthrough true", passthrough: true, token: "", + otlp: false, }, { name: "token provided and passthrough false", passthrough: false, token: "myToken", + otlp: false, }, { name: "token provided and passthrough true", passthrough: true, token: "myToken", + otlp: false, + }, + { + name: "No token provided and passthrough false for OTLP payload", + passthrough: false, + token: "", + otlp: true, + }, + { + name: "No token provided and passthrough true for OTLP payload", + passthrough: true, + token: "", + otlp: true, + }, + { + name: "token provided and passthrough false for OTLP payload", + passthrough: false, + token: "myToken", + otlp: true, + }, + { + name: "token provided and passthrough true for OTLP payload", + passthrough: true, + token: "myToken", + otlp: true, }, } @@ -655,10 +771,21 @@ func Test_sfxReceiver_DatapointAccessTokenPassthrough(t *testing.T) { rcv.RegisterMetricsConsumer(sink) currentTime := time.Now().Unix() * 1e3 - sFxMsg := buildSFxDatapointMsg(currentTime, 13, 3) - msgBytes, _ := sFxMsg.Marshal() + + var msgBytes []byte + var contentHeader string + if tt.otlp { + marshaler := &pmetric.ProtoMarshaler{} + msgBytes, err = marshaler.MarshalMetrics(*buildOtlpMetrics(5)) + require.NoError(t, err) + contentHeader = otlpProtobufContentType + } else { + sFxMsg := buildSFxDatapointMsg(currentTime, 13, 3) + msgBytes, _ = sFxMsg.Marshal() + contentHeader = "application/x-protobuf" + } req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(msgBytes)) - req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("Content-Type", contentHeader) if tt.token != "" { req.Header.Set("x-sf-token", tt.token) } @@ -850,3 +977,87 @@ func buildNDimensions(n uint) []*sfxpb.Dimension { } return d } + +func buildGauge(im pmetric.Metric) { + metricTime := pcommon.NewTimestampFromTime(time.Now()) + im.SetName("gauge_test") + im.SetDescription("") + im.SetUnit("1") + im.SetEmptyGauge() + idps := im.Gauge().DataPoints() + idp0 := idps.AppendEmpty() + addAttributes(2, idp0.Attributes()) + idp0.SetStartTimestamp(metricTime) + idp0.SetTimestamp(metricTime) + idp0.SetIntValue(123) + idp1 := idps.AppendEmpty() + addAttributes(1, idp1.Attributes()) + idp1.SetStartTimestamp(metricTime) + idp1.SetTimestamp(metricTime) + idp1.SetIntValue(456) +} + +func buildHistogram(im pmetric.Metric) { + now := time.Now() + startTime := pcommon.NewTimestampFromTime(now.Add(-10 * time.Second)) + endTime := pcommon.NewTimestampFromTime(now) + im.SetName("histogram_test") + im.SetDescription("") + im.SetUnit("1") + im.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + idps := im.Histogram().DataPoints() + + idp0 := idps.AppendEmpty() + addAttributes(1, idp0.Attributes()) + idp0.SetStartTimestamp(startTime) + idp0.SetTimestamp(endTime) + idp0.SetMin(1.0) + idp0.SetMax(2) + idp0.SetCount(5) + idp0.SetSum(7.0) + idp0.BucketCounts().FromRaw([]uint64{3, 2}) + idp0.ExplicitBounds().FromRaw([]float64{1, 2}) + + idp1 := idps.AppendEmpty() + addAttributes(1, idp1.Attributes()) + idp1.SetStartTimestamp(startTime) + idp1.SetTimestamp(endTime) + idp1.SetMin(0.3) + idp1.SetMax(3) + idp1.SetCount(10) + idp1.SetSum(17.5) +} + +func addAttributes(count int, dst pcommon.Map) { + for i := 0; i < count; i++ { + suffix := strconv.Itoa(i) + dst.PutStr("k"+suffix, "v"+suffix) + } +} + +func buildOtlpMetrics(metricsCount int) *pmetric.Metrics { + md := pmetric.NewMetrics() + md.ResourceMetrics().AppendEmpty().Resource().Attributes().PutStr("resource-attr", "resource-attr-val-1") + md.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty() + ilm := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + ilm.EnsureCapacity(metricsCount) + + for i := 0; i < metricsCount; i++ { + switch i % 2 { + case 0: + buildGauge(ilm.AppendEmpty()) + case 1: + buildHistogram(ilm.AppendEmpty()) + } + } + return &md +} + +func compressGzip(t *testing.T, msgBytes []byte) []byte { + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(msgBytes) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + return buf.Bytes() +}