From 27688ee31850c50cce639e1cade3e2dd5b644206 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 15 Jun 2021 16:46:44 -0400 Subject: [PATCH] Fix issue where log entry could be duplicated on parse error In some circumstances, a log entry could be unsuccessfully parsed. It is intended that a failure can result in an entry being "sent" to the next operator, but when this happens, the parser should cease execution immediately. Prior to this change, this was not always the case, as some parsers would continue on and "send" the entry again. --- operator/builtin/parser/csv/csv_test.go | 4 +- operator/helper/parser_test.go | 108 +++++++++++++++++------- operator/helper/transformer.go | 1 - operator/helper/transformer_test.go | 66 ++++++++------- testutil/mocks.go | 10 +++ 5 files changed, 126 insertions(+), 63 deletions(-) diff --git a/operator/builtin/parser/csv/csv_test.go b/operator/builtin/parser/csv/csv_test.go index ed52e74b..3400896c 100644 --- a/operator/builtin/parser/csv/csv_test.go +++ b/operator/builtin/parser/csv/csv_test.go @@ -234,13 +234,13 @@ func TestParserCSVMultipleBodys(t *testing.T) { entry := entry.New() entry.Body = "stanza,INFO,started agent\nstanza,DEBUG,started agent" err = op.Process(context.Background(), entry) - require.Nil(t, err, "Expected to parse a single csv record, got '2'") require.NoError(t, err) fake.ExpectBody(t, map[string]interface{}{ "name": "stanza", "sev": "INFO", "msg": "started agent", }) + fake.ExpectNoEntry(t, 100*time.Millisecond) }) } @@ -260,7 +260,7 @@ func TestParserCSVInvalidJSONInput(t *testing.T) { entry := entry.New() entry.Body = "{\"name\": \"stanza\"}" err = op.Process(context.Background(), entry) - require.Nil(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field") + require.Error(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field") fake.ExpectBody(t, "{\"name\": \"stanza\"}") }) } diff --git a/operator/helper/parser_test.go b/operator/helper/parser_test.go index eea59aa8..8fdb1593 100644 --- a/operator/helper/parser_test.go +++ b/operator/helper/parser_test.go @@ -89,18 +89,12 @@ func TestParserMissingField(t *testing.T) { require.Contains(t, err.Error(), "Entry is missing the expected parse_from field.") } -func TestParserInvalidParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: DropOnError, }, ParseFrom: entry.NewBodyField(), } @@ -112,20 +106,36 @@ func TestParserInvalidParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "parse failure") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } -func TestParserInvalidTimeParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidParseSend(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: SendOnError, + }, + ParseFrom: entry.NewBodyField(), + } + parse := func(i interface{}) (interface{}, error) { + return i, fmt.Errorf("parse failure") + } + ctx := context.Background() + testEntry := entry.New() + err := parser.ProcessWith(ctx, testEntry, parse) + require.Error(t, err) + require.Contains(t, err.Error(), "parse failure") + fakeOut.ExpectEntry(t, testEntry) + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) +} + +func TestParserInvalidTimeParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) + parser := ParserOperator{ + TransformerOperator: TransformerOperator{ + WriterOperator: *writer, + OnError: DropOnError, }, ParseFrom: entry.NewBodyField(), ParseTo: entry.NewBodyField(), @@ -144,20 +154,42 @@ func TestParserInvalidTimeParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } -func TestParserInvalidSeverityParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidTimeParseSend(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: SendOnError, + }, + ParseFrom: entry.NewBodyField(), + ParseTo: entry.NewBodyField(), + TimeParser: &TimeParser{ + ParseFrom: func() *entry.Field { + f := entry.NewBodyField("missing-key") + return &f + }(), + }, + } + parse := func(i interface{}) (interface{}, error) { + return i, nil + } + ctx := context.Background() + testEntry := entry.New() + err := parser.ProcessWith(ctx, testEntry, parse) + require.Error(t, err) + require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field") + fakeOut.ExpectEntry(t, testEntry) + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) +} +func TestParserInvalidSeverityParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) + parser := ParserOperator{ + TransformerOperator: TransformerOperator{ + WriterOperator: *writer, + OnError: DropOnError, }, SeverityParser: &SeverityParser{ ParseFrom: entry.NewBodyField("missing-key"), @@ -173,6 +205,7 @@ func TestParserInvalidSeverityParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } func TestParserInvalidTimeValidSeverityParse(t *testing.T) { @@ -457,3 +490,18 @@ func TestMapStructureDecodeParserConfig(t *testing.T) { require.NoError(t, err) require.Equal(t, except, actual) } + +func writerWithFakeOut(t *testing.T) (*WriterOperator, *testutil.FakeOutput) { + buildContext := testutil.NewBuildContext(t) + fakeOut := testutil.NewFakeOutput(t) + writer := &WriterOperator{ + BasicOperator: BasicOperator{ + OperatorID: "test-id", + OperatorType: "test-type", + SugaredLogger: buildContext.Logger.SugaredLogger, + }, + OutputIDs: []string{fakeOut.ID()}, + } + writer.SetOutputs([]operator.Operator{fakeOut}) + return writer, fakeOut +} diff --git a/operator/helper/transformer.go b/operator/helper/transformer.go index 61051df0..d492039c 100644 --- a/operator/helper/transformer.go +++ b/operator/helper/transformer.go @@ -111,7 +111,6 @@ func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry t.Errorw("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError), zap.Any("entry", entry)) if t.OnError == SendOnError { t.Write(ctx, entry) - return nil } return err } diff --git a/operator/helper/transformer_test.go b/operator/helper/transformer_test.go index be01ff4d..ea863d39 100644 --- a/operator/helper/transformer_test.go +++ b/operator/helper/transformer_test.go @@ -122,7 +122,7 @@ func TestTransformerSendOnError(t *testing.T) { } err := transformer.ProcessWith(ctx, testEntry, transform) - require.NoError(t, err) + require.Error(t, err) output.AssertCalled(t, "Process", mock.Anything, mock.Anything) } @@ -156,46 +156,48 @@ func TestTransformerProcessWithValid(t *testing.T) { func TestTransformerIf(t *testing.T) { cases := []struct { - name string - ifExpr string - inputBody string - expected string + name string + ifExpr string + inputBody string + expected string + errExpected bool }{ { - "NoIf", - "", - "test", - "parsed", + name: "NoIf", + ifExpr: "", + inputBody: "test", + expected: "parsed", }, { - "TrueIf", - "true", - "test", - "parsed", + name: "TrueIf", + ifExpr: "true", + inputBody: "test", + expected: "parsed", }, { - "FalseIf", - "false", - "test", - "test", + name: "FalseIf", + ifExpr: "false", + inputBody: "test", + expected: "test", }, { - "EvaluatedTrue", - "$body == 'test'", - "test", - "parsed", + name: "EvaluatedTrue", + ifExpr: "$body == 'test'", + inputBody: "test", + expected: "parsed", }, { - "EvaluatedFalse", - "$body == 'notest'", - "test", - "test", + name: "EvaluatedFalse", + ifExpr: "$body == 'notest'", + inputBody: "test", + expected: "test", }, { - "FailingExpressionEvaluation", - "$body.test.noexist == 'notest'", - "test", - "test", + name: "FailingExpressionEvaluation", + ifExpr: "$body.test.noexist == 'notest'", + inputBody: "test", + expected: "test", + errExpected: true, }, } @@ -216,7 +218,11 @@ func TestTransformerIf(t *testing.T) { e.Body = "parsed" return nil }) - require.NoError(t, err) + if tc.errExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + } fake.ExpectBody(t, tc.expected) }) diff --git a/testutil/mocks.go b/testutil/mocks.go index 8698167b..6c31e390 100644 --- a/testutil/mocks.go +++ b/testutil/mocks.go @@ -110,3 +110,13 @@ func (f *FakeOutput) ExpectEntry(t testing.TB, expected *entry.Entry) { require.FailNow(t, "Timed out waiting for entry") } } + +// ExpectNoEntry expects that no entry will be received within the specified time +func (f *FakeOutput) ExpectNoEntry(t testing.TB, timeout time.Duration) { + select { + case <-f.Received: + require.FailNow(t, "Should not have received entry") + case <-time.After(timeout): + return + } +}