Skip to content

Commit

Permalink
fix syslog parser data race problem (#32)
Browse files Browse the repository at this point in the history
* fix syslog parser data race
  • Loading branch information
wph95 authored Mar 1, 2021
1 parent a2b90b7 commit 620dfee
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ local/*
artifacts/*
.vscode/*
gen/
.idea/*
3 changes: 1 addition & 2 deletions operator/builtin/input/syslog/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,15 @@ func SyslogInputTest(t *testing.T, cfg *SyslogInputConfig, tc syslog.Case) {
conn.Close()
require.NoError(t, err)

defer p.Stop()
select {
case e := <-fake.Received:
// close pipeline to avoid data race
p.Stop()
require.Equal(t, tc.ExpectedRecord, e.Record)
require.Equal(t, tc.ExpectedTimestamp, e.Timestamp)
require.Equal(t, tc.ExpectedSeverity, e.Severity)
require.Equal(t, tc.ExpectedSeverityText, e.SeverityText)
case <-time.After(time.Second):
p.Stop()
require.FailNow(t, "Timed out waiting for entry to be processed")
}
}
Expand Down
5 changes: 1 addition & 4 deletions operator/builtin/parser/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,7 @@ type SyslogParser struct {

// Process will parse an entry field as syslog.
func (s *SyslogParser) Process(ctx context.Context, entry *entry.Entry) error {
if err := s.ParserOperator.ProcessWith(ctx, entry, s.parse); err != nil {
return err
}
return promoteSeverity(entry)
return s.ParserOperator.ProcessWithCallback(ctx, entry, s.parse, promoteSeverity)
}

// parse will parse a value as syslog.
Expand Down
11 changes: 11 additions & 0 deletions operator/helper/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type ParserOperator struct {

// ProcessWith will run ParseWith on the entry, then forward the entry on to the next operators.
func (p *ParserOperator) ProcessWith(ctx context.Context, entry *entry.Entry, parse ParseFunction) error {
return p.ProcessWithCallback(ctx, entry, parse, nil)
}

func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.Entry, parse ParseFunction, cb func(*entry.Entry) error) error {
// Short circuit if the "if" condition does not match
skip, err := p.Skip(ctx, entry)
if err != nil {
Expand All @@ -100,6 +104,13 @@ func (p *ParserOperator) ProcessWith(ctx context.Context, entry *entry.Entry, pa
if err := p.ParseWith(ctx, entry, parse); err != nil {
return err
}
if cb != nil {
err = cb(entry)
if err != nil {
return err
}
}

p.Write(ctx, entry)
return nil
}
Expand Down

0 comments on commit 620dfee

Please sign in to comment.