Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
recombine: add timeout configuration option (#325)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek authored Dec 15, 2021
1 parent edfd778 commit 7b3dec9
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 17 deletions.
23 changes: 12 additions & 11 deletions docs/operators/recombine.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ The `recombine` operator combines consecutive logs into single logs based on sim

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `recombine` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md). |
| `is_first_entry` | | An [expression](/docs/types/expression.md) that returns true if the entry being processed is the first entry in a multiline series. |
| `is_last_entry` | | An [expression](/docs/types/expression.md) that returns true if the entry being processed is the last entry in a multiline series. |
| `combine_field` | required | The [field](/docs/types/field.md) from all the entries that will recombined. |
| `combine_with` | `"\n"` | The string that is put between the combined entries. This can be an empty string as well. When using special characters like `\n`, be sure to enclose the value in double quotes: `"\n"`. |
| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. |
| `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `recombine` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md). |
| `is_first_entry` | | An [expression](/docs/types/expression.md) that returns true if the entry being processed is the first entry in a multiline series. |
| `is_last_entry` | | An [expression](/docs/types/expression.md) that returns true if the entry being processed is the last entry in a multiline series. |
| `combine_field` | required | The [field](/docs/types/field.md) from all the entries that will recombined. |
| `combine_with` | `"\n"` | The string that is put between the combined entries. This can be an empty string as well. When using special characters like `\n`, be sure to enclose the value in double quotes: `"\n"`. |
| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. |
| `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. |
| `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. |

Exactly one of `is_first_entry` and `is_last_entry` must be specified.

Expand Down
43 changes: 37 additions & 6 deletions operator/builtin/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ func NewRecombineOperatorConfig(operatorID string) *RecombineOperatorConfig {
MaxBatchSize: 1000,
CombineWith: "\n",
OverwriteWith: "oldest",
ForceFlushTimeout: 5 * time.Second,
}
}

// RecombineOperatorConfig is the configuration of a recombine operator
type RecombineOperatorConfig struct {
helper.TransformerConfig `yaml:",inline"`
IsFirstEntry string `json:"is_first_entry" yaml:"is_first_entry"`
IsLastEntry string `json:"is_last_entry" yaml:"is_last_entry"`
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
CombineField entry.Field `json:"combine_field" yaml:"combine_field"`
CombineWith string `json:"combine_with" yaml:"combine_with"`
OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"`
IsFirstEntry string `json:"is_first_entry" yaml:"is_first_entry"`
IsLastEntry string `json:"is_last_entry" yaml:"is_last_entry"`
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
CombineField entry.Field `json:"combine_field" yaml:"combine_field"`
CombineWith string `json:"combine_with" yaml:"combine_with"`
OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"`
ForceFlushTimeout time.Duration `json:"force_flush_period" yaml:"force_flush_period"`
}

// Build creates a new RecombineOperator from a config
Expand Down Expand Up @@ -108,6 +110,9 @@ func (c *RecombineOperatorConfig) Build(bc operator.BuildContext) ([]operator.Op
batch: make([]*entry.Entry, 0, c.MaxBatchSize),
combineField: c.CombineField,
combineWith: c.CombineWith,
forceFlushTimeout: c.ForceFlushTimeout,
ticker: time.NewTicker(c.ForceFlushTimeout),
chClose: make(chan struct{}),
}

return []operator.Operator{recombine}, nil
Expand All @@ -123,22 +128,46 @@ type RecombineOperator struct {
overwriteWithOldest bool
combineField entry.Field
combineWith string
ticker *time.Ticker
forceFlushTimeout time.Duration
chClose chan struct{}

sync.Mutex
batch []*entry.Entry
}

func (r *RecombineOperator) Start(_ operator.Persister) error {
go r.flushLoop()

return nil
}

func (r *RecombineOperator) flushLoop() {
for {
select {
case <-r.ticker.C:
r.Lock()
if err := r.flushCombined(); err != nil {
r.Errorw("Failed flushing", "error", err)
}
r.Unlock()
case <-r.chClose:
r.ticker.Stop()
return
}
}
}

func (r *RecombineOperator) Stop() error {
r.Lock()
defer r.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r.flushUncombined(ctx)

close(r.chClose)

return nil
}

Expand Down Expand Up @@ -217,6 +246,7 @@ func (r *RecombineOperator) flushUncombined(ctx context.Context) {
r.Write(ctx, entry)
}
r.batch = r.batch[:0]
r.ticker.Reset(r.forceFlushTimeout)
}

// flushCombined combines the entries currently in the batch into a single entry,
Expand Down Expand Up @@ -260,5 +290,6 @@ func (r *RecombineOperator) flushCombined() error {

r.Write(context.Background(), base)
r.batch = r.batch[:0]
r.ticker.Reset(r.forceFlushTimeout)
return nil
}
75 changes: 75 additions & 0 deletions operator/builtin/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,78 @@ func TestRecombineOperator(t *testing.T) {
}
})
}

func BenchmarkRecombine(b *testing.B) {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "false"
cfg.OutputIDs = []string{"fake"}
ops, err := cfg.Build(testutil.NewBuildContext(b))
require.NoError(b, err)
recombine := ops[0].(*RecombineOperator)

fake := testutil.NewFakeOutput(b)
require.NoError(b, recombine.SetOutputs([]operator.Operator{fake}))
recombine.Start(nil)

go func() {
for {
<-fake.Received
}
}()

e := entry.New()
e.Timestamp = time.Now()
e.Body = "body"

ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
require.NoError(b, recombine.Process(ctx, e))
require.NoError(b, recombine.Process(ctx, e))
require.NoError(b, recombine.Process(ctx, e))
require.NoError(b, recombine.Process(ctx, e))
require.NoError(b, recombine.Process(ctx, e))
recombine.flushUncombined(ctx)
}
}

func TestTimeout(t *testing.T) {
t.Parallel()

cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "false"
cfg.OutputIDs = []string{"fake"}
cfg.ForceFlushTimeout = 70 * time.Millisecond
ops, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
recombine := ops[0].(*RecombineOperator)

fake := testutil.NewFakeOutput(t)
require.NoError(t, recombine.SetOutputs([]operator.Operator{fake}))
recombine.Start(nil)

e := entry.New()
e.Timestamp = time.Now()
e.Body = "body"

ctx := context.Background()

require.NoError(t, recombine.Process(ctx, e))
select {
case <-fake.Received:
t.Logf("We shouldn't receive an entry before timeout")
t.FailNow()
case <-time.After(50 * time.Millisecond):
}

select {
case <-fake.Received:
case <-time.After(5 * time.Second):
t.Logf("The entry should be flushed by now")
t.FailNow()
}

require.NoError(t, recombine.Stop())
}

0 comments on commit 7b3dec9

Please sign in to comment.