Skip to content

Commit

Permalink
[pkg/stanza/operator/transformer/recombine] Add the max_log_size conf…
Browse files Browse the repository at this point in the history
…ig to recombine operator (#17387) (#18089)

"max_log_size" is added to the config of recombine operator. Once the total bytes size of the combined field exceeds the limit, all received entries of the source will be combined and flushed.

We've had discussion around this feature (see #17387). Here I choose the "soft limit" rather than truncating the log to get the fixed bytes size when total size exceeds the limit because I think soft limitation is enough for users on this occasion.
  • Loading branch information
yutingcaicyt authored Feb 2, 2023
1 parent 6b07384 commit 579965c
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 39 deletions.
16 changes: 16 additions & 0 deletions .chloggen/feature_addBytesLimit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza/operator/transformer/recombine

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add a new "max_log_size" config parameter to limit the max bytes size of the combined field

# One or more tracking issues related to the change
issues: [17387]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions pkg/stanza/docs/operators/recombine.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The `recombine` operator combines consecutive logs into single logs based on sim
| `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. |
| `source_identifier` | `$attributes["file.path"]` | The [field](../types/field.md) to separate one source of logs from others when combining them. |
| `max_sources` | 1000 | The maximum number of unique sources allowed concurrently to be tracked for combining separately. |
| `max_log_size` | 0 | The maximum bytes size of the combined field. Once the size exceeds the limit, all received entries of the source will be combined and flushed. "0" of max_log_size means no limit. |

Exactly one of `is_first_entry` and `is_last_entry` must be specified.

Expand Down
10 changes: 10 additions & 0 deletions pkg/stanza/operator/transformer/recombine/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"path/filepath"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest"
)

Expand Down Expand Up @@ -85,6 +86,15 @@ func TestUnmarshal(t *testing.T) {
return cfg
}(),
},
{
Name: "custom_max_log_size",
ExpectErr: false,
Expect: func() *Config {
cfg := NewConfig()
cfg.MaxLogSize = helper.ByteSize(256000)
return cfg
}(),
},
},
}.Run(t)
}
92 changes: 53 additions & 39 deletions pkg/stanza/operator/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,16 @@ func NewConfigWithID(operatorID string) *Config {
// Config is the configuration of a recombine operator
type Config struct {
helper.TransformerConfig `mapstructure:",squash"`
IsFirstEntry string `mapstructure:"is_first_entry"`
IsLastEntry string `mapstructure:"is_last_entry"`
MaxBatchSize int `mapstructure:"max_batch_size"`
CombineField entry.Field `mapstructure:"combine_field"`
CombineWith string `mapstructure:"combine_with"`
SourceIdentifier entry.Field `mapstructure:"source_identifier"`
OverwriteWith string `mapstructure:"overwrite_with"`
ForceFlushTimeout time.Duration `mapstructure:"force_flush_period"`
MaxSources int `mapstructure:"max_sources"`
IsFirstEntry string `mapstructure:"is_first_entry"`
IsLastEntry string `mapstructure:"is_last_entry"`
MaxBatchSize int `mapstructure:"max_batch_size"`
CombineField entry.Field `mapstructure:"combine_field"`
CombineWith string `mapstructure:"combine_with"`
SourceIdentifier entry.Field `mapstructure:"source_identifier"`
OverwriteWith string `mapstructure:"overwrite_with"`
ForceFlushTimeout time.Duration `mapstructure:"force_flush_period"`
MaxSources int `mapstructure:"max_sources"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
}

// Build creates a new Transformer from a config
Expand Down Expand Up @@ -123,13 +124,14 @@ func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
maxBatchSize: c.MaxBatchSize,
maxSources: c.MaxSources,
overwriteWithOldest: overwriteWithOldest,
batchMap: make(map[string][]*entry.Entry),
batchMap: make(map[string]*sourceBatch),
combineField: c.CombineField,
combineWith: c.CombineWith,
forceFlushTimeout: c.ForceFlushTimeout,
ticker: time.NewTicker(c.ForceFlushTimeout),
chClose: make(chan struct{}),
sourceIdentifier: c.SourceIdentifier,
maxLogSize: int64(c.MaxLogSize),
}, nil
}

Expand All @@ -150,7 +152,14 @@ type Transformer struct {
sourceIdentifier entry.Field

sync.Mutex
batchMap map[string][]*entry.Entry
batchMap map[string]*sourceBatch
maxLogSize int64
}

// sourceBatch contains the status info of a batch
type sourceBatch struct {
entries []*entry.Entry
recombined strings.Builder
}

func (r *Transformer) Start(_ operator.Persister) error {
Expand All @@ -165,7 +174,8 @@ func (r *Transformer) flushLoop() {
case <-r.ticker.C:
r.Lock()
timeNow := time.Now()
for source, entries := range r.batchMap {
for source, batch := range r.batchMap {
entries := batch.entries
lastEntryTs := entries[len(entries)-1].ObservedTimestamp
timeSinceLastEntry := timeNow.Sub(lastEntryTs)
if timeSinceLastEntry < r.forceFlushTimeout {
Expand Down Expand Up @@ -246,7 +256,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error {
case matches && r.matchIndicatesLast():
fallthrough
// When matching on first entry, never batch partial first. Just emit immediately
case !matches && r.matchIndicatesFirst() && len(r.batchMap[s]) == 0:
case !matches && r.matchIndicatesFirst() && r.batchMap[s] == nil:
r.addToBatch(ctx, e, s)
return r.flushSource(s)
}
Expand All @@ -267,73 +277,77 @@ func (r *Transformer) matchIndicatesLast() bool {

// addToBatch adds the current entry to the current batch of entries that will be combined
func (r *Transformer) addToBatch(_ context.Context, e *entry.Entry, source string) {
if _, ok := r.batchMap[source]; !ok {
r.batchMap[source] = []*entry.Entry{e}
batch, ok := r.batchMap[source]
if !ok {
batch = &sourceBatch{
entries: []*entry.Entry{e},
recombined: strings.Builder{},
}
r.batchMap[source] = batch
if len(r.batchMap) >= r.maxSources {
r.Error("Batched source exceeds max source size. Flushing all batched logs. Consider increasing max_sources parameter")
r.flushUncombined(context.Background())
return
}
} else {
batch.entries = append(batch.entries, e)
}

// Combine the combineField of each entry in the batch,
// separated by newlines
var s string
err := e.Read(r.combineField, &s)
if err != nil {
r.Errorf("entry does not contain the combine_field")
return
}
if batch.recombined.Len() > 0 {
batch.recombined.WriteString(r.combineWith)
}
batch.recombined.WriteString(s)

r.batchMap[source] = append(r.batchMap[source], e)
if len(r.batchMap[source]) >= r.maxBatchSize {
if (r.maxLogSize > 0 && int64(batch.recombined.Len()) > r.maxLogSize) || len(batch.entries) >= r.maxBatchSize {
if err := r.flushSource(source); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}

}

// flushUncombined flushes all the logs in the batch individually to the
// next output in the pipeline. This is only used when there is an error
// or at shutdown to avoid dropping the logs.
func (r *Transformer) flushUncombined(ctx context.Context) {
for source := range r.batchMap {
for _, entry := range r.batchMap[source] {
for _, entry := range r.batchMap[source].entries {
r.Write(ctx, entry)
}
}
r.batchMap = make(map[string][]*entry.Entry)
r.batchMap = make(map[string]*sourceBatch)
r.ticker.Reset(r.forceFlushTimeout)
}

// flushSource combines the entries currently in the batch into a single entry,
// then forwards them to the next operator in the pipeline
func (r *Transformer) flushSource(source string) error {
batch := r.batchMap[source]
// Skip flushing a combined log if the batch is empty
if len(r.batchMap[source]) == 0 {
if batch == nil {
return nil
}

// Choose which entry we want to keep the rest of the fields from
var base *entry.Entry
entries := r.batchMap[source]
entries := batch.entries

if r.overwriteWithOldest {
base = entries[0]
} else {
base = entries[len(entries)-1]
}

// Combine the combineField of each entry in the batch,
// separated by newlines
var recombined strings.Builder
for i, e := range entries {
var s string
err := e.Read(r.combineField, &s)
if err != nil {
r.Errorf("entry does not contain the combine_field, so is being dropped")
continue
}

recombined.WriteString(s)
if i != len(entries)-1 {
recombined.WriteString(r.combineWith)
}
}

// Set the recombined field on the entry
err := base.Set(r.combineField, recombined.String())
err := base.Set(r.combineField, batch.recombined.String())
if err != nil {
return err
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/stanza/operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

Expand Down Expand Up @@ -285,6 +286,49 @@ func TestTransformer(t *testing.T) {
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}),
},
},
{
"TestMaxLogSizeForLastEntry",
func() *Config {
cfg := NewConfig()
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "body == 'end'"
cfg.OutputIDs = []string{"fake"}
cfg.MaxLogSize = helper.ByteSize(5)
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "end", map[string]string{"file.path": "file1"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1\nfile1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "end", map[string]string{"file.path": "file1"}),
},
},
{
"TestMaxLogSizeForFirstEntry",
func() *Config {
cfg := NewConfig()
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "body == 'start'"
cfg.OutputIDs = []string{"fake"}
cfg.MaxLogSize = helper.ByteSize(12)
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "content1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "content2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "start\ncontent1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "content2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "start", map[string]string{"file.path": "file1"}),
},
},
}

for _, tc := range cases {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ combine_with_tab:
custom_id:
type: recombine
id: merge-split-lines
custom_max_log_size:
type: recombine
max_log_size: 256kb
default:
type: recombine

0 comments on commit 579965c

Please sign in to comment.