From d1c9d7a5d7608d981850afc0f3c6bb8c09c587e2 Mon Sep 17 00:00:00 2001 From: Rock Baek Date: Thu, 20 Jan 2022 06:29:04 -0800 Subject: [PATCH] Source identifier (#341) Enhance recombine operator to have ability to group entries according to a `source_identifier` field. --- docs/operators/recombine.md | 2 + go.mod | 3 +- go.sum | 3 +- .../transformer/recombine/recombine.go | 95 ++++++++++++++----- .../transformer/recombine/recombine_test.go | 91 ++++++++++++++++++ 5 files changed, 165 insertions(+), 29 deletions(-) diff --git a/docs/operators/recombine.md b/docs/operators/recombine.md index 6560ef90..56910729 100644 --- a/docs/operators/recombine.md +++ b/docs/operators/recombine.md @@ -16,6 +16,8 @@ The `recombine` operator combines consecutive logs into single logs based on sim | `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. | | `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. | | `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. | +| `source_identifier` | `$attibutes["file.path"]` | The [field](/docs/types/field.md) to separate one source of logs from others when combining them. | +| `max_sources` | 1000 | The maximum number of unique sources allowed concurrently to be tracked for combining separately. | Exactly one of `is_first_entry` and `is_last_entry` must be specified. diff --git a/go.mod b/go.mod index 666eb80e..81ae0fd4 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( ) require ( - github.com/benbjohnson/clock v1.1.0 // indirect + github.com/benbjohnson/clock v1.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/evanphx/json-patch v4.9.0+incompatible // indirect github.com/go-logr/logr v0.4.0 // indirect @@ -34,6 +34,7 @@ require ( github.com/google/go-cmp v0.5.6 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/googleapis/gnostic v0.4.1 // indirect + github.com/kr/pretty v0.3.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/go.sum b/go.sum index c1f274e9..1a8882d1 100644 --- a/go.sum +++ b/go.sum @@ -77,7 +77,6 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.2/go.mod h1:72H github.com/aws/aws-sdk-go-v2/service/sso v1.4.2/go.mod h1:NBvT9R1MEF+Ud6ApJKM0G+IkPchKS7p7c2YPKwHmBOk= github.com/aws/aws-sdk-go-v2/service/sts v1.7.2/go.mod h1:8EzeIqfWt2wWT4rJVu3f21TfrhJ8AEMzVybRNSb/b4g= github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.2.0 h1:9Re3G2TWxkE06LdMWMpcY6KV81GLXMGiYpPYUPkFAws= github.com/benbjohnson/clock v1.2.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -317,7 +316,6 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= @@ -432,6 +430,7 @@ github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShE github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/operator/builtin/transformer/recombine/recombine.go b/operator/builtin/transformer/recombine/recombine.go index ddbd7721..938a9f3f 100644 --- a/operator/builtin/transformer/recombine/recombine.go +++ b/operator/builtin/transformer/recombine/recombine.go @@ -38,9 +38,11 @@ func NewRecombineOperatorConfig(operatorID string) *RecombineOperatorConfig { return &RecombineOperatorConfig{ TransformerConfig: helper.NewTransformerConfig(operatorID, "recombine"), MaxBatchSize: 1000, + MaxSources: 1000, CombineWith: "\n", OverwriteWith: "oldest", ForceFlushTimeout: 5 * time.Second, + SourceIdentifier: entry.NewAttributeField("file.path"), } } @@ -52,8 +54,10 @@ type RecombineOperatorConfig struct { MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"` CombineField entry.Field `json:"combine_field" yaml:"combine_field"` CombineWith string `json:"combine_with" yaml:"combine_with"` + SourceIdentifier entry.Field `json:"source_identifier" yaml:"source_identifier"` OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"` ForceFlushTimeout time.Duration `json:"force_flush_period" yaml:"force_flush_period"` + MaxSources int `json:"max_sources" yaml:"max_sources"` } // Build creates a new RecombineOperator from a config @@ -106,13 +110,15 @@ func (c *RecombineOperatorConfig) Build(bc operator.BuildContext) ([]operator.Op matchFirstLine: matchesFirst, prog: prog, maxBatchSize: c.MaxBatchSize, + maxSources: c.MaxSources, overwriteWithOldest: overwriteWithOldest, - batch: make([]*entry.Entry, 0, c.MaxBatchSize), + batchMap: make(map[string][]*entry.Entry), combineField: c.CombineField, combineWith: c.CombineWith, forceFlushTimeout: c.ForceFlushTimeout, ticker: time.NewTicker(c.ForceFlushTimeout), chClose: make(chan struct{}), + sourceIdentifier: c.SourceIdentifier, } return []operator.Operator{recombine}, nil @@ -125,15 +131,17 @@ type RecombineOperator struct { matchFirstLine bool prog *vm.Program maxBatchSize int + maxSources int overwriteWithOldest bool combineField entry.Field combineWith string ticker *time.Ticker forceFlushTimeout time.Duration chClose chan struct{} + sourceIdentifier entry.Field sync.Mutex - batch []*entry.Entry + batchMap map[string][]*entry.Entry } func (r *RecombineOperator) Start(_ operator.Persister) error { @@ -147,9 +155,19 @@ func (r *RecombineOperator) flushLoop() { select { case <-r.ticker.C: r.Lock() - if err := r.flushCombined(); err != nil { - r.Errorw("Failed flushing", "error", err) + timeNow := time.Now() + for source, entries := range r.batchMap { + lastEntryTs := entries[len(entries)-1].Timestamp + timeSinceLastEntry := timeNow.Sub(lastEntryTs) + if timeSinceLastEntry < r.forceFlushTimeout { + continue + } + if err := r.flushSource(source); err != nil { + r.Errorf("there was error flushing combined logs %s", err) + } } + + r.ticker.Reset(r.forceFlushTimeout) r.Unlock() case <-r.chClose: r.ticker.Stop() @@ -171,6 +189,8 @@ func (r *RecombineOperator) Stop() error { return nil } +const DefaultSourceIdentifier = "DefaultSourceIdentifier" + func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error { // Lock the recombine operator because process can't run concurrently r.Lock() @@ -190,24 +210,34 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error { // this is guaranteed to be a boolean because of expr.AsBool matches := m.(bool) + var s string + err = e.Read(r.sourceIdentifier, &s) + if err != nil { + r.Warn("entry does not contain the source_identifier, so it may be pooled with other sources") + s = DefaultSourceIdentifier + } + + if s == "" { + s = DefaultSourceIdentifier + } // This is the first entry in the next batch if matches && r.matchIndicatesFirst() { // Flush the existing batch - err := r.flushCombined() + err := r.flushSource(s) if err != nil { return err } // Add the current log to the new batch - r.addToBatch(ctx, e) + r.addToBatch(ctx, e, s) return nil } // This is the last entry in a complete batch if matches && r.matchIndicatesLast() { - r.addToBatch(ctx, e) - err := r.flushCombined() + r.addToBatch(ctx, e, s) + err := r.flushSource(s) if err != nil { return err } @@ -216,7 +246,7 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error { // This is neither the first entry of a new log, // nor the last entry of a log, so just add it to the batch - r.addToBatch(ctx, e) + r.addToBatch(ctx, e, s) return nil } @@ -229,46 +259,59 @@ func (r *RecombineOperator) matchIndicatesLast() bool { } // addToBatch adds the current entry to the current batch of entries that will be combined -func (r *RecombineOperator) addToBatch(_ context.Context, e *entry.Entry) { - if len(r.batch) >= r.maxBatchSize { - r.Error("Batch size exceeds max batch size. Flushing logs that have not been recombined") - r.flushUncombined(context.Background()) +func (r *RecombineOperator) addToBatch(_ context.Context, e *entry.Entry, source string) { + if _, ok := r.batchMap[source]; !ok { + r.batchMap[source] = []*entry.Entry{e} + if len(r.batchMap) >= r.maxSources { + r.Error("Batched source exceeds max source size. Flushing all batched logs. Consider increasing max_sources parameter") + r.flushUncombined(context.Background()) + } + return } - r.batch = append(r.batch, e) + r.batchMap[source] = append(r.batchMap[source], e) + if len(r.batchMap[source]) >= r.maxBatchSize { + if err := r.flushSource(source); err != nil { + r.Errorf("there was error flushing combined logs %s", err) + } + } } // flushUncombined flushes all the logs in the batch individually to the // next output in the pipeline. This is only used when there is an error // or at shutdown to avoid dropping the logs. func (r *RecombineOperator) flushUncombined(ctx context.Context) { - for _, entry := range r.batch { - r.Write(ctx, entry) + for source := range r.batchMap { + for _, entry := range r.batchMap[source] { + r.Write(ctx, entry) + } } - r.batch = r.batch[:0] + r.batchMap = make(map[string][]*entry.Entry) r.ticker.Reset(r.forceFlushTimeout) } -// flushCombined combines the entries currently in the batch into a single entry, +// flushSource combines the entries currently in the batch into a single entry, // then forwards them to the next operator in the pipeline -func (r *RecombineOperator) flushCombined() error { +func (r *RecombineOperator) flushSource(source string) error { // Skip flushing a combined log if the batch is empty - if len(r.batch) == 0 { + if len(r.batchMap[source]) == 0 { return nil } // Choose which entry we want to keep the rest of the fields from var base *entry.Entry + entries := r.batchMap[source] + if r.overwriteWithOldest { - base = r.batch[0] + base = entries[0] } else { - base = r.batch[len(r.batch)-1] + base = entries[len(entries)-1] } // Combine the combineField of each entry in the batch, // separated by newlines var recombined strings.Builder - for i, e := range r.batch { + for i, e := range entries { var s string err := e.Read(r.combineField, &s) if err != nil { @@ -277,7 +320,7 @@ func (r *RecombineOperator) flushCombined() error { } recombined.WriteString(s) - if i != len(r.batch)-1 && len(r.combineWith) > 0 { + if i != len(entries)-1 { recombined.WriteString(r.combineWith) } } @@ -289,7 +332,7 @@ func (r *RecombineOperator) flushCombined() error { } r.Write(context.Background(), base) - r.batch = r.batch[:0] - r.ticker.Reset(r.forceFlushTimeout) + + delete(r.batchMap, source) return nil } diff --git a/operator/builtin/transformer/recombine/recombine_test.go b/operator/builtin/transformer/recombine/recombine_test.go index 6250dfef..02aaeb28 100644 --- a/operator/builtin/transformer/recombine/recombine_test.go +++ b/operator/builtin/transformer/recombine/recombine_test.go @@ -37,6 +37,14 @@ func TestRecombineOperator(t *testing.T) { return e } + entryWithBodyAttr := func(ts time.Time, body interface{}, Attr map[string]string) *entry.Entry { + e := entryWithBody(ts, body) + for k, v := range Attr { + e.AddAttribute(k, v) + } + return e + } + cases := []struct { name string config *RecombineOperatorConfig @@ -141,6 +149,89 @@ func TestRecombineOperator(t *testing.T) { }, []*entry.Entry{entryWithBody(t1, "test1test2")}, }, + { + "TestDefaultSourceIdentifier", + func() *RecombineOperatorConfig { + cfg := NewRecombineOperatorConfig("") + cfg.CombineField = entry.NewBodyField() + cfg.IsLastEntry = "$body == 'end'" + cfg.OutputIDs = []string{"fake"} + return cfg + }(), + []*entry.Entry{ + entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "file2", map[string]string{"file.path": "file2"}), + entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}), + }, + []*entry.Entry{ + entryWithBodyAttr(t1, "file1\nend", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "file2\nend", map[string]string{"file.path": "file2"}), + }, + }, + { + "TestCustomSourceIdentifier", + func() *RecombineOperatorConfig { + cfg := NewRecombineOperatorConfig("") + cfg.CombineField = entry.NewBodyField() + cfg.IsLastEntry = "$body == 'end'" + cfg.OutputIDs = []string{"fake"} + cfg.SourceIdentifier = entry.NewAttributeField("custom_source") + return cfg + }(), + []*entry.Entry{ + entryWithBodyAttr(t1, "file1", map[string]string{"custom_source": "file1"}), + entryWithBodyAttr(t1, "file2", map[string]string{"custom_source": "file2"}), + entryWithBodyAttr(t2, "end", map[string]string{"custom_source": "file1"}), + entryWithBodyAttr(t2, "end", map[string]string{"custom_source": "file2"}), + }, + []*entry.Entry{ + entryWithBodyAttr(t1, "file1\nend", map[string]string{"custom_source": "file1"}), + entryWithBodyAttr(t1, "file2\nend", map[string]string{"custom_source": "file2"}), + }, + }, + { + "TestMaxSources", + func() *RecombineOperatorConfig { + cfg := NewRecombineOperatorConfig("") + cfg.CombineField = entry.NewBodyField() + cfg.IsLastEntry = "$body == 'end'" + cfg.OutputIDs = []string{"fake"} + cfg.MaxSources = 1 + return cfg + }(), + []*entry.Entry{ + entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}), + }, + []*entry.Entry{ + entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}), + }, + }, + { + "TestMaxBatchSize", + func() *RecombineOperatorConfig { + cfg := NewRecombineOperatorConfig("") + cfg.CombineField = entry.NewBodyField() + cfg.IsLastEntry = "$body == 'end'" + cfg.OutputIDs = []string{"fake"} + cfg.MaxBatchSize = 2 + return cfg + }(), + []*entry.Entry{ + entryWithBodyAttr(t1, "file1_event1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "file2_event1", map[string]string{"file.path": "file2"}), + entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t2, "file2_event2", map[string]string{"file.path": "file2"}), + entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}), + }, + []*entry.Entry{ + entryWithBodyAttr(t1, "file1_event1\nend", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "file2_event1\nfile2_event2", map[string]string{"file.path": "file2"}), + entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}), + }, + }, } for _, tc := range cases {