Skip to content

Commit

Permalink
Add env() support to expressions
Browse files Browse the repository at this point in the history
To provide support for using environment variables for labeling,
   tagging, and routing entries, this commit adds the `env()` function
   to expressions.

An example expression: `env("FOO")`.

In addition, this commit fixes an issue with the algorithm for parsing
   expressions from `ExprString`s where there are paranthesis nested inside
   the expression. This issue was made clear by trying to use the `env()`
   function inside an `ExprString`.

This commit also adds the `GetExprEnv` and `PutExprEnv` functions which
   provide a way to re-use allocated environment maps. This keeps us from
   having to allocate a map for every entry that is rendered as an
   expression, which should help improve performance of plugins that
   make heavy use of expressions.
  • Loading branch information
camdencheek committed Jul 8, 2020
1 parent 388c90a commit b723fa5
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 58 deletions.
1 change: 1 addition & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
## **Please check that the PR fulfills these requirements**
- [ ] Tests for the changes have been added (for bug fixes / features)
- [ ] Docs have been added / updated (for bug fixes / features)
- [ ] Add a changelog entry (for non-trivial bug fixes / features)
- [ ] CI passes
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- Support for reading from environment in expressions
- Ability to point to labels with fields


## [0.9.0] - 2020-07-07

Initial Open Source Release
Initial Open Source Release
19 changes: 17 additions & 2 deletions docs/types/expression.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,25 @@ being processed.

For reference documentation of the expression language, see [here](https://github.com/antonmedv/expr/blob/master/docs/Language-Definition.md).

In most cases, the record of the entry being processed can be accessed with the `$` variable in the expression. See the examples below for syntax.
Available to the expressions are a few special variables:
- `$record` contains the entry's record
- `$labels` contains the entry's labels
- `$tags` contains the entry's tags
- `$timestamp` contains the entry's timestamp
- `env()` is a function that allows you to read environment variables

## Examples

### Add a label from an environment variable

```yaml
- id: add_stack_label
type: metadata
output: my_receiver
labels:
stack: 'EXPR(env("STACK"))'
```
### Map severity values to standard values
```yaml
Expand All @@ -19,5 +34,5 @@ In most cases, the record of the entry being processed can be accessed with the
ops:
- add:
field: severity
value_expr: '$.raw_severity in ["critical", "super_critical"] ? "error" : $.raw_severity'
value_expr: '$record.raw_severity in ["critical", "super_critical"] ? "error" : $record.raw_severity'
```
18 changes: 4 additions & 14 deletions plugin/builtin/transformer/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,8 @@ type labeler struct {
}

func (l *labeler) Label(e *entry.Entry) error {
env := map[string]interface{}{
"$": e.Record,
"$record": e.Record,
"$labels": e.Labels,
"$timestamp": e.Timestamp,
"$tags": e.Tags,
}
env := helper.GetExprEnv(e)
defer helper.PutExprEnv(env)

for k, v := range l.labels {
rendered, err := v.Render(env)
Expand Down Expand Up @@ -115,13 +110,8 @@ type tagger struct {
}

func (t *tagger) Tag(e *entry.Entry) error {
env := map[string]interface{}{
"$": e.Record,
"$record": e.Record,
"$labels": e.Labels,
"$timestamp": e.Timestamp,
"$tags": e.Tags,
}
env := helper.GetExprEnv(e)
defer helper.PutExprEnv(env)

for _, v := range t.tags {
rendered, err := v.Render(env)
Expand Down
36 changes: 36 additions & 0 deletions plugin/builtin/transformer/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transformer

import (
"context"
"os"
"testing"
"time"

Expand All @@ -14,6 +15,9 @@ import (
)

func TestMetadata(t *testing.T) {
os.Setenv("TEST_METADATA_PLUGIN_ENV", "foo")
defer os.Unsetenv("TEST_METADATA_PLUGIN_ENV")

baseConfig := func() *MetadataPluginConfig {
return &MetadataPluginConfig{
TransformerConfig: helper.TransformerConfig{
Expand Down Expand Up @@ -98,6 +102,38 @@ func TestMetadata(t *testing.T) {
return e
}(),
},
{
"AddLabelEnv",
func() *MetadataPluginConfig {
cfg := baseConfig()
cfg.Labels = map[string]helper.ExprStringConfig{
"label1": `EXPR(env("TEST_METADATA_PLUGIN_ENV"))`,
}
return cfg
}(),
entry.New(),
func() *entry.Entry {
e := entry.New()
e.Labels = map[string]string{
"label1": "foo",
}
return e
}(),
},
{
"AddTagEnv",
func() *MetadataPluginConfig {
cfg := baseConfig()
cfg.Tags = []helper.ExprStringConfig{`EXPR(env("TEST_METADATA_PLUGIN_ENV"))`}
return cfg
}(),
entry.New(),
func() *entry.Entry {
e := entry.New()
e.Tags = []string{"foo"}
return e
}(),
},
}

for _, tc := range cases {
Expand Down
12 changes: 4 additions & 8 deletions plugin/builtin/transformer/restructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (o Op) MarshalYAML() (interface{}, error) {

/******
Add
******/
*******/

type OpAdd struct {
Field entry.Field `json:"field" yaml:"field"`
Expand All @@ -205,13 +205,9 @@ func (op *OpAdd) Apply(e *entry.Entry) error {
return err
}
case op.program != nil:
env := map[string]interface{}{
"$": e.Record,
"$record": e.Record,
"$labels": e.Labels,
"$tags": e.Tags,
"$timestamp": e.Timestamp,
}
env := helper.GetExprEnv(e)
defer helper.PutExprEnv(env)

result, err := vm.Run(op.program, env)
if err != nil {
return fmt.Errorf("evaluate value_expr: %s", err)
Expand Down
25 changes: 25 additions & 0 deletions plugin/builtin/transformer/restructure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transformer
import (
"context"
"encoding/json"
"os"
"testing"
"time"

Expand Down Expand Up @@ -36,6 +37,9 @@ func NewFakeRestructurePlugin() (*RestructurePlugin, *testutil.Plugin) {
}

func TestRestructurePlugin(t *testing.T) {
os.Setenv("TEST_RESTRUCTURE_PLUGIN_ENV", "foo")
defer os.Unsetenv("TEST_RESTRUCTURE_PLUGIN_ENV")

newTestEntry := func() *entry.Entry {
e := entry.New()
e.Timestamp = time.Unix(1586632809, 0)
Expand Down Expand Up @@ -97,6 +101,27 @@ func TestRestructurePlugin(t *testing.T) {
return e
}(),
},
{
name: "AddValueExprEnv",
ops: []Op{
{
&OpAdd{
Field: entry.NewRecordField("new"),
program: func() *vm.Program {
vm, err := expr.Compile(`env("TEST_RESTRUCTURE_PLUGIN_ENV")`)
require.NoError(t, err)
return vm
}(),
},
},
},
input: newTestEntry(),
output: func() *entry.Entry {
e := newTestEntry()
e.Record.(map[string]interface{})["new"] = "foo"
return e
}(),
},
{
name: "Remove",
ops: []Op{
Expand Down
9 changes: 2 additions & 7 deletions plugin/builtin/transformer/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,8 @@ func (p *RouterPlugin) CanProcess() bool {
}

func (p *RouterPlugin) Process(ctx context.Context, entry *entry.Entry) error {
env := map[string]interface{}{
"$": entry.Record,
"$record": entry.Record,
"$labels": entry.Labels,
"$timestamp": entry.Timestamp,
"$tags": entry.Tags,
}
env := helper.GetExprEnv(entry)
defer helper.PutExprEnv(env)

for _, route := range p.routes {
matches, err := vm.Run(route.Expression, env)
Expand Down
23 changes: 23 additions & 0 deletions plugin/builtin/transformer/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transformer

import (
"context"
"os"
"testing"

"github.com/observiq/carbon/entry"
Expand All @@ -13,6 +14,9 @@ import (
)

func TestRouterPlugin(t *testing.T) {
os.Setenv("TEST_ROUTER_PLUGIN_ENV", "foo")
defer os.Unsetenv("TEST_ROUTER_PLUGIN_ENV")

basicConfig := func() *RouterPluginConfig {
return &RouterPluginConfig{
BasicConfig: helper.BasicConfig{
Expand Down Expand Up @@ -69,6 +73,25 @@ func TestRouterPlugin(t *testing.T) {
},
map[string]int{"output2": 1},
},
{
"MatchEnv",
&entry.Entry{
Record: map[string]interface{}{
"message": "test_message",
},
},
[]*RouterPluginRouteConfig{
{
`env("TEST_ROUTER_PLUGIN_ENV") == "foo"`,
[]string{"output1"},
},
{
`true`,
[]string{"output2"},
},
},
map[string]int{"output1": 1},
},
}

for _, tc := range cases {
Expand Down
95 changes: 72 additions & 23 deletions plugin/helper/expr_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,73 @@ package helper

import (
"fmt"
"os"
"strings"
"sync"

"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
"github.com/observiq/carbon/entry"
"github.com/observiq/carbon/errors"
)

type ExprStringConfig string

const (
exprStartTag = "EXPR("
exprEndTag = ")"
exprStartToken = "EXPR("
exprEndToken = ")"
)

func (e ExprStringConfig) Build() (*ExprString, error) {
s := string(e)
begin := 0
rangeStart := 0

subStrings := make([]string, 0)
subExprStrings := make([]string, 0)
subStrings := make([]string, 0, 4)
subExprStrings := make([]string, 0, 4)

LOOP:
for {
indexStart := strings.Index(s[begin:], exprStartTag)
indexEnd := strings.Index(s[begin:], exprEndTag)
switch {
case indexStart == -1 || indexEnd == -1:
fallthrough
case indexStart > indexEnd:
// if we don't have a "{{" followed by a "}}" in the remainder
// of the string, treat the remainder as a string literal
subStrings = append(subStrings, s[begin:])
break LOOP
default:
// make indexes relative to whole string again
indexStart += begin
indexEnd += begin
rangeEnd := len(s)

// Find the first instance of the start token
indexStart := strings.Index(s[rangeStart:rangeEnd], exprStartToken)
if indexStart == -1 {
// Start token does not exist in the remainder of the string,
// so treat the rest as a string literal
subStrings = append(subStrings, s[rangeStart:])
break
} else {
indexStart = rangeStart + indexStart
}

// Restrict our end token search range to the next instance of the start token
nextIndexStart := strings.Index(s[indexStart+len(exprStartToken):], exprStartToken)
if nextIndexStart == -1 {
rangeEnd = len(s)
} else {
rangeEnd = indexStart + len(exprStartToken) + nextIndexStart
}

// Greedily search for the last end token in the search range
indexEnd := strings.LastIndex(s[indexStart:rangeEnd], exprEndToken)
if indexEnd == -1 {
// End token does not exist before the next start token
// or end of expression string, so treat the remainder of the string
// as a string literal
subStrings = append(subStrings, s[rangeStart:])
break
} else {
indexEnd = indexStart + indexEnd
}

// Unscope the indexes and add the partitioned strings
subStrings = append(subStrings, s[rangeStart:indexStart])
subExprStrings = append(subExprStrings, s[indexStart+len(exprStartToken):indexEnd])

// Reset the starting range and finish if it reaches the end of the string
rangeStart = indexEnd + len(exprEndToken)
if rangeStart > len(s) {
break
}
subStrings = append(subStrings, s[begin:indexStart])
subExprStrings = append(subExprStrings, s[indexStart+len(exprStartTag):indexEnd])
begin = indexEnd + len(exprEndTag)
}

subExprs := make([]*vm.Program, 0, len(subExprStrings))
Expand Down Expand Up @@ -85,3 +111,26 @@ func (e *ExprString) Render(env map[string]interface{}) (string, error) {

return b.String(), nil
}

var envPool = sync.Pool{
New: func() interface{} {
return map[string]interface{}{
"env": os.Getenv,
}
},
}

func GetExprEnv(e *entry.Entry) map[string]interface{} {
env := envPool.Get().(map[string]interface{})
env["$"] = e.Record
env["$record"] = e.Record
env["$labels"] = e.Labels
env["$timestamp"] = e.Timestamp
env["$tags"] = e.Tags

return env
}

func PutExprEnv(e map[string]interface{}) {
envPool.Put(e)
}
Loading

0 comments on commit b723fa5

Please sign in to comment.