Skip to content

Commit

Permalink
Add SafeProcessor wrapper (#34647)
Browse files Browse the repository at this point in the history
Each processor might end up in multiple processor groups.
Every group has its own `Close` that calls `Close` on each
processor of that group which leads to multiple `Close` calls
on the same processor.

If the `SafeProcessor` wrapper was not used,
the processor would have to handle multiple `Close` calls with
adding `sync.Once` in its `Close` function.

We make it easer for processor developers and take care of it
in the processor registry instead.
  • Loading branch information
rdner authored Feb 24, 2023
1 parent 994b94d commit 5007b58
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix namespacing on self-monitoring {pull}32336[32336]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Fix panics when a processor is closed twice {pull}34647[34647]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var registry = NewNamespace()
func RegisterPlugin(name string, constructor Constructor) {
logp.L().Named(logName).Debugf("Register plugin %s", name)

err := registry.Register(name, constructor)
err := registry.Register(name, SafeWrap(constructor))
if err != nil {
panic(err)
}
Expand Down
72 changes: 72 additions & 0 deletions libbeat/processors/safe_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package processors

import (
"errors"
"sync/atomic"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/config"
)

var ErrClosed = errors.New("attempt to use a closed processor")

type SafeProcessor struct {
Processor
closed uint32
}

// Run allows to run processor only when `Close` was not called prior
func (p *SafeProcessor) Run(event *beat.Event) (*beat.Event, error) {
if atomic.LoadUint32(&p.closed) == 1 {
return nil, ErrClosed
}
return p.Processor.Run(event)
}

// Close makes sure the underlying `Close` function is called only once.
func (p *SafeProcessor) Close() (err error) {
if atomic.CompareAndSwapUint32(&p.closed, 0, 1) {
return Close(p.Processor)
}
return nil
}

// SafeWrap makes sure that the processor handles all the required edge-cases.
//
// Each processor might end up in multiple processor groups.
// Every group has its own `Close` that calls `Close` on each
// processor of that group which leads to multiple `Close` calls
// on the same processor.
//
// If `SafeWrap` is not used, the processor must handle multiple
// `Close` calls by using `sync.Once` in its `Close` function.
// We make it easer for processor developers and take care of it
// in the processor registry instead.
func SafeWrap(constructor Constructor) Constructor {
return func(config *config.C) (Processor, error) {
processor, err := constructor(config)
if err != nil {
return nil, err
}
return &SafeProcessor{
Processor: processor,
}, nil
}
}
97 changes: 97 additions & 0 deletions libbeat/processors/safe_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package processors

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/config"
)

var mockEvent = &beat.Event{}

type mockProcessor struct {
runCount int
closeCount int
}

func newMockConstructor() (Constructor, *mockProcessor) {
p := mockProcessor{}
constructor := func(config *config.C) (Processor, error) {
return &p, nil
}
return constructor, &p
}

func (p *mockProcessor) Run(event *beat.Event) (*beat.Event, error) {
p.runCount++
return mockEvent, nil
}

func (p *mockProcessor) Close() error {
p.closeCount++
return nil
}
func (p *mockProcessor) String() string {
return "mock-processor"
}

func TestSafeProcessor(t *testing.T) {
cons, p := newMockConstructor()
var (
sp Processor
err error
)
t.Run("creates a wrapped processor", func(t *testing.T) {
sw := SafeWrap(cons)
sp, err = sw(nil)
require.NoError(t, err)
})

t.Run("propagates Run to a processor", func(t *testing.T) {
e, err := sp.Run(nil)
require.NoError(t, err)
require.Equal(t, e, mockEvent)

e, err = sp.Run(nil)
require.NoError(t, err)
require.Equal(t, e, mockEvent)

require.Equal(t, 2, p.runCount)
})

t.Run("propagates Close to a processor only once", func(t *testing.T) {
err := Close(sp)
require.NoError(t, err)

err = Close(sp)
require.NoError(t, err)

require.Equal(t, 1, p.closeCount)
})

t.Run("does not propagate Run when closed", func(t *testing.T) {
e, err := sp.Run(nil)
require.Nil(t, e)
require.ErrorIs(t, err, ErrClosed)
require.Equal(t, 2, p.runCount) // still 2 from the previous test case
})
}

0 comments on commit 5007b58

Please sign in to comment.