From 70bc711c3ef460d43833b7bf97e885d0978d6c4d Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Tue, 22 Nov 2022 14:09:25 +0100 Subject: [PATCH] feat(pkg/stanza): add backpropagation of number of processed entries --- .chloggen/drosiek-investigation.yaml | 16 ++ pkg/stanza/adapter/emitter.go | 5 +- pkg/stanza/adapter/emitter_test.go | 11 +- pkg/stanza/adapter/integration_test.go | 6 +- pkg/stanza/adapter/mocks_test.go | 4 +- pkg/stanza/docs/operators/key_value_parser.md | 2 +- pkg/stanza/docs/operators/noop.md | 2 +- pkg/stanza/docs/operators/syslog_input.md | 4 +- pkg/stanza/docs/operators/udp_input.md | 2 +- pkg/stanza/docs/types/entry.md | 2 +- pkg/stanza/operator/helper/input.go | 4 +- pkg/stanza/operator/helper/input_test.go | 3 +- pkg/stanza/operator/helper/parser.go | 23 +- pkg/stanza/operator/helper/parser_test.go | 32 ++- pkg/stanza/operator/helper/transformer.go | 15 +- .../operator/helper/transformer_test.go | 18 +- pkg/stanza/operator/helper/writer.go | 14 +- pkg/stanza/operator/helper/writer_test.go | 54 ++++- .../operator/input/journald/journald_test.go | 2 +- pkg/stanza/operator/input/tcp/tcp_test.go | 8 +- pkg/stanza/operator/input/udp/udp_test.go | 6 +- pkg/stanza/operator/operator.go | 3 +- pkg/stanza/operator/output/drop/drop.go | 4 +- pkg/stanza/operator/output/drop/drop_test.go | 5 +- pkg/stanza/operator/output/file/file.go | 8 +- pkg/stanza/operator/output/stdout/stdout.go | 6 +- .../operator/output/stdout/stdout_test.go | 3 +- pkg/stanza/operator/parser/csv/csv.go | 6 +- pkg/stanza/operator/parser/csv/csv_test.go | 21 +- pkg/stanza/operator/parser/json/json.go | 2 +- pkg/stanza/operator/parser/json/json_test.go | 3 +- .../operator/parser/keyvalue/keyvalue.go | 2 +- .../operator/parser/keyvalue/keyvalue_test.go | 4 +- pkg/stanza/operator/parser/regex/regex.go | 2 +- .../operator/parser/regex/regex_test.go | 3 +- .../operator/parser/scope/scope_name.go | 2 +- .../operator/parser/scope/scope_name_test.go | 3 +- .../operator/parser/severity/severity.go | 2 +- .../operator/parser/severity/severity_test.go | 2 +- pkg/stanza/operator/parser/syslog/syslog.go | 2 +- .../operator/parser/syslog/syslog_test.go | 6 +- pkg/stanza/operator/parser/time/time.go | 2 +- pkg/stanza/operator/parser/time/time_test.go | 5 +- pkg/stanza/operator/parser/trace/trace.go | 2 +- .../operator/parser/trace/trace_test.go | 3 +- pkg/stanza/operator/parser/uri/uri.go | 2 +- pkg/stanza/operator/parser/uri/uri_test.go | 3 +- pkg/stanza/operator/transformer/add/add.go | 2 +- .../operator/transformer/add/add_test.go | 3 +- pkg/stanza/operator/transformer/copy/copy.go | 2 +- .../operator/transformer/copy/copy_test.go | 4 +- .../operator/transformer/filter/filter.go | 15 +- .../transformer/filter/filter_test.go | 21 +- .../operator/transformer/flatten/flatten.go | 2 +- .../transformer/flatten/flatten_test.go | 4 +- pkg/stanza/operator/transformer/move/move.go | 2 +- .../operator/transformer/move/move_test.go | 4 +- pkg/stanza/operator/transformer/noop/noop.go | 5 +- .../operator/transformer/noop/noop_test.go | 3 +- .../transformer/recombine/recombine.go | 10 +- .../transformer/recombine/recombine_test.go | 22 +- .../operator/transformer/remove/remove.go | 2 +- .../transformer/remove/remove_test.go | 4 +- .../operator/transformer/retain/retain.go | 2 +- .../transformer/retain/retain_test.go | 4 +- .../operator/transformer/router/router.go | 11 +- .../transformer/router/router_test.go | 7 +- pkg/stanza/pipeline_test.go | 217 ++++++++++++++++++ pkg/stanza/testutil/mocks.go | 4 +- pkg/stanza/testutil/operator.go | 15 +- processor/logstransformprocessor/processor.go | 2 +- 71 files changed, 525 insertions(+), 181 deletions(-) create mode 100755 .chloggen/drosiek-investigation.yaml create mode 100644 pkg/stanza/pipeline_test.go diff --git a/.chloggen/drosiek-investigation.yaml b/.chloggen/drosiek-investigation.yaml new file mode 100755 index 000000000000..6392e9dc1108 --- /dev/null +++ b/.chloggen/drosiek-investigation.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add backpropagation of number of emitted entries, so it can be handled properly in processors + +# One or more tracking issues related to the change +issues: [15378] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/pkg/stanza/adapter/emitter.go b/pkg/stanza/adapter/emitter.go index df3630a11148..744c0a951214 100644 --- a/pkg/stanza/adapter/emitter.go +++ b/pkg/stanza/adapter/emitter.go @@ -90,12 +90,13 @@ func (e *LogEmitter) OutChannel() <-chan []*entry.Entry { } // Process will emit an entry to the output channel -func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error { +func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) (int, error) { if oldBatch := e.appendEntry(ent); len(oldBatch) > 0 { e.flush(ctx, oldBatch) } - return nil + // always returns 1 as the entry is going to be emitted now or later + return 1, nil } // appendEntry appends the entry to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch diff --git a/pkg/stanza/adapter/emitter_test.go b/pkg/stanza/adapter/emitter_test.go index 1de564512fd1..26d76fe1e2b4 100644 --- a/pkg/stanza/adapter/emitter_test.go +++ b/pkg/stanza/adapter/emitter_test.go @@ -37,7 +37,8 @@ func TestLogEmitter(t *testing.T) { in := entry.New() go func() { - require.NoError(t, emitter.Process(context.Background(), in)) + _, err := emitter.Process(context.Background(), in) + require.NoError(t, err) }() select { @@ -65,7 +66,8 @@ func TestLogEmitterEmitsOnMaxBatchSize(t *testing.T) { go func() { ctx := context.Background() for _, e := range entries { - require.NoError(t, emitter.Process(ctx, e)) + _, err := emitter.Process(ctx, e) + require.NoError(t, err) } }() @@ -94,8 +96,9 @@ func TestLogEmitterEmitsOnFlushInterval(t *testing.T) { entry := complexEntry() go func() { - ctx := context.Background() - require.NoError(t, emitter.Process(ctx, entry)) + processed, err := emitter.Process(context.Background(), entry) + require.NoError(t, err) + require.Equal(t, 1, processed) }() timeoutChan := time.After(timeout) diff --git a/pkg/stanza/adapter/integration_test.go b/pkg/stanza/adapter/integration_test.go index ff31d4c5f072..c7b1a68e5129 100644 --- a/pkg/stanza/adapter/integration_test.go +++ b/pkg/stanza/adapter/integration_test.go @@ -94,7 +94,7 @@ func BenchmarkEmitterToConsumer(b *testing.B) { go func() { ctx := context.Background() for _, e := range entries { - _ = logsReceiver.emitter.Process(ctx, e) + _, _ = logsReceiver.emitter.Process(ctx, e) } }() @@ -125,7 +125,9 @@ func TestEmitterToConsumer(t *testing.T) { go func() { ctx := context.Background() for _, e := range entries { - require.NoError(t, logsReceiver.emitter.Process(ctx, e)) + processed, err := logsReceiver.emitter.Process(ctx, e) + require.NoError(t, err) + require.Equal(t, processed, 1) } }() diff --git a/pkg/stanza/adapter/mocks_test.go b/pkg/stanza/adapter/mocks_test.go index 7e41bf43e296..c66784b0cd05 100644 --- a/pkg/stanza/adapter/mocks_test.go +++ b/pkg/stanza/adapter/mocks_test.go @@ -65,8 +65,8 @@ func (o *UnstartableOperator) Start(_ operator.Persister) error { } // Process will return nil -func (o *UnstartableOperator) Process(ctx context.Context, entry *entry.Entry) error { - return nil +func (o *UnstartableOperator) Process(ctx context.Context, entry *entry.Entry) (int, error) { + return 0, nil } type mockLogsRejecter struct { diff --git a/pkg/stanza/docs/operators/key_value_parser.md b/pkg/stanza/docs/operators/key_value_parser.md index 70c45360b0d5..f2daf0500f49 100644 --- a/pkg/stanza/docs/operators/key_value_parser.md +++ b/pkg/stanza/docs/operators/key_value_parser.md @@ -8,7 +8,7 @@ The `key_value_parser` operator parses the string-type field selected by `parse_ | --- | --- | --- | | `id` | `key_value_parser` | A unique identifier for the operator. | | `delimiter` | `=` | The delimiter used for splitting a value into a key value pair. | -| `pair_delimiter` | | The delimiter used for seperating key value pairs, defaults to whitespace. | +| `pair_delimiter` | | The delimiter used for separating key value pairs, defaults to whitespace. | | `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | | `parse_from` | `body` | A [field](../types/field.md) that indicates the field to be parsed into key value pairs. | | `parse_to` | `attributes` | A [field](../types/field.md) that indicates the field to be parsed as into key value pairs. | diff --git a/pkg/stanza/docs/operators/noop.md b/pkg/stanza/docs/operators/noop.md index 40c1e9bd78fa..3034b85c3610 100644 --- a/pkg/stanza/docs/operators/noop.md +++ b/pkg/stanza/docs/operators/noop.md @@ -30,4 +30,4 @@ operators: #### Why is this necessary? -The last operator is always responsible for emitting logs from the receiver. In non-linear pipelines, it is sometimes necessary to explictly direct logs to the final operator. In many such cases, the final operator performs some work. However, if no more work is required, the `noop` operator can serve as a final operator. +The last operator is always responsible for emitting logs from the receiver. In non-linear pipelines, it is sometimes necessary to explicitly direct logs to the final operator. In many such cases, the final operator performs some work. However, if no more work is required, the `noop` operator can serve as a final operator. diff --git a/pkg/stanza/docs/operators/syslog_input.md b/pkg/stanza/docs/operators/syslog_input.md index 596bad91028c..014d2ba566ab 100644 --- a/pkg/stanza/docs/operators/syslog_input.md +++ b/pkg/stanza/docs/operators/syslog_input.md @@ -26,7 +26,7 @@ TCP Configuration: ```yaml - type: syslog_input tcp: - listen_adress: "0.0.0.0:54526" + listen_address: "0.0.0.0:54526" syslog: protocol: rfc5424 ``` @@ -36,7 +36,7 @@ UDP Configuration: ```yaml - type: syslog_input udp: - listen_adress: "0.0.0.0:54526" + listen_address: "0.0.0.0:54526" syslog: protocol: rfc3164 location: UTC diff --git a/pkg/stanza/docs/operators/udp_input.md b/pkg/stanza/docs/operators/udp_input.md index 3468290f7486..30b84ad841b0 100644 --- a/pkg/stanza/docs/operators/udp_input.md +++ b/pkg/stanza/docs/operators/udp_input.md @@ -48,7 +48,7 @@ Configuration: ```yaml - type: udp_input - listen_adress: "0.0.0.0:54526" + listen_address: "0.0.0.0:54526" ``` Send a log: diff --git a/pkg/stanza/docs/types/entry.md b/pkg/stanza/docs/types/entry.md index b2f6881b7bfb..8067d973fbfa 100644 --- a/pkg/stanza/docs/types/entry.md +++ b/pkg/stanza/docs/types/entry.md @@ -36,4 +36,4 @@ Represented in `json` format, an entry may look like the following: } ``` -Throughout the documentation, `json` format is used to represent entries. Fields are typically ommitted unless relevant to the behavior being described. +Throughout the documentation, `json` format is used to represent entries. Fields are typically omitted unless relevant to the behavior being described. diff --git a/pkg/stanza/operator/helper/input.go b/pkg/stanza/operator/helper/input.go index 8c19a033caaa..da83ca061ab4 100644 --- a/pkg/stanza/operator/helper/input.go +++ b/pkg/stanza/operator/helper/input.go @@ -94,9 +94,9 @@ func (i *InputOperator) CanProcess() bool { } // Process will always return an error if called. -func (i *InputOperator) Process(ctx context.Context, entry *entry.Entry) error { +func (i *InputOperator) Process(ctx context.Context, entry *entry.Entry) (int, error) { i.Errorw("Operator received an entry, but can not process", zap.Any("entry", entry)) - return errors.NewError( + return 0, errors.NewError( "Operator can not process logs.", "Ensure that operator is not configured to receive logs from other operators", ) diff --git a/pkg/stanza/operator/helper/input_test.go b/pkg/stanza/operator/helper/input_test.go index 0e314af7a096..adb05e5926c4 100644 --- a/pkg/stanza/operator/helper/input_test.go +++ b/pkg/stanza/operator/helper/input_test.go @@ -90,8 +90,9 @@ func TestInputOperatorProcess(t *testing.T) { } entry := entry.New() ctx := context.Background() - err := input.Process(ctx, entry) + processed, err := input.Process(ctx, entry) require.Error(t, err) + require.Equal(t, 0, processed) require.Equal(t, err.Error(), "Operator can not process logs.") } diff --git a/pkg/stanza/operator/helper/parser.go b/pkg/stanza/operator/helper/parser.go index e41577032aa6..3ebb8329a79f 100644 --- a/pkg/stanza/operator/helper/parser.go +++ b/pkg/stanza/operator/helper/parser.go @@ -105,37 +105,38 @@ type ParserOperator struct { } // ProcessWith will run ParseWith on the entry, then forward the entry on to the next operators. -func (p *ParserOperator) ProcessWith(ctx context.Context, entry *entry.Entry, parse ParseFunction) error { +// Returns number of entries being processed by pipeline and error eventually. +func (p *ParserOperator) ProcessWith(ctx context.Context, entry *entry.Entry, parse ParseFunction) (int, error) { return p.ProcessWithCallback(ctx, entry, parse, nil) } -func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.Entry, parse ParseFunction, cb func(*entry.Entry) error) error { +func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.Entry, parse ParseFunction, cb func(*entry.Entry) error) (int, error) { // Short circuit if the "if" condition does not match skip, err := p.Skip(ctx, entry) if err != nil { return p.HandleEntryError(ctx, entry, err) } if skip { - p.Write(ctx, entry) - return nil + return p.Write(ctx, entry), nil } - if err = p.ParseWith(ctx, entry, parse); err != nil { - return err + var processed int + if processed, err = p.ParseWith(ctx, entry, parse); err != nil { + return processed, err } if cb != nil { err = cb(entry) if err != nil { - return err + return 0, err } } - p.Write(ctx, entry) - return nil + return p.Write(ctx, entry), nil } // ParseWith will process an entry's field with a parser function. -func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, parse ParseFunction) error { +// Returns number of entries being processed by pipeline and error eventually. +func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, parse ParseFunction) (int, error) { value, ok := entry.Get(p.ParseFrom) if !ok { err := errors.NewError( @@ -194,7 +195,7 @@ func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, pars if scopeNameParserErr != nil { return p.HandleEntryError(ctx, entry, errors.Wrap(scopeNameParserErr, "scope_name parser")) } - return nil + return 0, nil } // ParseFunction is function that parses a raw value. diff --git a/pkg/stanza/operator/helper/parser_test.go b/pkg/stanza/operator/helper/parser_test.go index 20bf795081d1..919b96ef4972 100644 --- a/pkg/stanza/operator/helper/parser_test.go +++ b/pkg/stanza/operator/helper/parser_test.go @@ -126,9 +126,10 @@ func TestParserMissingField(t *testing.T) { } ctx := context.Background() testEntry := entry.New() - err := parser.ProcessWith(ctx, testEntry, parse) + processed, err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "Entry is missing the expected parse_from field.") + require.Equal(t, 0, processed) } func TestParserInvalidParseDrop(t *testing.T) { @@ -145,9 +146,10 @@ func TestParserInvalidParseDrop(t *testing.T) { } ctx := context.Background() testEntry := entry.New() - err := parser.ProcessWith(ctx, testEntry, parse) + processed, err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "parse failure") + require.Equal(t, 0, processed) fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } @@ -165,10 +167,11 @@ func TestParserInvalidParseSend(t *testing.T) { } ctx := context.Background() testEntry := entry.New() - err := parser.ProcessWith(ctx, testEntry, parse) + processed, err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "parse failure") fakeOut.ExpectEntry(t, testEntry) + require.Equal(t, 1, processed) fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } @@ -193,9 +196,10 @@ func TestParserInvalidTimeParseDrop(t *testing.T) { } ctx := context.Background() testEntry := entry.New() - err := parser.ProcessWith(ctx, testEntry, parse) + processed, err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field") + require.Equal(t, 0, processed) fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } @@ -220,9 +224,10 @@ func TestParserInvalidTimeParseSend(t *testing.T) { } ctx := context.Background() testEntry := entry.New() - err := parser.ProcessWith(ctx, testEntry, parse) + processed, err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field") + require.Equal(t, 1, processed) fakeOut.ExpectEntry(t, testEntry) fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } @@ -244,9 +249,10 @@ func TestParserInvalidSeverityParseDrop(t *testing.T) { } ctx := context.Background() testEntry := entry.New() - err := parser.ProcessWith(ctx, testEntry, parse) + processed, err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field") + require.Equal(t, 0, processed) fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } @@ -285,9 +291,10 @@ func TestParserInvalidTimeValidSeverityParse(t *testing.T) { err := testEntry.Set(entry.NewBodyField("severity"), "info") require.NoError(t, err) - err = parser.ProcessWith(ctx, testEntry, parse) + processed, err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field") + require.Equal(t, 0, processed) // But, this should have been set anyways require.Equal(t, entry.Info, testEntry.Severity) @@ -337,9 +344,10 @@ func TestParserValidTimeInvalidSeverityParse(t *testing.T) { err = testEntry.Set(entry.NewBodyField("timestamp"), sample) require.NoError(t, err) - err = parser.ProcessWith(ctx, testEntry, parse) + processed, err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field") + require.Equal(t, 0, processed) require.Equal(t, expected, testEntry.Timestamp) } @@ -347,7 +355,7 @@ func TestParserValidTimeInvalidSeverityParse(t *testing.T) { func TestParserOutput(t *testing.T) { output := &testutil.Operator{} output.On("ID").Return("test-output") - output.On("Process", mock.Anything, mock.Anything).Return(nil) + output.On("Process", mock.Anything, mock.Anything).Return(1, nil) parser := ParserOperator{ TransformerOperator: TransformerOperator{ @@ -369,9 +377,10 @@ func TestParserOutput(t *testing.T) { } ctx := context.Background() testEntry := entry.New() - err := parser.ProcessWith(ctx, testEntry, parse) + processed, err := parser.ProcessWith(ctx, testEntry, parse) require.NoError(t, err) output.AssertCalled(t, "Process", mock.Anything, mock.Anything) + require.Equal(t, 1, processed) } func TestParserFields(t *testing.T) { @@ -642,10 +651,11 @@ func TestParserFields(t *testing.T) { require.NoError(t, err) e := tc.input() - err = parser.ProcessWith(context.Background(), e, parse) + processed, err := parser.ProcessWith(context.Background(), e, parse) require.NoError(t, err) require.Equal(t, tc.output(), e) + require.Equal(t, 0, processed) }) } } diff --git a/pkg/stanza/operator/helper/transformer.go b/pkg/stanza/operator/helper/transformer.go index 290d73555b6e..ac89e67f8af1 100644 --- a/pkg/stanza/operator/helper/transformer.go +++ b/pkg/stanza/operator/helper/transformer.go @@ -87,31 +87,30 @@ func (t *TransformerOperator) CanProcess() bool { } // ProcessWith will process an entry with a transform function. -func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entry, transform TransformFunction) error { +// Returns number of entries being processed by pipeline and error eventually. +func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entry, transform TransformFunction) (int, error) { // Short circuit if the "if" condition does not match skip, err := t.Skip(ctx, entry) if err != nil { return t.HandleEntryError(ctx, entry, err) } if skip { - t.Write(ctx, entry) - return nil + return t.Write(ctx, entry), nil } if err := transform(entry); err != nil { return t.HandleEntryError(ctx, entry, err) } - t.Write(ctx, entry) - return nil + return t.Write(ctx, entry), nil } // HandleEntryError will handle an entry error using the on_error strategy. -func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry.Entry, err error) error { +func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry.Entry, err error) (int, error) { t.Errorw("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError), zap.Any("entry", entry)) if t.OnError == SendOnError { - t.Write(ctx, entry) + return t.Write(ctx, entry), err } - return err + return 0, err } func (t *TransformerOperator) Skip(ctx context.Context, entry *entry.Entry) (bool, error) { diff --git a/pkg/stanza/operator/helper/transformer_test.go b/pkg/stanza/operator/helper/transformer_test.go index e895f8be637b..e109e47240ac 100644 --- a/pkg/stanza/operator/helper/transformer_test.go +++ b/pkg/stanza/operator/helper/transformer_test.go @@ -73,7 +73,7 @@ func TestTransformerOperatorCanProcess(t *testing.T) { func TestTransformerDropOnError(t *testing.T) { output := &testutil.Operator{} output.On("ID").Return("test-output") - output.On("Process", mock.Anything, mock.Anything).Return(nil) + output.On("Process", mock.Anything, mock.Anything).Return(1, nil) transformer := TransformerOperator{ OnError: DropOnError, WriterOperator: WriterOperator{ @@ -92,15 +92,16 @@ func TestTransformerDropOnError(t *testing.T) { return fmt.Errorf("Failure") } - err := transformer.ProcessWith(ctx, testEntry, transform) + processed, err := transformer.ProcessWith(ctx, testEntry, transform) require.Error(t, err) + require.Equal(t, 0, processed) output.AssertNotCalled(t, "Process", mock.Anything, mock.Anything) } func TestTransformerSendOnError(t *testing.T) { output := &testutil.Operator{} output.On("ID").Return("test-output") - output.On("Process", mock.Anything, mock.Anything).Return(nil) + output.On("Process", mock.Anything, mock.Anything).Return(1, nil) transformer := TransformerOperator{ OnError: SendOnError, WriterOperator: WriterOperator{ @@ -119,15 +120,16 @@ func TestTransformerSendOnError(t *testing.T) { return fmt.Errorf("Failure") } - err := transformer.ProcessWith(ctx, testEntry, transform) + processed, err := transformer.ProcessWith(ctx, testEntry, transform) require.Error(t, err) + require.Equal(t, 1, processed) output.AssertCalled(t, "Process", mock.Anything, mock.Anything) } func TestTransformerProcessWithValid(t *testing.T) { output := &testutil.Operator{} output.On("ID").Return("test-output") - output.On("Process", mock.Anything, mock.Anything).Return(nil) + output.On("Process", mock.Anything, mock.Anything).Return(1, nil) transformer := TransformerOperator{ OnError: SendOnError, WriterOperator: WriterOperator{ @@ -146,8 +148,9 @@ func TestTransformerProcessWithValid(t *testing.T) { return nil } - err := transformer.ProcessWith(ctx, testEntry, transform) + processed, err := transformer.ProcessWith(ctx, testEntry, transform) require.NoError(t, err) + require.Equal(t, 1, processed) output.AssertCalled(t, "Process", mock.Anything, mock.Anything) } @@ -211,7 +214,7 @@ func TestTransformerIf(t *testing.T) { e := entry.New() e.Body = tc.inputBody - err = transformer.ProcessWith(context.Background(), e, func(e *entry.Entry) error { + processed, err := transformer.ProcessWith(context.Background(), e, func(e *entry.Entry) error { e.Body = "parsed" return nil }) @@ -220,6 +223,7 @@ func TestTransformerIf(t *testing.T) { } else { require.NoError(t, err) } + require.Equal(t, 1, processed) fake.ExpectBody(t, tc.expected) }) diff --git a/pkg/stanza/operator/helper/writer.go b/pkg/stanza/operator/helper/writer.go index a2accb4c52e8..74894625d1aa 100644 --- a/pkg/stanza/operator/helper/writer.go +++ b/pkg/stanza/operator/helper/writer.go @@ -58,14 +58,20 @@ type WriterOperator struct { } // Write will write an entry to the outputs of the operator. -func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) { +// Returns number of entries being processed by pipeline +func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) int { + emitted := 0 for i, operator := range w.OutputOperators { if i == len(w.OutputOperators)-1 { - _ = operator.Process(ctx, e) - return + ok, _ := operator.Process(ctx, e) + emitted += ok + return emitted } - _ = operator.Process(ctx, e.Copy()) + ok, _ := operator.Process(ctx, e.Copy()) + + emitted += ok } + return emitted } // CanOutput always returns true for a writer operator. diff --git a/pkg/stanza/operator/helper/writer_test.go b/pkg/stanza/operator/helper/writer_test.go index c067dfd79fb3..3f84e6a0877e 100644 --- a/pkg/stanza/operator/helper/writer_test.go +++ b/pkg/stanza/operator/helper/writer_test.go @@ -51,9 +51,9 @@ func TestWriterConfigValidBuild(t *testing.T) { func TestWriterOperatorWrite(t *testing.T) { output1 := &testutil.Operator{} - output1.On("Process", mock.Anything, mock.Anything).Return(nil) + output1.On("Process", mock.Anything, mock.Anything).Return(1, nil) output2 := &testutil.Operator{} - output2.On("Process", mock.Anything, mock.Anything).Return(nil) + output2.On("Process", mock.Anything, mock.Anything).Return(1, nil) writer := WriterOperator{ OutputOperators: []operator.Operator{output1, output2}, } @@ -61,7 +61,9 @@ func TestWriterOperatorWrite(t *testing.T) { ctx := context.Background() testEntry := entry.New() - writer.Write(ctx, testEntry) + processed := writer.Write(ctx, testEntry) + + require.Equal(t, 2, processed) output1.AssertCalled(t, "Process", ctx, mock.Anything) output2.AssertCalled(t, "Process", ctx, mock.Anything) } @@ -73,9 +75,9 @@ func TestWriterOperatorCanOutput(t *testing.T) { func TestWriterOperatorOutputs(t *testing.T) { output1 := &testutil.Operator{} - output1.On("Process", mock.Anything, mock.Anything).Return(nil) + output1.On("Process", mock.Anything, mock.Anything).Return(1, nil) output2 := &testutil.Operator{} - output2.On("Process", mock.Anything, mock.Anything).Return(nil) + output2.On("Process", mock.Anything, mock.Anything).Return(1, nil) writer := WriterOperator{ OutputOperators: []operator.Operator{output1, output2}, } @@ -83,7 +85,9 @@ func TestWriterOperatorOutputs(t *testing.T) { ctx := context.Background() testEntry := entry.New() - writer.Write(ctx, testEntry) + processed := writer.Write(ctx, testEntry) + + require.Equal(t, 2, processed) output1.AssertCalled(t, "Process", ctx, mock.Anything) output2.AssertCalled(t, "Process", ctx, mock.Anything) } @@ -159,3 +163,41 @@ func TestUnmarshalWriterConfig(t *testing.T) { }, }.Run(t) } + +func TestWriterWithDroppingData(t *testing.T) { + output1 := &testutil.Operator{} + output1.On("Process", mock.Anything, mock.Anything).Return(0, nil) + output2 := &testutil.Operator{} + output2.On("Process", mock.Anything, mock.Anything).Return(3, nil) + writer := WriterOperator{ + OutputOperators: []operator.Operator{output1, output2}, + } + + ctx := context.Background() + testEntry := entry.New() + + processed := writer.Write(ctx, testEntry) + + require.Equal(t, 3, processed) + output1.AssertCalled(t, "Process", ctx, mock.Anything) + output2.AssertCalled(t, "Process", ctx, mock.Anything) +} + +func TestWriterWithMultipleEntries(t *testing.T) { + output1 := &testutil.Operator{} + output1.On("Process", mock.Anything, mock.Anything).Return(7, nil) + output2 := &testutil.Operator{} + output2.On("Process", mock.Anything, mock.Anything).Return(3, nil) + writer := WriterOperator{ + OutputOperators: []operator.Operator{output1, output2}, + } + + ctx := context.Background() + testEntry := entry.New() + + processed := writer.Write(ctx, testEntry) + + require.Equal(t, 10, processed) + output1.AssertCalled(t, "Process", ctx, mock.Anything) + output2.AssertCalled(t, "Process", ctx, mock.Anything) +} diff --git a/pkg/stanza/operator/input/journald/journald_test.go b/pkg/stanza/operator/input/journald/journald_test.go index 1f37dbcd7c05..c613f8d1ecd3 100644 --- a/pkg/stanza/operator/input/journald/journald_test.go +++ b/pkg/stanza/operator/input/journald/journald_test.go @@ -56,7 +56,7 @@ func TestInputJournald(t *testing.T) { received := make(chan *entry.Entry) mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { received <- args.Get(1).(*entry.Entry) - }).Return(nil) + }).Return(1, nil) err = op.SetOutputs([]operator.Operator{mockOutput}) require.NoError(t, err) diff --git a/pkg/stanza/operator/input/tcp/tcp_test.go b/pkg/stanza/operator/input/tcp/tcp_test.go index 64162a60aa04..e78833633534 100644 --- a/pkg/stanza/operator/input/tcp/tcp_test.go +++ b/pkg/stanza/operator/input/tcp/tcp_test.go @@ -99,7 +99,7 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) { entryChan := make(chan *entry.Entry, 1) mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { entryChan <- args.Get(1).(*entry.Entry) - }).Return(nil) + }).Return(1, nil) err = tcpInput.Start(testutil.NewMockPersister("test")) require.NoError(t, err) @@ -148,7 +148,7 @@ func tcpInputAttributesTest(input []byte, expected []string) func(t *testing.T) entryChan := make(chan *entry.Entry, 1) mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { entryChan <- args.Get(1).(*entry.Entry) - }).Return(nil) + }).Return(1, nil) err = tcpInput.Start(testutil.NewMockPersister("test")) require.NoError(t, err) @@ -234,7 +234,7 @@ func tlsInputTest(input []byte, expected []string) func(t *testing.T) { entryChan := make(chan *entry.Entry, 1) mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { entryChan <- args.Get(1).(*entry.Entry) - }).Return(nil) + }).Return(1, nil) err = tcpInput.Start(testutil.NewMockPersister("test")) require.NoError(t, err) @@ -407,7 +407,7 @@ func TestFailToBind(t *testing.T) { entryChan := make(chan *entry.Entry, 1) mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { entryChan <- args.Get(1).(*entry.Entry) - }).Return(nil) + }).Return(1, nil) err = tcpInput.Start(testutil.NewMockPersister("test")) return tcpInput, err } diff --git a/pkg/stanza/operator/input/udp/udp_test.go b/pkg/stanza/operator/input/udp/udp_test.go index 0fcdb1b068be..1991234bffce 100644 --- a/pkg/stanza/operator/input/udp/udp_test.go +++ b/pkg/stanza/operator/input/udp/udp_test.go @@ -46,7 +46,7 @@ func udpInputTest(input []byte, expected []string) func(t *testing.T) { entryChan := make(chan *entry.Entry, 1) mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { entryChan <- args.Get(1).(*entry.Entry) - }).Return(nil) + }).Return(1, nil) err = udpInput.Start(testutil.NewMockPersister("test")) require.NoError(t, err) @@ -97,7 +97,7 @@ func udpInputAttributesTest(input []byte, expected []string) func(t *testing.T) entryChan := make(chan *entry.Entry, 1) mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { entryChan <- args.Get(1).(*entry.Entry) - }).Return(nil) + }).Return(1, nil) err = udpInput.Start(testutil.NewMockPersister("test")) require.NoError(t, err) @@ -195,7 +195,7 @@ func TestFailToBind(t *testing.T) { entryChan := make(chan *entry.Entry, 1) mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { entryChan <- args.Get(1).(*entry.Entry) - }).Return(nil) + }).Return(1, nil) err = udpInput.Start(testutil.NewMockPersister("test")) return udpInput, err diff --git a/pkg/stanza/operator/operator.go b/pkg/stanza/operator/operator.go index cf5a309e4557..65cacf419587 100644 --- a/pkg/stanza/operator/operator.go +++ b/pkg/stanza/operator/operator.go @@ -48,7 +48,8 @@ type Operator interface { // CanProcess indicates if the operator will process entries from other operators. CanProcess() bool // Process will process an entry from an operator. - Process(context.Context, *entry.Entry) error + // Returns number of entries being processed by pipeline and error eventually. + Process(context.Context, *entry.Entry) (int, error) // Logger returns the operator's logger Logger() *zap.SugaredLogger } diff --git a/pkg/stanza/operator/output/drop/drop.go b/pkg/stanza/operator/output/drop/drop.go index e5adbc5fc065..74aa013e9482 100644 --- a/pkg/stanza/operator/output/drop/drop.go +++ b/pkg/stanza/operator/output/drop/drop.go @@ -58,6 +58,6 @@ type Output struct { } // Process will drop the incoming entry. -func (p *Output) Process(ctx context.Context, entry *entry.Entry) error { - return nil +func (p *Output) Process(ctx context.Context, entry *entry.Entry) (int, error) { + return 0, nil } diff --git a/pkg/stanza/operator/output/drop/drop_test.go b/pkg/stanza/operator/output/drop/drop_test.go index a7f961a4a548..dc81faef4813 100644 --- a/pkg/stanza/operator/output/drop/drop_test.go +++ b/pkg/stanza/operator/output/drop/drop_test.go @@ -44,6 +44,7 @@ func TestProcess(t *testing.T) { require.NoError(t, err) entry := entry.New() - result := op.Process(context.Background(), entry) - require.Nil(t, result) + processed, err := op.Process(context.Background(), entry) + require.Nil(t, err) + require.Equal(t, 0, processed) } diff --git a/pkg/stanza/operator/output/file/file.go b/pkg/stanza/operator/output/file/file.go index bcb01380c39b..f774a633f373 100644 --- a/pkg/stanza/operator/output/file/file.go +++ b/pkg/stanza/operator/output/file/file.go @@ -110,21 +110,21 @@ func (fo *Output) Stop() error { } // Process will write an entry to the output file. -func (fo *Output) Process(ctx context.Context, entry *entry.Entry) error { +func (fo *Output) Process(ctx context.Context, entry *entry.Entry) (int, error) { fo.mux.Lock() defer fo.mux.Unlock() if fo.tmpl != nil { err := fo.tmpl.Execute(fo.file, entry) if err != nil { - return err + return 0, err } } else { err := fo.encoder.Encode(entry) if err != nil { - return err + return 0, err } } - return nil + return 0, nil } diff --git a/pkg/stanza/operator/output/stdout/stdout.go b/pkg/stanza/operator/output/stdout/stdout.go index 1b08f984c30e..1749b64ffa64 100644 --- a/pkg/stanza/operator/output/stdout/stdout.go +++ b/pkg/stanza/operator/output/stdout/stdout.go @@ -68,14 +68,14 @@ type Output struct { } // Process will log entries received. -func (o *Output) Process(ctx context.Context, entry *entry.Entry) error { +func (o *Output) Process(ctx context.Context, entry *entry.Entry) (int, error) { o.mux.Lock() err := o.encoder.Encode(entry) if err != nil { o.mux.Unlock() o.Errorf("Failed to process entry: %s, $s", err, entry.Body) - return err + return 0, err } o.mux.Unlock() - return nil + return 0, nil } diff --git a/pkg/stanza/operator/output/stdout/stdout_test.go b/pkg/stanza/operator/output/stdout/stdout_test.go index 96e1ac5156d0..cf37885cc782 100644 --- a/pkg/stanza/operator/output/stdout/stdout_test.go +++ b/pkg/stanza/operator/output/stdout/stdout_test.go @@ -51,8 +51,9 @@ func TestOperator(t *testing.T) { Timestamp: ts, Body: "test body", } - err = op.Process(context.Background(), e) + processed, err := op.Process(context.Background(), e) require.NoError(t, err) + require.Equal(t, 0, processed) marshalledOTS, err := json.Marshal(ots) require.NoError(t, err) diff --git a/pkg/stanza/operator/parser/csv/csv.go b/pkg/stanza/operator/parser/csv/csv.go index eec7a106ecbd..432097a1713d 100644 --- a/pkg/stanza/operator/parser/csv/csv.go +++ b/pkg/stanza/operator/parser/csv/csv.go @@ -115,7 +115,7 @@ type Parser struct { type parseFunc func(interface{}) (interface{}, error) // Process will parse an entry for csv. -func (r *Parser) Process(ctx context.Context, e *entry.Entry) error { +func (r *Parser) Process(ctx context.Context, e *entry.Entry) (int, error) { parse := r.parse // If we have a headerAttribute set we need to dynamically generate our parser function @@ -124,13 +124,13 @@ func (r *Parser) Process(ctx context.Context, e *entry.Entry) error { if !ok { err := fmt.Errorf("failed to read dynamic header attribute %s", r.headerAttribute) r.Error(err) - return err + return 0, err } headerString, ok := h.(string) if !ok { err := fmt.Errorf("header is expected to be a string but is %T", h) r.Error(err) - return err + return 0, err } headers := strings.Split(headerString, string([]rune{r.fieldDelimiter})) parse = generateParseFunc(headers, r.fieldDelimiter, r.lazyQuotes, r.ignoreQuotes) diff --git a/pkg/stanza/operator/parser/csv/csv_test.go b/pkg/stanza/operator/parser/csv/csv_test.go index 45e788aca570..fbe9a3c6d8ca 100644 --- a/pkg/stanza/operator/parser/csv/csv_test.go +++ b/pkg/stanza/operator/parser/csv/csv_test.go @@ -344,16 +344,7 @@ func TestParserCSV(t *testing.T) { Body: "stanza dev,1,400,555-555-5555", }, }, - []entry.Entry{ - { - Body: map[string]interface{}{ - "name": "stanza dev", - "age": "1", - "height": "400", - "number": "555-555-5555", - }, - }, - }, + []entry.Entry{}, false, true, }, @@ -737,11 +728,13 @@ func TestParserCSV(t *testing.T) { for i := range tc.inputEntries { inputEntry := tc.inputEntries[i] inputEntry.ObservedTimestamp = ots - err = op.Process(context.Background(), &inputEntry) + processed, err := op.Process(context.Background(), &inputEntry) if tc.expectProcessErr { require.Error(t, err) + require.Equal(t, len(tc.expectedEntries), processed) return } + require.Equal(t, 1, processed) require.NoError(t, err) expectedEntry := tc.expectedEntries[i] @@ -975,8 +968,9 @@ cc""",dddd,eeee`, entry := entry.New() entry.Body = tc.input - err = op.Process(context.Background(), entry) + processed, err := op.Process(context.Background(), entry) require.NoError(t, err) + require.Equal(t, 1, processed) fake.ExpectBody(t, tc.expected) fake.ExpectNoEntry(t, 100*time.Millisecond) }) @@ -997,8 +991,9 @@ func TestParserCSVInvalidJSONInput(t *testing.T) { entry := entry.New() entry.Body = "{\"name\": \"stanza\"}" - err = op.Process(context.Background(), entry) + processed, err := op.Process(context.Background(), entry) require.Error(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field") + require.Equal(t, 1, processed) fake.ExpectBody(t, "{\"name\": \"stanza\"}") }) } diff --git a/pkg/stanza/operator/parser/json/json.go b/pkg/stanza/operator/parser/json/json.go index 276953c71d17..368a501f4712 100644 --- a/pkg/stanza/operator/parser/json/json.go +++ b/pkg/stanza/operator/parser/json/json.go @@ -69,7 +69,7 @@ type Parser struct { } // Process will parse an entry for JSON. -func (j *Parser) Process(ctx context.Context, entry *entry.Entry) error { +func (j *Parser) Process(ctx context.Context, entry *entry.Entry) (int, error) { return j.ParserOperator.ProcessWith(ctx, entry, j.parse) } diff --git a/pkg/stanza/operator/parser/json/json_test.go b/pkg/stanza/operator/parser/json/json_test.go index 0384a966415c..6310cc6c1ddc 100644 --- a/pkg/stanza/operator/parser/json/json_test.go +++ b/pkg/stanza/operator/parser/json/json_test.go @@ -164,8 +164,9 @@ func TestParser(t *testing.T) { tc.input.ObservedTimestamp = ots tc.expect.ObservedTimestamp = ots - err = op.Process(context.Background(), tc.input) + processed, err := op.Process(context.Background(), tc.input) require.NoError(t, err) + require.Equal(t, 1, processed) fake.ExpectEntry(t, tc.expect) }) } diff --git a/pkg/stanza/operator/parser/keyvalue/keyvalue.go b/pkg/stanza/operator/parser/keyvalue/keyvalue.go index 4f30ce2c640f..4bc601dee663 100644 --- a/pkg/stanza/operator/parser/keyvalue/keyvalue.go +++ b/pkg/stanza/operator/parser/keyvalue/keyvalue.go @@ -94,7 +94,7 @@ type Parser struct { } // Process will parse an entry for key value pairs. -func (kv *Parser) Process(ctx context.Context, entry *entry.Entry) error { +func (kv *Parser) Process(ctx context.Context, entry *entry.Entry) (int, error) { return kv.ParserOperator.ProcessWith(ctx, entry, kv.parse) } diff --git a/pkg/stanza/operator/parser/keyvalue/keyvalue_test.go b/pkg/stanza/operator/parser/keyvalue/keyvalue_test.go index 15de27094e33..803659534cd2 100644 --- a/pkg/stanza/operator/parser/keyvalue/keyvalue_test.go +++ b/pkg/stanza/operator/parser/keyvalue/keyvalue_test.go @@ -570,7 +570,9 @@ key=value`, tc.input.ObservedTimestamp = ots tc.expect.ObservedTimestamp = ots - err = op.Process(context.Background(), tc.input) + processed, err := op.Process(context.Background(), tc.input) + require.Equal(t, 1, processed) + if tc.expectError { require.Error(t, err) return diff --git a/pkg/stanza/operator/parser/regex/regex.go b/pkg/stanza/operator/parser/regex/regex.go index cdf335010a9a..8d2470056104 100644 --- a/pkg/stanza/operator/parser/regex/regex.go +++ b/pkg/stanza/operator/parser/regex/regex.go @@ -106,7 +106,7 @@ type Parser struct { } // Process will parse an entry for regex. -func (r *Parser) Process(ctx context.Context, entry *entry.Entry) error { +func (r *Parser) Process(ctx context.Context, entry *entry.Entry) (int, error) { return r.ParserOperator.ProcessWith(ctx, entry, r.parse) } diff --git a/pkg/stanza/operator/parser/regex/regex_test.go b/pkg/stanza/operator/parser/regex/regex_test.go index e576d31e3162..ea3655cfe8d8 100644 --- a/pkg/stanza/operator/parser/regex/regex_test.go +++ b/pkg/stanza/operator/parser/regex/regex_test.go @@ -153,8 +153,9 @@ func TestParserRegex(t *testing.T) { tc.input.ObservedTimestamp = ots tc.expected.ObservedTimestamp = ots - err = op.Process(context.Background(), tc.input) + processed, err := op.Process(context.Background(), tc.input) require.NoError(t, err) + require.Equal(t, 1, processed) fake.ExpectEntry(t, tc.expected) }) diff --git a/pkg/stanza/operator/parser/scope/scope_name.go b/pkg/stanza/operator/parser/scope/scope_name.go index c43fc4cc3132..bf0b3b412db7 100644 --- a/pkg/stanza/operator/parser/scope/scope_name.go +++ b/pkg/stanza/operator/parser/scope/scope_name.go @@ -69,6 +69,6 @@ type Parser struct { } // Process will parse logger name from an entry. -func (p *Parser) Process(ctx context.Context, entry *entry.Entry) error { +func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (int, error) { return p.ProcessWith(ctx, entry, p.Parse) } diff --git a/pkg/stanza/operator/parser/scope/scope_name_test.go b/pkg/stanza/operator/parser/scope/scope_name_test.go index 5e5e555be065..cf017df1425d 100644 --- a/pkg/stanza/operator/parser/scope/scope_name_test.go +++ b/pkg/stanza/operator/parser/scope/scope_name_test.go @@ -106,13 +106,14 @@ func TestScopeNameParser(t *testing.T) { parser, err := tc.config.Build(testutil.Logger(t)) require.NoError(t, err) - err = parser.Process(context.Background(), tc.input) + processed, err := parser.Process(context.Background(), tc.input) if tc.expectErr { require.Error(t, err) } if tc.expected != nil { require.Equal(t, tc.expected, tc.input) } + require.Equal(t, 0, processed) }) } } diff --git a/pkg/stanza/operator/parser/severity/severity.go b/pkg/stanza/operator/parser/severity/severity.go index c54d3493d602..9f61fa7d2cb6 100644 --- a/pkg/stanza/operator/parser/severity/severity.go +++ b/pkg/stanza/operator/parser/severity/severity.go @@ -74,6 +74,6 @@ type Parser struct { } // Process will parse severity from an entry. -func (p *Parser) Process(ctx context.Context, entry *entry.Entry) error { +func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (int, error) { return p.ProcessWith(ctx, entry, p.Parse) } diff --git a/pkg/stanza/operator/parser/severity/severity_test.go b/pkg/stanza/operator/parser/severity/severity_test.go index a2dbff533187..4ab6ac3e86f1 100644 --- a/pkg/stanza/operator/parser/severity/severity_test.go +++ b/pkg/stanza/operator/parser/severity/severity_test.go @@ -244,7 +244,7 @@ func runSeverityParseTest(cfg *Config, ent *entry.Entry, buildErr bool, parseErr resultChan := make(chan *entry.Entry, 1) mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { resultChan <- args.Get(1).(*entry.Entry) - }).Return(nil) + }).Return(1, nil) severityParser := op.(*Parser) severityParser.OutputOperators = []operator.Operator{mockOutput} diff --git a/pkg/stanza/operator/parser/syslog/syslog.go b/pkg/stanza/operator/parser/syslog/syslog.go index 43af6747b2e9..b556cb270a84 100644 --- a/pkg/stanza/operator/parser/syslog/syslog.go +++ b/pkg/stanza/operator/parser/syslog/syslog.go @@ -160,7 +160,7 @@ type Parser struct { } // Process will parse an entry field as syslog. -func (s *Parser) Process(ctx context.Context, entry *entry.Entry) error { +func (s *Parser) Process(ctx context.Context, entry *entry.Entry) (int, error) { return s.ParserOperator.ProcessWithCallback(ctx, entry, s.parse, postprocess) } diff --git a/pkg/stanza/operator/parser/syslog/syslog_test.go b/pkg/stanza/operator/parser/syslog/syslog_test.go index bfe3615e3bab..4e9a23346f5a 100644 --- a/pkg/stanza/operator/parser/syslog/syslog_test.go +++ b/pkg/stanza/operator/parser/syslog/syslog_test.go @@ -48,8 +48,9 @@ func TestParser(t *testing.T) { newEntry := tc.Input ots := newEntry.ObservedTimestamp - err = op.Process(context.Background(), newEntry) + processed, err := op.Process(context.Background(), newEntry) require.NoError(t, err) + require.Equal(t, 1, processed) select { case e := <-fake.Received: @@ -77,9 +78,10 @@ func TestSyslogParseRFC5424_SDNameTooLong(t *testing.T) { newEntry := entry.New() newEntry.Body = body - err = op.Process(context.Background(), newEntry) + processed, err := op.Process(context.Background(), newEntry) require.Error(t, err) require.Contains(t, err.Error(), "expecting a structured data element id (from 1 to max 32 US-ASCII characters") + require.Equal(t, 1, processed) select { case e := <-fake.Received: diff --git a/pkg/stanza/operator/parser/time/time.go b/pkg/stanza/operator/parser/time/time.go index e88e70c5e81c..866310d9e513 100644 --- a/pkg/stanza/operator/parser/time/time.go +++ b/pkg/stanza/operator/parser/time/time.go @@ -73,6 +73,6 @@ type Parser struct { } // Process will parse time from an entry. -func (t *Parser) Process(ctx context.Context, entry *entry.Entry) error { +func (t *Parser) Process(ctx context.Context, entry *entry.Entry) (int, error) { return t.ProcessWith(ctx, entry, t.TimeParser.Parse) } diff --git a/pkg/stanza/operator/parser/time/time_test.go b/pkg/stanza/operator/parser/time/time_test.go index fbf4559b9c33..55ab8ef22489 100644 --- a/pkg/stanza/operator/parser/time/time_test.go +++ b/pkg/stanza/operator/parser/time/time_test.go @@ -140,9 +140,10 @@ func TestProcess(t *testing.T) { require.True(t, op.CanOutput(), "expected test operator CanOutput to return true") - err = op.Process(context.Background(), tc.input) + processed, err := op.Process(context.Background(), tc.input) require.NoError(t, err) require.Equal(t, tc.expect, tc.input) + require.Equal(t, 0, processed) }) } } @@ -523,7 +524,7 @@ func runLossyTimeParseTest(_ *testing.T, cfg *Config, ent *entry.Entry, buildErr resultChan := make(chan *entry.Entry, 1) mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { resultChan <- args.Get(1).(*entry.Entry) - }).Return(nil) + }).Return(1, nil) timeParser := op.(*Parser) timeParser.OutputOperators = []operator.Operator{mockOutput} diff --git a/pkg/stanza/operator/parser/trace/trace.go b/pkg/stanza/operator/parser/trace/trace.go index a7046c5925ea..531dda84b863 100644 --- a/pkg/stanza/operator/parser/trace/trace.go +++ b/pkg/stanza/operator/parser/trace/trace.go @@ -73,6 +73,6 @@ type Parser struct { } // Process will parse traces from an entry. -func (p *Parser) Process(ctx context.Context, entry *entry.Entry) error { +func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (int, error) { return p.ProcessWith(ctx, entry, p.Parse) } diff --git a/pkg/stanza/operator/parser/trace/trace_test.go b/pkg/stanza/operator/parser/trace/trace_test.go index 550c722893d9..7728ce5cc1d1 100644 --- a/pkg/stanza/operator/parser/trace/trace_test.go +++ b/pkg/stanza/operator/parser/trace/trace_test.go @@ -166,9 +166,10 @@ func TestProcess(t *testing.T) { op, err := tc.op() require.NoError(t, err, "did not expect operator function to return an error, this is a bug with the test case") - err = op.Process(context.Background(), tc.input) + processed, err := op.Process(context.Background(), tc.input) require.NoError(t, err) require.Equal(t, tc.expect, tc.input) + require.Equal(t, 0, processed) }) } } diff --git a/pkg/stanza/operator/parser/uri/uri.go b/pkg/stanza/operator/parser/uri/uri.go index 94ac41b14e6a..3170484d81a0 100644 --- a/pkg/stanza/operator/parser/uri/uri.go +++ b/pkg/stanza/operator/parser/uri/uri.go @@ -68,7 +68,7 @@ type Parser struct { } // Process will parse an entry. -func (u *Parser) Process(ctx context.Context, entry *entry.Entry) error { +func (u *Parser) Process(ctx context.Context, entry *entry.Entry) (int, error) { return u.ParserOperator.ProcessWith(ctx, entry, u.parse) } diff --git a/pkg/stanza/operator/parser/uri/uri_test.go b/pkg/stanza/operator/parser/uri/uri_test.go index 6a75d74f50c3..48097c0236e4 100644 --- a/pkg/stanza/operator/parser/uri/uri_test.go +++ b/pkg/stanza/operator/parser/uri/uri_test.go @@ -165,9 +165,10 @@ func TestProcess(t *testing.T) { op, err := tc.op() require.NoError(t, err, "did not expect operator function to return an error, this is a bug with the test case") - err = op.Process(context.Background(), tc.input) + processed, err := op.Process(context.Background(), tc.input) require.NoError(t, err) require.Equal(t, tc.expect, tc.input) + require.Equal(t, 0, processed) }) } } diff --git a/pkg/stanza/operator/transformer/add/add.go b/pkg/stanza/operator/transformer/add/add.go index ff54a8c691af..a40e57ca76eb 100644 --- a/pkg/stanza/operator/transformer/add/add.go +++ b/pkg/stanza/operator/transformer/add/add.go @@ -91,7 +91,7 @@ type Transformer struct { } // Process will process an entry with a add transformation. -func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) error { +func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) (int, error) { return p.ProcessWith(ctx, entry, p.Transform) } diff --git a/pkg/stanza/operator/transformer/add/add_test.go b/pkg/stanza/operator/transformer/add/add_test.go index d37cd6d97ed4..caaf32e66a70 100644 --- a/pkg/stanza/operator/transformer/add/add_test.go +++ b/pkg/stanza/operator/transformer/add/add_test.go @@ -321,13 +321,14 @@ func TestProcessAndBuild(t *testing.T) { fake := testutil.NewFakeOutput(t) require.NoError(t, add.SetOutputs([]operator.Operator{fake})) val := tc.input() - err = add.Process(context.Background(), val) + processed, err := add.Process(context.Background(), val) if tc.expectErr { require.Error(t, err) } else { require.NoError(t, err) fake.ExpectEntry(t, tc.output()) } + require.Equal(t, 1, processed) }) } } diff --git a/pkg/stanza/operator/transformer/copy/copy.go b/pkg/stanza/operator/transformer/copy/copy.go index 2ee5525c70e8..901e746328bf 100644 --- a/pkg/stanza/operator/transformer/copy/copy.go +++ b/pkg/stanza/operator/transformer/copy/copy.go @@ -80,7 +80,7 @@ type Transformer struct { } // Process will process an entry with a copy transformation. -func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) error { +func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) (int, error) { return p.ProcessWith(ctx, entry, p.Transform) } diff --git a/pkg/stanza/operator/transformer/copy/copy_test.go b/pkg/stanza/operator/transformer/copy/copy_test.go index 4aee60f00fd7..590a62d9f3f0 100644 --- a/pkg/stanza/operator/transformer/copy/copy_test.go +++ b/pkg/stanza/operator/transformer/copy/copy_test.go @@ -305,12 +305,14 @@ func TestBuildAndProcess(t *testing.T) { fake := testutil.NewFakeOutput(t) require.NoError(t, copy.SetOutputs([]operator.Operator{fake})) val := tc.input() - err = copy.Process(context.Background(), val) + processed, err := copy.Process(context.Background(), val) if tc.expectErr { require.Error(t, err) + require.Equal(t, 0, processed) } else { require.NoError(t, err) fake.ExpectEntry(t, tc.output()) + require.Equal(t, 1, processed) } }) } diff --git a/pkg/stanza/operator/transformer/filter/filter.go b/pkg/stanza/operator/transformer/filter/filter.go index 384695ab4fb6..d2d15e3b547f 100644 --- a/pkg/stanza/operator/transformer/filter/filter.go +++ b/pkg/stanza/operator/transformer/filter/filter.go @@ -91,35 +91,34 @@ type Transformer struct { } // Process will drop incoming entries that match the filter expression -func (f *Transformer) Process(ctx context.Context, entry *entry.Entry) error { +func (f *Transformer) Process(ctx context.Context, entry *entry.Entry) (int, error) { env := helper.GetExprEnv(entry) defer helper.PutExprEnv(env) matches, err := vm.Run(f.expression, env) if err != nil { f.Errorf("Running expressing returned an error", zap.Error(err)) - return nil + return 0, nil } filtered, ok := matches.(bool) if !ok { f.Errorf("Expression did not compile as a boolean") - return nil + return 0, nil } if !filtered { - f.Write(ctx, entry) - return nil + return f.Write(ctx, entry), nil } i, err := randInt(rand.Reader, upperBound) if err != nil { - return err + return 0, err } if i.Cmp(f.dropCutoff) >= 0 { - f.Write(ctx, entry) + return f.Write(ctx, entry), nil } - return nil + return 0, nil } diff --git a/pkg/stanza/operator/transformer/filter/filter_test.go b/pkg/stanza/operator/transformer/filter/filter_test.go index aadf5291faa5..379d689f7fbf 100644 --- a/pkg/stanza/operator/transformer/filter/filter_test.go +++ b/pkg/stanza/operator/transformer/filter/filter_test.go @@ -175,7 +175,7 @@ func TestTransformer(t *testing.T) { filtered := true mockOutput := testutil.NewMockOperator("output") - mockOutput.On("Process", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + mockOutput.On("Process", mock.Anything, mock.Anything).Return(1, nil).Run(func(args mock.Arguments) { filtered = false }) @@ -183,9 +183,15 @@ func TestTransformer(t *testing.T) { require.True(t, ok) filterOperator.OutputOperators = []operator.Operator{mockOutput} - err = filterOperator.Process(context.Background(), tc.input) + processed, err := filterOperator.Process(context.Background(), tc.input) require.NoError(t, err) + if tc.filtered { + require.Equal(t, 0, processed) + } else { + require.Equal(t, 1, processed) + } + require.Equal(t, tc.filtered, filtered) }) } @@ -200,7 +206,7 @@ func TestFilterDropRatio(t *testing.T) { processedEntries := 0 mockOutput := testutil.NewMockOperator("output") - mockOutput.On("Process", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + mockOutput.On("Process", mock.Anything, mock.Anything).Return(1, nil).Run(func(args mock.Arguments) { processedEntries++ }) @@ -223,15 +229,20 @@ func TestFilterDropRatio(t *testing.T) { return big.NewInt(randos[nextIndex]), nil } + processed := 0 + for i := 1; i < 11; i++ { - err = filterOperator.Process(context.Background(), testEntry) + p, err := filterOperator.Process(context.Background(), testEntry) require.NoError(t, err) + processed += p } for i := 1; i < 11; i++ { - err = filterOperator.Process(context.Background(), testEntry) + p, err := filterOperator.Process(context.Background(), testEntry) require.NoError(t, err) + processed += p } require.Equal(t, 10, processedEntries) + require.Equal(t, 10, processed) } diff --git a/pkg/stanza/operator/transformer/flatten/flatten.go b/pkg/stanza/operator/transformer/flatten/flatten.go index 749845fdc6f8..63ea0859b47e 100644 --- a/pkg/stanza/operator/transformer/flatten/flatten.go +++ b/pkg/stanza/operator/transformer/flatten/flatten.go @@ -75,7 +75,7 @@ type Transformer struct { } // Process will process an entry with a flatten transformation. -func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) error { +func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) (int, error) { return p.ProcessWith(ctx, entry, p.Transform) } diff --git a/pkg/stanza/operator/transformer/flatten/flatten_test.go b/pkg/stanza/operator/transformer/flatten/flatten_test.go index 8d0211aef7ae..d97a319f268e 100644 --- a/pkg/stanza/operator/transformer/flatten/flatten_test.go +++ b/pkg/stanza/operator/transformer/flatten/flatten_test.go @@ -282,13 +282,15 @@ func TestBuildAndProcess(t *testing.T) { fake := testutil.NewFakeOutput(t) require.NoError(t, flatten.SetOutputs([]operator.Operator{fake})) val := tc.input() - err = flatten.Process(context.Background(), val) + processed, err := flatten.Process(context.Background(), val) if tc.expectErr { require.Error(t, err) + require.Equal(t, 0, processed) } else { require.NoError(t, err) fake.ExpectEntry(t, tc.output()) + require.Equal(t, 1, processed) } }) } diff --git a/pkg/stanza/operator/transformer/move/move.go b/pkg/stanza/operator/transformer/move/move.go index 7cfcc12dac4f..b33ddf65b76e 100644 --- a/pkg/stanza/operator/transformer/move/move.go +++ b/pkg/stanza/operator/transformer/move/move.go @@ -76,7 +76,7 @@ type Transformer struct { } // Process will process an entry with a move transformation. -func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) error { +func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) (int, error) { return p.ProcessWith(ctx, entry, p.Transform) } diff --git a/pkg/stanza/operator/transformer/move/move_test.go b/pkg/stanza/operator/transformer/move/move_test.go index a3800f7a851b..5de8b8b5a24a 100644 --- a/pkg/stanza/operator/transformer/move/move_test.go +++ b/pkg/stanza/operator/transformer/move/move_test.go @@ -520,12 +520,14 @@ func TestProcessAndBuild(t *testing.T) { fake := testutil.NewFakeOutput(t) require.NoError(t, move.SetOutputs([]operator.Operator{fake})) val := tc.input() - err = move.Process(context.Background(), val) + processed, err := move.Process(context.Background(), val) if tc.expectErr { require.Error(t, err) + require.Equal(t, 0, processed) } else { require.NoError(t, err) fake.ExpectEntry(t, tc.output()) + require.Equal(t, 1, processed) } }) } diff --git a/pkg/stanza/operator/transformer/noop/noop.go b/pkg/stanza/operator/transformer/noop/noop.go index 367a0c04fcc3..1b019f578b4e 100644 --- a/pkg/stanza/operator/transformer/noop/noop.go +++ b/pkg/stanza/operator/transformer/noop/noop.go @@ -65,7 +65,6 @@ type Transformer struct { } // Process will forward the entry to the next output without any alterations. -func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) error { - p.Write(ctx, entry) - return nil +func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) (int, error) { + return p.Write(ctx, entry), nil } diff --git a/pkg/stanza/operator/transformer/noop/noop_test.go b/pkg/stanza/operator/transformer/noop/noop_test.go index d34b2b267aab..2bf21153bbb9 100644 --- a/pkg/stanza/operator/transformer/noop/noop_test.go +++ b/pkg/stanza/operator/transformer/noop/noop_test.go @@ -56,8 +56,9 @@ func TestProcess(t *testing.T) { entry.TraceFlags = []byte{0x01} expected := entry.Copy() - err = op.Process(context.Background(), entry) + processed, err := op.Process(context.Background(), entry) require.NoError(t, err) + require.Equal(t, 1, processed) fake.ExpectEntry(t, expected) } diff --git a/pkg/stanza/operator/transformer/recombine/recombine.go b/pkg/stanza/operator/transformer/recombine/recombine.go index c08eeb5a8357..8d805f5ac8c4 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine.go +++ b/pkg/stanza/operator/transformer/recombine/recombine.go @@ -200,7 +200,7 @@ func (r *Transformer) Stop() error { const DefaultSourceIdentifier = "DefaultSourceIdentifier" -func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { +func (r *Transformer) Process(ctx context.Context, e *entry.Entry) (int, error) { // Lock the recombine operator because process can't run concurrently r.Lock() defer r.Unlock() @@ -236,25 +236,25 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { // Flush the existing batch err := r.flushSource(s) if err != nil { - return err + return 0, err } // Add the current log to the new batch r.addToBatch(ctx, e, s) - return nil + return 1, nil // This is the last entry in a complete batch case matches && r.matchIndicatesLast(): fallthrough // When matching on first entry, never batch partial first. Just emit immediately case !matches && r.matchIndicatesFirst() && len(r.batchMap[s]) == 0: r.addToBatch(ctx, e, s) - return r.flushSource(s) + return 1, r.flushSource(s) } // This is neither the first entry of a new log, // nor the last entry of a log, so just add it to the batch r.addToBatch(ctx, e, s) - return nil + return 1, nil } func (r *Transformer) matchIndicatesFirst() bool { diff --git a/pkg/stanza/operator/transformer/recombine/recombine_test.go b/pkg/stanza/operator/transformer/recombine/recombine_test.go index 6b805dbe1ccb..b1e606d7a299 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine_test.go +++ b/pkg/stanza/operator/transformer/recombine/recombine_test.go @@ -298,7 +298,9 @@ func TestTransformer(t *testing.T) { require.NoError(t, err) for _, e := range tc.input { - require.NoError(t, recombine.Process(context.Background(), e)) + processed, err := recombine.Process(context.Background(), e) + require.NoError(t, err) + require.Equal(t, 1, processed) } for _, expected := range tc.expectedOutput { @@ -327,7 +329,9 @@ func TestTransformer(t *testing.T) { require.NoError(t, err) // Send an entry that isn't the last in a multiline - require.NoError(t, recombine.Process(context.Background(), entry.New())) + processed, err := recombine.Process(context.Background(), entry.New()) + require.NoError(t, err) + require.Equal(t, 1, processed) // Ensure that the entry isn't immediately sent select { @@ -374,11 +378,11 @@ func BenchmarkRecombine(b *testing.B) { ctx := context.Background() b.ResetTimer() for i := 0; i < b.N; i++ { - require.NoError(b, recombine.Process(ctx, e)) - require.NoError(b, recombine.Process(ctx, e)) - require.NoError(b, recombine.Process(ctx, e)) - require.NoError(b, recombine.Process(ctx, e)) - require.NoError(b, recombine.Process(ctx, e)) + for j := 0; j < 5; j++ { + processed, err := recombine.Process(ctx, e) + require.NoError(b, err) + require.Equal(b, 1, processed) + } recombine.flushUncombined(ctx) } } @@ -405,7 +409,9 @@ func TestTimeout(t *testing.T) { ctx := context.Background() require.NoError(t, recombine.Start(nil)) - require.NoError(t, recombine.Process(ctx, e)) + processed, err := recombine.Process(ctx, e) + require.NoError(t, err) + require.Equal(t, 1, processed) select { case <-fake.Received: t.Logf("We shouldn't receive an entry before timeout") diff --git a/pkg/stanza/operator/transformer/remove/remove.go b/pkg/stanza/operator/transformer/remove/remove.go index 6b3836d60e8f..0a5b5fc43abc 100644 --- a/pkg/stanza/operator/transformer/remove/remove.go +++ b/pkg/stanza/operator/transformer/remove/remove.go @@ -74,7 +74,7 @@ type Transformer struct { } // Process will process an entry with a remove transformation. -func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) error { +func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) (int, error) { return p.ProcessWith(ctx, entry, p.Transform) } diff --git a/pkg/stanza/operator/transformer/remove/remove_test.go b/pkg/stanza/operator/transformer/remove/remove_test.go index 88b706898f85..4636269a3d16 100644 --- a/pkg/stanza/operator/transformer/remove/remove_test.go +++ b/pkg/stanza/operator/transformer/remove/remove_test.go @@ -270,12 +270,14 @@ func TestProcessAndBuild(t *testing.T) { fake := testutil.NewFakeOutput(t) require.NoError(t, remove.SetOutputs([]operator.Operator{fake})) val := tc.input() - err = remove.Process(context.Background(), val) + processed, err := remove.Process(context.Background(), val) if tc.expectErr { require.Error(t, err) + require.Equal(t, 0, processed) } else { require.NoError(t, err) fake.ExpectEntry(t, tc.output()) + require.Equal(t, 1, processed) } }) } diff --git a/pkg/stanza/operator/transformer/retain/retain.go b/pkg/stanza/operator/transformer/retain/retain.go index 626d9d55dc06..ea5ec04997c4 100644 --- a/pkg/stanza/operator/transformer/retain/retain.go +++ b/pkg/stanza/operator/transformer/retain/retain.go @@ -90,7 +90,7 @@ type Transformer struct { } // Process will process an entry with a retain transformation. -func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) error { +func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) (int, error) { return p.ProcessWith(ctx, entry, p.Transform) } diff --git a/pkg/stanza/operator/transformer/retain/retain_test.go b/pkg/stanza/operator/transformer/retain/retain_test.go index a5ce0ec3b320..6b50cdd8c44d 100644 --- a/pkg/stanza/operator/transformer/retain/retain_test.go +++ b/pkg/stanza/operator/transformer/retain/retain_test.go @@ -402,12 +402,14 @@ func TestBuildAndProcess(t *testing.T) { fake := testutil.NewFakeOutput(t) require.NoError(t, retain.SetOutputs([]operator.Operator{fake})) val := tc.input() - err = retain.Process(context.Background(), val) + processed, err := retain.Process(context.Background(), val) if tc.expectErr { require.Error(t, err) + require.Equal(t, 0, processed) } else { require.NoError(t, err) fake.ExpectEntry(t, tc.output()) + require.Equal(t, 1, processed) } }) } diff --git a/pkg/stanza/operator/transformer/router/router.go b/pkg/stanza/operator/transformer/router/router.go index ca61eb0594f5..0c0b904252b7 100644 --- a/pkg/stanza/operator/transformer/router/router.go +++ b/pkg/stanza/operator/transformer/router/router.go @@ -120,9 +120,10 @@ func (p *Transformer) CanProcess() bool { } // Process will route incoming entries based on matching expressions -func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) error { +func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) (int, error) { env := helper.GetExprEnv(entry) defer helper.PutExprEnv(env) + processed := 0 for _, route := range p.routes { matches, err := vm.Run(route.Expression, env) @@ -135,17 +136,19 @@ func (p *Transformer) Process(ctx context.Context, entry *entry.Entry) error { if matches.(bool) { if err := route.Attribute(entry); err != nil { p.Errorf("Failed to label entry: %s", err) - return err + return 0, err } for _, output := range route.OutputOperators { - _ = output.Process(ctx, entry) + // ToDo; check if at least one process is ok + pr, _ := output.Process(ctx, entry) + processed += pr } break } } - return nil + return processed, nil } // CanOutput will always return true for a router operator diff --git a/pkg/stanza/operator/transformer/router/router_test.go b/pkg/stanza/operator/transformer/router/router_test.go index 6921a84bfdf9..e30b9aab6bd8 100644 --- a/pkg/stanza/operator/transformer/router/router_test.go +++ b/pkg/stanza/operator/transformer/router/router_test.go @@ -201,7 +201,7 @@ func TestTransformer(t *testing.T) { var attributes map[string]interface{} mock1 := testutil.NewMockOperator("output1") - mock1.On("Process", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + mock1.On("Process", mock.Anything, mock.Anything).Return(1, nil).Run(func(args mock.Arguments) { results["output1"]++ if entry, ok := args[1].(*entry.Entry); ok { attributes = entry.Attributes @@ -209,7 +209,7 @@ func TestTransformer(t *testing.T) { }) mock2 := testutil.NewMockOperator("output2") - mock2.On("Process", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + mock2.On("Process", mock.Anything, mock.Anything).Return(1, nil).Run(func(args mock.Arguments) { results["output2"]++ if entry, ok := args[1].(*entry.Entry); ok { attributes = entry.Attributes @@ -220,8 +220,9 @@ func TestTransformer(t *testing.T) { err = routerOperator.SetOutputs([]operator.Operator{mock1, mock2}) require.NoError(t, err) - err = routerOperator.Process(context.Background(), tc.input) + processed, err := routerOperator.Process(context.Background(), tc.input) require.NoError(t, err) + require.Equal(t, len(tc.expectedCounts), processed) require.Equal(t, tc.expectedCounts, results) require.Equal(t, tc.expectedAttributes, attributes) diff --git a/pkg/stanza/pipeline_test.go b/pkg/stanza/pipeline_test.go new file mode 100644 index 000000000000..4a214461c4e0 --- /dev/null +++ b/pkg/stanza/pipeline_test.go @@ -0,0 +1,217 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/add" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/copy" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/filter" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/flatten" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/move" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/noop" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/recombine" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/remove" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/retain" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" +) + +// buildPipeline returns fake output and pipeline with multiple operators +func buildPipeline(t *testing.T) (*testutil.FakeOutput, *pipeline.DirectedPipeline) { + opAdd := add.NewConfigWithID("add") + opAdd.Value = "test" + opAdd.Field = entry.NewResourceField("new_field") + + opCopy := operator.NewConfig(copy.NewConfigWithID("copy")) + err := opCopy.UnmarshalJSON([]byte(`{"from": "resource.new_field", "to": "resource.field_copy", "type": "copy"}`)) + require.NoError(t, err) + + opFilter := filter.NewConfigWithID("filter") + opFilter.Expression = "body.filter matches 'true'" + + opFlatten := operator.NewConfig(flatten.NewConfigWithID("flatten")) + err = opFlatten.UnmarshalJSON([]byte(`{"type": "flatten", "field": "body.to_flatten"}`)) + require.NoError(t, err) + + opMove := operator.NewConfig(move.NewConfigWithID("move")) + err = opMove.UnmarshalJSON([]byte(`{"type": "move", "from": "resource.field_copy", "to": "resource.field_copy_moved"}`)) + require.NoError(t, err) + + opNoop := operator.NewConfig(noop.NewConfigWithID("noop")) + err = opNoop.UnmarshalJSON([]byte(`{"type": "noop"}`)) + require.NoError(t, err) + + opRecombine := recombine.NewConfigWithID("recombine") + opRecombine.IsLastEntry = "body.recombine matches \"true\"" + opRecombine.CombineField = entry.NewBodyField("body") + + opRemove := operator.NewConfig(remove.NewConfigWithID("remove")) + err = opRemove.UnmarshalJSON([]byte(`{"type": "remove", "field": "body.to_remove"}`)) + require.NoError(t, err) + + opRetain := operator.NewConfig(retain.NewConfigWithID("retain")) + err = opRetain.UnmarshalJSON([]byte(`{"type": "retain", "fields": ["body"]}`)) + require.NoError(t, err) + + fake := testutil.NewFakeOutput(t) + + cfg := pipeline.Config{ + Operators: []operator.Config{ + { + Builder: opAdd, + }, + opCopy, + { + Builder: opFilter, + }, + opFlatten, + opMove, + opNoop, + { + Builder: opRecombine, + }, + opRemove, + opRetain, + }, + DefaultOutput: fake, + } + + pipeline, err := cfg.Build(testutil.Logger(t)) + require.NoError(t, err) + + return fake, pipeline +} + +func TestMe(t *testing.T) { + fake, pipeline := buildPipeline(t) + + testCases := []struct { + Name string + Bodies []map[string]interface{} + Expected map[string]interface{} + Processed int + }{ + { + Name: "Basic", + Bodies: []map[string]interface{}{ + { + "to_flatten": map[string]interface{}{ + "field1": "value1", + }, + "filter": "false", + "to_remove": "remove_me", + "recombine": "true", + "body": "This is my body", + }, + }, + Expected: map[string]interface{}{ + "body": "This is my body", + "filter": "false", + "recombine": "true", + "field1": "value1", + }, + Processed: 1, + }, + { + Name: "Filter out", + Bodies: []map[string]interface{}{ + { + "to_flatten": map[string]interface{}{ + "field1": "value1", + }, + "filter": "true", + "to_remove": "remove_me", + "recombine": "true", + "body": "This is my body", + }, + }, + Expected: map[string]interface{}{}, + Processed: 0, + }, + { + Name: "Recombine", + Bodies: []map[string]interface{}{ + { + "to_flatten": map[string]interface{}{ + "field1": "value1", + }, + "filter": "false", + "to_remove": "remove_me", + "recombine": "false", + "body": "This is first line", + }, + { + "to_flatten": map[string]interface{}{ + "field1": "value1", + }, + "filter": "false", + "to_remove": "remove_me", + "recombine": "true", + "body": "and the second one", + }, + }, + Expected: map[string]interface{}{ + "body": "This is first line\nand the second one", + "filter": "false", + "recombine": "false", + "field1": "value1", + }, + Processed: 2, + }, + } + + for _, tt := range testCases { + t.Run(tt.Name, func(t *testing.T) { + processed := 0 + + for _, body := range tt.Bodies { + e := entry.New() + e.Body = body + + p, err := pipeline.Operators()[0].Process(context.Background(), e) + require.NoError(t, err) + + processed += p + } + + assert.Equal(t, tt.Processed, processed) + + if tt.Processed == 0 { + return + } + + select { + case e := <-fake.Received: + assert.EqualValues(t, tt.Expected, e.Body) + assert.EqualValues(t, map[string]interface{}{ + "field_copy_moved": "test", + "new_field": "test", + }, e.Resource) + case <-time.After(5 * time.Second): + t.Logf("The entry should be flushed by now!") + t.FailNow() + } + }) + } +} diff --git a/pkg/stanza/testutil/mocks.go b/pkg/stanza/testutil/mocks.go index 34ad91bd705d..15a505c4412c 100644 --- a/pkg/stanza/testutil/mocks.go +++ b/pkg/stanza/testutil/mocks.go @@ -84,9 +84,9 @@ func (f *FakeOutput) Stop() error { return nil } func (f *FakeOutput) Type() string { return "fake_output" } // Process will place all incoming entries on the Received channel of a fake output -func (f *FakeOutput) Process(ctx context.Context, entry *entry.Entry) error { +func (f *FakeOutput) Process(ctx context.Context, entry *entry.Entry) (int, error) { f.Received <- entry - return nil + return 1, nil } // ExpectBody expects that a body will be received by the fake operator within a second diff --git a/pkg/stanza/testutil/operator.go b/pkg/stanza/testutil/operator.go index 9fcc486cd72a..029e4fef9a19 100644 --- a/pkg/stanza/testutil/operator.go +++ b/pkg/stanza/testutil/operator.go @@ -108,17 +108,20 @@ func (_m *Operator) Outputs() []operator.Operator { } // Process provides a mock function with given fields: _a0, _a1 -func (_m *Operator) Process(_a0 context.Context, _a1 *entry.Entry) error { +func (_m *Operator) Process(_a0 context.Context, _a1 *entry.Entry) (int, error) { ret := _m.Called(_a0, _a1) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *entry.Entry) error); ok { - r0 = rf(_a0, _a1) + var r1 error + var r0 int + + if rf, ok := ret.Get(1).(func(context.Context, *entry.Entry) (int, error)); ok { + r0, r1 = rf(_a0, _a1) } else { - r0 = ret.Error(0) + r1 = ret.Error(1) + r0 = ret.Int(0) } - return r0 + return r0, r1 } // SetOutputIDs provides a mock function with given fields: _a0 diff --git a/processor/logstransformprocessor/processor.go b/processor/logstransformprocessor/processor.go index fd0117fce060..472e4a98142b 100644 --- a/processor/logstransformprocessor/processor.go +++ b/processor/logstransformprocessor/processor.go @@ -151,7 +151,7 @@ func (ltp *logsTransformProcessor) converterLoop(ctx context.Context) { for _, e := range entries { // Add item to the first operator of the pipeline manually - if err := ltp.firstOperator.Process(ctx, e); err != nil { + if _, err := ltp.firstOperator.Process(ctx, e); err != nil { ltp.logger.Error("processor encountered an issue with the pipeline", zap.Error(err)) break }