diff --git a/.chloggen/feature_addBytesLimit.yaml b/.chloggen/feature_addBytesLimit.yaml new file mode 100755 index 000000000000..d09c56e9fbf4 --- /dev/null +++ b/.chloggen/feature_addBytesLimit.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza/operator/transformer/recombine + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add a new "max_log_size" config parameter to limit the max bytes size of the combined field + +# One or more tracking issues related to the change +issues: [17387] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/pkg/stanza/docs/operators/recombine.md b/pkg/stanza/docs/operators/recombine.md index 1e1a73ea9bcd..9f85a79eea9d 100644 --- a/pkg/stanza/docs/operators/recombine.md +++ b/pkg/stanza/docs/operators/recombine.md @@ -18,6 +18,7 @@ The `recombine` operator combines consecutive logs into single logs based on sim | `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. | | `source_identifier` | `$attributes["file.path"]` | The [field](../types/field.md) to separate one source of logs from others when combining them. | | `max_sources` | 1000 | The maximum number of unique sources allowed concurrently to be tracked for combining separately. | +| `max_log_size` | 0 | The maximum bytes size of the combined field. Once the size exceeds the limit, all received entries of the source will be combined and flushed. "0" of max_log_size means no limit. | Exactly one of `is_first_entry` and `is_last_entry` must be specified. diff --git a/pkg/stanza/operator/transformer/recombine/config_test.go b/pkg/stanza/operator/transformer/recombine/config_test.go index 6ea78f32fe64..01b6cd7e5e82 100644 --- a/pkg/stanza/operator/transformer/recombine/config_test.go +++ b/pkg/stanza/operator/transformer/recombine/config_test.go @@ -18,6 +18,7 @@ import ( "path/filepath" "testing" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" ) @@ -85,6 +86,15 @@ func TestUnmarshal(t *testing.T) { return cfg }(), }, + { + Name: "custom_max_log_size", + ExpectErr: false, + Expect: func() *Config { + cfg := NewConfig() + cfg.MaxLogSize = helper.ByteSize(256000) + return cfg + }(), + }, }, }.Run(t) } diff --git a/pkg/stanza/operator/transformer/recombine/recombine.go b/pkg/stanza/operator/transformer/recombine/recombine.go index c08eeb5a8357..5ad0aa162f20 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine.go +++ b/pkg/stanza/operator/transformer/recombine/recombine.go @@ -60,15 +60,16 @@ func NewConfigWithID(operatorID string) *Config { // Config is the configuration of a recombine operator type Config struct { helper.TransformerConfig `mapstructure:",squash"` - IsFirstEntry string `mapstructure:"is_first_entry"` - IsLastEntry string `mapstructure:"is_last_entry"` - MaxBatchSize int `mapstructure:"max_batch_size"` - CombineField entry.Field `mapstructure:"combine_field"` - CombineWith string `mapstructure:"combine_with"` - SourceIdentifier entry.Field `mapstructure:"source_identifier"` - OverwriteWith string `mapstructure:"overwrite_with"` - ForceFlushTimeout time.Duration `mapstructure:"force_flush_period"` - MaxSources int `mapstructure:"max_sources"` + IsFirstEntry string `mapstructure:"is_first_entry"` + IsLastEntry string `mapstructure:"is_last_entry"` + MaxBatchSize int `mapstructure:"max_batch_size"` + CombineField entry.Field `mapstructure:"combine_field"` + CombineWith string `mapstructure:"combine_with"` + SourceIdentifier entry.Field `mapstructure:"source_identifier"` + OverwriteWith string `mapstructure:"overwrite_with"` + ForceFlushTimeout time.Duration `mapstructure:"force_flush_period"` + MaxSources int `mapstructure:"max_sources"` + MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` } // Build creates a new Transformer from a config @@ -123,13 +124,14 @@ func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { maxBatchSize: c.MaxBatchSize, maxSources: c.MaxSources, overwriteWithOldest: overwriteWithOldest, - batchMap: make(map[string][]*entry.Entry), + batchMap: make(map[string]*sourceBatch), combineField: c.CombineField, combineWith: c.CombineWith, forceFlushTimeout: c.ForceFlushTimeout, ticker: time.NewTicker(c.ForceFlushTimeout), chClose: make(chan struct{}), sourceIdentifier: c.SourceIdentifier, + maxLogSize: int64(c.MaxLogSize), }, nil } @@ -150,7 +152,14 @@ type Transformer struct { sourceIdentifier entry.Field sync.Mutex - batchMap map[string][]*entry.Entry + batchMap map[string]*sourceBatch + maxLogSize int64 +} + +// sourceBatch contains the status info of a batch +type sourceBatch struct { + entries []*entry.Entry + recombined strings.Builder } func (r *Transformer) Start(_ operator.Persister) error { @@ -165,7 +174,8 @@ func (r *Transformer) flushLoop() { case <-r.ticker.C: r.Lock() timeNow := time.Now() - for source, entries := range r.batchMap { + for source, batch := range r.batchMap { + entries := batch.entries lastEntryTs := entries[len(entries)-1].ObservedTimestamp timeSinceLastEntry := timeNow.Sub(lastEntryTs) if timeSinceLastEntry < r.forceFlushTimeout { @@ -246,7 +256,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { case matches && r.matchIndicatesLast(): fallthrough // When matching on first entry, never batch partial first. Just emit immediately - case !matches && r.matchIndicatesFirst() && len(r.batchMap[s]) == 0: + case !matches && r.matchIndicatesFirst() && r.batchMap[s] == nil: r.addToBatch(ctx, e, s) return r.flushSource(s) } @@ -267,21 +277,41 @@ func (r *Transformer) matchIndicatesLast() bool { // addToBatch adds the current entry to the current batch of entries that will be combined func (r *Transformer) addToBatch(_ context.Context, e *entry.Entry, source string) { - if _, ok := r.batchMap[source]; !ok { - r.batchMap[source] = []*entry.Entry{e} + batch, ok := r.batchMap[source] + if !ok { + batch = &sourceBatch{ + entries: []*entry.Entry{e}, + recombined: strings.Builder{}, + } + r.batchMap[source] = batch if len(r.batchMap) >= r.maxSources { r.Error("Batched source exceeds max source size. Flushing all batched logs. Consider increasing max_sources parameter") r.flushUncombined(context.Background()) + return } + } else { + batch.entries = append(batch.entries, e) + } + + // Combine the combineField of each entry in the batch, + // separated by newlines + var s string + err := e.Read(r.combineField, &s) + if err != nil { + r.Errorf("entry does not contain the combine_field") return } + if batch.recombined.Len() > 0 { + batch.recombined.WriteString(r.combineWith) + } + batch.recombined.WriteString(s) - r.batchMap[source] = append(r.batchMap[source], e) - if len(r.batchMap[source]) >= r.maxBatchSize { + if (r.maxLogSize > 0 && int64(batch.recombined.Len()) > r.maxLogSize) || len(batch.entries) >= r.maxBatchSize { if err := r.flushSource(source); err != nil { r.Errorf("there was error flushing combined logs %s", err) } } + } // flushUncombined flushes all the logs in the batch individually to the @@ -289,25 +319,26 @@ func (r *Transformer) addToBatch(_ context.Context, e *entry.Entry, source strin // or at shutdown to avoid dropping the logs. func (r *Transformer) flushUncombined(ctx context.Context) { for source := range r.batchMap { - for _, entry := range r.batchMap[source] { + for _, entry := range r.batchMap[source].entries { r.Write(ctx, entry) } } - r.batchMap = make(map[string][]*entry.Entry) + r.batchMap = make(map[string]*sourceBatch) r.ticker.Reset(r.forceFlushTimeout) } // flushSource combines the entries currently in the batch into a single entry, // then forwards them to the next operator in the pipeline func (r *Transformer) flushSource(source string) error { + batch := r.batchMap[source] // Skip flushing a combined log if the batch is empty - if len(r.batchMap[source]) == 0 { + if batch == nil { return nil } // Choose which entry we want to keep the rest of the fields from var base *entry.Entry - entries := r.batchMap[source] + entries := batch.entries if r.overwriteWithOldest { base = entries[0] @@ -315,25 +346,8 @@ func (r *Transformer) flushSource(source string) error { base = entries[len(entries)-1] } - // Combine the combineField of each entry in the batch, - // separated by newlines - var recombined strings.Builder - for i, e := range entries { - var s string - err := e.Read(r.combineField, &s) - if err != nil { - r.Errorf("entry does not contain the combine_field, so is being dropped") - continue - } - - recombined.WriteString(s) - if i != len(entries)-1 { - recombined.WriteString(r.combineWith) - } - } - // Set the recombined field on the entry - err := base.Set(r.combineField, recombined.String()) + err := base.Set(r.combineField, batch.recombined.String()) if err != nil { return err } diff --git a/pkg/stanza/operator/transformer/recombine/recombine_test.go b/pkg/stanza/operator/transformer/recombine/recombine_test.go index 6b805dbe1ccb..d3db4d5c4815 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine_test.go +++ b/pkg/stanza/operator/transformer/recombine/recombine_test.go @@ -23,6 +23,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) @@ -285,6 +286,49 @@ func TestTransformer(t *testing.T) { entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}), }, }, + { + "TestMaxLogSizeForLastEntry", + func() *Config { + cfg := NewConfig() + cfg.CombineField = entry.NewBodyField() + cfg.IsLastEntry = "body == 'end'" + cfg.OutputIDs = []string{"fake"} + cfg.MaxLogSize = helper.ByteSize(5) + return cfg + }(), + []*entry.Entry{ + entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "end", map[string]string{"file.path": "file1"}), + }, + []*entry.Entry{ + entryWithBodyAttr(t1, "file1\nfile1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "end", map[string]string{"file.path": "file1"}), + }, + }, + { + "TestMaxLogSizeForFirstEntry", + func() *Config { + cfg := NewConfig() + cfg.CombineField = entry.NewBodyField() + cfg.IsFirstEntry = "body == 'start'" + cfg.OutputIDs = []string{"fake"} + cfg.MaxLogSize = helper.ByteSize(12) + return cfg + }(), + []*entry.Entry{ + entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "content1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "content2", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}), + }, + []*entry.Entry{ + entryWithBodyAttr(t1, "start\ncontent1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "content2", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}), + }, + }, } for _, tc := range cases { diff --git a/pkg/stanza/operator/transformer/recombine/testdata/config.yaml b/pkg/stanza/operator/transformer/recombine/testdata/config.yaml index 7622feb8cbcb..11e337ad987f 100644 --- a/pkg/stanza/operator/transformer/recombine/testdata/config.yaml +++ b/pkg/stanza/operator/transformer/recombine/testdata/config.yaml @@ -16,5 +16,8 @@ combine_with_tab: custom_id: type: recombine id: merge-split-lines +custom_max_log_size: + type: recombine + max_log_size: 256kb default: type: recombine