Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add marshal processor #1830

Merged
merged 52 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
1f2ed7c
add factory
kuiperda Aug 29, 2024
1a10e81
add config
kuiperda Aug 29, 2024
54ca50f
add testdata
kuiperda Aug 29, 2024
52d4611
processor test and other files
kuiperda Aug 30, 2024
7a10944
processor.go mvp
kuiperda Aug 30, 2024
73ed4f1
rename module
kuiperda Aug 30, 2024
848a3d0
add processors.go
kuiperda Aug 30, 2024
82523e9
add readme
kuiperda Aug 30, 2024
d299e20
update go.mod replace
kuiperda Aug 30, 2024
7a88686
run make fmt
kuiperda Aug 30, 2024
2bde71b
lint
kuiperda Aug 30, 2024
ab59b04
lint 2.0
kuiperda Aug 30, 2024
fc7a952
Skip processing unless body is a map
kuiperda Aug 30, 2024
065acfa
convert map to kv directly
kuiperda Aug 30, 2024
77e6fe0
do not allow configuring xml
kuiperda Aug 30, 2024
3fb0c7d
Add metadata.yaml
kuiperda Aug 30, 2024
18a3755
Update go.mod and rest of metadata
kuiperda Aug 30, 2024
821c1e2
fmt and lint
kuiperda Aug 30, 2024
b78c599
fix factory_test
kuiperda Aug 30, 2024
bb0a533
log instead of erroring when log type is not map
kuiperda Aug 30, 2024
bac7f5a
use golden and comparelogs
kuiperda Aug 30, 2024
d56c6c1
run go mod tidy at top leve;
kuiperda Aug 30, 2024
2faebda
add kv separators to config
kuiperda Sep 4, 2024
3a2804d
rename testdata fileds
kuiperda Sep 4, 2024
bc075c9
cover cases for KV marshaling
kuiperda Sep 5, 2024
7398f3d
rename separator and pair separator properly
kuiperda Sep 5, 2024
8ccf304
update readme with kv separators
kuiperda Sep 5, 2024
f428cd8
go mod tidy + go generate + make fmt
kuiperda Sep 5, 2024
ab612b2
handle nested maps for kv
kuiperda Sep 5, 2024
8200ee4
handle deeply nested logs for kv that also contain problematic charac…
kuiperda Sep 5, 2024
760dbb0
go mod tidy + go generate + make fmt
kuiperda Sep 5, 2024
1f2b1df
include fuller pipeline for config examples
kuiperda Sep 6, 2024
add2d0d
rename headers
kuiperda Sep 6, 2024
6ff9a88
remove XML references in readme
kuiperda Sep 6, 2024
12eefe5
set default config parameters in config
kuiperda Sep 6, 2024
b60b99a
update go mod
kuiperda Sep 6, 2024
42bf818
add config defaults to factory test
kuiperda Sep 6, 2024
59feb3e
convert string in newMarshalProcessor
kuiperda Sep 6, 2024
3a2d8ec
use backticks
kuiperda Sep 6, 2024
f84209c
do not repeatedly fmt.Sprintf(v)
kuiperda Sep 9, 2024
2389bbf
use recursive function
kuiperda Sep 9, 2024
8ea6964
add map kv separators in config options
kuiperda Sep 9, 2024
6d40c53
add processor tests for mapkvseparators
kuiperda Sep 9, 2024
6b05c3f
allow specifying map kv separators
kuiperda Sep 9, 2024
3990520
go generate + make fmt
kuiperda Sep 10, 2024
ae0136e
lint
kuiperda Sep 10, 2024
5b008ba
sort by keys
kuiperda Sep 10, 2024
4c7ccf3
update readme
kuiperda Sep 12, 2024
b9dc73a
add test to confirm behavior of nested slices with KV marshaling
kuiperda Sep 12, 2024
d0eb663
add array expectation to readme
kuiperda Sep 12, 2024
f211fd8
fix case of config options
kuiperda Sep 12, 2024
51179ed
Merge branch 'release/v1.60.0' into add-marshal-processor
kuiperda Sep 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/processors.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Below is a list of supported processors with links to their documentation pages.
| Log DeDuplication Processor | [logdeduplicationprocessor](../processor/logdeduplicationprocessor/README.md) |
| Logs Transform Processor | [logstransform](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.108.0/processor/logstransformprocessor/README.md) |
| Lookup Processor | [lookupprocessor](../processor/lookupprocessor/README.md) |
| Marshal Processor | [marshalprocessor](../processor/marshalprocessor/README.md) |
| Mask Processor | [maskprocessor](../processor/maskprocessor/README.md) |
| Memory Limiter Processor | [memorylimiterprocessor](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.108.0/processor/memorylimiterprocessor/README.md) |
| Metric Extract Processor | [metricextract](../processor/metricextractprocessor/README.md) |
Expand Down
2 changes: 2 additions & 0 deletions factories/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/observiq/bindplane-agent/processor/logcountprocessor"
"github.com/observiq/bindplane-agent/processor/logdeduplicationprocessor"
"github.com/observiq/bindplane-agent/processor/lookupprocessor"
"github.com/observiq/bindplane-agent/processor/marshalprocessor"
"github.com/observiq/bindplane-agent/processor/maskprocessor"
"github.com/observiq/bindplane-agent/processor/metricextractprocessor"
"github.com/observiq/bindplane-agent/processor/metricstatsprocessor"
Expand Down Expand Up @@ -66,6 +67,7 @@ var defaultProcessors = []processor.Factory{
logdeduplicationprocessor.NewFactory(),
logstransformprocessor.NewFactory(),
lookupprocessor.NewFactory(),
marshalprocessor.NewFactory(),
maskprocessor.NewFactory(),
memorylimiterprocessor.NewFactory(),
metricextractprocessor.NewFactory(),
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/observiq/bindplane-agent/processor/logcountprocessor v1.60.0
github.com/observiq/bindplane-agent/processor/logdeduplicationprocessor v1.60.0
github.com/observiq/bindplane-agent/processor/lookupprocessor v1.60.0
github.com/observiq/bindplane-agent/processor/marshalprocessor v1.60.0
github.com/observiq/bindplane-agent/processor/maskprocessor v1.60.0
github.com/observiq/bindplane-agent/processor/metricextractprocessor v1.60.0
github.com/observiq/bindplane-agent/processor/metricstatsprocessor v1.60.0
Expand Down Expand Up @@ -797,6 +798,8 @@ replace github.com/observiq/bindplane-agent/processor/throughputmeasurementproce

replace github.com/observiq/bindplane-agent/processor/samplingprocessor => ./processor/samplingprocessor

replace github.com/observiq/bindplane-agent/processor/marshalprocessor => ./processor/marshalprocessor

replace github.com/observiq/bindplane-agent/processor/maskprocessor => ./processor/maskprocessor

replace github.com/observiq/bindplane-agent/processor/logcountprocessor => ./processor/logcountprocessor
Expand Down
192 changes: 192 additions & 0 deletions processor/marshalprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# Marshal Processor

This processor is used to marshal parsed logs into JSON or KV format.

This processor is intended to be wrapped into the Marshal processor in Bindplane.

NOTE: XML support is in progress and not yet available.

## Supported pipelines

- Logs

## How It Works

1. This processor expects its input to contain a parsed log body. It will marshal the body fields only; if additional fields from the log are desired, they must first be moved to the body.

2. The body can be marshaled to string-encoded JSON or KV.

- For KV:
- Fields will be converted to "key1=value1 key2=value2 key3=value3..." if no separators are configured
- The parsed fields should be flattened first so that every key is at the top level
- If fields are not flattened, the nested fields will be converted to "nested=[k1=v1,k2=v2]..." if no map separators are configured
- If any key or value contains characters that conflict with the separators, they will be wrapped in `"` and any `"` inside them will be escaped
- Arrays will simply be stringified

3. The output of this processor will be the same as the input, but with a modified log body. Any body incompatible with the marshal type will be unchanged.

## Configuration

| Field | Type | Default | Description |
| --------------------- | ------ | ------- | --------------------------------------------- |
| marshal_to | string | "" | The format to marshal into. Can be JSON or KV |
| kv_separator | rune | "=" | The separator between key and value |
| kv_pair_separator | rune | " " | The separator between KV pairs |
| map_kv_separator | rune | "=" | The separator between nested KV pairs |
| map_kv_pair_separator | rune | "," | The separator between nested KV pairs |

## Example Config for JSON

```yaml
receivers:
otlp:
processors:
transform:
marshal:
marshal_to: "JSON"
exporters:
chronicle:
service:
pipelines:
logs:
receivers: [otlp]
processors: [transform, marshal]
exporters: [chronicle]
```

## Example config for KV

```yaml
receivers:
otlp:
processors:
transform:
marshal:
marshal_to: "KV"
kv_separator: ","
kv_pair_separator: ":"
exporters:
chronicle:
service:
pipelines:
logs:
receivers: [otlp]
processors: [transform, marshal]
exporters: [chronicle]
```

## Example Output

The parsed body will be replaced by a marshaled body. All other fields are untouched.

### 1. Nested parsed body to JSON

In the example below, "bindplane-otel-attributes" represents attributes that have been moved to the body.

#### Parsed body

```
"body": {
"kvlistValue": {
"values": [
{ "key": "severity", "value": { "doubleValue": 155 } },
{
"key": "nested",
"value": {
"kvlistValue": {
"values": [
{ "key": "n2", "value": { "doubleValue": 2 } },
{ "key": "n1", "value": { "doubleValue": 1 } }
]
}
}
},
{ "key": "name", "value": { "stringValue": "test" } },
{
"key": "bindplane-otel-attributes",
"value": {
"kvlistValue": {
"values": [
{
"key": "baba",
"value": { "stringValue": "you" }
},
{
"key": "host",
"value": { "stringValue": "myhost" }
}
]
}
}
}
]
}
},

```

#### JSON output

```
"body": {
"stringValue": {
{
"bindplane-otel-attributes":
{
"baba":"you",
"host":"myhost"
},
"name":"test",
"nested":
{
"n1":1,
"n2":2
},
"severity":155
}
}
}
```

### 2: Flattened parsed body to KV with default separators

In the example below, flattening has already been done on the "nested" field and the "bindplane-otel-attributes" field.

#### Parsed flattened body

```
"body": {
"kvlistValue": {
"values": [
{ "key": "severity", "value": { "doubleValue": 155 } },
{
"key": "nested-n1",
"value": { "doubleValue": 1 }
},
{
"key": "nested-n2",
"value": { "doubleValue": 2 }
},
{ "key": "name", "value": { "stringValue": "test" } },
{
"key": "bindplane-otel-attributes-baba",
"value": { "stringValue": "you" }
},
{
"key": "bindplane-otel-attributes-host",
"value": { "stringValue": "myhost" }
}
]
}
}
```

#### KV output

```
"body": {
"stringValue": {
bindplane-otel-attributes-baba=you bindplane-otel-attributes-host=myhost name=test nested-n1=1 nested-n2=2 severity=155
}
}
```
82 changes: 82 additions & 0 deletions processor/marshalprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package marshalprocessor provides a processor that marshals logs to a specified format.
package marshalprocessor

import (
"errors"
"strings"

"go.opentelemetry.io/collector/component"
"go.uber.org/multierr"
)

var errInvalidMarshalTo = errors.New("marshal_to must be JSON or KV")
var errXMLNotSupported = errors.New("XML not yet supported")
var errKVSeparatorsEqual = errors.New("kv_separator and kv_pair_separator must be different")
var errMapKVSeparatorsEqual = errors.New("map_kv_separator and map_kv_pair_separator must be different")

const (
defaultMarshalTo = "JSON"
defaultKVSeparator = '='
defaultKVPairSeparator = ' '
defaultMapKVSeparator = '='
defaultMapKVPairSeparator = ','
)

// Config is the configuration for the processor
type Config struct {
MarshalTo string `mapstructure:"marshal_to"` // MarshalTo is either JSON or KV
KVSeparator rune `mapstructure:"kv_separator"`
KVPairSeparator rune `mapstructure:"kv_pair_separator"`
MapKVSeparator rune `mapstructure:"map_kv_separator"`
MapKVPairSeparator rune `mapstructure:"map_kv_pair_separator"`
}

// Validate validates the processor configuration
func (cfg Config) Validate() error {
var errs error

// Validate MarshalTo choice
switch strings.ToUpper(cfg.MarshalTo) {
case "JSON":
case "XML":
errs = multierr.Append(errs, errXMLNotSupported)
case "KV":
// Validate KV separators, which must be different from each other
if cfg.KVSeparator == cfg.KVPairSeparator && cfg.KVSeparator != 0 {
errs = multierr.Append(errs, errKVSeparatorsEqual)
}
// Validate Map KV separators, which must be different from each other (but can match KV separators)
if cfg.MapKVSeparator == cfg.MapKVPairSeparator && cfg.MapKVSeparator != 0 {
errs = multierr.Append(errs, errMapKVSeparatorsEqual)
}
default:
errs = multierr.Append(errs, errInvalidMarshalTo)
}

return errs
}

// createDefaultConfig returns the default config for the processor.
func createDefaultConfig() component.Config {
return &Config{
MarshalTo: defaultMarshalTo,
KVSeparator: defaultKVSeparator,
KVPairSeparator: defaultKVPairSeparator,
MapKVSeparator: defaultMapKVSeparator,
MapKVPairSeparator: defaultMapKVPairSeparator,
}
}
Loading
Loading