Skip to content

Commit

Permalink
Merge pull request #10 from observIQ/env-in-exprs
Browse files Browse the repository at this point in the history
Add env support to expressions
  • Loading branch information
camdencheek authored Jul 9, 2020
2 parents cecee58 + b723fa5 commit 9aa4d6f
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 58 deletions.
1 change: 1 addition & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
## **Please check that the PR fulfills these requirements**
- [ ] Tests for the changes have been added (for bug fixes / features)
- [ ] Docs have been added / updated (for bug fixes / features)
- [ ] Add a changelog entry (for non-trivial bug fixes / features)
- [ ] CI passes
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- Support for reading from environment in expressions
- Ability to point to labels with fields


## [0.9.0] - 2020-07-07

Initial Open Source Release
Initial Open Source Release
19 changes: 17 additions & 2 deletions docs/types/expression.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,25 @@ being processed.

For reference documentation of the expression language, see [here](https://github.com/antonmedv/expr/blob/master/docs/Language-Definition.md).

In most cases, the record of the entry being processed can be accessed with the `$` variable in the expression. See the examples below for syntax.
Available to the expressions are a few special variables:
- `$record` contains the entry's record
- `$labels` contains the entry's labels
- `$tags` contains the entry's tags
- `$timestamp` contains the entry's timestamp
- `env()` is a function that allows you to read environment variables

## Examples

### Add a label from an environment variable

```yaml
- id: add_stack_label
type: metadata
output: my_receiver
labels:
stack: 'EXPR(env("STACK"))'
```
### Map severity values to standard values
```yaml
Expand All @@ -19,5 +34,5 @@ In most cases, the record of the entry being processed can be accessed with the
ops:
- add:
field: severity
value_expr: '$.raw_severity in ["critical", "super_critical"] ? "error" : $.raw_severity'
value_expr: '$record.raw_severity in ["critical", "super_critical"] ? "error" : $record.raw_severity'
```
18 changes: 4 additions & 14 deletions plugin/builtin/transformer/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,8 @@ type labeler struct {
}

func (l *labeler) Label(e *entry.Entry) error {
env := map[string]interface{}{
"$": e.Record,
"$record": e.Record,
"$labels": e.Labels,
"$timestamp": e.Timestamp,
"$tags": e.Tags,
}
env := helper.GetExprEnv(e)
defer helper.PutExprEnv(env)

for k, v := range l.labels {
rendered, err := v.Render(env)
Expand Down Expand Up @@ -115,13 +110,8 @@ type tagger struct {
}

func (t *tagger) Tag(e *entry.Entry) error {
env := map[string]interface{}{
"$": e.Record,
"$record": e.Record,
"$labels": e.Labels,
"$timestamp": e.Timestamp,
"$tags": e.Tags,
}
env := helper.GetExprEnv(e)
defer helper.PutExprEnv(env)

for _, v := range t.tags {
rendered, err := v.Render(env)
Expand Down
36 changes: 36 additions & 0 deletions plugin/builtin/transformer/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transformer

import (
"context"
"os"
"testing"
"time"

Expand All @@ -14,6 +15,9 @@ import (
)

func TestMetadata(t *testing.T) {
os.Setenv("TEST_METADATA_PLUGIN_ENV", "foo")
defer os.Unsetenv("TEST_METADATA_PLUGIN_ENV")

baseConfig := func() *MetadataPluginConfig {
return &MetadataPluginConfig{
TransformerConfig: helper.TransformerConfig{
Expand Down Expand Up @@ -98,6 +102,38 @@ func TestMetadata(t *testing.T) {
return e
}(),
},
{
"AddLabelEnv",
func() *MetadataPluginConfig {
cfg := baseConfig()
cfg.Labels = map[string]helper.ExprStringConfig{
"label1": `EXPR(env("TEST_METADATA_PLUGIN_ENV"))`,
}
return cfg
}(),
entry.New(),
func() *entry.Entry {
e := entry.New()
e.Labels = map[string]string{
"label1": "foo",
}
return e
}(),
},
{
"AddTagEnv",
func() *MetadataPluginConfig {
cfg := baseConfig()
cfg.Tags = []helper.ExprStringConfig{`EXPR(env("TEST_METADATA_PLUGIN_ENV"))`}
return cfg
}(),
entry.New(),
func() *entry.Entry {
e := entry.New()
e.Tags = []string{"foo"}
return e
}(),
},
}

for _, tc := range cases {
Expand Down
12 changes: 4 additions & 8 deletions plugin/builtin/transformer/restructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (o Op) MarshalYAML() (interface{}, error) {

/******
Add
******/
*******/

type OpAdd struct {
Field entry.Field `json:"field" yaml:"field"`
Expand All @@ -205,13 +205,9 @@ func (op *OpAdd) Apply(e *entry.Entry) error {
return err
}
case op.program != nil:
env := map[string]interface{}{
"$": e.Record,
"$record": e.Record,
"$labels": e.Labels,
"$tags": e.Tags,
"$timestamp": e.Timestamp,
}
env := helper.GetExprEnv(e)
defer helper.PutExprEnv(env)

result, err := vm.Run(op.program, env)
if err != nil {
return fmt.Errorf("evaluate value_expr: %s", err)
Expand Down
25 changes: 25 additions & 0 deletions plugin/builtin/transformer/restructure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transformer
import (
"context"
"encoding/json"
"os"
"testing"
"time"

Expand Down Expand Up @@ -36,6 +37,9 @@ func NewFakeRestructurePlugin() (*RestructurePlugin, *testutil.Plugin) {
}

func TestRestructurePlugin(t *testing.T) {
os.Setenv("TEST_RESTRUCTURE_PLUGIN_ENV", "foo")
defer os.Unsetenv("TEST_RESTRUCTURE_PLUGIN_ENV")

newTestEntry := func() *entry.Entry {
e := entry.New()
e.Timestamp = time.Unix(1586632809, 0)
Expand Down Expand Up @@ -97,6 +101,27 @@ func TestRestructurePlugin(t *testing.T) {
return e
}(),
},
{
name: "AddValueExprEnv",
ops: []Op{
{
&OpAdd{
Field: entry.NewRecordField("new"),
program: func() *vm.Program {
vm, err := expr.Compile(`env("TEST_RESTRUCTURE_PLUGIN_ENV")`)
require.NoError(t, err)
return vm
}(),
},
},
},
input: newTestEntry(),
output: func() *entry.Entry {
e := newTestEntry()
e.Record.(map[string]interface{})["new"] = "foo"
return e
}(),
},
{
name: "Remove",
ops: []Op{
Expand Down
9 changes: 2 additions & 7 deletions plugin/builtin/transformer/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,8 @@ func (p *RouterPlugin) CanProcess() bool {
}

func (p *RouterPlugin) Process(ctx context.Context, entry *entry.Entry) error {
env := map[string]interface{}{
"$": entry.Record,
"$record": entry.Record,
"$labels": entry.Labels,
"$timestamp": entry.Timestamp,
"$tags": entry.Tags,
}
env := helper.GetExprEnv(entry)
defer helper.PutExprEnv(env)

for _, route := range p.routes {
matches, err := vm.Run(route.Expression, env)
Expand Down
23 changes: 23 additions & 0 deletions plugin/builtin/transformer/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transformer

import (
"context"
"os"
"testing"

"github.com/observiq/carbon/entry"
Expand All @@ -13,6 +14,9 @@ import (
)

func TestRouterPlugin(t *testing.T) {
os.Setenv("TEST_ROUTER_PLUGIN_ENV", "foo")
defer os.Unsetenv("TEST_ROUTER_PLUGIN_ENV")

basicConfig := func() *RouterPluginConfig {
return &RouterPluginConfig{
BasicConfig: helper.BasicConfig{
Expand Down Expand Up @@ -69,6 +73,25 @@ func TestRouterPlugin(t *testing.T) {
},
map[string]int{"output2": 1},
},
{
"MatchEnv",
&entry.Entry{
Record: map[string]interface{}{
"message": "test_message",
},
},
[]*RouterPluginRouteConfig{
{
`env("TEST_ROUTER_PLUGIN_ENV") == "foo"`,
[]string{"output1"},
},
{
`true`,
[]string{"output2"},
},
},
map[string]int{"output1": 1},
},
}

for _, tc := range cases {
Expand Down
95 changes: 72 additions & 23 deletions plugin/helper/expr_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,73 @@ package helper

import (
"fmt"
"os"
"strings"
"sync"

"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
"github.com/observiq/carbon/entry"
"github.com/observiq/carbon/errors"
)

type ExprStringConfig string

const (
exprStartTag = "EXPR("
exprEndTag = ")"
exprStartToken = "EXPR("
exprEndToken = ")"
)

func (e ExprStringConfig) Build() (*ExprString, error) {
s := string(e)
begin := 0
rangeStart := 0

subStrings := make([]string, 0)
subExprStrings := make([]string, 0)
subStrings := make([]string, 0, 4)
subExprStrings := make([]string, 0, 4)

LOOP:
for {
indexStart := strings.Index(s[begin:], exprStartTag)
indexEnd := strings.Index(s[begin:], exprEndTag)
switch {
case indexStart == -1 || indexEnd == -1:
fallthrough
case indexStart > indexEnd:
// if we don't have a "{{" followed by a "}}" in the remainder
// of the string, treat the remainder as a string literal
subStrings = append(subStrings, s[begin:])
break LOOP
default:
// make indexes relative to whole string again
indexStart += begin
indexEnd += begin
rangeEnd := len(s)

// Find the first instance of the start token
indexStart := strings.Index(s[rangeStart:rangeEnd], exprStartToken)
if indexStart == -1 {
// Start token does not exist in the remainder of the string,
// so treat the rest as a string literal
subStrings = append(subStrings, s[rangeStart:])
break
} else {
indexStart = rangeStart + indexStart
}

// Restrict our end token search range to the next instance of the start token
nextIndexStart := strings.Index(s[indexStart+len(exprStartToken):], exprStartToken)
if nextIndexStart == -1 {
rangeEnd = len(s)
} else {
rangeEnd = indexStart + len(exprStartToken) + nextIndexStart
}

// Greedily search for the last end token in the search range
indexEnd := strings.LastIndex(s[indexStart:rangeEnd], exprEndToken)
if indexEnd == -1 {
// End token does not exist before the next start token
// or end of expression string, so treat the remainder of the string
// as a string literal
subStrings = append(subStrings, s[rangeStart:])
break
} else {
indexEnd = indexStart + indexEnd
}

// Unscope the indexes and add the partitioned strings
subStrings = append(subStrings, s[rangeStart:indexStart])
subExprStrings = append(subExprStrings, s[indexStart+len(exprStartToken):indexEnd])

// Reset the starting range and finish if it reaches the end of the string
rangeStart = indexEnd + len(exprEndToken)
if rangeStart > len(s) {
break
}
subStrings = append(subStrings, s[begin:indexStart])
subExprStrings = append(subExprStrings, s[indexStart+len(exprStartTag):indexEnd])
begin = indexEnd + len(exprEndTag)
}

subExprs := make([]*vm.Program, 0, len(subExprStrings))
Expand Down Expand Up @@ -85,3 +111,26 @@ func (e *ExprString) Render(env map[string]interface{}) (string, error) {

return b.String(), nil
}

var envPool = sync.Pool{
New: func() interface{} {
return map[string]interface{}{
"env": os.Getenv,
}
},
}

func GetExprEnv(e *entry.Entry) map[string]interface{} {
env := envPool.Get().(map[string]interface{})
env["$"] = e.Record
env["$record"] = e.Record
env["$labels"] = e.Labels
env["$timestamp"] = e.Timestamp
env["$tags"] = e.Tags

return env
}

func PutExprEnv(e map[string]interface{}) {
envPool.Put(e)
}
Loading

0 comments on commit 9aa4d6f

Please sign in to comment.