Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/routing] Supports matching the statement only once. #28888

Merged
merged 10 commits into from
Dec 14, 2023
27 changes: 27 additions & 0 deletions .chloggen/feat_routing_match_once.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: routingconnector supports matching the statement only once

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26353]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
15 changes: 15 additions & 0 deletions connector/routingconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ The following settings are available:
- `table (required)`: the routing table for this connector.
- `table.statement (required)`: the routing condition provided as the [OTTL] statement.
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met.
- `table.order (optional)`: Order will affect the priority of the statement, when the smaller the Order, the higher the priority of the statement.
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
- `default_pipelines (optional)`: contains the list of pipelines to use when a record does not meet any of specified conditions.
- `error_mode (optional)`: determines how errors returned from OTTL statements are handled. Valid values are `ignore` and `propagate`. If `ignored` is used and a statement's condition has an error then the payload will be routed to the default pipelines. If not supplied, `propagate` is used.
- `match_once (optional)`: determines whether the connector matches multiple statements. Valid values are `true` and `false`. If `true` is used, then the payload will be routed to the pipeline with the lowest order. If not supplied, `false` is used.
djaglowski marked this conversation as resolved.
Show resolved Hide resolved

Example:

Expand All @@ -55,12 +57,25 @@ connectors:
routing:
default_pipelines: [traces/jaeger]
error_mode: ignore
match_once: false
table:
- statement: route() where attributes["X-Tenant"] == "acme"
pipelines: [traces/jaeger-acme]
- statement: delete_key(attributes, "X-Tenant") where IsMatch(attributes["X-Tenant"], ".*corp")
pipelines: [traces/jaeger-ecorp]

routing/match_once:
default_pipelines: [traces/jaeger]
error_mode: ignore
match_once: true
table:
- statement: route() where attributes["X-Tenant"] == "acme"
pipelines: [traces/jaeger-acme]
order: 1
- statement: route() where attributes["X-Tenant"] == ".*acme"
pipelines: [traces/jaeger-ecorp]
order: 2

service:
pipelines:
traces/in:
Expand Down
10 changes: 10 additions & 0 deletions connector/routingconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ type Config struct {
// Table contains the routing table for this processor.
// Required.
Table []RoutingTableItem `mapstructure:"table"`

// MatchOnce determines whether the connector matches multiple statements.
// Valid values are `true` and `false`.
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
// Optional.
MatchOnce bool `mapstructure:"match_once"`
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}

// Validate checks if the processor configuration is valid.
Expand Down Expand Up @@ -75,4 +80,9 @@ type RoutingTableItem struct {
// The routing processor will fail upon the first failure from these pipelines.
// Optional.
Pipelines []component.ID `mapstructure:"pipelines"`

// Order will affect the priority of the statement, when the smaller the Order, the higher
// the priority of the statement
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
// Optional.
Order int `mapstructure:"order"`
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}
5 changes: 4 additions & 1 deletion connector/routingconnector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
rtx := ottlresource.NewTransformContext(rlogs.Resource())

noRoutesMatch := true
for _, route := range c.router.routes {
for _, route := range c.router.routeSlice {
_, isMatch, err := route.statement.Execute(ctx, rtx)
if err != nil {
if c.config.ErrorMode == ottl.PropagateError {
Expand All @@ -84,6 +84,9 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if isMatch {
noRoutesMatch = false
c.group(groups, route.consumer, rlogs)
if c.config.MatchOnce {
break
}
}

}
Expand Down
155 changes: 155 additions & 0 deletions connector/routingconnector/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,161 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) {
})
}

func TestLogsAreCorrectlyMatchOnceWithOTTL(t *testing.T) {
logsDefault := component.NewIDWithName(component.DataTypeLogs, "default")
logs0 := component.NewIDWithName(component.DataTypeLogs, "0")
logs1 := component.NewIDWithName(component.DataTypeLogs, "1")

cfg := &Config{
DefaultPipelines: []component.ID{logsDefault},
Table: []RoutingTableItem{
{
Statement: `route() where IsMatch(attributes["X-Tenant"], ".*acme") == true`,
Pipelines: []component.ID{logs0},
Order: 1,
},
{
Statement: `route() where IsMatch(attributes["X-Tenant"], "_acme") == true`,
Pipelines: []component.ID{logs1},
Order: 2,
},
{
Statement: `route() where attributes["X-Tenant"] == "ecorp"`,
Pipelines: []component.ID{logsDefault, logs0},
Order: 3,
},
},
MatchOnce: true,
}

var defaultSink, sink0, sink1 consumertest.LogsSink

router := connectortest.NewLogsRouter(
connectortest.WithLogsSink(logsDefault, &defaultSink),
connectortest.WithLogsSink(logs0, &sink0),
connectortest.WithLogsSink(logs1, &sink1),
)

resetSinks := func() {
defaultSink.Reset()
sink0.Reset()
sink1.Reset()
}

factory := NewFactory()
conn, err := factory.CreateLogsToLogs(
context.Background(),
connectortest.NewNopCreateSettings(),
cfg,
router.(consumer.Logs),
)

require.NoError(t, err)
require.NotNil(t, conn)
require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
assert.NoError(t, conn.Shutdown(context.Background()))
}()

t.Run("logs matched by no expressions", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()
rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "something-else")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 1)
assert.Len(t, sink0.AllLogs(), 0)
assert.Len(t, sink1.AllLogs(), 0)
})

t.Run("logs matched one expression", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "xacme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 0)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)
})

t.Run("logs matched by two expressions, but sinks to one", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "x_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

rl = l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 0)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)

assert.Equal(t, sink0.AllLogs()[0].LogRecordCount(), 2)
})

t.Run("one log matched by multiple expressions, other matched none", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "_acme")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

rl = l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "something-else")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 1)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)

rlog := defaultSink.AllLogs()[0].ResourceLogs().At(0)
attr, ok := rlog.Resource().Attributes().Get("X-Tenant")
assert.True(t, ok, "routing attribute must exists")
assert.Equal(t, attr.AsString(), "something-else")
})

t.Run("logs matched by one expression, multiple pipelines", func(t *testing.T) {
resetSinks()

l := plog.NewLogs()

rl := l.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("X-Tenant", "ecorp")
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()

require.NoError(t, conn.ConsumeLogs(context.Background(), l))

assert.Len(t, defaultSink.AllLogs(), 1)
assert.Len(t, sink0.AllLogs(), 1)
assert.Len(t, sink1.AllLogs(), 0)

assert.Equal(t, defaultSink.AllLogs()[0].LogRecordCount(), 1)
assert.Equal(t, sink0.AllLogs()[0].LogRecordCount(), 1)
assert.Equal(t, defaultSink.AllLogs(), sink0.AllLogs())
})
}

func TestLogsResourceAttributeDroppedByOTTL(t *testing.T) {
logsDefault := component.NewIDWithName(component.DataTypeLogs, "default")
logsOther := component.NewIDWithName(component.DataTypeLogs, "other")
Expand Down
5 changes: 4 additions & 1 deletion connector/routingconnector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
rtx := ottlresource.NewTransformContext(rmetrics.Resource())

noRoutesMatch := true
for _, route := range c.router.routes {
for _, route := range c.router.routeSlice {
_, isMatch, err := route.statement.Execute(ctx, rtx)
if err != nil {
if c.config.ErrorMode == ottl.PropagateError {
Expand All @@ -84,6 +84,9 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
if isMatch {
noRoutesMatch = false
c.group(groups, route.consumer, rmetrics)
if c.config.MatchOnce {
break
}
}

}
Expand Down
Loading