Skip to content

Commit

Permalink
Handle partial success responses in the OTLP HTTP exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-bradley committed Jan 19, 2023
1 parent 7e8b1c4 commit 5ae21fb
Show file tree
Hide file tree
Showing 3 changed files with 340 additions and 50 deletions.
16 changes: 16 additions & 0 deletions .chloggen/otlphttp-partial-success.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlphttpexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Treat partial success responses as errors

# One or more tracking issues or pull requests related to the change
issues: [6686]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
90 changes: 86 additions & 4 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
return consumererror.NewPermanent(err)
}

return e.export(ctx, e.tracesURL, request)
return e.export(ctx, e.tracesURL, request, tracesPartialSuccessHandler)
}

func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
Expand All @@ -110,7 +110,7 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro
if err != nil {
return consumererror.NewPermanent(err)
}
return e.export(ctx, e.metricsURL, request)
return e.export(ctx, e.metricsURL, request, metricsPartialSuccessHandler)
}

func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
Expand All @@ -120,10 +120,10 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
return consumererror.NewPermanent(err)
}

return e.export(ctx, e.logsURL, request)
return e.export(ctx, e.logsURL, request, logsPartialSuccessHandler)
}

func (e *baseExporter) export(ctx context.Context, url string, request []byte) error {
func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error {
e.logger.Debug("Preparing to make HTTP request", zap.String("url", url))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request))
if err != nil {
Expand All @@ -144,6 +144,10 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte) e
}()

if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
if err := handlePartialSuccessResponse(resp, partialSuccessHandler); err != nil {
return err
}

// Request is successful.
return nil
}
Expand Down Expand Up @@ -235,3 +239,81 @@ func readResponse(resp *http.Response) *status.Status {

return respStatus
}

func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error {
if resp.ContentLength == 0 {
return nil
}

maxRead := resp.ContentLength
needsResize := false
if maxRead == -1 || maxRead > maxHTTPResponseReadBytes {
maxRead = maxHTTPResponseReadBytes
needsResize = true
}
protoBytes := make([]byte, maxRead)
n, err := io.ReadFull(resp.Body, protoBytes)

// No bytes read and an EOF error indicates there is no body to read.
if n == 0 && err == io.EOF {
return nil
}

// io.ReadFull will return io.ErrorUnexpectedEOF if the Content-Length header
// wasn't set, since we will try to read past the length of the body. If this
// is the case, the body will still have the full message in it, so we want to
// ignore the error and parse the message.
if err != nil && err != io.ErrUnexpectedEOF {
return err
}

// The pdata unmarshaling methods check for the length of the slice
// when unmarshaling it, so we have to trim down the length to the
// actual size of the data.
if needsResize {
protoBytes = protoBytes[:n]
}

return partialSuccessHandler(protoBytes)
}

type partialSuccessHandler func(protoBytes []byte) error

func tracesPartialSuccessHandler(protoBytes []byte) error {
exportResponse := ptraceotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return err
}
partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedSpans()))
}
return nil
}

func metricsPartialSuccessHandler(protoBytes []byte) error {
exportResponse := pmetricotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return err
}
partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedDataPoints()))
}
return nil
}

func logsPartialSuccessHandler(protoBytes []byte) error {
exportResponse := plogotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return err
}
partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: %s (%d rejected)", partialSuccess.ErrorMessage(), partialSuccess.RejectedLogRecords()))
}
return nil
}
Loading

0 comments on commit 5ae21fb

Please sign in to comment.