From a3b1e3779253150394a17ee618519448461a210b Mon Sep 17 00:00:00 2001 From: Cole Laven Date: Tue, 24 Sep 2024 13:58:48 -0400 Subject: [PATCH 1/5] Add metric expression, statement, condition to expr package. Use metric condition in sampling processor --- expr/ottl.go | 15 ++++ expr/ottl_condition.go | 14 +++ expr/ottl_expression.go | 16 ++++ processor/samplingprocessor/factory.go | 2 +- processor/samplingprocessor/go.mod | 2 + processor/samplingprocessor/processor.go | 85 +++---------------- processor/samplingprocessor/processor_test.go | 4 +- 7 files changed, 63 insertions(+), 75 deletions(-) diff --git a/expr/ottl.go b/expr/ottl.go index 3b1f4163b..29c95ffec 100644 --- a/expr/ottl.go +++ b/expr/ottl.go @@ -23,6 +23,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs" ) @@ -42,6 +43,20 @@ func NewOTTLSpanStatement(statementStr string, set component.TelemetrySettings) return statement, nil } +// NewOTTLMetricStatement parses the given statement into an ottl.Statement for a metric transform context. +func NewOTTLMetricStatement(statementStr string, set component.TelemetrySettings) (*ottl.Statement[ottlmetric.TransformContext], error) { + parser, err := ottlmetric.NewParser(functions[ottlmetric.TransformContext](), set) + if err != nil { + return nil, err + } + statement, err := parser.ParseStatement(statementStr) + if err != nil { + return nil, err + } + + return statement, nil +} + // NewOTTLDatapointStatement parses the given statement into an ottl.Statement for a datapoint transform context. func NewOTTLDatapointStatement(statementStr string, set component.TelemetrySettings) (*ottl.Statement[ottldatapoint.TransformContext], error) { parser, err := ottldatapoint.NewParser(functions[ottldatapoint.TransformContext](), set) diff --git a/expr/ottl_condition.go b/expr/ottl_condition.go index 29869d6a5..52be8abc5 100644 --- a/expr/ottl_condition.go +++ b/expr/ottl_condition.go @@ -20,6 +20,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "go.opentelemetry.io/collector/component" ) @@ -48,6 +49,19 @@ func NewOTTLSpanCondition(condition string, set component.TelemetrySettings) (*O }, nil } +// NewOTTLMetricCondition creates a new OTTLCondition for a metric with the given condition. +func NewOTTLMetricCondition(condition string, set component.TelemetrySettings) (*OTTLCondition[ottlmetric.TransformContext], error) { + statementStr := "noop() where " + condition + statement, err := NewOTTLMetricStatement(statementStr, set) + if err != nil { + return nil, err + } + + return &OTTLCondition[ottlmetric.TransformContext]{ + statement: statement, + }, nil +} + // NewOTTLDatapointCondition creates a new OTTLCondition for a datapoint with the given condition. func NewOTTLDatapointCondition(condition string, set component.TelemetrySettings) (*OTTLCondition[ottldatapoint.TransformContext], error) { statementStr := "noop() where " + condition diff --git a/expr/ottl_expression.go b/expr/ottl_expression.go index a21476d39..43c83de86 100644 --- a/expr/ottl_expression.go +++ b/expr/ottl_expression.go @@ -21,6 +21,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "go.opentelemetry.io/collector/component" ) @@ -51,6 +52,21 @@ func NewOTTLSpanExpression(expression string, set component.TelemetrySettings) ( }, nil } +// NewOTTLMetricExpression creates a new expression for metrics. +// The expression is wrapped in an editor function, so only Converter functions and target expressions can be used. +func NewOTTLMetricExpression(expression string, set component.TelemetrySettings) (*OTTLExpression[ottlmetric.TransformContext], error) { + // Wrap the expression in the "value" function, since the ottl grammar expects a function first. + statementStr := fmt.Sprintf("value(%s)", expression) + statement, err := NewOTTLMetricStatement(statementStr, set) + if err != nil { + return nil, err + } + + return &OTTLExpression[ottlmetric.TransformContext]{ + statement: statement, + }, nil +} + // NewOTTLDatapointExpression creates a new expression for datapoints. // The expression is wrapped in an editor function, so only Converter functions and target expressions can be used. func NewOTTLDatapointExpression(expression string, set component.TelemetrySettings) (*OTTLExpression[ottldatapoint.TransformContext], error) { diff --git a/processor/samplingprocessor/factory.go b/processor/samplingprocessor/factory.go index 3c32ed88c..2d905b8e8 100644 --- a/processor/samplingprocessor/factory.go +++ b/processor/samplingprocessor/factory.go @@ -96,7 +96,7 @@ func createMetricsProcessor( nextConsumer consumer.Metrics, ) (processor.Metrics, error) { oCfg := cfg.(*Config) - condition, err := expr.NewOTTLDatapointCondition(oCfg.Condition, set.TelemetrySettings) + condition, err := expr.NewOTTLMetricCondition(oCfg.Condition, set.TelemetrySettings) if err != nil { return nil, fmt.Errorf("invalid condition: %w", err) } diff --git a/processor/samplingprocessor/go.mod b/processor/samplingprocessor/go.mod index 41f541d53..0e8b00f3e 100644 --- a/processor/samplingprocessor/go.mod +++ b/processor/samplingprocessor/go.mod @@ -50,3 +50,5 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/observiq/bindplane-agent/expr => ../../expr diff --git a/processor/samplingprocessor/processor.go b/processor/samplingprocessor/processor.go index 4b0b71a0d..1360eb6ce 100644 --- a/processor/samplingprocessor/processor.go +++ b/processor/samplingprocessor/processor.go @@ -19,8 +19,8 @@ import ( "math/rand" "github.com/observiq/bindplane-agent/expr" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -37,7 +37,7 @@ type logsSamplingProcessor struct { type metricsSamplingProcessor struct { logger *zap.Logger dropCutOffRatio float64 - condition *expr.OTTLCondition[ottldatapoint.TransformContext] + condition *expr.OTTLCondition[ottlmetric.TransformContext] } type tracesSamplingProcessor struct { @@ -54,7 +54,7 @@ func newLogsSamplingProcessor(logger *zap.Logger, cfg *Config, condition *expr.O } } -func newMetricsSamplingProcessor(logger *zap.Logger, cfg *Config, condition *expr.OTTLCondition[ottldatapoint.TransformContext]) *metricsSamplingProcessor { +func newMetricsSamplingProcessor(logger *zap.Logger, cfg *Config, condition *expr.OTTLCondition[ottlmetric.TransformContext]) *metricsSamplingProcessor { return &metricsSamplingProcessor{ logger: logger, dropCutOffRatio: cfg.DropRatio, @@ -133,77 +133,18 @@ func (sp *metricsSamplingProcessor) processMetrics(ctx context.Context, md pmetr for i := 0; i < md.ResourceMetrics().Len(); i++ { for j := 0; j < md.ResourceMetrics().At(i).ScopeMetrics().Len(); j++ { md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics().RemoveIf(func(metric pmetric.Metric) bool { - anyMatch := false - eachDatapoint(metric, func(dp any) { - datapointCtx := ottldatapoint.NewTransformContext( - dp, - metric, - md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics(), - md.ResourceMetrics().At(i).ScopeMetrics().At(j).Scope(), - md.ResourceMetrics().At(i).Resource(), - md.ResourceMetrics().At(i).ScopeMetrics().At(j), - md.ResourceMetrics().At(i), - ) - curDatapointMatched, err := sp.condition.Match(ctx, datapointCtx) - anyMatch = anyMatch || (err == nil && curDatapointMatched) - }) - - // if no datapoints matched, check if metric matches without a datapoint - if !anyMatch { - metricCtx := ottldatapoint.NewTransformContext( - nil, - metric, - md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics(), - md.ResourceMetrics().At(i).ScopeMetrics().At(j).Scope(), - md.ResourceMetrics().At(i).Resource(), - md.ResourceMetrics().At(i).ScopeMetrics().At(j), - md.ResourceMetrics().At(i), - ) - metricMatch, err := sp.condition.Match(ctx, metricCtx) - anyMatch = err == nil && metricMatch - } - - return anyMatch && sampleFunc(sp.dropCutOffRatio) + metricCtx := ottlmetric.NewTransformContext( + metric, + md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics(), + md.ResourceMetrics().At(i).ScopeMetrics().At(j).Scope(), + md.ResourceMetrics().At(i).Resource(), + md.ResourceMetrics().At(i).ScopeMetrics().At(j), + md.ResourceMetrics().At(i), + ) + match, err := sp.condition.Match(ctx, metricCtx) + return err == nil && match && sampleFunc(sp.dropCutOffRatio) }) } } return md, nil } - -// eachDatapoint calls the callback function f with each datapoint in the metric -func eachDatapoint(metric pmetric.Metric, f func(dp any)) { - switch metric.Type() { - case pmetric.MetricTypeGauge: - dps := metric.Gauge().DataPoints() - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - f(dp) - } - case pmetric.MetricTypeSum: - dps := metric.Sum().DataPoints() - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - f(dp) - } - case pmetric.MetricTypeHistogram: - dps := metric.Histogram().DataPoints() - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - f(dp) - } - case pmetric.MetricTypeExponentialHistogram: - dps := metric.ExponentialHistogram().DataPoints() - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - f(dp) - } - case pmetric.MetricTypeSummary: - dps := metric.Summary().DataPoints() - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - f(dp) - } - default: - // skip anything else - } -} diff --git a/processor/samplingprocessor/processor_test.go b/processor/samplingprocessor/processor_test.go index d6006b0f9..d85b64460 100644 --- a/processor/samplingprocessor/processor_test.go +++ b/processor/samplingprocessor/processor_test.go @@ -248,7 +248,7 @@ func Test_processMetrics(t *testing.T) { }, { desc: "multiple metrics condition", - condition: `(metric.name == "m1")`, + condition: `(name == "m1")`, dropRatio: 1.0, input: multipleMetrics, expected: multipleMetricsResult, @@ -262,7 +262,7 @@ func Test_processMetrics(t *testing.T) { Condition: tc.condition, } - ottlCondition, err := expr.NewOTTLDatapointCondition(cfg.Condition, component.TelemetrySettings{Logger: zap.NewNop()}) + ottlCondition, err := expr.NewOTTLMetricCondition(cfg.Condition, component.TelemetrySettings{Logger: zap.NewNop()}) require.NoError(t, err) processor := newMetricsSamplingProcessor(zap.NewNop(), cfg, ottlCondition) From 719402c00e10799f41d3eb07fb4f865cea436793 Mon Sep 17 00:00:00 2001 From: Cole Laven Date: Tue, 24 Sep 2024 14:30:24 -0400 Subject: [PATCH 2/5] update modules v1.61.0 --- go.mod | 2 +- processor/samplingprocessor/go.mod | 2 +- processor/samplingprocessor/go.sum | 2 -- receiver/awss3rehydrationreceiver/go.mod | 2 +- receiver/azureblobrehydrationreceiver/go.mod | 2 +- 5 files changed, 4 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 319818e1c..44996ce05 100644 --- a/go.mod +++ b/go.mod @@ -346,7 +346,7 @@ require ( github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/observiq/bindplane-agent/counter v1.61.0 // indirect github.com/observiq/bindplane-agent/expr v1.61.0 // indirect - github.com/observiq/bindplane-agent/internal/rehydration v1.60.0 // indirect + github.com/observiq/bindplane-agent/internal/rehydration v1.61.0 // indirect github.com/okta/okta-sdk-golang/v2 v2.20.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlemanagedprometheusexporter v0.109.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension v0.109.0 // indirect diff --git a/processor/samplingprocessor/go.mod b/processor/samplingprocessor/go.mod index 0e8b00f3e..cdc8085c3 100644 --- a/processor/samplingprocessor/go.mod +++ b/processor/samplingprocessor/go.mod @@ -3,7 +3,7 @@ module github.com/observiq/bindplane-agent/processor/samplingprocessor go 1.22.6 require ( - github.com/observiq/bindplane-agent/expr v1.60.0 + github.com/observiq/bindplane-agent/expr v1.61.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.109.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.109.0 diff --git a/processor/samplingprocessor/go.sum b/processor/samplingprocessor/go.sum index 29fc4038b..11b8665c8 100644 --- a/processor/samplingprocessor/go.sum +++ b/processor/samplingprocessor/go.sum @@ -55,8 +55,6 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/observiq/bindplane-agent/expr v1.60.0 h1:KAzOTgFtwSNJa90gg4TrZzQ2v4h7C+FvgEFwiRXak7U= -github.com/observiq/bindplane-agent/expr v1.60.0/go.mod h1:0YAhrdXeOAjgZJiIJfWh+Pcj4iIAolxZ7EHbqC2g3mM= github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0 h1:4VBRgtyh3hHSgAVGgs4bvNwJd0oUGyxVA3eQO2ujNsA= github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0/go.mod h1:9MGQCqxdCNBhdD+7QBZ6hH9HipXe5CajMafVKglD5f0= github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.109.0 h1:k7uHhrznH4dYvzbaCRz5VgFyHzhd1NGow1s6504r6tA= diff --git a/receiver/awss3rehydrationreceiver/go.mod b/receiver/awss3rehydrationreceiver/go.mod index 9bd1058d9..5187336b6 100644 --- a/receiver/awss3rehydrationreceiver/go.mod +++ b/receiver/awss3rehydrationreceiver/go.mod @@ -7,7 +7,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.27.11 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15 github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1 - github.com/observiq/bindplane-agent/internal/rehydration v1.60.0 + github.com/observiq/bindplane-agent/internal/rehydration v1.61.0 github.com/observiq/bindplane-agent/internal/testutils v1.61.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.109.0 diff --git a/receiver/azureblobrehydrationreceiver/go.mod b/receiver/azureblobrehydrationreceiver/go.mod index fa4e7815d..db17866e7 100644 --- a/receiver/azureblobrehydrationreceiver/go.mod +++ b/receiver/azureblobrehydrationreceiver/go.mod @@ -4,7 +4,7 @@ go 1.22.6 require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 - github.com/observiq/bindplane-agent/internal/rehydration v1.60.0 + github.com/observiq/bindplane-agent/internal/rehydration v1.61.0 github.com/observiq/bindplane-agent/internal/testutils v1.61.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.109.0 From f57f17a25feae7ebde5a68a8732ee8462363842e Mon Sep 17 00:00:00 2001 From: Cole Laven Date: Wed, 25 Sep 2024 08:18:32 -0400 Subject: [PATCH 3/5] optimize base cases w/ condition --- processor/samplingprocessor/config.go | 4 +++ processor/samplingprocessor/processor.go | 29 +++++++++++++++++-- processor/samplingprocessor/processor_test.go | 19 ++---------- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/processor/samplingprocessor/config.go b/processor/samplingprocessor/config.go index bf5283404..76a068143 100644 --- a/processor/samplingprocessor/config.go +++ b/processor/samplingprocessor/config.go @@ -36,5 +36,9 @@ func (cfg Config) Validate() error { return errInvalidDropRatio } + if cfg.Condition == "" { + cfg.Condition = "true" + } + return nil } diff --git a/processor/samplingprocessor/processor.go b/processor/samplingprocessor/processor.go index 1360eb6ce..d35ea5a5b 100644 --- a/processor/samplingprocessor/processor.go +++ b/processor/samplingprocessor/processor.go @@ -31,18 +31,21 @@ import ( type logsSamplingProcessor struct { logger *zap.Logger dropCutOffRatio float64 + conditionString string condition *expr.OTTLCondition[ottllog.TransformContext] } type metricsSamplingProcessor struct { logger *zap.Logger dropCutOffRatio float64 + conditionString string condition *expr.OTTLCondition[ottlmetric.TransformContext] } type tracesSamplingProcessor struct { logger *zap.Logger dropCutOffRatio float64 + conditionString string condition *expr.OTTLCondition[ottlspan.TransformContext] } @@ -51,6 +54,7 @@ func newLogsSamplingProcessor(logger *zap.Logger, cfg *Config, condition *expr.O logger: logger, dropCutOffRatio: cfg.DropRatio, condition: condition, + conditionString: cfg.Condition, } } @@ -59,6 +63,7 @@ func newMetricsSamplingProcessor(logger *zap.Logger, cfg *Config, condition *exp logger: logger, dropCutOffRatio: cfg.DropRatio, condition: condition, + conditionString: cfg.Condition, } } @@ -67,6 +72,7 @@ func newTracesSamplingProcessor(logger *zap.Logger, cfg *Config, condition *expr logger: logger, dropCutOffRatio: cfg.DropRatio, condition: condition, + conditionString: cfg.Condition, } } @@ -76,8 +82,13 @@ func sampleFunc(dropCutOffRatio float64) bool { } func (sp *tracesSamplingProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { + // Drop everything + if sp.dropCutOffRatio == 1.0 && sp.conditionString == "true" { + return ptrace.NewTraces(), nil + } + // Drop nothing - if sp.dropCutOffRatio == 0.0 { + if sp.dropCutOffRatio == 0.0 || sp.conditionString == "false" { return td, nil } @@ -101,10 +112,16 @@ func (sp *tracesSamplingProcessor) processTraces(ctx context.Context, td ptrace. } func (sp *logsSamplingProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { + // Drop everything + if sp.dropCutOffRatio == 1.0 && sp.conditionString == "true" { + return plog.NewLogs(), nil + } + // Drop nothing - if sp.dropCutOffRatio == 0.0 { + if sp.dropCutOffRatio == 0.0 || sp.conditionString == "false" { return ld, nil } + // Drop based on ratio and condition for i := 0; i < ld.ResourceLogs().Len(); i++ { for j := 0; j < ld.ResourceLogs().At(i).ScopeLogs().Len(); j++ { @@ -125,10 +142,16 @@ func (sp *logsSamplingProcessor) processLogs(ctx context.Context, ld plog.Logs) } func (sp *metricsSamplingProcessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { + // Drop everything + if sp.dropCutOffRatio == 1.0 && sp.conditionString == "true" { + return pmetric.NewMetrics(), nil + } + // Drop nothing - if sp.dropCutOffRatio == 0.0 { + if sp.dropCutOffRatio == 0.0 || sp.conditionString == "false" { return md, nil } + // Drop based on ratio and condition for i := 0; i < md.ResourceMetrics().Len(); i++ { for j := 0; j < md.ResourceMetrics().At(i).ScopeMetrics().Len(); j++ { diff --git a/processor/samplingprocessor/processor_test.go b/processor/samplingprocessor/processor_test.go index d85b64460..8b93fe83e 100644 --- a/processor/samplingprocessor/processor_test.go +++ b/processor/samplingprocessor/processor_test.go @@ -31,10 +31,6 @@ func Test_processTraces(t *testing.T) { td := ptrace.NewTraces() td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() - emptySpan := ptrace.NewTraces() - emptySpan.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() - emptySpan.ResourceSpans().At(0).ScopeSpans().At(0).Spans().RemoveIf(func(_ ptrace.Span) bool { return true }) - multipleSpansInput := ptrace.NewTraces() multipleSpansInput.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() multipleSpansInput.ResourceSpans().At(0).ScopeSpans().At(0).Spans().AppendEmpty() @@ -57,7 +53,7 @@ func Test_processTraces(t *testing.T) { condition: "true", dropRatio: 1.0, input: td, - expected: emptySpan, + expected: ptrace.NewTraces(), }, { desc: "Never Drop true", @@ -110,9 +106,6 @@ func Test_processTraces(t *testing.T) { func Test_processLogs(t *testing.T) { ld := plog.NewLogs() ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - emptyLogRecords := plog.NewLogs() - emptyLogRecords.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - emptyLogRecords.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().RemoveIf(func(_ plog.LogRecord) bool { return true }) multipleRecordsInput := plog.NewLogs() multipleRecordsInput.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() @@ -139,7 +132,7 @@ func Test_processLogs(t *testing.T) { dropRatio: 1.0, condition: "true", input: ld, - expected: emptyLogRecords, + expected: plog.NewLogs(), }, { desc: "Never Drop true", @@ -195,12 +188,6 @@ func Test_processMetrics(t *testing.T) { metric.SetEmptyGauge() metric.Gauge().DataPoints().AppendEmpty() - emptyMetrics := pmetric.NewMetrics() - em := emptyMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - em.SetEmptyGauge() - em.Gauge().DataPoints().AppendEmpty() - emptyMetrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().RemoveIf(func(_ pmetric.Metric) bool { return true }) - multipleMetrics := pmetric.NewMetrics() m1 := multipleMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() m2 := multipleMetrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() @@ -223,7 +210,7 @@ func Test_processMetrics(t *testing.T) { condition: "true", dropRatio: 1.0, input: md, - expected: emptyMetrics, + expected: pmetric.NewMetrics(), }, { desc: "Never Drop true", From a2d210bc2467e198f91256e4c8c7e44f40a9d5af Mon Sep 17 00:00:00 2001 From: Cole Laven Date: Wed, 25 Sep 2024 08:53:24 -0400 Subject: [PATCH 4/5] optimize true condition --- processor/samplingprocessor/processor.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/processor/samplingprocessor/processor.go b/processor/samplingprocessor/processor.go index d35ea5a5b..7f6efdae4 100644 --- a/processor/samplingprocessor/processor.go +++ b/processor/samplingprocessor/processor.go @@ -96,14 +96,18 @@ func (sp *tracesSamplingProcessor) processTraces(ctx context.Context, td ptrace. for i := 0; i < td.ResourceSpans().Len(); i++ { for j := 0; j < td.ResourceSpans().At(i).ScopeSpans().Len(); j++ { td.ResourceSpans().At(i).ScopeSpans().At(j).Spans().RemoveIf(func(span ptrace.Span) bool { - logCtx := ottlspan.NewTransformContext( + if sp.conditionString == "true" { + return sampleFunc(sp.dropCutOffRatio) + } + + spanCtx := ottlspan.NewTransformContext( span, td.ResourceSpans().At(i).ScopeSpans().At(j).Scope(), td.ResourceSpans().At(i).Resource(), td.ResourceSpans().At(i).ScopeSpans().At(j), td.ResourceSpans().At(i), ) - match, err := sp.condition.Match(ctx, logCtx) + match, err := sp.condition.Match(ctx, spanCtx) return err == nil && match && sampleFunc(sp.dropCutOffRatio) }) } @@ -126,6 +130,10 @@ func (sp *logsSamplingProcessor) processLogs(ctx context.Context, ld plog.Logs) for i := 0; i < ld.ResourceLogs().Len(); i++ { for j := 0; j < ld.ResourceLogs().At(i).ScopeLogs().Len(); j++ { ld.ResourceLogs().At(i).ScopeLogs().At(j).LogRecords().RemoveIf(func(logRecord plog.LogRecord) bool { + if sp.conditionString == "true" { + return sampleFunc(sp.dropCutOffRatio) + } + logCtx := ottllog.NewTransformContext( logRecord, ld.ResourceLogs().At(i).ScopeLogs().At(j).Scope(), @@ -156,6 +164,10 @@ func (sp *metricsSamplingProcessor) processMetrics(ctx context.Context, md pmetr for i := 0; i < md.ResourceMetrics().Len(); i++ { for j := 0; j < md.ResourceMetrics().At(i).ScopeMetrics().Len(); j++ { md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics().RemoveIf(func(metric pmetric.Metric) bool { + if sp.conditionString == "true" { + return sampleFunc(sp.dropCutOffRatio) + } + metricCtx := ottlmetric.NewTransformContext( metric, md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics(), From a97dbf78537da6acde4e8aa9b03e6f56ef61736a Mon Sep 17 00:00:00 2001 From: Cole Laven Date: Wed, 25 Sep 2024 09:22:06 -0400 Subject: [PATCH 5/5] rm empty condition case --- processor/samplingprocessor/config.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/processor/samplingprocessor/config.go b/processor/samplingprocessor/config.go index 76a068143..bf5283404 100644 --- a/processor/samplingprocessor/config.go +++ b/processor/samplingprocessor/config.go @@ -36,9 +36,5 @@ func (cfg Config) Validate() error { return errInvalidDropRatio } - if cfg.Condition == "" { - cfg.Condition = "true" - } - return nil }