Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle partial success responses in the OTLP HTTP exporter #6970

Merged
merged 5 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/otlphttp-partial-success.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlphttpexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Treat partial success responses as errors

# One or more tracking issues or pull requests related to the change
issues: [6686]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
121 changes: 101 additions & 20 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
return consumererror.NewPermanent(err)
}

return e.export(ctx, e.tracesURL, request)
return e.export(ctx, e.tracesURL, request, tracesPartialSuccessHandler)
}

func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
Expand All @@ -99,7 +99,7 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro
if err != nil {
return consumererror.NewPermanent(err)
}
return e.export(ctx, e.metricsURL, request)
return e.export(ctx, e.metricsURL, request, metricsPartialSuccessHandler)
}

func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
Expand All @@ -109,10 +109,10 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
return consumererror.NewPermanent(err)
}

return e.export(ctx, e.logsURL, request)
return e.export(ctx, e.logsURL, request, logsPartialSuccessHandler)
}

func (e *baseExporter) export(ctx context.Context, url string, request []byte) error {
func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error {
e.logger.Debug("Preparing to make HTTP request", zap.String("url", url))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request))
if err != nil {
Expand All @@ -133,11 +133,10 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte) e
}()

if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
// Request is successful.
return nil
return handlePartialSuccessResponse(resp, partialSuccessHandler)
}

respStatus := readResponse(resp)
respStatus := readResponseStatus(resp)

// Format the error message. Use the status if it is present in the response.
var formattedErr error
Expand Down Expand Up @@ -188,29 +187,111 @@ func isRetryableStatusCode(code int) bool {
}
}

func readResponseBody(resp *http.Response) ([]byte, error) {
if resp.ContentLength == 0 {
return nil, nil
}

maxRead := resp.ContentLength

// if maxRead == -1, the ContentLength header has not been sent, so read up to
// the maximum permitted body size. If it is larger than the permitted body
// size, still try to read from the body in case the value is an error. If the
// body is larger than the maximum size, proto unmarshaling will likely fail.
if maxRead == -1 || maxRead > maxHTTPResponseReadBytes {
maxRead = maxHTTPResponseReadBytes
}
protoBytes := make([]byte, maxRead)
n, err := io.ReadFull(resp.Body, protoBytes)

// No bytes read and an EOF error indicates there is no body to read.
if n == 0 && (err == nil || errors.Is(err, io.EOF)) {
return nil, nil
}

// io.ReadFull will return io.ErrorUnexpectedEOF if the Content-Length header
// wasn't set, since we will try to read past the length of the body. If this
// is the case, the body will still have the full message in it, so we want to
// ignore the error and parse the message.
if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, err
}

return protoBytes[:n], nil
}

// Read the response and decode the status.Status from the body.
// Returns nil if the response is empty or cannot be decoded.
func readResponse(resp *http.Response) *status.Status {
func readResponseStatus(resp *http.Response) *status.Status {
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
var respStatus *status.Status
if resp.StatusCode >= 400 && resp.StatusCode <= 599 {
// Request failed. Read the body. OTLP spec says:
// "Response body for all HTTP 4xx and HTTP 5xx responses MUST be a
// Protobuf-encoded Status message that describes the problem."
maxRead := resp.ContentLength
if maxRead == -1 || maxRead > maxHTTPResponseReadBytes {
maxRead = maxHTTPResponseReadBytes
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
respBytes, err := readResponseBody(resp)

if err != nil {
return nil
}
respBytes := make([]byte, maxRead)
n, err := io.ReadFull(resp.Body, respBytes)
if err == nil && n > 0 {
// Decode it as Status struct. See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures
respStatus = &status.Status{}
err = proto.Unmarshal(respBytes, respStatus)
if err != nil {
respStatus = nil
}

// Decode it as Status struct. See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures
respStatus = &status.Status{}
err = proto.Unmarshal(respBytes, respStatus)
if err != nil {
return nil
}
}

return respStatus
}

func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error {
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
bodyBytes, err := readResponseBody(resp)

if err != nil {
return err
}

return partialSuccessHandler(bodyBytes)
}

type partialSuccessHandler func(protoBytes []byte) error

func tracesPartialSuccessHandler(protoBytes []byte) error {
exportResponse := ptraceotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return err
}
partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedSpans()))
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

func metricsPartialSuccessHandler(protoBytes []byte) error {
exportResponse := pmetricotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return err
}
partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedDataPoints()))
}
return nil
}

func logsPartialSuccessHandler(protoBytes []byte) error {
exportResponse := plogotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return err
}
partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedLogRecords()))
}
return nil
}
Loading