Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pkg/stanza/operator/transformer/recombine] Add the max_log_size config to recombine operator (#17387) #18089

Merged
merged 1 commit into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/feature_addBytesLimit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza/operator/transformer/recombine

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add a new "max_log_size" config parameter to limit the max bytes size of the combined field

# One or more tracking issues related to the change
issues: [17387]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions pkg/stanza/docs/operators/recombine.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The `recombine` operator combines consecutive logs into single logs based on sim
| `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. |
| `source_identifier` | `$attributes["file.path"]` | The [field](../types/field.md) to separate one source of logs from others when combining them. |
| `max_sources` | 1000 | The maximum number of unique sources allowed concurrently to be tracked for combining separately. |
| `max_log_size` | 0 | The maximum bytes size of the combined field. Once the size exceeds the limit, all received entries of the source will be combined and flushed. "0" of max_log_size means no limit. |

Exactly one of `is_first_entry` and `is_last_entry` must be specified.

Expand Down
10 changes: 10 additions & 0 deletions pkg/stanza/operator/transformer/recombine/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"path/filepath"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest"
)

Expand Down Expand Up @@ -85,6 +86,15 @@ func TestUnmarshal(t *testing.T) {
return cfg
}(),
},
{
Name: "custom_max_log_size",
ExpectErr: false,
Expect: func() *Config {
cfg := NewConfig()
cfg.MaxLogSize = helper.ByteSize(256000)
return cfg
}(),
},
},
}.Run(t)
}
92 changes: 53 additions & 39 deletions pkg/stanza/operator/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,16 @@ func NewConfigWithID(operatorID string) *Config {
// Config is the configuration of a recombine operator
type Config struct {
helper.TransformerConfig `mapstructure:",squash"`
IsFirstEntry string `mapstructure:"is_first_entry"`
IsLastEntry string `mapstructure:"is_last_entry"`
MaxBatchSize int `mapstructure:"max_batch_size"`
CombineField entry.Field `mapstructure:"combine_field"`
CombineWith string `mapstructure:"combine_with"`
SourceIdentifier entry.Field `mapstructure:"source_identifier"`
OverwriteWith string `mapstructure:"overwrite_with"`
ForceFlushTimeout time.Duration `mapstructure:"force_flush_period"`
MaxSources int `mapstructure:"max_sources"`
IsFirstEntry string `mapstructure:"is_first_entry"`
IsLastEntry string `mapstructure:"is_last_entry"`
MaxBatchSize int `mapstructure:"max_batch_size"`
CombineField entry.Field `mapstructure:"combine_field"`
CombineWith string `mapstructure:"combine_with"`
SourceIdentifier entry.Field `mapstructure:"source_identifier"`
OverwriteWith string `mapstructure:"overwrite_with"`
ForceFlushTimeout time.Duration `mapstructure:"force_flush_period"`
MaxSources int `mapstructure:"max_sources"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
}

// Build creates a new Transformer from a config
Expand Down Expand Up @@ -123,13 +124,14 @@ func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
maxBatchSize: c.MaxBatchSize,
maxSources: c.MaxSources,
overwriteWithOldest: overwriteWithOldest,
batchMap: make(map[string][]*entry.Entry),
batchMap: make(map[string]*sourceBatch),
combineField: c.CombineField,
combineWith: c.CombineWith,
forceFlushTimeout: c.ForceFlushTimeout,
ticker: time.NewTicker(c.ForceFlushTimeout),
chClose: make(chan struct{}),
sourceIdentifier: c.SourceIdentifier,
maxLogSize: int64(c.MaxLogSize),
}, nil
}

Expand All @@ -150,7 +152,14 @@ type Transformer struct {
sourceIdentifier entry.Field

sync.Mutex
batchMap map[string][]*entry.Entry
batchMap map[string]*sourceBatch
maxLogSize int64
}

// sourceBatch contains the status info of a batch
type sourceBatch struct {
entries []*entry.Entry
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
recombined strings.Builder
}

func (r *Transformer) Start(_ operator.Persister) error {
Expand All @@ -165,7 +174,8 @@ func (r *Transformer) flushLoop() {
case <-r.ticker.C:
r.Lock()
timeNow := time.Now()
for source, entries := range r.batchMap {
for source, batch := range r.batchMap {
entries := batch.entries
lastEntryTs := entries[len(entries)-1].ObservedTimestamp
timeSinceLastEntry := timeNow.Sub(lastEntryTs)
if timeSinceLastEntry < r.forceFlushTimeout {
Expand Down Expand Up @@ -246,7 +256,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {
case matches && r.matchIndicatesLast():
fallthrough
// When matching on first entry, never batch partial first. Just emit immediately
case !matches && r.matchIndicatesFirst() && len(r.batchMap[s]) == 0:
case !matches && r.matchIndicatesFirst() && r.batchMap[s] == nil:
r.addToBatch(ctx, e, s)
return r.flushSource(s)
}
Expand All @@ -267,73 +277,77 @@ func (r *Transformer) matchIndicatesLast() bool {

// addToBatch adds the current entry to the current batch of entries that will be combined
func (r *Transformer) addToBatch(_ context.Context, e *entry.Entry, source string) {
if _, ok := r.batchMap[source]; !ok {
r.batchMap[source] = []*entry.Entry{e}
batch, ok := r.batchMap[source]
if !ok {
batch = &sourceBatch{
entries: []*entry.Entry{e},
recombined: strings.Builder{},
}
r.batchMap[source] = batch
if len(r.batchMap) >= r.maxSources {
r.Error("Batched source exceeds max source size. Flushing all batched logs. Consider increasing max_sources parameter")
r.flushUncombined(context.Background())
return
}
} else {
batch.entries = append(batch.entries, e)
}

// Combine the combineField of each entry in the batch,
// separated by newlines
var s string
err := e.Read(r.combineField, &s)
if err != nil {
r.Errorf("entry does not contain the combine_field")
return
}
if batch.recombined.Len() > 0 {
batch.recombined.WriteString(r.combineWith)
}
batch.recombined.WriteString(s)

r.batchMap[source] = append(r.batchMap[source], e)
if len(r.batchMap[source]) >= r.maxBatchSize {
if (r.maxLogSize > 0 && int64(batch.recombined.Len()) > r.maxLogSize) || len(batch.entries) >= r.maxBatchSize {
if err := r.flushSource(source); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}

}

// flushUncombined flushes all the logs in the batch individually to the
// next output in the pipeline. This is only used when there is an error
// or at shutdown to avoid dropping the logs.
func (r *Transformer) flushUncombined(ctx context.Context) {
for source := range r.batchMap {
for _, entry := range r.batchMap[source] {
for _, entry := range r.batchMap[source].entries {
r.Write(ctx, entry)
}
}
r.batchMap = make(map[string][]*entry.Entry)
r.batchMap = make(map[string]*sourceBatch)
r.ticker.Reset(r.forceFlushTimeout)
}

// flushSource combines the entries currently in the batch into a single entry,
// then forwards them to the next operator in the pipeline
func (r *Transformer) flushSource(source string) error {
batch := r.batchMap[source]
// Skip flushing a combined log if the batch is empty
if len(r.batchMap[source]) == 0 {
if batch == nil {
return nil
}

// Choose which entry we want to keep the rest of the fields from
var base *entry.Entry
entries := r.batchMap[source]
entries := batch.entries

if r.overwriteWithOldest {
base = entries[0]
} else {
base = entries[len(entries)-1]
}

// Combine the combineField of each entry in the batch,
// separated by newlines
var recombined strings.Builder
for i, e := range entries {
var s string
err := e.Read(r.combineField, &s)
if err != nil {
r.Errorf("entry does not contain the combine_field, so is being dropped")
continue
}

recombined.WriteString(s)
if i != len(entries)-1 {
recombined.WriteString(r.combineWith)
}
}

// Set the recombined field on the entry
err := base.Set(r.combineField, recombined.String())
err := base.Set(r.combineField, batch.recombined.String())
if err != nil {
return err
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/stanza/operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

Expand Down Expand Up @@ -285,6 +286,49 @@ func TestTransformer(t *testing.T) {
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}),
},
},
{
"TestMaxLogSizeForLastEntry",
func() *Config {
cfg := NewConfig()
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "body == 'end'"
cfg.OutputIDs = []string{"fake"}
cfg.MaxLogSize = helper.ByteSize(5)
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "end", map[string]string{"file.path": "file1"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1\nfile1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "end", map[string]string{"file.path": "file1"}),
},
},
{
"TestMaxLogSizeForFirstEntry",
func() *Config {
cfg := NewConfig()
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "body == 'start'"
cfg.OutputIDs = []string{"fake"}
cfg.MaxLogSize = helper.ByteSize(12)
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "content1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "content2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "start\ncontent1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "content2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
},
},
}

for _, tc := range cases {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ combine_with_tab:
custom_id:
type: recombine
id: merge-split-lines
custom_max_log_size:
type: recombine
max_log_size: 256kb
default:
type: recombine