Skip to content

Commit

Permalink
Add Flatten operator (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mrod1598 authored Apr 21, 2021
1 parent 30848ed commit f5ae001
Show file tree
Hide file tree
Showing 7 changed files with 649 additions and 0 deletions.
116 changes: 116 additions & 0 deletions docs/operators/flatten.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
## `flatten` operator

The `flatten` operator flattens a field by moving its children up to the same level as the field.
The operator only flattens a single level deep.

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `flatten` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `field` | required | The [field](/docs/types/field.md) to be flattened. |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
| `if` | | An [expression](/docs/types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. |

Example usage:

<hr>
Flatten an object to the base of the body
<br>
<br>

```yaml
- type: flatten
field: key1
```
<table>
<tr><td> Input Entry </td> <td> Output Entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"attributes": { },
"body": {
"key1": {
"nested1": "nestedval1",
"nested2": "nestedval2"
},
"key2": "val2"
}
}
```

</td>
<td>

```json
{
"resource": { },
"attributes": { },
"body": {
"nested1": "nestedval1",
"nested2": "nestedval2",
"key2": "val2"
}
}
```

</td>
</tr>
</table>

<hr>
Flatten an object within another object
<br>
<br>

```yaml
- type: flatten
field: wrapper.key1
```
<table>
<tr><td> Input Entry </td> <td> Output Entry </td></tr>
<tr>
<td>
```json
{
"resource": { },
"attributes": { },
"body": {
"wrapper": {
"key1": {
"nested1": "nestedval1",
"nested2": "nestedval2"
},
"key2": "val2"
}
}
}
```

</td>
<td>

```json
{
"resource": { },
"attributes": { },
"body": {
"wrapper": {
"nested1": "nestedval1",
"nested2": "nestedval2",
"key2": "val2"
}
}
}
```

</td>
</tr>
</table>
132 changes: 132 additions & 0 deletions operator/builtin/transformer/flatten/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flatten

import (
"fmt"
"io/ioutil"
"path"
"testing"

"github.com/mitchellh/mapstructure"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

type configTestCase struct {
name string
expect *FlattenOperatorConfig
expectErr bool
}

// Test unmarshalling of values into config struct
func TestGoldenConfig(t *testing.T) {
cases := []configTestCase{
{
"flatten_one_level",
func() *FlattenOperatorConfig {
cfg := defaultCfg()
cfg.Field = entry.BodyField{
Keys: []string{"nested"},
}
return cfg
}(),
false,
},
{
"flatten_second_level",
func() *FlattenOperatorConfig {
cfg := defaultCfg()
cfg.Field = entry.BodyField{
Keys: []string{"nested", "secondlevel"},
}
return cfg
}(),
false,
},
{
"flatten_attributes",
func() *FlattenOperatorConfig {
cfg := defaultCfg()
cfg.Field = entry.BodyField{
Keys: []string{"$attributes", "errField"},
}
return cfg
}(),
false,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cfgFromYaml, yamlErr := configFromFileViaYaml(path.Join(".", "testdata", fmt.Sprintf("%s.yaml", tc.name)))
cfgFromMapstructure, mapErr := configFromFileViaMapstructure(path.Join(".", "testdata", fmt.Sprintf("%s.yaml", tc.name)))
if tc.expectErr {
t.Log(cfgFromYaml)
require.Error(t, mapErr)
require.Error(t, yamlErr)
} else {
require.NoError(t, yamlErr)
require.Equal(t, tc.expect, cfgFromYaml)
require.NoError(t, mapErr)
require.Equal(t, tc.expect, cfgFromMapstructure)
}
})
}
}

func configFromFileViaYaml(file string) (*FlattenOperatorConfig, error) {
bytes, err := ioutil.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("could not find config file: %s", err)
}

config := defaultCfg()
if err := yaml.Unmarshal(bytes, config); err != nil {
return nil, fmt.Errorf("failed to read config file as yaml: %s", err)
}

return config, nil
}

func configFromFileViaMapstructure(file string) (*FlattenOperatorConfig, error) {
bytes, err := ioutil.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("could not find config file: %s", err)
}

raw := map[string]interface{}{}

if err := yaml.Unmarshal(bytes, raw); err != nil {
return nil, fmt.Errorf("failed to read data from yaml: %s", err)
}

cfg := defaultCfg()
dc := &mapstructure.DecoderConfig{Result: cfg, DecodeHook: helper.JSONUnmarshalerHook()}
ms, err := mapstructure.NewDecoder(dc)
if err != nil {
return nil, err
}
err = ms.Decode(raw)
if err != nil {
return nil, err
}
return cfg, nil
}

func defaultCfg() *FlattenOperatorConfig {
return NewFlattenOperatorConfig("flatten")
}
101 changes: 101 additions & 0 deletions operator/builtin/transformer/flatten/flaltten.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flatten

import (
"context"
"fmt"
"strings"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/errors"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

func init() {
operator.Register("flatten", func() operator.Builder { return NewFlattenOperatorConfig("") })
}

// NewFlattenOperatorConfig creates a new flatten operator config with default values
func NewFlattenOperatorConfig(operatorID string) *FlattenOperatorConfig {
return &FlattenOperatorConfig{
TransformerConfig: helper.NewTransformerConfig(operatorID, "flatten"),
}
}

// FlattenOperatorConfig is the configuration of a flatten operator
type FlattenOperatorConfig struct {
helper.TransformerConfig `mapstructure:",squash" yaml:",inline"`
Field entry.BodyField `mapstructure:"field" json:"field" yaml:"field"`
}

// Build will build a Flatten operator from the supplied configuration
func (c FlattenOperatorConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
transformerOperator, err := c.TransformerConfig.Build(context)
if err != nil {
return nil, err
}

if strings.Contains(c.Field.String(), "$attributes") || strings.Contains(c.Field.String(), "$resource") {
return nil, fmt.Errorf("flatten: field cannot be a resource or attribute")
}

flattenOp := &FlattenOperator{
TransformerOperator: transformerOperator,
Field: c.Field,
}

return []operator.Operator{flattenOp}, nil
}

// FlattenOperator flattens an object in the body field
type FlattenOperator struct {
helper.TransformerOperator
Field entry.BodyField
}

// Process will process an entry with a flatten transformation.
func (p *FlattenOperator) Process(ctx context.Context, entry *entry.Entry) error {
return p.ProcessWith(ctx, entry, p.Transform)
}

// Transform will apply the flatten operation to an entry
func (p *FlattenOperator) Transform(entry *entry.Entry) error {
parent := p.Field.Parent()
val, ok := entry.Delete(p.Field)
if !ok {
// The field doesn't exist, so ignore it
return fmt.Errorf("apply flatten: field %s does not exist on body", p.Field)
}

valMap, ok := val.(map[string]interface{})
if !ok {
// The field we were asked to flatten was not a map, so put it back
err := entry.Set(p.Field, val)
if err != nil {
return errors.Wrap(err, "reset non-map field")
}
return fmt.Errorf("apply flatten: field %s is not a map", p.Field)
}

for k, v := range valMap {
err := entry.Set(parent.Child(k), v)
if err != nil {
return err
}
}
return nil
}
Loading

0 comments on commit f5ae001

Please sign in to comment.