Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return permanent errors in prometheus remote write exporter #30

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/prompb"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1"
"go.opentelemetry.io/collector/internal/version"
Expand Down Expand Up @@ -88,7 +89,6 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int
tsMap := map[string]*prompb.TimeSeries{}
dropped := 0
var errs []error

resourceMetrics := pdata.MetricsToOtlp(md)
for _, resourceMetric := range resourceMetrics {
if resourceMetric == nil {
Expand All @@ -102,21 +102,22 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int
// TODO: decide if instrumentation library information should be exported as labels
for _, metric := range instrumentationMetrics.Metrics {
if metric == nil {
errs = append(errs, consumererror.Permanent(errors.New("encountered nil metric")))
dropped++
continue
}
// check for valid type and temporality combination and for matching data field and type
if ok := validateMetrics(metric); !ok {
dropped++
errs = append(errs, errors.New("invalid temporality and type combination"))
errs = append(errs, consumererror.Permanent(errors.New("invalid temporality and type combination")))
continue
}
// handle individual metric based on type
switch metric.Data.(type) {
case *otlp.Metric_DoubleSum, *otlp.Metric_IntSum, *otlp.Metric_DoubleGauge, *otlp.Metric_IntGauge:
if err := prwe.handleScalarMetric(tsMap, metric); err != nil {
dropped++
errs = append(errs, err)
errs = append(errs, consumererror.Permanent(err))
}
case *otlp.Metric_DoubleHistogram, *otlp.Metric_IntHistogram:
if err := prwe.handleHistogramMetric(tsMap, metric); err != nil {
Expand All @@ -125,15 +126,15 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int
}
default:
dropped++
errs = append(errs, errors.New("unsupported metric type"))
errs = append(errs, consumererror.Permanent(errors.New("unsupported metric type")))
}
}
}
}

if err := prwe.export(ctx, tsMap); err != nil {
dropped = md.MetricCount()
errs = append(errs, err)
errs = append(errs, consumererror.Permanent(err))
}

if dropped != 0 {
Expand Down Expand Up @@ -212,21 +213,21 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti
// Calls the helper function to convert the TsMap to the desired format
req, err := wrapTimeSeries(tsMap)
if err != nil {
return err
return consumererror.Permanent(err)
}

// Uses proto.Marshal to convert the WriteRequest into bytes array
data, err := proto.Marshal(req)
if err != nil {
return err
return consumererror.Permanent(err)
}
buf := make([]byte, len(data), cap(data))
compressedData := snappy.Encode(buf, data)

// Create the HTTP POST request to send to the endpoint
httpReq, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
if err != nil {
return err
return consumererror.Permanent(err)
}

// Add necessary headers specified by:
Expand All @@ -238,17 +239,25 @@ func (prwe *PrwExporter) export(ctx context.Context, tsMap map[string]*prompb.Ti

httpResp, err := prwe.client.Do(httpReq)
if err != nil {
return err
return consumererror.Permanent(err)
}

// 2xx status code is considered a success
// 5xx errors are recoverable and the exporter should retry
// Reference for different behavior according to status code:
// https://github.com/prometheus/prometheus/pull/2552/files#diff-ae8db9d16d8057358e49d694522e7186
if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 256))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
errMsg := "server returned HTTP status " + httpResp.Status + ": " + line
return errors.New(errMsg)
if httpResp.StatusCode >= 500 && httpResp.StatusCode < 600 {
return errors.New(errMsg)
}
return consumererror.Permanent(errors.New(errMsg))

}
return nil
}
9 changes: 9 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,15 @@ func Test_PushMetrics(t *testing.T) {
0,
false,
},
{
"5xx_case",
&unmatchedBoundBucketDoubleHistBatch,
checkFunc,
5,
http.StatusServiceUnavailable,
1,
true,
},
{
"nilDataPointDoubleGauge_case",
&nilDataPointDoubleGaugeBatch,
Expand Down