Skip to content

Commit

Permalink
Fix issue where log entry could be duplicated on parse error (open-te…
Browse files Browse the repository at this point in the history
…lemetry#188)

In some circumstances, a log entry could be unsuccessfully parsed.
It is intended that a failure can result in an entry being "sent"
to the next operator, but when this happens, the parser should
cease execution immediately. Prior to this change, this was not
always the case, as some parsers would continue on and "send" the
entry again.
  • Loading branch information
djaglowski authored Jun 16, 2021
1 parent cdbb6d6 commit 841966e
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 63 deletions.
4 changes: 2 additions & 2 deletions operator/builtin/parser/csv/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,13 @@ func TestParserCSVMultipleBodys(t *testing.T) {
entry := entry.New()
entry.Body = "stanza,INFO,started agent\nstanza,DEBUG,started agent"
err = op.Process(context.Background(), entry)
require.Nil(t, err, "Expected to parse a single csv record, got '2'")
require.NoError(t, err)
fake.ExpectBody(t, map[string]interface{}{
"name": "stanza",
"sev": "INFO",
"msg": "started agent",
})
fake.ExpectNoEntry(t, 100*time.Millisecond)
})
}

Expand All @@ -260,7 +260,7 @@ func TestParserCSVInvalidJSONInput(t *testing.T) {
entry := entry.New()
entry.Body = "{\"name\": \"stanza\"}"
err = op.Process(context.Background(), entry)
require.Nil(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field")
require.Error(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field")
fake.ExpectBody(t, "{\"name\": \"stanza\"}")
})
}
Expand Down
108 changes: 78 additions & 30 deletions operator/helper/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,12 @@ func TestParserMissingField(t *testing.T) {
require.Contains(t, err.Error(), "Entry is missing the expected parse_from field.")
}

func TestParserInvalidParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: DropOnError,
},
ParseFrom: entry.NewBodyField(),
}
Expand All @@ -112,20 +106,36 @@ func TestParserInvalidParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "parse failure")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidParseSend(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: SendOnError,
},
ParseFrom: entry.NewBodyField(),
}
parse := func(i interface{}) (interface{}, error) {
return i, fmt.Errorf("parse failure")
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "parse failure")
fakeOut.ExpectEntry(t, testEntry)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: *writer,
OnError: DropOnError,
},
ParseFrom: entry.NewBodyField(),
ParseTo: entry.NewBodyField(),
Expand All @@ -144,20 +154,42 @@ func TestParserInvalidTimeParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidSeverityParse(t *testing.T) {
buildContext := testutil.NewBuildContext(t)
func TestParserInvalidTimeParseSend(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
},
OnError: DropOnError,
WriterOperator: *writer,
OnError: SendOnError,
},
ParseFrom: entry.NewBodyField(),
ParseTo: entry.NewBodyField(),
TimeParser: &TimeParser{
ParseFrom: func() *entry.Field {
f := entry.NewBodyField("missing-key")
return &f
}(),
},
}
parse := func(i interface{}) (interface{}, error) {
return i, nil
}
ctx := context.Background()
testEntry := entry.New()
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "time parser: log entry does not have the expected parse_from field")
fakeOut.ExpectEntry(t, testEntry)
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}
func TestParserInvalidSeverityParseDrop(t *testing.T) {
writer, fakeOut := writerWithFakeOut(t)
parser := ParserOperator{
TransformerOperator: TransformerOperator{
WriterOperator: *writer,
OnError: DropOnError,
},
SeverityParser: &SeverityParser{
ParseFrom: entry.NewBodyField("missing-key"),
Expand All @@ -173,6 +205,7 @@ func TestParserInvalidSeverityParse(t *testing.T) {
err := parser.ProcessWith(ctx, testEntry, parse)
require.Error(t, err)
require.Contains(t, err.Error(), "severity parser: log entry does not have the expected parse_from field")
fakeOut.ExpectNoEntry(t, 100*time.Millisecond)
}

func TestParserInvalidTimeValidSeverityParse(t *testing.T) {
Expand Down Expand Up @@ -457,3 +490,18 @@ func TestMapStructureDecodeParserConfig(t *testing.T) {
require.NoError(t, err)
require.Equal(t, except, actual)
}

func writerWithFakeOut(t *testing.T) (*WriterOperator, *testutil.FakeOutput) {
buildContext := testutil.NewBuildContext(t)
fakeOut := testutil.NewFakeOutput(t)
writer := &WriterOperator{
BasicOperator: BasicOperator{
OperatorID: "test-id",
OperatorType: "test-type",
SugaredLogger: buildContext.Logger.SugaredLogger,
},
OutputIDs: []string{fakeOut.ID()},
}
writer.SetOutputs([]operator.Operator{fakeOut})
return writer, fakeOut
}
1 change: 0 additions & 1 deletion operator/helper/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry
t.Errorw("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError), zap.Any("entry", entry))
if t.OnError == SendOnError {
t.Write(ctx, entry)
return nil
}
return err
}
Expand Down
66 changes: 36 additions & 30 deletions operator/helper/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestTransformerSendOnError(t *testing.T) {
}

err := transformer.ProcessWith(ctx, testEntry, transform)
require.NoError(t, err)
require.Error(t, err)
output.AssertCalled(t, "Process", mock.Anything, mock.Anything)
}

Expand Down Expand Up @@ -156,46 +156,48 @@ func TestTransformerProcessWithValid(t *testing.T) {

func TestTransformerIf(t *testing.T) {
cases := []struct {
name string
ifExpr string
inputBody string
expected string
name string
ifExpr string
inputBody string
expected string
errExpected bool
}{
{
"NoIf",
"",
"test",
"parsed",
name: "NoIf",
ifExpr: "",
inputBody: "test",
expected: "parsed",
},
{
"TrueIf",
"true",
"test",
"parsed",
name: "TrueIf",
ifExpr: "true",
inputBody: "test",
expected: "parsed",
},
{
"FalseIf",
"false",
"test",
"test",
name: "FalseIf",
ifExpr: "false",
inputBody: "test",
expected: "test",
},
{
"EvaluatedTrue",
"$body == 'test'",
"test",
"parsed",
name: "EvaluatedTrue",
ifExpr: "$body == 'test'",
inputBody: "test",
expected: "parsed",
},
{
"EvaluatedFalse",
"$body == 'notest'",
"test",
"test",
name: "EvaluatedFalse",
ifExpr: "$body == 'notest'",
inputBody: "test",
expected: "test",
},
{
"FailingExpressionEvaluation",
"$body.test.noexist == 'notest'",
"test",
"test",
name: "FailingExpressionEvaluation",
ifExpr: "$body.test.noexist == 'notest'",
inputBody: "test",
expected: "test",
errExpected: true,
},
}

Expand All @@ -216,7 +218,11 @@ func TestTransformerIf(t *testing.T) {
e.Body = "parsed"
return nil
})
require.NoError(t, err)
if tc.errExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
}

fake.ExpectBody(t, tc.expected)
})
Expand Down
10 changes: 10 additions & 0 deletions testutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,13 @@ func (f *FakeOutput) ExpectEntry(t testing.TB, expected *entry.Entry) {
require.FailNow(t, "Timed out waiting for entry")
}
}

// ExpectNoEntry expects that no entry will be received within the specified time
func (f *FakeOutput) ExpectNoEntry(t testing.TB, timeout time.Duration) {
select {
case <-f.Received:
require.FailNow(t, "Should not have received entry")
case <-time.After(timeout):
return
}
}

0 comments on commit 841966e

Please sign in to comment.