Skip to content

Commit

Permalink
[exporter/prometheusremotewrite] Fix: Don't drop batch in case of fai…
Browse files Browse the repository at this point in the history
…lure to translate metrics (#29729)

Don't drop a whole batch in case of failure to
translate from Otel to Prometheus. Instead, with this PR we are trying
to send to Prometheus all the metrics that were properly translated and
create a warning message for failures to translate.

This PR also adds supports to telemetry in this component so that it is
possible to inspect how the translation process is happening and
identify failed translations.

I opted to not include the number of time series that failed translation
because I don't want to make assumptions about how the `FromMetrics`
function works. Instead we are just publishing if there was any failure
during the translation process and the number of time series returned.

**Link to tracking Issue:** #15281

**Testing:** UTs were added to account for the case that you have mixed
metrics, with some succeeding the translation and some failing.

---------

Signed-off-by: Raphael Silva <[email protected]>
Co-authored-by: Anthony Mirabella <[email protected]>
Co-authored-by: bryan-aguilar <[email protected]>
Co-authored-by: Bryan Aguilar <[email protected]>
  • Loading branch information
4 people authored Mar 13, 2024
1 parent ae8fde2 commit e5fc693
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 35 deletions.
27 changes: 27 additions & 0 deletions .chloggen/prw_failure_translate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Publish telemetry about translation of metrics from Otel to Prometheus. Don't drop all data points if some fail translation.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29729]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
62 changes: 60 additions & 2 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,35 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter/internal/metadata"
prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"
)

type prwTelemetry interface {
recordTranslationFailure(ctx context.Context)
recordTranslatedTimeSeries(ctx context.Context, numTS int)
}

type prwTelemetryOtel struct {
failedTranslations metric.Int64Counter
translatedTimeSeries metric.Int64Counter
otelAttrs []attribute.KeyValue
}

func (p *prwTelemetryOtel) recordTranslationFailure(ctx context.Context) {
p.failedTranslations.Add(ctx, 1, metric.WithAttributes(p.otelAttrs...))
}

func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS int) {
p.translatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...))
}

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type prwExporter struct {
endpointURL *url.URL
Expand All @@ -45,6 +68,31 @@ type prwExporter struct {
retrySettings configretry.BackOffConfig
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
}

func newPRWTelemetry(set exporter.CreateSettings) (prwTelemetry, error) {

meter := metadata.Meter(set.TelemetrySettings)
// TODO: create helper functions similar to the processor helper: BuildCustomMetricName
prefix := "exporter/" + metadata.Type.String() + "/"
failedTranslations, errFailedTranslation := meter.Int64Counter(prefix+"failed_translations",
metric.WithDescription("Number of translation operations that failed to translate metrics from Otel to Prometheus"),
metric.WithUnit("1"),
)

translatedTimeSeries, errTranslatedMetrics := meter.Int64Counter(prefix+"translated_time_series",
metric.WithDescription("Number of Prometheus time series that were translated from OTel metrics"),
metric.WithUnit("1"),
)

return &prwTelemetryOtel{
failedTranslations: failedTranslations,
translatedTimeSeries: translatedTimeSeries,
otelAttrs: []attribute.KeyValue{
attribute.String("exporter", set.ID.String()),
},
}, errors.Join(errFailedTranslation, errTranslatedMetrics)
}

// newPRWExporter initializes a new prwExporter instance and sets fields accordingly.
Expand All @@ -59,6 +107,11 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
return nil, errors.New("invalid endpoint")
}

prwTelemetry, err := newPRWTelemetry(set)
if err != nil {
return nil, err
}

userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version)

prwe := &prwExporter{
Expand All @@ -79,6 +132,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
AddMetricSuffixes: cfg.AddMetricSuffixes,
SendMetadata: cfg.SendMetadata,
},
telemetry: prwTelemetry,
}

prwe.wal = newWAL(cfg.WAL, prwe.export)
Expand Down Expand Up @@ -128,15 +182,19 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
if err != nil {
err = consumererror.NewPermanent(err)
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
}

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))

var m []*prompb.MetricMetadata
if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
}

// Call export even if a conversion error, since there may be points that were successfully converted.
return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m))
return prwe.handleExport(ctx, tsMap, m)
}
}

Expand Down
100 changes: 68 additions & 32 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,19 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
return prwe.handleExport(context.Background(), testmap, nil)
}

type mockPRWTelemetry struct {
failedTranslations int
translatedTimeSeries int
}

func (m *mockPRWTelemetry) recordTranslationFailure(_ context.Context) {
m.failedTranslations++
}

func (m *mockPRWTelemetry) recordTranslatedTimeSeries(_ context.Context, numTs int) {
m.translatedTimeSeries += numTs
}

// Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
// expected
func Test_PushMetrics(t *testing.T) {
Expand Down Expand Up @@ -420,6 +433,11 @@ func Test_PushMetrics(t *testing.T) {

emptySummaryBatch := getMetricsFromMetricList(invalidMetrics[emptySummary])

// partial success (or partial failure) cases

partialSuccess1 := getMetricsFromMetricList(validMetrics1[validSum], validMetrics2[validSum],
validMetrics1[validIntGauge], validMetrics2[validIntGauge], invalidMetrics[emptyGauge])

// staleNaN cases
staleNaNHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNHistogram])
staleNaNEmptyHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNEmptyHistogram])
Expand Down Expand Up @@ -457,20 +475,23 @@ func Test_PushMetrics(t *testing.T) {
}

tests := []struct {
name string
metrics pmetric.Metrics
reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool)
expectedTimeSeries int
httpResponseCode int
returnErr bool
isStaleMarker bool
skipForWAL bool
name string
metrics pmetric.Metrics
reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool)
expectedTimeSeries int
httpResponseCode int
returnErr bool
isStaleMarker bool
skipForWAL bool
expectedFailedTranslations int
}{
{
name: "invalid_type_case",
metrics: invalidTypeBatch,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "invalid_type_case",
metrics: invalidTypeBatch,
httpResponseCode: http.StatusAccepted,
reqTestFunc: checkFunc,
expectedTimeSeries: 1, // the resource target metric.
expectedFailedTranslations: 1,
},
{
name: "intSum_case",
Expand Down Expand Up @@ -567,32 +588,40 @@ func Test_PushMetrics(t *testing.T) {
skipForWAL: true,
},
{
name: "emptyGauge_case",
metrics: emptyDoubleGaugeBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "emptyGauge_case",
metrics: emptyDoubleGaugeBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
expectedFailedTranslations: 1,
},
{
name: "emptyCumulativeSum_case",
metrics: emptyCumulativeSumBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
expectedFailedTranslations: 1,
},
{
name: "emptyCumulativeSum_case",
metrics: emptyCumulativeSumBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "emptyCumulativeHistogram_case",
metrics: emptyCumulativeHistogramBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
expectedFailedTranslations: 1,
},
{
name: "emptyCumulativeHistogram_case",
metrics: emptyCumulativeHistogramBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "emptySummary_case",
metrics: emptySummaryBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
expectedFailedTranslations: 1,
},
{
name: "emptySummary_case",
metrics: emptySummaryBatch,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
returnErr: true,
name: "partialSuccess_case",
metrics: partialSuccess1,
reqTestFunc: checkFunc,
httpResponseCode: http.StatusAccepted,
expectedTimeSeries: 4,
expectedFailedTranslations: 1,
},
{
name: "staleNaNIntGauge_case",
Expand Down Expand Up @@ -668,6 +697,7 @@ func Test_PushMetrics(t *testing.T) {
}
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
mockTelemetry := &mockPRWTelemetry{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if tt.reqTestFunc != nil {
tt.reqTestFunc(t, r, tt.expectedTimeSeries, tt.isStaleMarker)
Expand Down Expand Up @@ -716,7 +746,10 @@ func Test_PushMetrics(t *testing.T) {
}
set := exportertest.NewNopCreateSettings()
set.BuildInfo = buildInfo

prwe, nErr := newPRWExporter(cfg, set)
prwe.telemetry = mockTelemetry

require.NoError(t, nErr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -729,6 +762,9 @@ func Test_PushMetrics(t *testing.T) {
assert.Error(t, err)
return
}

assert.Equal(t, tt.expectedFailedTranslations, mockTelemetry.failedTranslations)
assert.Equal(t, tt.expectedTimeSeries, mockTelemetry.translatedTimeSeries)
assert.NoError(t, err)
})
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/prometheusremotewriteexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.opentelemetry.io/collector/consumer v0.96.1-0.20240306115632-b2693620eff6
go.opentelemetry.io/collector/exporter v0.96.1-0.20240306115632-b2693620eff6
go.opentelemetry.io/collector/pdata v1.3.1-0.20240306115632-b2693620eff6
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/goleak v1.3.0
Expand Down Expand Up @@ -70,7 +71,6 @@ require (
go.opentelemetry.io/collector/receiver v0.96.1-0.20240306115632-b2693620eff6 // indirect
go.opentelemetry.io/collector/semconv v0.96.1-0.20240306115632-b2693620eff6 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect
Expand Down

0 comments on commit e5fc693

Please sign in to comment.