diff --git a/cmd/stanza/init_common.go b/cmd/stanza/init_common.go index b77a8bebc..0599d2d2b 100644 --- a/cmd/stanza/init_common.go +++ b/cmd/stanza/init_common.go @@ -27,6 +27,7 @@ import ( _ "github.com/observiq/stanza/operator/builtin/transformer/hostmetadata" _ "github.com/observiq/stanza/operator/builtin/transformer/k8smetadata" _ "github.com/observiq/stanza/operator/builtin/transformer/metadata" + _ "github.com/observiq/stanza/operator/builtin/transformer/move" _ "github.com/observiq/stanza/operator/builtin/transformer/noop" _ "github.com/observiq/stanza/operator/builtin/transformer/ratelimit" _ "github.com/observiq/stanza/operator/builtin/transformer/recombine" diff --git a/docs/operators/move.md b/docs/operators/move.md new file mode 100644 index 000000000..fc82080b1 --- /dev/null +++ b/docs/operators/move.md @@ -0,0 +1,283 @@ +## `move` operator + +The `move` operator moves (or renames) a field from one location to another. + +It's configured by passing 'to' and 'from' fields. + +### Configuration Fields + +| Field | Default | Description | +| --- | --- | --- | +| `id` | `move` | A unique identifier for the operator | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `from` | required | The [field](/docs/types/field.md) to move the value out of. +| `to` | required | The [field](/docs/types/field.md) to move the value into. +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) | +| `if` | | An [expression](/docs/types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. | + +Example usage: + +Rename value +```yaml +- type: move + from: key1 + to: key3 +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { }, + "record": { + "key1": "val1", + "key2": "val2" + } +} +``` + + + +```json +{ + "resource": { }, + "labels": { }, + "record": { + "key3": "val1", + "key2": "val2" + } +} +``` + +
+
+ +Move a value from the record to resource + +```yaml +- type: move + from: uuid + to: $resoruce.uuid +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { }, + "record": { + "uuid": "091edc50-d91a-460d-83cd-089a62937738" + } +} +``` + + + +```json +{ + "resource": { + "uuid": "091edc50-d91a-460d-83cd-089a62937738" + }, + "labels": { }, + "record": { } +} +``` + +
+ +
+ +Move a value from the record to labels + +```yaml +- type: move + from: ip + to: $labels.ip +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { }, + "record": { + "ip": "8.8.8.8" + } +} +``` + + + +```json +{ + "resource": { }, + "labels": { + "ip": "8.8.8.8" + }, + "record": { } +} +``` + +
+ +
+ +Replace the record with an individual value nested within the record +```yaml +- type: move + from: log + to: $record +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { }, + "record": { + "log": "The log line" + } +} +``` + + + +```json +{ + "resource": { }, + "labels": { }, + "record": "The log line" +} +``` + +
+ +
+ +Remove a layer from the record +```yaml +- type: move + from: wrapper + to: $record +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { }, + "record": { + "wrapper": { + "key1": "val1", + "key2": "val2", + "key3": "val3" + } + } +} +} +``` + + + +```json +{ + "resource": { }, + "labels": { }, + "record": { + "key1": "val1", + "key2": "val2", + "key3": "val3" + } +} +``` + +
+ +
+ +Merge a layer to the record +```yaml +- type: move + from: wrapper + to: $record +``` + + + + + + + +
Input Entry Output Entry
+ +```json +{ + "resource": { }, + "labels": { }, + "record": { + "wrapper": { + "key1": "val1", + "key2": "val2", + "key3": "val3" + }, + "key4": "val1", + "key5": "val2", + "key6": "val3" + } +} +``` + + + +```json +{ + "resource": { }, + "labels": { }, + "record": { + "key1": "val1", + "key2": "val2", + "key3": "val3", + "key4": "val1", + "key5": "val2", + "key6": "val3" + } +} +``` + +
+ diff --git a/operator/builtin/transformer/move/move.go b/operator/builtin/transformer/move/move.go new file mode 100644 index 000000000..1a333ff00 --- /dev/null +++ b/operator/builtin/transformer/move/move.go @@ -0,0 +1,69 @@ +package move + +import ( + "context" + "fmt" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" +) + +func init() { + operator.Register("move", func() operator.Builder { return NewMoveOperatorConfig("") }) +} + +// NewMoveOperatorConfig creates a new move operator config with default values +func NewMoveOperatorConfig(operatorID string) *MoveOperatorConfig { + return &MoveOperatorConfig{ + TransformerConfig: helper.NewTransformerConfig(operatorID, "move"), + } +} + +// MoveOperatorConfig is the configuration of a move operator +type MoveOperatorConfig struct { + helper.TransformerConfig `mapstructure:",squash" yaml:",inline"` + From entry.Field `mapstructure:"from" yaml:"from"` + To entry.Field `mapstructure:"to" yaml:"to"` +} + +// Build will build a Move operator from the supplied configuration +func (c MoveOperatorConfig) Build(context operator.BuildContext) ([]operator.Operator, error) { + transformerOperator, err := c.TransformerConfig.Build(context) + if err != nil { + return nil, err + } + + if c.To == entry.NewNilField() || c.From == entry.NewNilField() { + return nil, fmt.Errorf("move: missing to or from field") + } + + moveOperator := &MoveOperator{ + TransformerOperator: transformerOperator, + From: c.From, + To: c.To, + } + + return []operator.Operator{moveOperator}, nil +} + +// MoveOperator is an operator that moves a field's value to a new field +type MoveOperator struct { + helper.TransformerOperator + From entry.Field + To entry.Field +} + +// Process will process an entry with a move transformation. +func (p *MoveOperator) Process(ctx context.Context, entry *entry.Entry) error { + return p.ProcessWith(ctx, entry, p.Transform) +} + +// Transform will apply the move operation to an entry +func (p *MoveOperator) Transform(e *entry.Entry) error { + val, exist := p.From.Delete(e) + if !exist { + return fmt.Errorf("move: field does not exist") + } + return p.To.Set(e, val) +} diff --git a/operator/builtin/transformer/move/move_test.go b/operator/builtin/transformer/move/move_test.go new file mode 100644 index 000000000..aa2ac21df --- /dev/null +++ b/operator/builtin/transformer/move/move_test.go @@ -0,0 +1,377 @@ +package move + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/testutil" +) + +type processTestCase struct { + name string + expectErr bool + op *MoveOperatorConfig + input func() *entry.Entry + output func() *entry.Entry +} + +func TestMoveProcess(t *testing.T) { + newTestEntry := func() *entry.Entry { + e := entry.New() + e.Timestamp = time.Unix(1586632809, 0) + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + return e + } + + cases := []processTestCase{ + { + "MoveRecordToRecord", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("key") + cfg.To = entry.NewRecordField("new") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "new": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + return e + }, + }, + { + "MoveRecordToLabel", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("key") + cfg.To = entry.NewLabelField("new") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + e.Labels = map[string]string{"new": "val"} + return e + }, + }, + { + "MoveLabelToRecord", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewLabelField("new") + cfg.To = entry.NewRecordField("new") + return cfg + }(), + func() *entry.Entry { + e := newTestEntry() + e.Labels = map[string]string{"new": "val"} + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "new": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + e.Labels = map[string]string{} + return e + }, + }, + { + "MoveLabelToResource", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewLabelField("new") + cfg.To = entry.NewResourceField("new") + return cfg + }(), + func() *entry.Entry { + e := newTestEntry() + e.Labels = map[string]string{"new": "val"} + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Resource = map[string]string{"new": "val"} + e.Labels = map[string]string{} + return e + }, + }, + { + "MoveResourceToLabel", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewResourceField("new") + cfg.To = entry.NewLabelField("new") + return cfg + }(), + func() *entry.Entry { + e := newTestEntry() + e.Resource = map[string]string{"new": "val"} + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Resource = map[string]string{} + e.Labels = map[string]string{"new": "val"} + return e + }, + }, + { + "MoveNest", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("nested") + cfg.To = entry.NewRecordField("NewNested") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "NewNested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + return e + }, + }, + { + "MoveFromNestedObj", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("nested", "nestedkey") + cfg.To = entry.NewRecordField("unnestedkey") + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{}, + "unnestedkey": "nestedval", + } + return e + }, + }, + { + "MoveToNestedObj", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("newnestedkey") + cfg.To = entry.NewRecordField("nested", "newnestedkey") + + return cfg + }(), + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + "newnestedkey": "nestedval", + } + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + "newnestedkey": "nestedval", + }, + } + return e + }, + }, + { + "MoveDoubleNestedObj", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("nested", "nested2") + cfg.To = entry.NewRecordField("nested2") + return cfg + }(), + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + "nested2": map[string]interface{}{ + "nestedkey": "nestedval", + }, + }, + } + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + "nested2": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + return e + }, + }, + { + "MoveNestToResource", + true, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("nested") + cfg.To = entry.NewResourceField("NewNested") + return cfg + }(), + newTestEntry, + nil, + }, + { + "MoveNestToLabel", + true, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("nested") + cfg.To = entry.NewLabelField("NewNested") + + return cfg + }(), + newTestEntry, + nil, + }, + { + "ReplaceRecordObj", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("wrapper") + cfg.To = entry.NewRecordField() + return cfg + }(), + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "wrapper": map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + }, + } + return e + }, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nested": map[string]interface{}{ + "nestedkey": "nestedval", + }, + } + return e + }, + }, + { + "ReplaceRecordString", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("key") + cfg.To = entry.NewRecordField() + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = "val" + return e + }, + }, + { + "MergeObjToRecord", + false, + func() *MoveOperatorConfig { + cfg := defaultCfg() + cfg.From = entry.NewRecordField("nested") + cfg.To = entry.NewRecordField() + return cfg + }(), + newTestEntry, + func() *entry.Entry { + e := newTestEntry() + e.Record = map[string]interface{}{ + "key": "val", + "nestedkey": "nestedval", + } + return e + }, + }, + } + for _, tc := range cases { + t.Run("BuildandProcess/"+tc.name, func(t *testing.T) { + cfg := tc.op + cfg.OutputIDs = []string{"fake"} + cfg.OnError = "drop" + ops, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + op := ops[0] + + move := op.(*MoveOperator) + fake := testutil.NewFakeOutput(t) + move.SetOutputs([]operator.Operator{fake}) + val := tc.input() + err = move.Process(context.Background(), val) + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + fake.ExpectEntry(t, tc.output()) + } + }) + } +} + +func defaultCfg() *MoveOperatorConfig { + return NewMoveOperatorConfig("move") +}