diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 2e5eed44746..9698dea39ea 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/prometheus/prompb" "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" @@ -88,7 +89,7 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int tsMap := map[string]*prompb.TimeSeries{} dropped := 0 var errs []error - + permanent := false resourceMetrics := pdata.MetricsToOtlp(md) for _, resourceMetric := range resourceMetrics { if resourceMetric == nil { @@ -102,6 +103,8 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int // TODO: decide if instrumentation library information should be exported as labels for _, metric := range instrumentationMetrics.Metrics { if metric == nil { + permanent = true + errs = append(errs, errors.New("encountered nil metric")) dropped++ continue } @@ -109,6 +112,7 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int if ok := validateMetrics(metric); !ok { dropped++ errs = append(errs, errors.New("invalid temporality and type combination")) + permanent = true continue } // handle individual metric based on type @@ -117,11 +121,13 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int if err := prwe.handleScalarMetric(tsMap, metric); err != nil { dropped++ errs = append(errs, err) + permanent = true } case *otlp.Metric_DoubleHistogram, *otlp.Metric_IntHistogram: if err := prwe.handleHistogramMetric(tsMap, metric); err != nil { dropped++ errs = append(errs, err) + permanent = true } default: dropped++ @@ -134,9 +140,13 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int if err := prwe.export(ctx, tsMap); err != nil { dropped = pdatautil.MetricCount(md) errs = append(errs, err) + permanent = permanent || consumererror.IsPermanent(err) } if dropped != 0 { + if permanent { + return dropped, consumererror.Permanent(componenterror.CombineErrors(errs)) + } return dropped, componenterror.CombineErrors(errs) } @@ -212,13 +222,13 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti //Calls the helper function to convert the TsMap to the desired format req, err := wrapTimeSeries(tsMap) if err != nil { - return err + return consumererror.Permanent(err) } //Uses proto.Marshal to convert the WriteRequest into bytes array data, err := proto.Marshal(req) if err != nil { - return err + return consumererror.Permanent(err) } buf := make([]byte, len(data), cap(data)) compressedData := snappy.Encode(buf, data) @@ -226,7 +236,7 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti //Create the HTTP POST request to send to the endpoint httpReq, err := http.NewRequest("POST", prwe.endpointURL.String(), bytes.NewReader(compressedData)) if err != nil { - return err + return consumererror.Permanent(err) } // Add necessary headers specified by: @@ -242,9 +252,13 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti httpResp, err := prwe.client.Do(httpReq) if err != nil { - return err + return consumererror.Permanent(err) } + // 2xx status code is considered a success + // 5xx errors are recoverable and the exporter should retry + // Reference for different behavior according to status code: + // https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186 if httpResp.StatusCode/100 != 2 { scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 256)) line := "" @@ -252,7 +266,11 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti line = scanner.Text() } errMsg := "server returned HTTP status " + httpResp.Status + ": " + line - return errors.New(errMsg) + if httpResp.StatusCode >= 500 && httpResp.StatusCode < 600 { + return errors.New(errMsg) + } + return consumererror.Permanent(errors.New(errMsg)) + } return nil } diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index cbff23b489e..807f721b34e 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -564,6 +564,15 @@ func Test_PushMetrics(t *testing.T) { 0, false, }, + { + "5xx_case", + &unmatchedBoundBucketDoubleHistBatch, + checkFunc, + 5, + http.StatusServiceUnavailable, + 1, + true, + }, { "nilDataPointDoubleGauge_case", &nilDataPointDoubleGaugeBatch,