Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
Source identifier (#341)
Browse files Browse the repository at this point in the history
Enhance recombine operator to have ability to group entries according to a `source_identifier` field.
  • Loading branch information
rockb1017 authored Jan 20, 2022
1 parent 0197a90 commit d1c9d7a
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 29 deletions.
2 changes: 2 additions & 0 deletions docs/operators/recombine.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The `recombine` operator combines consecutive logs into single logs based on sim
| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. |
| `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. |
| `force_flush_period` | `5s` | Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. |
| `source_identifier` | `$attibutes["file.path"]` | The [field](/docs/types/field.md) to separate one source of logs from others when combining them. |
| `max_sources` | 1000 | The maximum number of unique sources allowed concurrently to be tracked for combining separately. |

Exactly one of `is_first_entry` and `is_last_entry` must be specified.

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
)

require (
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/benbjohnson/clock v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.9.0+incompatible // indirect
github.com/go-logr/logr v0.4.0 // indirect
Expand All @@ -34,6 +34,7 @@ require (
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.2/go.mod h1:72H
github.com/aws/aws-sdk-go-v2/service/sso v1.4.2/go.mod h1:NBvT9R1MEF+Ud6ApJKM0G+IkPchKS7p7c2YPKwHmBOk=
github.com/aws/aws-sdk-go-v2/service/sts v1.7.2/go.mod h1:8EzeIqfWt2wWT4rJVu3f21TfrhJ8AEMzVybRNSb/b4g=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.2.0 h1:9Re3G2TWxkE06LdMWMpcY6KV81GLXMGiYpPYUPkFAws=
github.com/benbjohnson/clock v1.2.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -317,7 +316,6 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
Expand Down Expand Up @@ -432,6 +430,7 @@ github.com/rivo/tview v0.0.0-20200219210816-cd38d7432498/go.mod h1:6lkG1x+13OShE
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down
95 changes: 69 additions & 26 deletions operator/builtin/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ func NewRecombineOperatorConfig(operatorID string) *RecombineOperatorConfig {
return &RecombineOperatorConfig{
TransformerConfig: helper.NewTransformerConfig(operatorID, "recombine"),
MaxBatchSize: 1000,
MaxSources: 1000,
CombineWith: "\n",
OverwriteWith: "oldest",
ForceFlushTimeout: 5 * time.Second,
SourceIdentifier: entry.NewAttributeField("file.path"),
}
}

Expand All @@ -52,8 +54,10 @@ type RecombineOperatorConfig struct {
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
CombineField entry.Field `json:"combine_field" yaml:"combine_field"`
CombineWith string `json:"combine_with" yaml:"combine_with"`
SourceIdentifier entry.Field `json:"source_identifier" yaml:"source_identifier"`
OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"`
ForceFlushTimeout time.Duration `json:"force_flush_period" yaml:"force_flush_period"`
MaxSources int `json:"max_sources" yaml:"max_sources"`
}

// Build creates a new RecombineOperator from a config
Expand Down Expand Up @@ -106,13 +110,15 @@ func (c *RecombineOperatorConfig) Build(bc operator.BuildContext) ([]operator.Op
matchFirstLine: matchesFirst,
prog: prog,
maxBatchSize: c.MaxBatchSize,
maxSources: c.MaxSources,
overwriteWithOldest: overwriteWithOldest,
batch: make([]*entry.Entry, 0, c.MaxBatchSize),
batchMap: make(map[string][]*entry.Entry),
combineField: c.CombineField,
combineWith: c.CombineWith,
forceFlushTimeout: c.ForceFlushTimeout,
ticker: time.NewTicker(c.ForceFlushTimeout),
chClose: make(chan struct{}),
sourceIdentifier: c.SourceIdentifier,
}

return []operator.Operator{recombine}, nil
Expand All @@ -125,15 +131,17 @@ type RecombineOperator struct {
matchFirstLine bool
prog *vm.Program
maxBatchSize int
maxSources int
overwriteWithOldest bool
combineField entry.Field
combineWith string
ticker *time.Ticker
forceFlushTimeout time.Duration
chClose chan struct{}
sourceIdentifier entry.Field

sync.Mutex
batch []*entry.Entry
batchMap map[string][]*entry.Entry
}

func (r *RecombineOperator) Start(_ operator.Persister) error {
Expand All @@ -147,9 +155,19 @@ func (r *RecombineOperator) flushLoop() {
select {
case <-r.ticker.C:
r.Lock()
if err := r.flushCombined(); err != nil {
r.Errorw("Failed flushing", "error", err)
timeNow := time.Now()
for source, entries := range r.batchMap {
lastEntryTs := entries[len(entries)-1].Timestamp
timeSinceLastEntry := timeNow.Sub(lastEntryTs)
if timeSinceLastEntry < r.forceFlushTimeout {
continue
}
if err := r.flushSource(source); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}

r.ticker.Reset(r.forceFlushTimeout)
r.Unlock()
case <-r.chClose:
r.ticker.Stop()
Expand All @@ -171,6 +189,8 @@ func (r *RecombineOperator) Stop() error {
return nil
}

const DefaultSourceIdentifier = "DefaultSourceIdentifier"

func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {
// Lock the recombine operator because process can't run concurrently
r.Lock()
Expand All @@ -190,24 +210,34 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {

// this is guaranteed to be a boolean because of expr.AsBool
matches := m.(bool)
var s string
err = e.Read(r.sourceIdentifier, &s)
if err != nil {
r.Warn("entry does not contain the source_identifier, so it may be pooled with other sources")
s = DefaultSourceIdentifier
}

if s == "" {
s = DefaultSourceIdentifier
}

// This is the first entry in the next batch
if matches && r.matchIndicatesFirst() {
// Flush the existing batch
err := r.flushCombined()
err := r.flushSource(s)
if err != nil {
return err
}

// Add the current log to the new batch
r.addToBatch(ctx, e)
r.addToBatch(ctx, e, s)
return nil
}

// This is the last entry in a complete batch
if matches && r.matchIndicatesLast() {
r.addToBatch(ctx, e)
err := r.flushCombined()
r.addToBatch(ctx, e, s)
err := r.flushSource(s)
if err != nil {
return err
}
Expand All @@ -216,7 +246,7 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {

// This is neither the first entry of a new log,
// nor the last entry of a log, so just add it to the batch
r.addToBatch(ctx, e)
r.addToBatch(ctx, e, s)
return nil
}

Expand All @@ -229,46 +259,59 @@ func (r *RecombineOperator) matchIndicatesLast() bool {
}

// addToBatch adds the current entry to the current batch of entries that will be combined
func (r *RecombineOperator) addToBatch(_ context.Context, e *entry.Entry) {
if len(r.batch) >= r.maxBatchSize {
r.Error("Batch size exceeds max batch size. Flushing logs that have not been recombined")
r.flushUncombined(context.Background())
func (r *RecombineOperator) addToBatch(_ context.Context, e *entry.Entry, source string) {
if _, ok := r.batchMap[source]; !ok {
r.batchMap[source] = []*entry.Entry{e}
if len(r.batchMap) >= r.maxSources {
r.Error("Batched source exceeds max source size. Flushing all batched logs. Consider increasing max_sources parameter")
r.flushUncombined(context.Background())
}
return
}

r.batch = append(r.batch, e)
r.batchMap[source] = append(r.batchMap[source], e)
if len(r.batchMap[source]) >= r.maxBatchSize {
if err := r.flushSource(source); err != nil {
r.Errorf("there was error flushing combined logs %s", err)
}
}
}

// flushUncombined flushes all the logs in the batch individually to the
// next output in the pipeline. This is only used when there is an error
// or at shutdown to avoid dropping the logs.
func (r *RecombineOperator) flushUncombined(ctx context.Context) {
for _, entry := range r.batch {
r.Write(ctx, entry)
for source := range r.batchMap {
for _, entry := range r.batchMap[source] {
r.Write(ctx, entry)
}
}
r.batch = r.batch[:0]
r.batchMap = make(map[string][]*entry.Entry)
r.ticker.Reset(r.forceFlushTimeout)
}

// flushCombined combines the entries currently in the batch into a single entry,
// flushSource combines the entries currently in the batch into a single entry,
// then forwards them to the next operator in the pipeline
func (r *RecombineOperator) flushCombined() error {
func (r *RecombineOperator) flushSource(source string) error {
// Skip flushing a combined log if the batch is empty
if len(r.batch) == 0 {
if len(r.batchMap[source]) == 0 {
return nil
}

// Choose which entry we want to keep the rest of the fields from
var base *entry.Entry
entries := r.batchMap[source]

if r.overwriteWithOldest {
base = r.batch[0]
base = entries[0]
} else {
base = r.batch[len(r.batch)-1]
base = entries[len(entries)-1]
}

// Combine the combineField of each entry in the batch,
// separated by newlines
var recombined strings.Builder
for i, e := range r.batch {
for i, e := range entries {
var s string
err := e.Read(r.combineField, &s)
if err != nil {
Expand All @@ -277,7 +320,7 @@ func (r *RecombineOperator) flushCombined() error {
}

recombined.WriteString(s)
if i != len(r.batch)-1 && len(r.combineWith) > 0 {
if i != len(entries)-1 {
recombined.WriteString(r.combineWith)
}
}
Expand All @@ -289,7 +332,7 @@ func (r *RecombineOperator) flushCombined() error {
}

r.Write(context.Background(), base)
r.batch = r.batch[:0]
r.ticker.Reset(r.forceFlushTimeout)

delete(r.batchMap, source)
return nil
}
91 changes: 91 additions & 0 deletions operator/builtin/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func TestRecombineOperator(t *testing.T) {
return e
}

entryWithBodyAttr := func(ts time.Time, body interface{}, Attr map[string]string) *entry.Entry {
e := entryWithBody(ts, body)
for k, v := range Attr {
e.AddAttribute(k, v)
}
return e
}

cases := []struct {
name string
config *RecombineOperatorConfig
Expand Down Expand Up @@ -141,6 +149,89 @@ func TestRecombineOperator(t *testing.T) {
},
[]*entry.Entry{entryWithBody(t1, "test1test2")},
},
{
"TestDefaultSourceIdentifier",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "$body == 'end'"
cfg.OutputIDs = []string{"fake"}
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1\nend", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file2\nend", map[string]string{"file.path": "file2"}),
},
},
{
"TestCustomSourceIdentifier",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "$body == 'end'"
cfg.OutputIDs = []string{"fake"}
cfg.SourceIdentifier = entry.NewAttributeField("custom_source")
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"custom_source": "file1"}),
entryWithBodyAttr(t1, "file2", map[string]string{"custom_source": "file2"}),
entryWithBodyAttr(t2, "end", map[string]string{"custom_source": "file1"}),
entryWithBodyAttr(t2, "end", map[string]string{"custom_source": "file2"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1\nend", map[string]string{"custom_source": "file1"}),
entryWithBodyAttr(t1, "file2\nend", map[string]string{"custom_source": "file2"}),
},
},
{
"TestMaxSources",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "$body == 'end'"
cfg.OutputIDs = []string{"fake"}
cfg.MaxSources = 1
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}),
},
},
{
"TestMaxBatchSize",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "$body == 'end'"
cfg.OutputIDs = []string{"fake"}
cfg.MaxBatchSize = 2
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1_event1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file2_event1", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2_event2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1_event1\nend", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file2_event1\nfile2_event2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file2"}),
},
},
}

for _, tc := range cases {
Expand Down

0 comments on commit d1c9d7a

Please sign in to comment.