From 3ac6fd3d799f36d9d187c08258d372874eff7afa Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 15 Jun 2021 16:01:34 -0400 Subject: [PATCH 1/3] Transformer.HandleEntryError incorrectly returned nil when on_error=send --- operator/builtin/parser/csv/csv_test.go | 2 +- operator/helper/parser_test.go | 31 +++++++++++++ operator/helper/transformer.go | 1 - operator/helper/transformer_test.go | 58 ++++++++++++++----------- testutil/mocks.go | 10 +++++ 5 files changed, 74 insertions(+), 28 deletions(-) diff --git a/operator/builtin/parser/csv/csv_test.go b/operator/builtin/parser/csv/csv_test.go index 56635d7cb..8d21e08af 100644 --- a/operator/builtin/parser/csv/csv_test.go +++ b/operator/builtin/parser/csv/csv_test.go @@ -238,7 +238,7 @@ func TestParserCSVInvalidJSONInput(t *testing.T) { entry := entry.New() entry.Record = "{\"name\": \"stanza\"}" err = op.Process(context.Background(), entry) - require.Nil(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field") + require.Error(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field") fake.ExpectRecord(t, "{\"name\": \"stanza\"}") }) } diff --git a/operator/helper/parser_test.go b/operator/helper/parser_test.go index 6dd42243c..20730c723 100644 --- a/operator/helper/parser_test.go +++ b/operator/helper/parser_test.go @@ -98,6 +98,37 @@ func TestParserInvalidParse(t *testing.T) { require.Contains(t, err.Error(), "parse failure") } +func TestParserInvalidParseSend(t *testing.T) { + buildContext := testutil.NewBuildContext(t) + fakeOut := testutil.NewFakeOutput(t) + writer := WriterOperator{ + BasicOperator: BasicOperator{ + OperatorID: "test-id", + OperatorType: "test-type", + SugaredLogger: buildContext.Logger.SugaredLogger, + }, + OutputIDs: []string{fakeOut.ID()}, + } + writer.SetOutputs([]operator.Operator{fakeOut}) + parser := ParserOperator{ + TransformerOperator: TransformerOperator{ + WriterOperator: writer, + OnError: SendOnError, + }, + ParseFrom: entry.NewRecordField(), + } + parse := func(i interface{}) (interface{}, error) { + return i, fmt.Errorf("parse failure") + } + ctx := context.Background() + testEntry := entry.New() + err := parser.ProcessWith(ctx, testEntry, parse) + require.Error(t, err) + require.Contains(t, err.Error(), "parse failure") + fakeOut.ExpectEntry(t, testEntry) + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) +} + func TestParserInvalidTimeParse(t *testing.T) { buildContext := testutil.NewBuildContext(t) parser := ParserOperator{ diff --git a/operator/helper/transformer.go b/operator/helper/transformer.go index e8bfec8f2..3bd112624 100644 --- a/operator/helper/transformer.go +++ b/operator/helper/transformer.go @@ -96,7 +96,6 @@ func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry t.Errorw("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError), zap.Any("entry", entry)) if t.OnError == SendOnError { t.Write(ctx, entry) - return nil } return err } diff --git a/operator/helper/transformer_test.go b/operator/helper/transformer_test.go index 51b8317a0..fc4e834f1 100644 --- a/operator/helper/transformer_test.go +++ b/operator/helper/transformer_test.go @@ -107,7 +107,7 @@ func TestTransformerSendOnError(t *testing.T) { } err := transformer.ProcessWith(ctx, testEntry, transform) - require.NoError(t, err) + require.Error(t, err) output.AssertCalled(t, "Process", mock.Anything, mock.Anything) } @@ -145,42 +145,44 @@ func TestTransformerIf(t *testing.T) { ifExpr string inputRecord string expected string + errExpected bool }{ { - "NoIf", - "", - "test", - "parsed", + name: "NoIf", + ifExpr: "", + inputRecord: "test", + expected: "parsed", }, { - "TrueIf", - "true", - "test", - "parsed", + name: "TrueIf", + ifExpr: "true", + inputRecord: "test", + expected: "parsed", }, { - "FalseIf", - "false", - "test", - "test", + name: "FalseIf", + ifExpr: "false", + inputRecord: "test", + expected: "test", }, { - "EvaluatedTrue", - "$record == 'test'", - "test", - "parsed", + name: "EvaluatedTrue", + ifExpr: "$record == 'test'", + inputRecord: "test", + expected: "parsed", }, { - "EvaluatedFalse", - "$record == 'notest'", - "test", - "test", + name: "EvaluatedFalse", + ifExpr: "$record == 'notest'", + inputRecord: "test", + expected: "test", }, { - "FailingExpressionEvaluation", - "$record.test.noexist == 'notest'", - "test", - "test", + name: "FailingExpressionEvaluation", + ifExpr: "$record.test.noexist == 'notest'", + inputRecord: "test", + expected: "test", + errExpected: true, }, } @@ -201,7 +203,11 @@ func TestTransformerIf(t *testing.T) { e.Record = "parsed" return nil }) - require.NoError(t, err) + if tc.errExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + } fake.ExpectRecord(t, tc.expected) }) diff --git a/testutil/mocks.go b/testutil/mocks.go index fb23b9a38..4d021186c 100644 --- a/testutil/mocks.go +++ b/testutil/mocks.go @@ -89,3 +89,13 @@ func (f *FakeOutput) ExpectEntry(t testing.TB, expected *entry.Entry) { require.FailNow(t, "Timed out waiting for entry") } } + +// ExpectNoEntry expects that no entry will be received within the specified time +func (f *FakeOutput) ExpectNoEntry(t testing.TB, timeout time.Duration) { + select { + case <-f.Received: + require.FailNow(t, "Should not have received entry") + case <-time.After(timeout): + return + } +} From 27bf52da30bccd5b2be44ef973f3c071a2671284 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 15 Jun 2021 16:42:40 -0400 Subject: [PATCH 2/3] Test more parse failure cases --- operator/helper/parser_test.go | 101 +++++++++++++++++++-------------- 1 file changed, 59 insertions(+), 42 deletions(-) diff --git a/operator/helper/parser_test.go b/operator/helper/parser_test.go index 20730c723..639d4b9e4 100644 --- a/operator/helper/parser_test.go +++ b/operator/helper/parser_test.go @@ -73,18 +73,12 @@ func TestParserMissingField(t *testing.T) { require.Contains(t, err.Error(), "Entry is missing the expected parse_from field.") } -func TestParserInvalidParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: DropOnError, }, ParseFrom: entry.NewRecordField(), } @@ -96,23 +90,14 @@ func TestParserInvalidParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "parse failure") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } func TestParserInvalidParseSend(t *testing.T) { - buildContext := testutil.NewBuildContext(t) - fakeOut := testutil.NewFakeOutput(t) - writer := WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - OutputIDs: []string{fakeOut.ID()}, - } - writer.SetOutputs([]operator.Operator{fakeOut}) + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: writer, + WriterOperator: *writer, OnError: SendOnError, }, ParseFrom: entry.NewRecordField(), @@ -129,18 +114,12 @@ func TestParserInvalidParseSend(t *testing.T) { fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } -func TestParserInvalidTimeParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidTimeParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: DropOnError, }, ParseFrom: entry.NewRecordField(), ParseTo: entry.NewRecordField(), @@ -159,20 +138,42 @@ func TestParserInvalidTimeParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } -func TestParserInvalidSeverityParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidTimeParseSend(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: SendOnError, + }, + ParseFrom: entry.NewRecordField(), + ParseTo: entry.NewRecordField(), + TimeParser: &TimeParser{ + ParseFrom: func() *entry.Field { + f := entry.NewRecordField("missing-key") + return &f + }(), + }, + } + parse := func(i interface{}) (interface{}, error) { + return i, nil + } + ctx := context.Background() + testEntry := entry.New() + err := parser.ProcessWith(ctx, testEntry, parse) + require.Error(t, err) + require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field") + fakeOut.ExpectEntry(t, testEntry) + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) +} +func TestParserInvalidSeverityParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) + parser := ParserOperator{ + TransformerOperator: TransformerOperator{ + WriterOperator: *writer, + OnError: DropOnError, }, SeverityParser: &SeverityParser{ ParseFrom: entry.NewRecordField("missing-key"), @@ -188,6 +189,7 @@ func TestParserInvalidSeverityParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } func TestParserInvalidTimeValidSeverityParse(t *testing.T) { @@ -407,3 +409,18 @@ func TestParserPreserve(t *testing.T) { }) } } + +func writerWithFakeOut(t *testing.T) (*WriterOperator, *testutil.FakeOutput) { + buildContext := testutil.NewBuildContext(t) + fakeOut := testutil.NewFakeOutput(t) + writer := &WriterOperator{ + BasicOperator: BasicOperator{ + OperatorID: "test-id", + OperatorType: "test-type", + SugaredLogger: buildContext.Logger.SugaredLogger, + }, + OutputIDs: []string{fakeOut.ID()}, + } + writer.SetOutputs([]operator.Operator{fakeOut}) + return writer, fakeOut +} From fde54759c17defdb9e74e9188836df3591ea4472 Mon Sep 17 00:00:00 2001 From: jsirianni Date: Wed, 16 Jun 2021 11:42:34 -0400 Subject: [PATCH 3/3] Fixed bug where logs can be duplicated when a parser has on_error=send --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83b1dae1c..23943ac67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Fixed panic during shutdown when Google Cloud Output credential file not found [Issue 264](https://github.com/observIQ/stanza/issues/264) +- Fixed bug where logs can be duplicated when a parser has on_error=send [PR 330](https://github.com/observIQ/stanza/pull/330) ## [1.0.0] - 2021-05-27