From cc938e461c4fe95a5c9c0af0abafec85f7277bb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Thu, 2 Dec 2021 18:48:53 +0100 Subject: [PATCH] recombine: add timeout configuration option --- docs/operators/recombine.md | 23 +++--- .../transformer/recombine/recombine.go | 43 +++++++++-- .../transformer/recombine/recombine_test.go | 75 +++++++++++++++++++ 3 files changed, 124 insertions(+), 17 deletions(-) diff --git a/docs/operators/recombine.md b/docs/operators/recombine.md index 6379495e..6560ef90 100644 --- a/docs/operators/recombine.md +++ b/docs/operators/recombine.md @@ -4,17 +4,18 @@ The `recombine` operator combines consecutive logs into single logs based on sim ### Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `id` | `recombine` | A unique identifier for the operator. | -| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | -| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md). | -| `is_first_entry` | | An [expression](/docs/types/expression.md) that returns true if the entry being processed is the first entry in a multiline series. | -| `is_last_entry` | | An [expression](/docs/types/expression.md) that returns true if the entry being processed is the last entry in a multiline series. | -| `combine_field` | required | The [field](/docs/types/field.md) from all the entries that will recombined. | -| `combine_with` | `"\n"` | The string that is put between the combined entries. This can be an empty string as well. When using special characters like `\n`, be sure to enclose the value in double quotes: `"\n"`. | -| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. | -| `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. | +| Field | Default | Description | +| --- | --- | --- | +| `id` | `recombine` | A unique identifier for the operator. | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md). | +| `is_first_entry` | | An [expression](/docs/types/expression.md) that returns true if the entry being processed is the first entry in a multiline series. | +| `is_last_entry` | | An [expression](/docs/types/expression.md) that returns true if the entry being processed is the last entry in a multiline series. | +| `combine_field` | required | The [field](/docs/types/field.md) from all the entries that will recombined. | +| `combine_with` | `"\n"` | The string that is put between the combined entries. This can be an empty string as well. When using special characters like `\n`, be sure to enclose the value in double quotes: `"\n"`. | +| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. | +| `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. | +| `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. | Exactly one of `is_first_entry` and `is_last_entry` must be specified. diff --git a/operator/builtin/transformer/recombine/recombine.go b/operator/builtin/transformer/recombine/recombine.go index 165d7fa8..ddbd7721 100644 --- a/operator/builtin/transformer/recombine/recombine.go +++ b/operator/builtin/transformer/recombine/recombine.go @@ -40,18 +40,20 @@ func NewRecombineOperatorConfig(operatorID string) *RecombineOperatorConfig { MaxBatchSize: 1000, CombineWith: "\n", OverwriteWith: "oldest", + ForceFlushTimeout: 5 * time.Second, } } // RecombineOperatorConfig is the configuration of a recombine operator type RecombineOperatorConfig struct { helper.TransformerConfig `yaml:",inline"` - IsFirstEntry string `json:"is_first_entry" yaml:"is_first_entry"` - IsLastEntry string `json:"is_last_entry" yaml:"is_last_entry"` - MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"` - CombineField entry.Field `json:"combine_field" yaml:"combine_field"` - CombineWith string `json:"combine_with" yaml:"combine_with"` - OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"` + IsFirstEntry string `json:"is_first_entry" yaml:"is_first_entry"` + IsLastEntry string `json:"is_last_entry" yaml:"is_last_entry"` + MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"` + CombineField entry.Field `json:"combine_field" yaml:"combine_field"` + CombineWith string `json:"combine_with" yaml:"combine_with"` + OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"` + ForceFlushTimeout time.Duration `json:"force_flush_period" yaml:"force_flush_period"` } // Build creates a new RecombineOperator from a config @@ -108,6 +110,9 @@ func (c *RecombineOperatorConfig) Build(bc operator.BuildContext) ([]operator.Op batch: make([]*entry.Entry, 0, c.MaxBatchSize), combineField: c.CombineField, combineWith: c.CombineWith, + forceFlushTimeout: c.ForceFlushTimeout, + ticker: time.NewTicker(c.ForceFlushTimeout), + chClose: make(chan struct{}), } return []operator.Operator{recombine}, nil @@ -123,15 +128,36 @@ type RecombineOperator struct { overwriteWithOldest bool combineField entry.Field combineWith string + ticker *time.Ticker + forceFlushTimeout time.Duration + chClose chan struct{} sync.Mutex batch []*entry.Entry } func (r *RecombineOperator) Start(_ operator.Persister) error { + go r.flushLoop() + return nil } +func (r *RecombineOperator) flushLoop() { + for { + select { + case <-r.ticker.C: + r.Lock() + if err := r.flushCombined(); err != nil { + r.Errorw("Failed flushing", "error", err) + } + r.Unlock() + case <-r.chClose: + r.ticker.Stop() + return + } + } +} + func (r *RecombineOperator) Stop() error { r.Lock() defer r.Unlock() @@ -139,6 +165,9 @@ func (r *RecombineOperator) Stop() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() r.flushUncombined(ctx) + + close(r.chClose) + return nil } @@ -217,6 +246,7 @@ func (r *RecombineOperator) flushUncombined(ctx context.Context) { r.Write(ctx, entry) } r.batch = r.batch[:0] + r.ticker.Reset(r.forceFlushTimeout) } // flushCombined combines the entries currently in the batch into a single entry, @@ -260,5 +290,6 @@ func (r *RecombineOperator) flushCombined() error { r.Write(context.Background(), base) r.batch = r.batch[:0] + r.ticker.Reset(r.forceFlushTimeout) return nil } diff --git a/operator/builtin/transformer/recombine/recombine_test.go b/operator/builtin/transformer/recombine/recombine_test.go index 9a5e28ef..4e1d27db 100644 --- a/operator/builtin/transformer/recombine/recombine_test.go +++ b/operator/builtin/transformer/recombine/recombine_test.go @@ -203,3 +203,78 @@ func TestRecombineOperator(t *testing.T) { } }) } + +func BenchmarkRecombine(b *testing.B) { + cfg := NewRecombineOperatorConfig("") + cfg.CombineField = entry.NewBodyField() + cfg.IsFirstEntry = "false" + cfg.OutputIDs = []string{"fake"} + ops, err := cfg.Build(testutil.NewBuildContext(b)) + require.NoError(b, err) + recombine := ops[0].(*RecombineOperator) + + fake := testutil.NewFakeOutput(b) + require.NoError(b, recombine.SetOutputs([]operator.Operator{fake})) + recombine.Start(nil) + + go func() { + for { + <-fake.Received + } + }() + + e := entry.New() + e.Timestamp = time.Now() + e.Body = "body" + + ctx := context.Background() + b.ResetTimer() + for i := 0; i < b.N; i++ { + require.NoError(b, recombine.Process(ctx, e)) + require.NoError(b, recombine.Process(ctx, e)) + require.NoError(b, recombine.Process(ctx, e)) + require.NoError(b, recombine.Process(ctx, e)) + require.NoError(b, recombine.Process(ctx, e)) + recombine.flushUncombined(ctx) + } +} + +func TestTimeout(t *testing.T) { + t.Parallel() + + cfg := NewRecombineOperatorConfig("") + cfg.CombineField = entry.NewBodyField() + cfg.IsFirstEntry = "false" + cfg.OutputIDs = []string{"fake"} + cfg.Timeout = 70 * time.Millisecond + ops, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + recombine := ops[0].(*RecombineOperator) + + fake := testutil.NewFakeOutput(t) + require.NoError(t, recombine.SetOutputs([]operator.Operator{fake})) + recombine.Start(nil) + + e := entry.New() + e.Timestamp = time.Now() + e.Body = "body" + + ctx := context.Background() + + require.NoError(t, recombine.Process(ctx, e)) + select { + case <-fake.Received: + t.Logf("We shouldn't receive an entry before timeout") + t.FailNow() + case <-time.After(50 * time.Millisecond): + } + + select { + case <-fake.Received: + case <-time.After(5 * time.Second): + t.Logf("The entry should be flushed by now") + t.FailNow() + } + + require.NoError(t, recombine.Stop()) +}