diff --git a/operator/builtin/parser/csv/csv_test.go b/operator/builtin/parser/csv/csv_test.go index ed52e74b..3400896c 100644 --- a/operator/builtin/parser/csv/csv_test.go +++ b/operator/builtin/parser/csv/csv_test.go @@ -234,13 +234,13 @@ func TestParserCSVMultipleBodys(t *testing.T) { entry := entry.New() entry.Body = "stanza,INFO,started agent\nstanza,DEBUG,started agent" err = op.Process(context.Background(), entry) - require.Nil(t, err, "Expected to parse a single csv record, got '2'") require.NoError(t, err) fake.ExpectBody(t, map[string]interface{}{ "name": "stanza", "sev": "INFO", "msg": "started agent", }) + fake.ExpectNoEntry(t, 100*time.Millisecond) }) } @@ -260,7 +260,7 @@ func TestParserCSVInvalidJSONInput(t *testing.T) { entry := entry.New() entry.Body = "{\"name\": \"stanza\"}" err = op.Process(context.Background(), entry) - require.Nil(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field") + require.Error(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field") fake.ExpectBody(t, "{\"name\": \"stanza\"}") }) } diff --git a/operator/helper/parser_test.go b/operator/helper/parser_test.go index eea59aa8..8fdb1593 100644 --- a/operator/helper/parser_test.go +++ b/operator/helper/parser_test.go @@ -89,18 +89,12 @@ func TestParserMissingField(t *testing.T) { require.Contains(t, err.Error(), "Entry is missing the expected parse_from field.") } -func TestParserInvalidParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: DropOnError, }, ParseFrom: entry.NewBodyField(), } @@ -112,20 +106,36 @@ func TestParserInvalidParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "parse failure") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } -func TestParserInvalidTimeParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidParseSend(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: SendOnError, + }, + ParseFrom: entry.NewBodyField(), + } + parse := func(i interface{}) (interface{}, error) { + return i, fmt.Errorf("parse failure") + } + ctx := context.Background() + testEntry := entry.New() + err := parser.ProcessWith(ctx, testEntry, parse) + require.Error(t, err) + require.Contains(t, err.Error(), "parse failure") + fakeOut.ExpectEntry(t, testEntry) + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) +} + +func TestParserInvalidTimeParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) + parser := ParserOperator{ + TransformerOperator: TransformerOperator{ + WriterOperator: *writer, + OnError: DropOnError, }, ParseFrom: entry.NewBodyField(), ParseTo: entry.NewBodyField(), @@ -144,20 +154,42 @@ func TestParserInvalidTimeParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } -func TestParserInvalidSeverityParse(t *testing.T) { - buildContext := testutil.NewBuildContext(t) +func TestParserInvalidTimeParseSend(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) parser := ParserOperator{ TransformerOperator: TransformerOperator{ - WriterOperator: WriterOperator{ - BasicOperator: BasicOperator{ - OperatorID: "test-id", - OperatorType: "test-type", - SugaredLogger: buildContext.Logger.SugaredLogger, - }, - }, - OnError: DropOnError, + WriterOperator: *writer, + OnError: SendOnError, + }, + ParseFrom: entry.NewBodyField(), + ParseTo: entry.NewBodyField(), + TimeParser: &TimeParser{ + ParseFrom: func() *entry.Field { + f := entry.NewBodyField("missing-key") + return &f + }(), + }, + } + parse := func(i interface{}) (interface{}, error) { + return i, nil + } + ctx := context.Background() + testEntry := entry.New() + err := parser.ProcessWith(ctx, testEntry, parse) + require.Error(t, err) + require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field") + fakeOut.ExpectEntry(t, testEntry) + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) +} +func TestParserInvalidSeverityParseDrop(t *testing.T) { + writer, fakeOut := writerWithFakeOut(t) + parser := ParserOperator{ + TransformerOperator: TransformerOperator{ + WriterOperator: *writer, + OnError: DropOnError, }, SeverityParser: &SeverityParser{ ParseFrom: entry.NewBodyField("missing-key"), @@ -173,6 +205,7 @@ func TestParserInvalidSeverityParse(t *testing.T) { err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field") + fakeOut.ExpectNoEntry(t, 100*time.Millisecond) } func TestParserInvalidTimeValidSeverityParse(t *testing.T) { @@ -457,3 +490,18 @@ func TestMapStructureDecodeParserConfig(t *testing.T) { require.NoError(t, err) require.Equal(t, except, actual) } + +func writerWithFakeOut(t *testing.T) (*WriterOperator, *testutil.FakeOutput) { + buildContext := testutil.NewBuildContext(t) + fakeOut := testutil.NewFakeOutput(t) + writer := &WriterOperator{ + BasicOperator: BasicOperator{ + OperatorID: "test-id", + OperatorType: "test-type", + SugaredLogger: buildContext.Logger.SugaredLogger, + }, + OutputIDs: []string{fakeOut.ID()}, + } + writer.SetOutputs([]operator.Operator{fakeOut}) + return writer, fakeOut +} diff --git a/operator/helper/transformer.go b/operator/helper/transformer.go index 61051df0..d492039c 100644 --- a/operator/helper/transformer.go +++ b/operator/helper/transformer.go @@ -111,7 +111,6 @@ func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry t.Errorw("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError), zap.Any("entry", entry)) if t.OnError == SendOnError { t.Write(ctx, entry) - return nil } return err } diff --git a/operator/helper/transformer_test.go b/operator/helper/transformer_test.go index be01ff4d..ea863d39 100644 --- a/operator/helper/transformer_test.go +++ b/operator/helper/transformer_test.go @@ -122,7 +122,7 @@ func TestTransformerSendOnError(t *testing.T) { } err := transformer.ProcessWith(ctx, testEntry, transform) - require.NoError(t, err) + require.Error(t, err) output.AssertCalled(t, "Process", mock.Anything, mock.Anything) } @@ -156,46 +156,48 @@ func TestTransformerProcessWithValid(t *testing.T) { func TestTransformerIf(t *testing.T) { cases := []struct { - name string - ifExpr string - inputBody string - expected string + name string + ifExpr string + inputBody string + expected string + errExpected bool }{ { - "NoIf", - "", - "test", - "parsed", + name: "NoIf", + ifExpr: "", + inputBody: "test", + expected: "parsed", }, { - "TrueIf", - "true", - "test", - "parsed", + name: "TrueIf", + ifExpr: "true", + inputBody: "test", + expected: "parsed", }, { - "FalseIf", - "false", - "test", - "test", + name: "FalseIf", + ifExpr: "false", + inputBody: "test", + expected: "test", }, { - "EvaluatedTrue", - "$body == 'test'", - "test", - "parsed", + name: "EvaluatedTrue", + ifExpr: "$body == 'test'", + inputBody: "test", + expected: "parsed", }, { - "EvaluatedFalse", - "$body == 'notest'", - "test", - "test", + name: "EvaluatedFalse", + ifExpr: "$body == 'notest'", + inputBody: "test", + expected: "test", }, { - "FailingExpressionEvaluation", - "$body.test.noexist == 'notest'", - "test", - "test", + name: "FailingExpressionEvaluation", + ifExpr: "$body.test.noexist == 'notest'", + inputBody: "test", + expected: "test", + errExpected: true, }, } @@ -216,7 +218,11 @@ func TestTransformerIf(t *testing.T) { e.Body = "parsed" return nil }) - require.NoError(t, err) + if tc.errExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + } fake.ExpectBody(t, tc.expected) }) diff --git a/testutil/mocks.go b/testutil/mocks.go index 8698167b..6c31e390 100644 --- a/testutil/mocks.go +++ b/testutil/mocks.go @@ -110,3 +110,13 @@ func (f *FakeOutput) ExpectEntry(t testing.TB, expected *entry.Entry) { require.FailNow(t, "Timed out waiting for entry") } } + +// ExpectNoEntry expects that no entry will be received within the specified time +func (f *FakeOutput) ExpectNoEntry(t testing.TB, timeout time.Duration) { + select { + case <-f.Received: + require.FailNow(t, "Should not have received entry") + case <-time.After(timeout): + return + } +}