From 467393638ce8fd98833ffbbeac7ce65a34ff7ffe Mon Sep 17 00:00:00 2001 From: huyan0 Date: Thu, 3 Sep 2020 08:57:30 -0500 Subject: [PATCH] change error type to permanent error implement retry check a different way change error behavior increase coverage fix segfault add descriptive error code add descriptive error code --- .../prometheusremotewriteexporter/exporter.go | 30 +++++++++++++++---- .../exporter_test.go | 9 ++++++ go.sum | 5 ++-- 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 2e5eed44746..9698dea39ea 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/prometheus/prompb" "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" @@ -88,7 +89,7 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int tsMap := map[string]*prompb.TimeSeries{} dropped := 0 var errs []error - + permanent := false resourceMetrics := pdata.MetricsToOtlp(md) for _, resourceMetric := range resourceMetrics { if resourceMetric == nil { @@ -102,6 +103,8 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int // TODO: decide if instrumentation library information should be exported as labels for _, metric := range instrumentationMetrics.Metrics { if metric == nil { + permanent = true + errs = append(errs, errors.New("encountered nil metric")) dropped++ continue } @@ -109,6 +112,7 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int if ok := validateMetrics(metric); !ok { dropped++ errs = append(errs, errors.New("invalid temporality and type combination")) + permanent = true continue } // handle individual metric based on type @@ -117,11 +121,13 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int if err := prwe.handleScalarMetric(tsMap, metric); err != nil { dropped++ errs = append(errs, err) + permanent = true } case *otlp.Metric_DoubleHistogram, *otlp.Metric_IntHistogram: if err := prwe.handleHistogramMetric(tsMap, metric); err != nil { dropped++ errs = append(errs, err) + permanent = true } default: dropped++ @@ -134,9 +140,13 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int if err := prwe.export(ctx, tsMap); err != nil { dropped = pdatautil.MetricCount(md) errs = append(errs, err) + permanent = permanent || consumererror.IsPermanent(err) } if dropped != 0 { + if permanent { + return dropped, consumererror.Permanent(componenterror.CombineErrors(errs)) + } return dropped, componenterror.CombineErrors(errs) } @@ -212,13 +222,13 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti //Calls the helper function to convert the TsMap to the desired format req, err := wrapTimeSeries(tsMap) if err != nil { - return err + return consumererror.Permanent(err) } //Uses proto.Marshal to convert the WriteRequest into bytes array data, err := proto.Marshal(req) if err != nil { - return err + return consumererror.Permanent(err) } buf := make([]byte, len(data), cap(data)) compressedData := snappy.Encode(buf, data) @@ -226,7 +236,7 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti //Create the HTTP POST request to send to the endpoint httpReq, err := http.NewRequest("POST", prwe.endpointURL.String(), bytes.NewReader(compressedData)) if err != nil { - return err + return consumererror.Permanent(err) } // Add necessary headers specified by: @@ -242,9 +252,13 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti httpResp, err := prwe.client.Do(httpReq) if err != nil { - return err + return consumererror.Permanent(err) } + // 2xx status code is considered a success + // 5xx errors are recoverable and the exporter should retry + // Reference for different behavior according to status code: + // https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186 if httpResp.StatusCode/100 != 2 { scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 256)) line := "" @@ -252,7 +266,11 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti line = scanner.Text() } errMsg := "server returned HTTP status " + httpResp.Status + ": " + line - return errors.New(errMsg) + if httpResp.StatusCode >= 500 && httpResp.StatusCode < 600 { + return errors.New(errMsg) + } + return consumererror.Permanent(errors.New(errMsg)) + } return nil } diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index cbff23b489e..807f721b34e 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -564,6 +564,15 @@ func Test_PushMetrics(t *testing.T) { 0, false, }, + { + "5xx_case", + &unmatchedBoundBucketDoubleHistBatch, + checkFunc, + 5, + http.StatusServiceUnavailable, + 1, + true, + }, { "nilDataPointDoubleGauge_case", &nilDataPointDoubleGaugeBatch, diff --git a/go.sum b/go.sum index ecbc2cbb3c9..0178cfcc0c8 100644 --- a/go.sum +++ b/go.sum @@ -539,10 +539,10 @@ github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19y github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE= github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mqiSBE6Ffsg94weZZ2c+v/ciT8QRHFOap7EKDrR0= github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:Wbbw6tYNvwa5dlB6304Sd+82Z3f7PmVZHVKU637d4po= -github.com/jaegertracing/jaeger v1.19.2 h1:JX1ty1wlkk3JENyfXNMRAxGClwErTyzEKbQAFktYpOc= -github.com/jaegertracing/jaeger v1.19.2/go.mod h1:2GVHuF9OIfRw2N6ZMoEgRGL+GJxvDLVtALDWxOINqDk= github.com/jaegertracing/jaeger v1.18.2-0.20200707061226-97d2319ff2be h1:w1jgaUpX6HFiqWXYnjAQjiuCwXZPxC/D2JdO9okVxjM= github.com/jaegertracing/jaeger v1.18.2-0.20200707061226-97d2319ff2be/go.mod h1:yoD2o9xdj6o4XHjlJOZMYI+1QeqhrcKqz7/mXFrQzDE= +github.com/jaegertracing/jaeger v1.19.2 h1:JX1ty1wlkk3JENyfXNMRAxGClwErTyzEKbQAFktYpOc= +github.com/jaegertracing/jaeger v1.19.2/go.mod h1:2GVHuF9OIfRw2N6ZMoEgRGL+GJxvDLVtALDWxOINqDk= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= @@ -651,6 +651,7 @@ github.com/miekg/dns v1.1.29/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7 github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=