From 5852d09fb7b519e18610b99fc5a9da8cddbbf790 Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Tue, 27 Jun 2023 00:58:57 -0400 Subject: [PATCH] Handle partial success responses in the OTLP HTTP exporter (#6970) Handle partial success messages returned from OTLP HTTP backends. Fixes https://github.com/open-telemetry/opentelemetry-collector/issues/6686 --------- Co-authored-by: Evan Bradley Co-authored-by: Dmitrii Anoshin --- .chloggen/otlphttp-partial-success.yaml | 16 ++ exporter/otlphttpexporter/otlp.go | 121 ++++++-- exporter/otlphttpexporter/otlp_test.go | 355 +++++++++++++++++++----- 3 files changed, 397 insertions(+), 95 deletions(-) create mode 100755 .chloggen/otlphttp-partial-success.yaml diff --git a/.chloggen/otlphttp-partial-success.yaml b/.chloggen/otlphttp-partial-success.yaml new file mode 100755 index 00000000000..78635f3b421 --- /dev/null +++ b/.chloggen/otlphttp-partial-success.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlphttpexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Treat partial success responses as errors + +# One or more tracking issues or pull requests related to the change +issues: [6686] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 3ced7ef4cec..9e2e92a34d5 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -90,7 +90,7 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { return consumererror.NewPermanent(err) } - return e.export(ctx, e.tracesURL, request) + return e.export(ctx, e.tracesURL, request, tracesPartialSuccessHandler) } func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { @@ -99,7 +99,7 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro if err != nil { return consumererror.NewPermanent(err) } - return e.export(ctx, e.metricsURL, request) + return e.export(ctx, e.metricsURL, request, metricsPartialSuccessHandler) } func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { @@ -109,10 +109,10 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { return consumererror.NewPermanent(err) } - return e.export(ctx, e.logsURL, request) + return e.export(ctx, e.logsURL, request, logsPartialSuccessHandler) } -func (e *baseExporter) export(ctx context.Context, url string, request []byte) error { +func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error { e.logger.Debug("Preparing to make HTTP request", zap.String("url", url)) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request)) if err != nil { @@ -133,11 +133,10 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte) e }() if resp.StatusCode >= 200 && resp.StatusCode <= 299 { - // Request is successful. - return nil + return handlePartialSuccessResponse(resp, partialSuccessHandler) } - respStatus := readResponse(resp) + respStatus := readResponseStatus(resp) // Format the error message. Use the status if it is present in the response. var formattedErr error @@ -188,29 +187,111 @@ func isRetryableStatusCode(code int) bool { } } +func readResponseBody(resp *http.Response) ([]byte, error) { + if resp.ContentLength == 0 { + return nil, nil + } + + maxRead := resp.ContentLength + + // if maxRead == -1, the ContentLength header has not been sent, so read up to + // the maximum permitted body size. If it is larger than the permitted body + // size, still try to read from the body in case the value is an error. If the + // body is larger than the maximum size, proto unmarshaling will likely fail. + if maxRead == -1 || maxRead > maxHTTPResponseReadBytes { + maxRead = maxHTTPResponseReadBytes + } + protoBytes := make([]byte, maxRead) + n, err := io.ReadFull(resp.Body, protoBytes) + + // No bytes read and an EOF error indicates there is no body to read. + if n == 0 && (err == nil || errors.Is(err, io.EOF)) { + return nil, nil + } + + // io.ReadFull will return io.ErrorUnexpectedEOF if the Content-Length header + // wasn't set, since we will try to read past the length of the body. If this + // is the case, the body will still have the full message in it, so we want to + // ignore the error and parse the message. + if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { + return nil, err + } + + return protoBytes[:n], nil +} + // Read the response and decode the status.Status from the body. // Returns nil if the response is empty or cannot be decoded. -func readResponse(resp *http.Response) *status.Status { +func readResponseStatus(resp *http.Response) *status.Status { var respStatus *status.Status if resp.StatusCode >= 400 && resp.StatusCode <= 599 { // Request failed. Read the body. OTLP spec says: // "Response body for all HTTP 4xx and HTTP 5xx responses MUST be a // Protobuf-encoded Status message that describes the problem." - maxRead := resp.ContentLength - if maxRead == -1 || maxRead > maxHTTPResponseReadBytes { - maxRead = maxHTTPResponseReadBytes + respBytes, err := readResponseBody(resp) + + if err != nil { + return nil } - respBytes := make([]byte, maxRead) - n, err := io.ReadFull(resp.Body, respBytes) - if err == nil && n > 0 { - // Decode it as Status struct. See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures - respStatus = &status.Status{} - err = proto.Unmarshal(respBytes, respStatus) - if err != nil { - respStatus = nil - } + + // Decode it as Status struct. See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures + respStatus = &status.Status{} + err = proto.Unmarshal(respBytes, respStatus) + if err != nil { + return nil } } return respStatus } + +func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error { + bodyBytes, err := readResponseBody(resp) + + if err != nil { + return err + } + + return partialSuccessHandler(bodyBytes) +} + +type partialSuccessHandler func(protoBytes []byte) error + +func tracesPartialSuccessHandler(protoBytes []byte) error { + exportResponse := ptraceotlp.NewExportResponse() + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return err + } + partialSuccess := exportResponse.PartialSuccess() + if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { + return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedSpans())) + } + return nil +} + +func metricsPartialSuccessHandler(protoBytes []byte) error { + exportResponse := pmetricotlp.NewExportResponse() + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return err + } + partialSuccess := exportResponse.PartialSuccess() + if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) { + return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedDataPoints())) + } + return nil +} + +func logsPartialSuccessHandler(protoBytes []byte) error { + exportResponse := plogotlp.NewExportResponse() + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return err + } + partialSuccess := exportResponse.PartialSuccess() + if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { + return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedLogRecords())) + } + return nil +} diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 255968d1beb..64d9223e3f5 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -12,9 +12,9 @@ import ( "errors" "fmt" "io" - "net" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -37,7 +37,9 @@ import ( "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/internal/testutil" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/otlpreceiver" @@ -371,14 +373,15 @@ func startAndCleanup(t *testing.T, cmp component.Component) { } func TestErrorResponses(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - errMsgPrefix := fmt.Sprintf("error exporting items, request to http://%s/v1/traces responded with HTTP Status Code ", addr) + errMsgPrefix := func(srv *httptest.Server) string { + return fmt.Sprintf("error exporting items, request to %s/v1/traces responded with HTTP Status Code ", srv.URL) + } tests := []struct { name string responseStatus int responseBody *status.Status - err error + err func(srv *httptest.Server) error isPermErr bool headers map[string]string }{ @@ -428,9 +431,11 @@ func TestErrorResponses(t *testing.T) { name: "419", responseStatus: http.StatusTooManyRequests, responseBody: status.New(codes.InvalidArgument, "Quota exceeded"), - err: exporterhelper.NewThrottleRetry( - errors.New(errMsgPrefix+"429, Message=Quota exceeded, Details=[]"), - time.Duration(0)*time.Second), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"429, Message=Quota exceeded, Details=[]"), + time.Duration(0)*time.Second) + }, }, { name: "500", @@ -442,41 +447,58 @@ func TestErrorResponses(t *testing.T) { name: "502", responseStatus: http.StatusBadGateway, responseBody: status.New(codes.InvalidArgument, "Bad gateway"), - err: exporterhelper.NewThrottleRetry( - errors.New(errMsgPrefix+"502, Message=Bad gateway, Details=[]"), - time.Duration(0)*time.Second), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"502, Message=Bad gateway, Details=[]"), + time.Duration(0)*time.Second) + }, }, { name: "503", responseStatus: http.StatusServiceUnavailable, responseBody: status.New(codes.InvalidArgument, "Server overloaded"), - err: exporterhelper.NewThrottleRetry( - errors.New(errMsgPrefix+"503, Message=Server overloaded, Details=[]"), - time.Duration(0)*time.Second), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]"), + time.Duration(0)*time.Second) + }, }, { name: "503-Retry-After", responseStatus: http.StatusServiceUnavailable, responseBody: status.New(codes.InvalidArgument, "Server overloaded"), headers: map[string]string{"Retry-After": "30"}, - err: exporterhelper.NewThrottleRetry( - errors.New(errMsgPrefix+"503, Message=Server overloaded, Details=[]"), - time.Duration(30)*time.Second), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"503, Message=Server overloaded, Details=[]"), + time.Duration(30)*time.Second) + }, }, { name: "504", responseStatus: http.StatusGatewayTimeout, responseBody: status.New(codes.InvalidArgument, "Gateway timeout"), - err: exporterhelper.NewThrottleRetry( - errors.New(errMsgPrefix+"504, Message=Gateway timeout, Details=[]"), - time.Duration(0)*time.Second), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"504, Message=Gateway timeout, Details=[]"), + time.Duration(0)*time.Second) + }, + }, + { + name: "Bad response payload", + responseStatus: http.StatusServiceUnavailable, + responseBody: status.New(codes.InvalidArgument, strings.Repeat("a", maxHTTPResponseReadBytes+1)), + err: func(srv *httptest.Server) error { + return exporterhelper.NewThrottleRetry( + errors.New(errMsgPrefix(srv)+"503"), + time.Duration(0)*time.Second) + }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { for k, v := range test.headers { writer.Header().Add(k, v) } @@ -488,18 +510,10 @@ func TestErrorResponses(t *testing.T) { require.NoError(t, err) } }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) - require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ - TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr), + TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), // Create without QueueSettings and RetrySettings so that ConsumeTraces // returns the errors that we want to check immediately. } @@ -521,16 +535,23 @@ func TestErrorResponses(t *testing.T) { if test.isPermErr { assert.True(t, consumererror.IsPermanent(err)) } else { - assert.EqualValues(t, test.err, err) + assert.EqualValues(t, test.err(srv), err) } - - srv.Close() }) } } +func TestErrorResponseInvalidResponseBody(t *testing.T) { + resp := &http.Response{ + StatusCode: 400, + Body: io.NopCloser(badReader{}), + ContentLength: 100, + } + status := readResponseStatus(resp) + assert.Nil(t, status) +} + func TestUserAgent(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) set := exportertest.NewNopCreateSettings() set.BuildInfo.Description = "Collector" set.BuildInfo.Version = "1.2.3test" @@ -559,23 +580,14 @@ func TestUserAgent(t *testing.T) { t.Run("traces", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) - require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ - TracesEndpoint: fmt.Sprintf("http://%s/v1/traces", addr), + TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: test.headers, }, @@ -594,8 +606,6 @@ func TestUserAgent(t *testing.T) { traces := ptrace.NewTraces() err = exp.ConsumeTraces(context.Background(), traces) require.NoError(t, err) - - srv.Close() }) } }) @@ -603,23 +613,14 @@ func TestUserAgent(t *testing.T) { t.Run("metrics", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) - require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ - MetricsEndpoint: fmt.Sprintf("http://%s/v1/metrics", addr), + MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: test.headers, }, @@ -638,8 +639,6 @@ func TestUserAgent(t *testing.T) { metrics := pmetric.NewMetrics() err = exp.ConsumeMetrics(context.Background(), metrics) require.NoError(t, err) - - srv.Close() }) } }) @@ -647,23 +646,14 @@ func TestUserAgent(t *testing.T) { t.Run("logs", func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + srv := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { assert.Contains(t, request.Header.Get("user-agent"), test.expectedUA) writer.WriteHeader(200) }) - srv := http.Server{ - Addr: addr, - Handler: mux, - } - ln, err := net.Listen("tcp", addr) - require.NoError(t, err) - go func() { - _ = srv.Serve(ln) - }() + defer srv.Close() cfg := &Config{ - LogsEndpoint: fmt.Sprintf("http://%s/v1/logs", addr), + LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), HTTPClientSettings: confighttp.HTTPClientSettings{ Headers: test.headers, }, @@ -688,3 +678,218 @@ func TestUserAgent(t *testing.T) { } }) } + +func TestPartialSuccess_traces(t *testing.T) { + srv := createBackend("/v1/traces", func(writer http.ResponseWriter, request *http.Request) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + bytes, err := response.MarshalProto() + require.NoError(t, err) + _, err = writer.Write(bytes) + require.NoError(t, err) + }) + defer srv.Close() + + cfg := &Config{ + TracesEndpoint: fmt.Sprintf("%s/v1/traces", srv.URL), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + traces := ptrace.NewTraces() + err = exp.ConsumeTraces(context.Background(), traces) + require.Error(t, err) +} + +func TestPartialSuccess_metrics(t *testing.T) { + srv := createBackend("/v1/metrics", func(writer http.ResponseWriter, request *http.Request) { + response := pmetricotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedDataPoints(1) + bytes, err := response.MarshalProto() + require.NoError(t, err) + _, err = writer.Write(bytes) + require.NoError(t, err) + }) + defer srv.Close() + + cfg := &Config{ + MetricsEndpoint: fmt.Sprintf("%s/v1/metrics", srv.URL), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + metrics := pmetric.NewMetrics() + err = exp.ConsumeMetrics(context.Background(), metrics) + require.Error(t, err) +} + +func TestPartialSuccess_logs(t *testing.T) { + srv := createBackend("/v1/logs", func(writer http.ResponseWriter, request *http.Request) { + response := plogotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedLogRecords(1) + bytes, err := response.MarshalProto() + require.NoError(t, err) + _, err = writer.Write(bytes) + require.NoError(t, err) + }) + defer srv.Close() + + cfg := &Config{ + LogsEndpoint: fmt.Sprintf("%s/v1/logs", srv.URL), + HTTPClientSettings: confighttp.HTTPClientSettings{}, + } + exp, err := createLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate data + logs := plog.NewLogs() + err = exp.ConsumeLogs(context.Background(), logs) + require.Error(t, err) +} + +func TestPartialResponse_missingHeaderButHasBody(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.True(t, consumererror.IsPermanent(err)) +} + +func TestPartialResponse_missingHeaderAndBody(t *testing.T) { + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader([]byte{})), + } + err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Nil(t, err) +} + +func TestPartialResponse_nonErrUnexpectedEOFError(t *testing.T) { + resp := &http.Response{ + // `-1` indicates a missing Content-Length header in the Go http standard library + ContentLength: -1, + Body: io.NopCloser(badReader{}), + } + err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) +} + +func TestPartialSuccess_shortContentLengthHeader(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + ContentLength: 3, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) +} + +func TestPartialSuccess_longContentLengthHeader(t *testing.T) { + response := ptraceotlp.NewExportResponse() + partial := response.PartialSuccess() + partial.SetErrorMessage("hello") + partial.SetRejectedSpans(1) + data, err := response.MarshalProto() + require.NoError(t, err) + resp := &http.Response{ + ContentLength: 4096, + Body: io.NopCloser(bytes.NewReader(data)), + } + err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) +} + +func TestPartialSuccessInvalidResponseBody(t *testing.T) { + resp := &http.Response{ + Body: io.NopCloser(badReader{}), + ContentLength: 100, + } + err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) + assert.Error(t, err) +} + +func TestPartialSuccessInvalidBody(t *testing.T) { + invalidBodyCases := []struct { + telemetryType string + handler partialSuccessHandler + }{ + { + telemetryType: "traces", + handler: tracesPartialSuccessHandler, + }, + { + telemetryType: "metrics", + handler: metricsPartialSuccessHandler, + }, + { + telemetryType: "logs", + handler: logsPartialSuccessHandler, + }, + } + for _, tt := range invalidBodyCases { + t.Run("Invalid response body_"+tt.telemetryType, func(t *testing.T) { + err := tt.handler([]byte{1}) + assert.Error(t, err) + }) + } +} + +func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server { + mux := http.NewServeMux() + mux.HandleFunc(endpoint, handler) + + srv := httptest.NewServer(mux) + + return srv +} + +type badReader struct{} + +func (b badReader) Read([]byte) (int, error) { + return 0, errors.New("Bad read") +}