Skip to content

Commit

Permalink
[exporter/otlphttpexporter] Partial success HTTP response handling (#…
Browse files Browse the repository at this point in the history
…8270)

**Description:**
Fix the handling of the HTTP response to ignore responses not encoded as
protobuf

**Link to tracking Issue:**
Fixes #8263
  • Loading branch information
atoulme authored Aug 24, 2023
1 parent 4d36e23 commit cdbe8e7
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 9 deletions.
25 changes: 25 additions & 0 deletions .chloggen/handle-http-response-partial-success.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlphttpexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix the handling of the HTTP response to ignore responses not encoded as protobuf

# One or more tracking issues or pull requests related to the change
issues: [8263]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
23 changes: 17 additions & 6 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type baseExporter struct {
const (
headerRetryAfter = "Retry-After"
maxHTTPResponseReadBytes = 64 * 1024

protobufContentType = "application/x-protobuf"
)

// Create new exporter.
Expand Down Expand Up @@ -118,7 +120,7 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p
if err != nil {
return consumererror.NewPermanent(err)
}
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Type", protobufContentType)
req.Header.Set("User-Agent", e.userAgent)

resp, err := e.client.Do(req)
Expand Down Expand Up @@ -252,12 +254,15 @@ func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler par
return err
}

return partialSuccessHandler(bodyBytes)
return partialSuccessHandler(bodyBytes, resp.Header.Get("Content-Type"))
}

type partialSuccessHandler func(protoBytes []byte) error
type partialSuccessHandler func(bytes []byte, contentType string) error

func tracesPartialSuccessHandler(protoBytes []byte) error {
func tracesPartialSuccessHandler(protoBytes []byte, contentType string) error {
if contentType != protobufContentType {
return nil
}
exportResponse := ptraceotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
Expand All @@ -270,7 +275,10 @@ func tracesPartialSuccessHandler(protoBytes []byte) error {
return nil
}

func metricsPartialSuccessHandler(protoBytes []byte) error {
func metricsPartialSuccessHandler(protoBytes []byte, contentType string) error {
if contentType != protobufContentType {
return nil
}
exportResponse := pmetricotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
Expand All @@ -283,7 +291,10 @@ func metricsPartialSuccessHandler(protoBytes []byte) error {
return nil
}

func logsPartialSuccessHandler(protoBytes []byte) error {
func logsPartialSuccessHandler(protoBytes []byte, contentType string) error {
if contentType != protobufContentType {
return nil
}
exportResponse := plogotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
Expand Down
64 changes: 61 additions & 3 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ func TestPartialSuccess_traces(t *testing.T) {
partial.SetRejectedSpans(1)
bytes, err := response.MarshalProto()
require.NoError(t, err)
writer.Header().Set("Content-Type", "application/x-protobuf")
_, err = writer.Write(bytes)
require.NoError(t, err)
})
Expand Down Expand Up @@ -720,6 +721,7 @@ func TestPartialSuccess_metrics(t *testing.T) {
partial.SetRejectedDataPoints(1)
bytes, err := response.MarshalProto()
require.NoError(t, err)
writer.Header().Set("Content-Type", "application/x-protobuf")
_, err = writer.Write(bytes)
require.NoError(t, err)
})
Expand Down Expand Up @@ -751,9 +753,10 @@ func TestPartialSuccess_logs(t *testing.T) {
partial := response.PartialSuccess()
partial.SetErrorMessage("hello")
partial.SetRejectedLogRecords(1)
bytes, err := response.MarshalProto()
b, err := response.MarshalProto()
require.NoError(t, err)
_, err = writer.Write(bytes)
writer.Header().Set("Content-Type", "application/x-protobuf")
_, err = writer.Write(b)
require.NoError(t, err)
})
defer srv.Close()
Expand Down Expand Up @@ -789,6 +792,9 @@ func TestPartialResponse_missingHeaderButHasBody(t *testing.T) {
// `-1` indicates a missing Content-Length header in the Go http standard library
ContentLength: -1,
Body: io.NopCloser(bytes.NewReader(data)),
Header: map[string][]string{
"Content-Type": {"application/x-protobuf"},
},
}
err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler)
assert.True(t, consumererror.IsPermanent(err))
Expand All @@ -799,6 +805,9 @@ func TestPartialResponse_missingHeaderAndBody(t *testing.T) {
// `-1` indicates a missing Content-Length header in the Go http standard library
ContentLength: -1,
Body: io.NopCloser(bytes.NewReader([]byte{})),
Header: map[string][]string{
"Content-Type": {"application/x-protobuf"},
},
}
err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler)
assert.Nil(t, err)
Expand All @@ -824,6 +833,9 @@ func TestPartialSuccess_shortContentLengthHeader(t *testing.T) {
resp := &http.Response{
ContentLength: 3,
Body: io.NopCloser(bytes.NewReader(data)),
Header: map[string][]string{
"Content-Type": {"application/x-protobuf"},
},
}
err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler)
assert.Error(t, err)
Expand All @@ -839,6 +851,9 @@ func TestPartialSuccess_longContentLengthHeader(t *testing.T) {
resp := &http.Response{
ContentLength: 4096,
Body: io.NopCloser(bytes.NewReader(data)),
Header: map[string][]string{
"Content-Type": {"application/x-protobuf"},
},
}
err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler)
assert.Error(t, err)
Expand All @@ -848,6 +863,9 @@ func TestPartialSuccessInvalidResponseBody(t *testing.T) {
resp := &http.Response{
Body: io.NopCloser(badReader{}),
ContentLength: 100,
Header: map[string][]string{
"Content-Type": {protobufContentType},
},
}
err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler)
assert.Error(t, err)
Expand All @@ -873,12 +891,52 @@ func TestPartialSuccessInvalidBody(t *testing.T) {
}
for _, tt := range invalidBodyCases {
t.Run("Invalid response body_"+tt.telemetryType, func(t *testing.T) {
err := tt.handler([]byte{1})
err := tt.handler([]byte{1}, "application/x-protobuf")
assert.Error(t, err)
})
}
}

func TestPartialSuccessUnsupportedContentType(t *testing.T) {
unsupportedContentTypeCases := []struct {
contentType string
}{
{
contentType: "application/json",
},
{
contentType: "text/plain",
},
{
contentType: "application/octet-stream",
},
}
for _, telemetryType := range []string{"logs", "metrics", "traces"} {
for _, tt := range unsupportedContentTypeCases {
t.Run("Unsupported content type "+tt.contentType+" "+telemetryType, func(t *testing.T) {
var handler func(b []byte, contentType string) error
switch telemetryType {
case "logs":
handler = logsPartialSuccessHandler
case "metrics":
handler = metricsPartialSuccessHandler
case "traces":
handler = tracesPartialSuccessHandler
default:
panic(telemetryType)
}
exportResponse := ptraceotlp.NewExportResponse()
exportResponse.PartialSuccess().SetErrorMessage("foo")
exportResponse.PartialSuccess().SetRejectedSpans(42)
b, err := exportResponse.MarshalProto()
require.NoError(t, err)
err = handler(b, tt.contentType)
assert.NoError(t, err)
})
}
}
}

func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc(endpoint, handler)
Expand Down

0 comments on commit cdbe8e7

Please sign in to comment.