Skip to content

Commit

Permalink
Fix data race in convert processor (elastic#17032) (elastic#17045)
Browse files Browse the repository at this point in the history
If the convert processor was used in the global context it could lead to data races because there was a
variable that was reused across executions. When processors are used in the global context they are
shared across individual publisher clients so you could end up with a data race.

The fix here was to replace the shared state with a local variable. In testing this didn't make much of
a difference in the number of allocations.

(cherry picked from commit b6167ae)
  • Loading branch information
andrewkroh authored Mar 19, 2020
1 parent 21a282b commit 5ecd98e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `NewContainerMetadataEnricher` to use default config for kubernetes module. {pull}16857[16857]
- Improve some logging messages for add_kubernetes_metadata processor {pull}16866{16866}
- Fix k8s metadata issue regarding node labels not shown up on root level of metadata. {pull}16834[16834]
- Fix concurrency issues in convert processor when used in the global context. {pull}17032[17032]

*Auditbeat*

Expand Down
28 changes: 10 additions & 18 deletions libbeat/processors/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (

const logName = "processor.convert"

var ignoredFailure = struct{}{}

func init() {
processors.RegisterPlugin("convert", New)
jsprocessor.RegisterPlugin("Convert", New)
Expand All @@ -43,8 +45,6 @@ func init() {
type processor struct {
config
log *logp.Logger

converted []interface{} // Temporary storage for converted values.
}

// New constructs a new convert processor.
Expand All @@ -63,27 +63,19 @@ func newConvert(c config) (*processor, error) {
log = log.With("instance_id", c.Tag)
}

return &processor{config: c, log: log, converted: make([]interface{}, len(c.Fields))}, nil
return &processor{config: c, log: log}, nil
}

func (p *processor) String() string {
json, _ := json.Marshal(p.config)
return "convert=" + string(json)
}

var ignoredFailure = struct{}{}

func resetValues(s []interface{}) {
for i := range s {
s[i] = nil
}
}

func (p *processor) Run(event *beat.Event) (*beat.Event, error) {
defer resetValues(p.converted)
converted := make([]interface{}, len(p.Fields))

// Convert the fields and write the results to temporary storage.
if err := p.convertFields(event); err != nil {
if err := p.convertFields(event, converted); err != nil {
return event, err
}

Expand All @@ -99,14 +91,14 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) {
}

// Update the event with the converted values.
if err := p.writeToEvent(event); err != nil {
if err := p.writeToEvent(event, converted); err != nil {
return &saved, err
}

return event, nil
}

func (p *processor) convertFields(event *beat.Event) error {
func (p *processor) convertFields(event *beat.Event, converted []interface{}) error {
// Write conversion results to temporary storage.
for i, conv := range p.Fields {
v, err := p.convertField(event, conv)
Expand All @@ -116,7 +108,7 @@ func (p *processor) convertFields(event *beat.Event) error {
}
v = ignoredFailure
}
p.converted[i] = v
converted[i] = v
}

return nil
Expand All @@ -142,9 +134,9 @@ func (p *processor) convertField(event *beat.Event, conversion field) (interface
return v, nil
}

func (p *processor) writeToEvent(event *beat.Event) error {
func (p *processor) writeToEvent(event *beat.Event, converted []interface{}) error {
for i, conversion := range p.Fields {
v := p.converted[i]
v := converted[i]
if v == ignoredFailure {
continue
}
Expand Down
43 changes: 43 additions & 0 deletions libbeat/processors/convert/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,46 @@ func TestDataTypes(t *testing.T) {
})
}
}

func BenchmarkTestConvertRun(b *testing.B) {
c := defaultConfig()
c.IgnoreMissing = true
c.Fields = append(c.Fields,
field{From: "source.address", To: "source.ip", Type: IP},
field{From: "destination.address", To: "destination.ip", Type: IP},
field{From: "a", To: "b"},
field{From: "c", To: "d"},
field{From: "e", To: "f"},
field{From: "g", To: "h"},
field{From: "i", To: "j"},
field{From: "k", To: "l"},
field{From: "m", To: "n"},
field{From: "o", To: "p"},
)

p, err := newConvert(c)
if err != nil {
b.Fatal(err)
}

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
event := &beat.Event{
Fields: common.MapStr{
"source": common.MapStr{
"address": "192.51.100.1",
},
"destination": common.MapStr{
"address": "192.0.2.51",
},
},
}

_, err := p.Run(event)
if err != nil {
b.Fatal(err)
}
}
})
}

0 comments on commit 5ecd98e

Please sign in to comment.