Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
recombine: add timeout configuration option
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek committed Dec 2, 2021
1 parent d86516e commit aded0ff
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/operators/recombine.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The `recombine` operator combines consecutive logs into single logs based on sim
| `combine_with` | `"\n"` | The string that is put between the combined entries. This can be an empty string as well. When using special characters like `\n`, be sure to enclose the value in double quotes: `"\n"`. |
| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. |
| `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. |
| `timeout` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. |

Exactly one of `is_first_entry` and `is_last_entry` must be specified.

Expand Down
39 changes: 33 additions & 6 deletions operator/builtin/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ func NewRecombineOperatorConfig(operatorID string) *RecombineOperatorConfig {
MaxBatchSize: 1000,
CombineWith: "\n",
OverwriteWith: "oldest",
Timeout: 5 * time.Second,
}
}

// RecombineOperatorConfig is the configuration of a recombine operator
type RecombineOperatorConfig struct {
helper.TransformerConfig `yaml:",inline"`
IsFirstEntry string `json:"is_first_entry" yaml:"is_first_entry"`
IsLastEntry string `json:"is_last_entry" yaml:"is_last_entry"`
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
CombineField entry.Field `json:"combine_field" yaml:"combine_field"`
CombineWith string `json:"combine_with" yaml:"combine_with"`
OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"`
IsFirstEntry string `json:"is_first_entry" yaml:"is_first_entry"`
IsLastEntry string `json:"is_last_entry" yaml:"is_last_entry"`
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
CombineField entry.Field `json:"combine_field" yaml:"combine_field"`
CombineWith string `json:"combine_with" yaml:"combine_with"`
OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"`
Timeout time.Duration `json:"timeout" yaml:"timeout"`
}

// Build creates a new RecombineOperator from a config
Expand Down Expand Up @@ -108,6 +110,9 @@ func (c *RecombineOperatorConfig) Build(bc operator.BuildContext) ([]operator.Op
batch: make([]*entry.Entry, 0, c.MaxBatchSize),
combineField: c.CombineField,
combineWith: c.CombineWith,
timeout: c.Timeout,
ticker: time.NewTicker(c.Timeout),
chClose: make(chan struct{}),
}

return []operator.Operator{recombine}, nil
Expand All @@ -123,12 +128,29 @@ type RecombineOperator struct {
overwriteWithOldest bool
combineField entry.Field
combineWith string
ticker *time.Ticker
timeout time.Duration
chClose chan struct{}

sync.Mutex
batch []*entry.Entry
}

func (r *RecombineOperator) Start(_ operator.Persister) error {
go func() {
for {
select {
case <-r.ticker.C:
if err := r.flushCombined(); err != nil {
r.Errorw("Failed flushing", "error", err)
}
case <-r.chClose:
r.ticker.Stop()
return
}
}
}()

return nil
}

Expand All @@ -139,6 +161,9 @@ func (r *RecombineOperator) Stop() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r.flushUncombined(ctx)

close(r.chClose)

return nil
}

Expand Down Expand Up @@ -217,6 +242,7 @@ func (r *RecombineOperator) flushUncombined(ctx context.Context) {
r.Write(ctx, entry)
}
r.batch = r.batch[:0]
r.ticker.Reset(r.timeout)
}

// flushCombined combines the entries currently in the batch into a single entry,
Expand Down Expand Up @@ -260,5 +286,6 @@ func (r *RecombineOperator) flushCombined() error {

r.Write(context.Background(), base)
r.batch = r.batch[:0]
r.ticker.Reset(r.timeout)
return nil
}
75 changes: 75 additions & 0 deletions operator/builtin/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,78 @@ func TestRecombineOperator(t *testing.T) {
}
})
}

func BenchmarkRecombine(b *testing.B) {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "false"
cfg.OutputIDs = []string{"fake"}
ops, err := cfg.Build(testutil.NewBuildContext(b))
require.NoError(b, err)
recombine := ops[0].(*RecombineOperator)

fake := testutil.NewFakeOutput(b)
require.NoError(b, recombine.SetOutputs([]operator.Operator{fake}))
recombine.Start(nil)

go func() {
for {
<-fake.Received
}
}()

e := entry.New()
e.Timestamp = time.Now()
e.Body = "body"

ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
require.NoError(b, recombine.Process(ctx, e))
require.NoError(b, recombine.Process(ctx, e))
require.NoError(b, recombine.Process(ctx, e))
require.NoError(b, recombine.Process(ctx, e))
require.NoError(b, recombine.Process(ctx, e))
recombine.flushUncombined(ctx)
}
}

func TestTimeout(t *testing.T) {
t.Parallel()

cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "false"
cfg.OutputIDs = []string{"fake"}
cfg.Timeout = 70 * time.Millisecond
ops, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
recombine := ops[0].(*RecombineOperator)

fake := testutil.NewFakeOutput(t)
require.NoError(t, recombine.SetOutputs([]operator.Operator{fake}))
recombine.Start(nil)

e := entry.New()
e.Timestamp = time.Now()
e.Body = "body"

ctx := context.Background()

require.NoError(t, recombine.Process(ctx, e))
select {
case <-fake.Received:
t.Logf("We shouldn't receive an entry before timeout")
t.FailNow()
case <-time.After(50 * time.Millisecond):
}

select {
case <-fake.Received:
case <-time.After(5 * time.Second):
t.Logf("The entry should be flushed by now")
t.FailNow()
}

require.NoError(t, recombine.Stop())
}

0 comments on commit aded0ff

Please sign in to comment.