Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return consumererror.Permanent in the Prometheus remote write exporter #1734

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/prompb"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1"
Expand Down Expand Up @@ -88,7 +89,7 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int
tsMap := map[string]*prompb.TimeSeries{}
dropped := 0
var errs []error

permanent := false
resourceMetrics := pdata.MetricsToOtlp(md)
for _, resourceMetric := range resourceMetrics {
if resourceMetric == nil {
Expand All @@ -102,13 +103,16 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int
// TODO: decide if instrumentation library information should be exported as labels
for _, metric := range instrumentationMetrics.Metrics {
if metric == nil {
permanent = true
errs = append(errs, errors.New("encountered nil metric"))
dropped++
continue
}
// check for valid type and temporality combination and for matching data field and type
if ok := validateMetrics(metric); !ok {
dropped++
errs = append(errs, errors.New("invalid temporality and type combination"))
permanent = true
continue
}
// handle individual metric based on type
Expand All @@ -117,11 +121,13 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int
if err := prwe.handleScalarMetric(tsMap, metric); err != nil {
dropped++
errs = append(errs, err)
permanent = true
}
case *otlp.Metric_DoubleHistogram, *otlp.Metric_IntHistogram:
if err := prwe.handleHistogramMetric(tsMap, metric); err != nil {
dropped++
errs = append(errs, err)
permanent = true
}
default:
dropped++
Expand All @@ -134,9 +140,13 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int
if err := prwe.export(ctx, tsMap); err != nil {
dropped = pdatautil.MetricCount(md)
errs = append(errs, err)
permanent = permanent || consumererror.IsPermanent(err)
}

if dropped != 0 {
if permanent {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing this hack here, can you consider to change componenterror.CombineErrors to do this? you just need to check the type and if any error is permanent return permanent?

Probably a separate PR :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. I was hesitant on doing that because i'm not sure if its good to introduce the idea of "conusmer" in component error. I have the code already haha. Will file a PR tomorrow then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Components already depend on consumer. The other way around is not expected

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs now to be removed, the CombineErrors handles this.

return dropped, consumererror.Permanent(componenterror.CombineErrors(errs))
}
return dropped, componenterror.CombineErrors(errs)
}

Expand Down Expand Up @@ -212,21 +222,21 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
//Calls the helper function to convert the TsMap to the desired format
req, err := wrapTimeSeries(tsMap)
if err != nil {
return err
return consumererror.Permanent(err)
}

//Uses proto.Marshal to convert the WriteRequest into bytes array
data, err := proto.Marshal(req)
if err != nil {
return err
return consumererror.Permanent(err)
}
buf := make([]byte, len(data), cap(data))
compressedData := snappy.Encode(buf, data)

//Create the HTTP POST request to send to the endpoint
httpReq, err := http.NewRequest("POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
if err != nil {
return err
return consumererror.Permanent(err)
}

// Add necessary headers specified by:
Expand All @@ -242,17 +252,25 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti

httpResp, err := prwe.client.Do(httpReq)
if err != nil {
return err
return consumererror.Permanent(err)
}

// 2xx status code is considered a success
// 5xx errors are recoverable and the exporter should retry
// Reference for different behavior according to status code:
// https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186
if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 256))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
errMsg := "server returned HTTP status " + httpResp.Status + ": " + line
return errors.New(errMsg)
if httpResp.StatusCode >= 500 && httpResp.StatusCode < 600 {
return errors.New(errMsg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want permanent in this case not the other, can you add test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we should retry if its a 503 or other 500 errors. This is the Prometheus implementation I referenced.

I don't know how I can make the proto buf marshal fail or the construction of a http request return error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if you want to retry on 5xx add a comment to the code that this is expected based on link to prom implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. I have added comments from 259 - 262

}
return consumererror.Permanent(errors.New(errMsg))

}
return nil
}
9 changes: 9 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,15 @@ func Test_PushMetrics(t *testing.T) {
0,
false,
},
{
"5xx_case",
&unmatchedBoundBucketDoubleHistBatch,
checkFunc,
5,
http.StatusServiceUnavailable,
1,
true,
},
{
"nilDataPointDoubleGauge_case",
&nilDataPointDoubleGaugeBatch,
Expand Down