Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
feat(recombine): add combine_with option
Browse files Browse the repository at this point in the history
Fixes #314
  • Loading branch information
andrzej-stencel committed Nov 29, 2021
1 parent 2645572 commit 94e3bed
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
5 changes: 3 additions & 2 deletions docs/operators/recombine.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ The `recombine` operator combines consecutive logs into single logs based on sim
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md). |
| `is_first_entry` | | An [expression](/docs/types/expression.md) that returns true if the entry being processed is the first entry in a multiline series. |
| `is_last_entry` | | An [expression](/docs/types/expression.md) that returns true if the entry being processed is the last entry in a multiline series. |
| `combine_field` | required | The [field](/docs/types/field.md) from all the entries that will recombined with newlines. |
| `combine_field` | required | The [field](/docs/types/field.md) from all the entries that will recombined. |
| `combine_with` | `\n` | The string that is put between the combined entries. This can be an empty string as well. |
| `max_batch_size` | 1000 | The maximum number of consecutive entries that will be combined into a single entry. |
| `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined with newlines. |
| `overwrite_with` | `oldest` | Whether to use the fields from the `oldest` or the `newest` entry for all the fields that are not combined. |

Exactly one of `is_first_entry` and `is_last_entry` must be specified.

Expand Down
8 changes: 6 additions & 2 deletions operator/builtin/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewRecombineOperatorConfig(operatorID string) *RecombineOperatorConfig {
return &RecombineOperatorConfig{
TransformerConfig: helper.NewTransformerConfig(operatorID, "recombine"),
MaxBatchSize: 1000,
CombineWith: "\n",
OverwriteWith: "oldest",
}
}
Expand All @@ -49,6 +50,7 @@ type RecombineOperatorConfig struct {
IsLastEntry string `json:"is_last_entry" yaml:"is_last_entry"`
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
CombineField entry.Field `json:"combine_field" yaml:"combine_field"`
CombineWith string `json:"combine_with" yaml:"combine_with"`
OverwriteWith string `json:"overwrite_with" yaml:"overwrite_with"`
}

Expand Down Expand Up @@ -105,6 +107,7 @@ func (c *RecombineOperatorConfig) Build(bc operator.BuildContext) ([]operator.Op
overwriteWithOldest: overwriteWithOldest,
batch: make([]*entry.Entry, 0, c.MaxBatchSize),
combineField: c.CombineField,
combineWith: c.CombineWith,
}

return []operator.Operator{recombine}, nil
Expand All @@ -119,6 +122,7 @@ type RecombineOperator struct {
maxBatchSize int
overwriteWithOldest bool
combineField entry.Field
combineWith string

sync.Mutex
batch []*entry.Entry
Expand Down Expand Up @@ -243,8 +247,8 @@ func (r *RecombineOperator) flushCombined() error {
}

recombined.WriteString(s)
if i != len(r.batch)-1 {
recombined.WriteByte('\n')
if i != len(r.batch)-1 && len(r.combineWith) > 0 {
recombined.WriteString(r.combineWith)
}
}

Expand Down
16 changes: 16 additions & 0 deletions operator/builtin/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ func TestRecombineOperator(t *testing.T) {
entryWithBody(t2, "test1\ntest2"),
},
},
{
"CombineWithEmptyString",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.CombineWith = ""
cfg.IsLastEntry = "$body == 'test2'"
cfg.OutputIDs = []string{"fake"}
return cfg
}(),
[]*entry.Entry{
entryWithBody(t1, "test1"),
entryWithBody(t1, "test2"),
},
[]*entry.Entry{entryWithBody(t1, "test1test2")},
},
}

for _, tc := range cases {
Expand Down

0 comments on commit 94e3bed

Please sign in to comment.