Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
Fix issue where log entry could be duplicated on parse error
Browse files Browse the repository at this point in the history
In some circumstances, a log entry could be unsuccessfully parsed.
It is intended that a failure can result in an entry being "sent"
to the next operator, but when this happens, the parser should
cease execution immediately. Prior to this change, this was not
always the case, as some parsers would continue on and "send" the
entry again.
  • Loading branch information
djaglowski committed Jun 15, 2021
1 parent 44b6bf5 commit e283005
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 62 deletions.
2 changes: 1 addition & 1 deletion operator/builtin/parser/csv/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,13 @@ func TestParserCSVMultipleBodys(t *testing.T) {
entry := entry.New()
entry.Body = "stanza,INFO,started agent\nstanza,DEBUG,started agent"
err = op.Process(context.Background(), entry)
require.Nil(t, err, "Expected to parse a single csv record, got '2'")
require.NoError(t, err)
fake.ExpectBody(t, map[string]interface{}{
"name": "stanza",
"sev": "INFO",
"msg": "started agent",
})
fake.ExpectNoEntry(t, 100*time.Millisecond)
})
}

Expand Down
108 changes: 78 additions & 30 deletions operator/helper/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,12 @@ func TestParserMissingField(t *testing.T) {
require.Contains(t, err.Error(), "Entry is missing the expected parse_from field.")
}

func TestParserInvalidParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: DropOnError,
},
ParseFrom: entry.NewBodyField(),
}
Expand All @@ -112,20 +106,36 @@ func TestParserInvalidParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "parse failure")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidParseSend(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: SendOnError,
},
ParseFrom: entry.NewBodyField(),
}
parse := func(i interface{}) (interface{}, error) {
return i, fmt.Errorf("parse failure")
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "parse failure")
fakeOut.ExpectEntry(t, testEntry)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: *writer,
OnError: DropOnError,
},
ParseFrom: entry.NewBodyField(),
ParseTo: entry.NewBodyField(),
Expand All @@ -144,20 +154,42 @@ func TestParserInvalidTimeParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidSeverityParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidTimeParseSend(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: SendOnError,
},
ParseFrom: entry.NewBodyField(),
ParseTo: entry.NewBodyField(),
TimeParser: &TimeParser{
ParseFrom: func() *entry.Field {
f := entry.NewBodyField("missing-key")
return &f
}(),
},
}
parse := func(i interface{}) (interface{}, error) {
return i, nil
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field")
fakeOut.ExpectEntry(t, testEntry)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}
func TestParserInvalidSeverityParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: *writer,
OnError: DropOnError,
},
SeverityParser: &SeverityParser{
ParseFrom: entry.NewBodyField("missing-key"),
Expand All @@ -173,6 +205,7 @@ func TestParserInvalidSeverityParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeValidSeverityParse(t *testing.T) {
Expand Down Expand Up @@ -457,3 +490,18 @@ func TestMapStructureDecodeParserConfig(t *testing.T) {
require.NoError(t, err)
require.Equal(t, except, actual)
}

func writerWithFakeOut(t *testing.T) (*WriterOperator, *testutil.FakeOutput) {
buildContext := testutil.NewBuildContext(t)
fakeOut := testutil.NewFakeOutput(t)
writer := &WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
OutputIDs: []string{fakeOut.ID()},
}
writer.SetOutputs([]operator.Operator{fakeOut})
return writer, fakeOut
}
1 change: 0 additions & 1 deletion operator/helper/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry
t.Errorw("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError), zap.Any("entry", entry))
if t.OnError == SendOnError {
t.Write(ctx, entry)
return nil
}
return err
}
Expand Down
66 changes: 36 additions & 30 deletions operator/helper/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestTransformerSendOnError(t *testing.T) {
}

err := transformer.ProcessWith(ctx, testEntry, transform)
require.NoError(t, err)
require.Error(t, err)
output.AssertCalled(t, "Process", mock.Anything, mock.Anything)
}

Expand Down Expand Up @@ -156,46 +156,48 @@ func TestTransformerProcessWithValid(t *testing.T) {

func TestTransformerIf(t *testing.T) {
cases := []struct {
name string
ifExpr string
inputBody string
expected string
name string
ifExpr string
inputBody string
expected string
errExpected bool
}{
{
"NoIf",
"",
"test",
"parsed",
name: "NoIf",
ifExpr: "",
inputBody: "test",
expected: "parsed",
},
{
"TrueIf",
"true",
"test",
"parsed",
name: "TrueIf",
ifExpr: "true",
inputBody: "test",
expected: "parsed",
},
{
"FalseIf",
"false",
"test",
"test",
name: "FalseIf",
ifExpr: "false",
inputBody: "test",
expected: "test",
},
{
"EvaluatedTrue",
"$body == 'test'",
"test",
"parsed",
name: "EvaluatedTrue",
ifExpr: "$body == 'test'",
inputBody: "test",
expected: "parsed",
},
{
"EvaluatedFalse",
"$body == 'notest'",
"test",
"test",
name: "EvaluatedFalse",
ifExpr: "$body == 'notest'",
inputBody: "test",
expected: "test",
},
{
"FailingExpressionEvaluation",
"$body.test.noexist == 'notest'",
"test",
"test",
name: "FailingExpressionEvaluation",
ifExpr: "$body.test.noexist == 'notest'",
inputBody: "test",
expected: "test",
errExpected: true,
},
}

Expand All @@ -216,7 +218,11 @@ func TestTransformerIf(t *testing.T) {
e.Body = "parsed"
return nil
})
require.NoError(t, err)
if tc.errExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
}

fake.ExpectBody(t, tc.expected)
})
Expand Down
10 changes: 10 additions & 0 deletions testutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,13 @@ func (f *FakeOutput) ExpectEntry(t testing.TB, expected *entry.Entry) {
require.FailNow(t, "Timed out waiting for entry")
}
}

// ExpectNoEntry expects that no entry will be received within the specified time
func (f *FakeOutput) ExpectNoEntry(t testing.TB, timeout time.Duration) {
select {
case <-f.Received:
require.FailNow(t, "Should not have received entry")
case <-time.After(timeout):
return
}
}

0 comments on commit e283005

Please sign in to comment.