Skip to content

Commit

Permalink
feat: unroll processor (#2021)
Browse files Browse the repository at this point in the history
* unroll processor spike

* Minimize copying and complexity (#2022)

* minimal readme.md

* make fmt

* remove some of the otel-community specific things

* add-licenses

* Feat/unroll processor tests (#2024)

Add golden test cases

* don't update go version

* some more test cases

* remove some otel stuff and add some more tests

* make add-licenses

* reset go version update

* update configuration parameter documentation

* address feedback and add another test case encapsulating empty and byte slice

---------

Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
schmikei and djaglowski authored Dec 3, 2024
1 parent 3b25dea commit 97f5f27
Show file tree
Hide file tree
Showing 25 changed files with 1,311 additions and 0 deletions.
2 changes: 2 additions & 0 deletions factories/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/observiq/bindplane-agent/processor/samplingprocessor"
"github.com/observiq/bindplane-agent/processor/spancountprocessor"
"github.com/observiq/bindplane-agent/processor/throughputmeasurementprocessor"
"github.com/observiq/bindplane-agent/processor/unrollprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor"
Expand Down Expand Up @@ -87,4 +88,5 @@ var defaultProcessors = []processor.Factory{
throughputmeasurementprocessor.NewFactory(),
tailsamplingprocessor.NewFactory(),
transformprocessor.NewFactory(),
unrollprocessor.NewFactory(),
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/observiq/bindplane-agent/processor/samplingprocessor v1.66.0
github.com/observiq/bindplane-agent/processor/spancountprocessor v1.66.0
github.com/observiq/bindplane-agent/processor/throughputmeasurementprocessor v1.66.0
github.com/observiq/bindplane-agent/processor/unrollprocessor v1.66.0
github.com/observiq/bindplane-agent/receiver/awss3rehydrationreceiver v1.66.0
github.com/observiq/bindplane-agent/receiver/azureblobrehydrationreceiver v1.66.0
github.com/observiq/bindplane-agent/receiver/httpreceiver v1.66.0
Expand Down Expand Up @@ -853,6 +854,8 @@ replace github.com/observiq/bindplane-agent/processor/datapointcountprocessor =>

replace github.com/observiq/bindplane-agent/processor/lookupprocessor => ./processor/lookupprocessor

replace github.com/observiq/bindplane-agent/processor/unrollprocessor => ./processor/unrollprocessor

replace github.com/observiq/bindplane-agent/expr => ./expr

replace github.com/observiq/bindplane-agent/counter => ./counter
Expand Down
124 changes: 124 additions & 0 deletions processor/unrollprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Unroll Processor

This is an experimental processor that will take a log record with slice bodies and expand each element of the slice into its own log record within the slice.

## Important Note

This is an experimental processor and is expected that this functionality would eventually be moved to an [OTTL function](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl).

## Supported pipelines

- Logs


## How it works

1. The user configures the `unroll` processor in their desired logs pipeline
2. Logs that go into this pipeline with a pcommon.Slice body will have each element of that body be expanded into its own log record


## Configuration
| Field | Type | Default | Description |
| --------- | ------ | ------- | ---------------------------------------------------------------------------------------------------------- |
| field | string | body | note: body is currently the only available value for unrolling; making this configuration currently static |
| recursive | bool | false | whether to recursively unroll body slices of slices |


### Example configuration

```yaml
unroll:
recursive: false
```
## How To
### Split a log record into multiple via a delimiter: ","
The following configuration utilizes the [transformprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor) to first split the original string body and then the unroll processor can create multiple events
```yaml
receivers:
filelog:
include: [ ./test.txt ]
start_at: beginning
processors:
transform:
log_statements:
- context: log
statements:
- set(body, Split(body, ","))
unroll:
exporters:
file:
path: ./test/output.json
service:
pipelines:
logs:
receivers: [filelog]
processors: [transform, unroll]
exporters: [file]
```
<details>
<summary> Sample Data </summary>
```txt
1,2,3
```

```json
{
"resourceLogs": [
{
"resource": {},
"scopeLogs": [
{
"scope": {},
"logRecords": [
{
"observedTimeUnixNano": "1733240156591852000",
"body": { "stringValue": "1" },
"attributes": [
{
"key": "log.file.name",
"value": { "stringValue": "test.txt" }
},
],
"traceId": "",
"spanId": ""
},
{
"observedTimeUnixNano": "1733240156591852000",
"body": { "stringValue": "2" },
"attributes": [
{
"key": "log.file.name",
"value": { "stringValue": "test.txt" }
},
],
"traceId": "",
"spanId": ""
},
{
"observedTimeUnixNano": "1733240156591852000",
"body": { "stringValue": "3" },
"attributes": [
{
"key": "log.file.name",
"value": { "stringValue": "test.txt" }
},
],
"traceId": "",
"spanId": ""
}
]
}
]
}
]
}
```
</details>
43 changes: 43 additions & 0 deletions processor/unrollprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package unrollprocessor contains the logic to unroll logs from a slice in the body field.
package unrollprocessor

import (
"errors"
)

// Config is the configuration for the unroll processor.
type Config struct {
Field UnrollField `mapstructure:"field"`
Recursive bool `mapstructure:"recursive"`
}

// UnrollField is the field to unroll.
type UnrollField string

const (
// UnrollFieldBody is the only supported field for unrolling logs.
UnrollFieldBody UnrollField = "body"
)

// Validate checks the configuration for any issues.
func (c *Config) Validate() error {
if c.Field != UnrollFieldBody {
return errors.New("only unrolling logs from a body slice is currently supported")
}

return nil
}
53 changes: 53 additions & 0 deletions processor/unrollprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package unrollprocessor

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidate(t *testing.T) {

testCases := []struct {
desc string
cfg *Config
expectedErr string
}{
{
desc: "valid config",
cfg: createDefaultConfig().(*Config),
},
{
desc: "config without body field",
cfg: &Config{
Field: "attributes",
},
expectedErr: "only unrolling logs from a body slice is currently supported",
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
err := tc.cfg.Validate()
if tc.expectedErr != "" {
require.ErrorContains(t, err, tc.expectedErr)
} else {
require.NoError(t, err)
}
})
}
}
70 changes: 70 additions & 0 deletions processor/unrollprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package unrollprocessor // import "github.com/observiq/bindplane-agent/processor/unrollprocessor"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
)

var processorCapabilities = consumer.Capabilities{MutatesData: true}

// componentType is the value of the "type" key in configuration.
var componentType = component.MustNewType("unroll")

const (
stability = component.StabilityLevelAlpha
)

// NewFactory returns a new factory for the Transform processor.
func NewFactory() processor.Factory {
return processor.NewFactory(
componentType,
createDefaultConfig,
processor.WithLogs(createLogsProcessor, stability),
)
}

func createDefaultConfig() component.Config {
return &Config{
Field: UnrollFieldBody,
}
}

func createLogsProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (processor.Logs, error) {
oCfg := cfg.(*Config)

proc, err := newUnrollProcessor(oCfg)
if err != nil {
return nil, fmt.Errorf("invalid config for \"unroll\" processor %w", err)
}
return processorhelper.NewLogs(
ctx,
set,
cfg,
nextConsumer,
proc.ProcessLogs,
processorhelper.WithCapabilities(processorCapabilities))
}
47 changes: 47 additions & 0 deletions processor/unrollprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package unrollprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"
)

func TestNewFactory(t *testing.T) {
factory := NewFactory()
require.Equal(t, componentType, factory.Type())

expectedCfg := &Config{
Field: UnrollFieldBody,
}

cfg, ok := factory.CreateDefaultConfig().(*Config)
require.True(t, ok)
require.Equal(t, expectedCfg, cfg)
}

func TestBadFactory(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Field = "invalid"

_, err := factory.CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, &consumertest.LogsSink{})
require.Error(t, err)
require.ErrorContains(t, err, "invalid config for \"unroll\" processor")
}
Loading

0 comments on commit 97f5f27

Please sign in to comment.