Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add ottl metric condition to expr package for use in sampling processor #1878

Merged
merged 5 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions expr/ottl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs"
)
Expand All @@ -42,6 +43,20 @@ func NewOTTLSpanStatement(statementStr string, set component.TelemetrySettings)
return statement, nil
}

// NewOTTLMetricStatement parses the given statement into an ottl.Statement for a metric transform context.
func NewOTTLMetricStatement(statementStr string, set component.TelemetrySettings) (*ottl.Statement[ottlmetric.TransformContext], error) {
parser, err := ottlmetric.NewParser(functions[ottlmetric.TransformContext](), set)
if err != nil {
return nil, err
}
statement, err := parser.ParseStatement(statementStr)
if err != nil {
return nil, err
}

return statement, nil
}

// NewOTTLDatapointStatement parses the given statement into an ottl.Statement for a datapoint transform context.
func NewOTTLDatapointStatement(statementStr string, set component.TelemetrySettings) (*ottl.Statement[ottldatapoint.TransformContext], error) {
parser, err := ottldatapoint.NewParser(functions[ottldatapoint.TransformContext](), set)
Expand Down
14 changes: 14 additions & 0 deletions expr/ottl_condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"go.opentelemetry.io/collector/component"
)
Expand Down Expand Up @@ -48,6 +49,19 @@ func NewOTTLSpanCondition(condition string, set component.TelemetrySettings) (*O
}, nil
}

// NewOTTLMetricCondition creates a new OTTLCondition for a metric with the given condition.
func NewOTTLMetricCondition(condition string, set component.TelemetrySettings) (*OTTLCondition[ottlmetric.TransformContext], error) {
statementStr := "noop() where " + condition
statement, err := NewOTTLMetricStatement(statementStr, set)
if err != nil {
return nil, err
}

return &OTTLCondition[ottlmetric.TransformContext]{
statement: statement,
}, nil
}

// NewOTTLDatapointCondition creates a new OTTLCondition for a datapoint with the given condition.
func NewOTTLDatapointCondition(condition string, set component.TelemetrySettings) (*OTTLCondition[ottldatapoint.TransformContext], error) {
statementStr := "noop() where " + condition
Expand Down
16 changes: 16 additions & 0 deletions expr/ottl_expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"go.opentelemetry.io/collector/component"
)
Expand Down Expand Up @@ -51,6 +52,21 @@ func NewOTTLSpanExpression(expression string, set component.TelemetrySettings) (
}, nil
}

// NewOTTLMetricExpression creates a new expression for metrics.
// The expression is wrapped in an editor function, so only Converter functions and target expressions can be used.
func NewOTTLMetricExpression(expression string, set component.TelemetrySettings) (*OTTLExpression[ottlmetric.TransformContext], error) {
// Wrap the expression in the "value" function, since the ottl grammar expects a function first.
statementStr := fmt.Sprintf("value(%s)", expression)
statement, err := NewOTTLMetricStatement(statementStr, set)
if err != nil {
return nil, err
}

return &OTTLExpression[ottlmetric.TransformContext]{
statement: statement,
}, nil
}

// NewOTTLDatapointExpression creates a new expression for datapoints.
// The expression is wrapped in an editor function, so only Converter functions and target expressions can be used.
func NewOTTLDatapointExpression(expression string, set component.TelemetrySettings) (*OTTLExpression[ottldatapoint.TransformContext], error) {
Expand Down
2 changes: 1 addition & 1 deletion processor/samplingprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func createMetricsProcessor(
nextConsumer consumer.Metrics,
) (processor.Metrics, error) {
oCfg := cfg.(*Config)
condition, err := expr.NewOTTLDatapointCondition(oCfg.Condition, set.TelemetrySettings)
condition, err := expr.NewOTTLMetricCondition(oCfg.Condition, set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("invalid condition: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions processor/samplingprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,5 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/observiq/bindplane-agent/expr => ../../expr
BinaryFissionGames marked this conversation as resolved.
Show resolved Hide resolved
85 changes: 13 additions & 72 deletions processor/samplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"math/rand"

"github.com/observiq/bindplane-agent/expr"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -37,7 +37,7 @@ type logsSamplingProcessor struct {
type metricsSamplingProcessor struct {
logger *zap.Logger
dropCutOffRatio float64
condition *expr.OTTLCondition[ottldatapoint.TransformContext]
condition *expr.OTTLCondition[ottlmetric.TransformContext]
}

type tracesSamplingProcessor struct {
Expand All @@ -54,7 +54,7 @@ func newLogsSamplingProcessor(logger *zap.Logger, cfg *Config, condition *expr.O
}
}

func newMetricsSamplingProcessor(logger *zap.Logger, cfg *Config, condition *expr.OTTLCondition[ottldatapoint.TransformContext]) *metricsSamplingProcessor {
func newMetricsSamplingProcessor(logger *zap.Logger, cfg *Config, condition *expr.OTTLCondition[ottlmetric.TransformContext]) *metricsSamplingProcessor {
return &metricsSamplingProcessor{
logger: logger,
dropCutOffRatio: cfg.DropRatio,
Expand Down Expand Up @@ -133,77 +133,18 @@ func (sp *metricsSamplingProcessor) processMetrics(ctx context.Context, md pmetr
for i := 0; i < md.ResourceMetrics().Len(); i++ {
for j := 0; j < md.ResourceMetrics().At(i).ScopeMetrics().Len(); j++ {
md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics().RemoveIf(func(metric pmetric.Metric) bool {
anyMatch := false
eachDatapoint(metric, func(dp any) {
datapointCtx := ottldatapoint.NewTransformContext(
dp,
metric,
md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics(),
md.ResourceMetrics().At(i).ScopeMetrics().At(j).Scope(),
md.ResourceMetrics().At(i).Resource(),
md.ResourceMetrics().At(i).ScopeMetrics().At(j),
md.ResourceMetrics().At(i),
)
curDatapointMatched, err := sp.condition.Match(ctx, datapointCtx)
anyMatch = anyMatch || (err == nil && curDatapointMatched)
})

// if no datapoints matched, check if metric matches without a datapoint
if !anyMatch {
metricCtx := ottldatapoint.NewTransformContext(
nil,
metric,
md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics(),
md.ResourceMetrics().At(i).ScopeMetrics().At(j).Scope(),
md.ResourceMetrics().At(i).Resource(),
md.ResourceMetrics().At(i).ScopeMetrics().At(j),
md.ResourceMetrics().At(i),
)
metricMatch, err := sp.condition.Match(ctx, metricCtx)
anyMatch = err == nil && metricMatch
}

return anyMatch && sampleFunc(sp.dropCutOffRatio)
metricCtx := ottlmetric.NewTransformContext(
metric,
md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics(),
md.ResourceMetrics().At(i).ScopeMetrics().At(j).Scope(),
md.ResourceMetrics().At(i).Resource(),
md.ResourceMetrics().At(i).ScopeMetrics().At(j),
md.ResourceMetrics().At(i),
)
match, err := sp.condition.Match(ctx, metricCtx)
return err == nil && match && sampleFunc(sp.dropCutOffRatio)
})
}
}
return md, nil
}

// eachDatapoint calls the callback function f with each datapoint in the metric
func eachDatapoint(metric pmetric.Metric, f func(dp any)) {
switch metric.Type() {
case pmetric.MetricTypeGauge:
dps := metric.Gauge().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
f(dp)
}
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
f(dp)
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
f(dp)
}
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
f(dp)
}
case pmetric.MetricTypeSummary:
dps := metric.Summary().DataPoints()
for i := 0; i < dps.Len(); i++ {
dp := dps.At(i)
f(dp)
}
default:
// skip anything else
}
}
4 changes: 2 additions & 2 deletions processor/samplingprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func Test_processMetrics(t *testing.T) {
},
{
desc: "multiple metrics condition",
condition: `(metric.name == "m1")`,
condition: `(name == "m1")`,
dropRatio: 1.0,
input: multipleMetrics,
expected: multipleMetricsResult,
Expand All @@ -262,7 +262,7 @@ func Test_processMetrics(t *testing.T) {
Condition: tc.condition,
}

ottlCondition, err := expr.NewOTTLDatapointCondition(cfg.Condition, component.TelemetrySettings{Logger: zap.NewNop()})
ottlCondition, err := expr.NewOTTLMetricCondition(cfg.Condition, component.TelemetrySettings{Logger: zap.NewNop()})
require.NoError(t, err)

processor := newMetricsSamplingProcessor(zap.NewNop(), cfg, ottlCondition)
Expand Down
Loading