Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Fix issue where log entry could be duplicated on parse error #188

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions operator/builtin/parser/csv/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,13 @@ func TestParserCSVMultipleBodys(t *testing.T) {
entry := entry.New()
entry.Body = "stanza,INFO,started agent\nstanza,DEBUG,started agent"
err = op.Process(context.Background(), entry)
require.Nil(t, err, "Expected to parse a single csv record, got '2'")
require.NoError(t, err)
fake.ExpectBody(t, map[string]interface{}{
"name": "stanza",
"sev": "INFO",
"msg": "started agent",
})
fake.ExpectNoEntry(t, 100*time.Millisecond)
})
}

Expand All @@ -260,7 +260,7 @@ func TestParserCSVInvalidJSONInput(t *testing.T) {
entry := entry.New()
entry.Body = "{\"name\": \"stanza\"}"
err = op.Process(context.Background(), entry)
require.Nil(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field")
require.Error(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field")
fake.ExpectBody(t, "{\"name\": \"stanza\"}")
})
}
Expand Down
108 changes: 78 additions & 30 deletions operator/helper/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,12 @@ func TestParserMissingField(t *testing.T) {
require.Contains(t, err.Error(), "Entry is missing the expected parse_from field.")
}

func TestParserInvalidParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: DropOnError,
},
ParseFrom: entry.NewBodyField(),
}
Expand All @@ -112,20 +106,36 @@ func TestParserInvalidParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "parse failure")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidParseSend(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: SendOnError,
},
ParseFrom: entry.NewBodyField(),
}
parse := func(i interface{}) (interface{}, error) {
return i, fmt.Errorf("parse failure")
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "parse failure")
fakeOut.ExpectEntry(t, testEntry)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: *writer,
OnError: DropOnError,
},
ParseFrom: entry.NewBodyField(),
ParseTo: entry.NewBodyField(),
Expand All @@ -144,20 +154,42 @@ func TestParserInvalidTimeParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidSeverityParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidTimeParseSend(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: SendOnError,
},
ParseFrom: entry.NewBodyField(),
ParseTo: entry.NewBodyField(),
TimeParser: &TimeParser{
ParseFrom: func() *entry.Field {
f := entry.NewBodyField("missing-key")
return &f
}(),
},
}
parse := func(i interface{}) (interface{}, error) {
return i, nil
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field")
fakeOut.ExpectEntry(t, testEntry)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}
func TestParserInvalidSeverityParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: *writer,
OnError: DropOnError,
},
SeverityParser: &SeverityParser{
ParseFrom: entry.NewBodyField("missing-key"),
Expand All @@ -173,6 +205,7 @@ func TestParserInvalidSeverityParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeValidSeverityParse(t *testing.T) {
Expand Down Expand Up @@ -457,3 +490,18 @@ func TestMapStructureDecodeParserConfig(t *testing.T) {
require.NoError(t, err)
require.Equal(t, except, actual)
}

func writerWithFakeOut(t *testing.T) (*WriterOperator, *testutil.FakeOutput) {
buildContext := testutil.NewBuildContext(t)
fakeOut := testutil.NewFakeOutput(t)
writer := &WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
OutputIDs: []string{fakeOut.ID()},
}
writer.SetOutputs([]operator.Operator{fakeOut})
return writer, fakeOut
}
1 change: 0 additions & 1 deletion operator/helper/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry
t.Errorw("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError), zap.Any("entry", entry))
if t.OnError == SendOnError {
t.Write(ctx, entry)
return nil
}
return err
}
Expand Down
66 changes: 36 additions & 30 deletions operator/helper/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestTransformerSendOnError(t *testing.T) {
}

err := transformer.ProcessWith(ctx, testEntry, transform)
require.NoError(t, err)
require.Error(t, err)
output.AssertCalled(t, "Process", mock.Anything, mock.Anything)
}

Expand Down Expand Up @@ -156,46 +156,48 @@ func TestTransformerProcessWithValid(t *testing.T) {

func TestTransformerIf(t *testing.T) {
cases := []struct {
name string
ifExpr string
inputBody string
expected string
name string
ifExpr string
inputBody string
expected string
errExpected bool
}{
{
"NoIf",
"",
"test",
"parsed",
name: "NoIf",
ifExpr: "",
inputBody: "test",
expected: "parsed",
},
{
"TrueIf",
"true",
"test",
"parsed",
name: "TrueIf",
ifExpr: "true",
inputBody: "test",
expected: "parsed",
},
{
"FalseIf",
"false",
"test",
"test",
name: "FalseIf",
ifExpr: "false",
inputBody: "test",
expected: "test",
},
{
"EvaluatedTrue",
"$body == 'test'",
"test",
"parsed",
name: "EvaluatedTrue",
ifExpr: "$body == 'test'",
inputBody: "test",
expected: "parsed",
},
{
"EvaluatedFalse",
"$body == 'notest'",
"test",
"test",
name: "EvaluatedFalse",
ifExpr: "$body == 'notest'",
inputBody: "test",
expected: "test",
},
{
"FailingExpressionEvaluation",
"$body.test.noexist == 'notest'",
"test",
"test",
name: "FailingExpressionEvaluation",
ifExpr: "$body.test.noexist == 'notest'",
inputBody: "test",
expected: "test",
errExpected: true,
},
}

Expand All @@ -216,7 +218,11 @@ func TestTransformerIf(t *testing.T) {
e.Body = "parsed"
return nil
})
require.NoError(t, err)
if tc.errExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
}

fake.ExpectBody(t, tc.expected)
})
Expand Down
10 changes: 10 additions & 0 deletions testutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,13 @@ func (f *FakeOutput) ExpectEntry(t testing.TB, expected *entry.Entry) {
require.FailNow(t, "Timed out waiting for entry")
}
}

// ExpectNoEntry expects that no entry will be received within the specified time
func (f *FakeOutput) ExpectNoEntry(t testing.TB, timeout time.Duration) {
select {
case <-f.Received:
require.FailNow(t, "Should not have received entry")
case <-time.After(timeout):
return
}
}