Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix problem where parser errors would result in duplicate calls to the next operator #330

Merged
merged 3 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed
- Fixed panic during shutdown when Google Cloud Output credential file not found [Issue 264](https://github.com/observIQ/stanza/issues/264)
- Fixed bug where logs can be duplicated when a parser has on_error=send [PR 330](https://github.com/observIQ/stanza/pull/330)

## [1.0.0] - 2021-05-27

Expand Down
2 changes: 1 addition & 1 deletion operator/builtin/parser/csv/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestParserCSVInvalidJSONInput(t *testing.T) {
entry := entry.New()
entry.Record = "{\"name\": \"stanza\"}"
err = op.Process(context.Background(), entry)
require.Nil(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field")
require.Error(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field")
fake.ExpectRecord(t, "{\"name\": \"stanza\"}")
})
}
Expand Down
108 changes: 78 additions & 30 deletions operator/helper/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,12 @@ func TestParserMissingField(t *testing.T) {
require.Contains(t, err.Error(), "Entry is missing the expected parse_from field.")
}

func TestParserInvalidParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: DropOnError,
},
ParseFrom: entry.NewRecordField(),
}
Expand All @@ -96,20 +90,36 @@ func TestParserInvalidParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "parse failure")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidParseSend(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: SendOnError,
},
ParseFrom: entry.NewRecordField(),
}
parse := func(i interface{}) (interface{}, error) {
return i, fmt.Errorf("parse failure")
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "parse failure")
fakeOut.ExpectEntry(t, testEntry)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: *writer,
OnError: DropOnError,
},
ParseFrom: entry.NewRecordField(),
ParseTo: entry.NewRecordField(),
Expand All @@ -128,20 +138,42 @@ func TestParserInvalidTimeParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidSeverityParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidTimeParseSend(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: SendOnError,
},
ParseFrom: entry.NewRecordField(),
ParseTo: entry.NewRecordField(),
TimeParser: &TimeParser{
ParseFrom: func() *entry.Field {
f := entry.NewRecordField("missing-key")
return &f
}(),
},
}
parse := func(i interface{}) (interface{}, error) {
return i, nil
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field")
fakeOut.ExpectEntry(t, testEntry)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}
func TestParserInvalidSeverityParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: *writer,
OnError: DropOnError,
},
SeverityParser: &SeverityParser{
ParseFrom: entry.NewRecordField("missing-key"),
Expand All @@ -157,6 +189,7 @@ func TestParserInvalidSeverityParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeValidSeverityParse(t *testing.T) {
Expand Down Expand Up @@ -376,3 +409,18 @@ func TestParserPreserve(t *testing.T) {
})
}
}

func writerWithFakeOut(t *testing.T) (*WriterOperator, *testutil.FakeOutput) {
buildContext := testutil.NewBuildContext(t)
fakeOut := testutil.NewFakeOutput(t)
writer := &WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
OutputIDs: []string{fakeOut.ID()},
}
writer.SetOutputs([]operator.Operator{fakeOut})
return writer, fakeOut
}
1 change: 0 additions & 1 deletion operator/helper/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry
t.Errorw("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError), zap.Any("entry", entry))
if t.OnError == SendOnError {
t.Write(ctx, entry)
return nil
}
return err
}
Expand Down
58 changes: 32 additions & 26 deletions operator/helper/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestTransformerSendOnError(t *testing.T) {
}

err := transformer.ProcessWith(ctx, testEntry, transform)
require.NoError(t, err)
require.Error(t, err)
output.AssertCalled(t, "Process", mock.Anything, mock.Anything)
}

Expand Down Expand Up @@ -145,42 +145,44 @@ func TestTransformerIf(t *testing.T) {
ifExpr string
inputRecord string
expected string
errExpected bool
}{
{
"NoIf",
"",
"test",
"parsed",
name: "NoIf",
ifExpr: "",
inputRecord: "test",
expected: "parsed",
},
{
"TrueIf",
"true",
"test",
"parsed",
name: "TrueIf",
ifExpr: "true",
inputRecord: "test",
expected: "parsed",
},
{
"FalseIf",
"false",
"test",
"test",
name: "FalseIf",
ifExpr: "false",
inputRecord: "test",
expected: "test",
},
{
"EvaluatedTrue",
"$record == 'test'",
"test",
"parsed",
name: "EvaluatedTrue",
ifExpr: "$record == 'test'",
inputRecord: "test",
expected: "parsed",
},
{
"EvaluatedFalse",
"$record == 'notest'",
"test",
"test",
name: "EvaluatedFalse",
ifExpr: "$record == 'notest'",
inputRecord: "test",
expected: "test",
},
{
"FailingExpressionEvaluation",
"$record.test.noexist == 'notest'",
"test",
"test",
name: "FailingExpressionEvaluation",
ifExpr: "$record.test.noexist == 'notest'",
inputRecord: "test",
expected: "test",
errExpected: true,
},
}

Expand All @@ -201,7 +203,11 @@ func TestTransformerIf(t *testing.T) {
e.Record = "parsed"
return nil
})
require.NoError(t, err)
if tc.errExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
}

fake.ExpectRecord(t, tc.expected)
})
Expand Down
10 changes: 10 additions & 0 deletions testutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,13 @@ func (f *FakeOutput) ExpectEntry(t testing.TB, expected *entry.Entry) {
require.FailNow(t, "Timed out waiting for entry")
}
}

// ExpectNoEntry expects that no entry will be received within the specified time
func (f *FakeOutput) ExpectNoEntry(t testing.TB, timeout time.Duration) {
select {
case <-f.Received:
require.FailNow(t, "Should not have received entry")
case <-time.After(timeout):
return
}
}