Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event pool): simplify realization of event pool #616

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 51 additions & 88 deletions pipeline/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package pipeline

import (
"fmt"
"runtime"
"sync"
"time"

"github.com/ozontech/file.d/logger"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/atomic"
)

type Event struct {
Expand Down Expand Up @@ -214,114 +211,80 @@ func (e *Event) String() string {

// channels are slower than this implementation by ~20%
type eventPool struct {
capacity int

avgEventSize int
inUseEvents atomic.Int64
getCounter atomic.Int64
backCounter atomic.Int64
events []*Event
free1 []atomic.Bool
free2 []atomic.Bool

getMu *sync.Mutex
getCond *sync.Cond
capacity int
ptr2idx map[*Event]int
obj []*Event
freeptr int
cond sync.Cond
}

func newEventPool(capacity, avgEventSize int) *eventPool {
eventPool := &eventPool{
pool := &eventPool{
avgEventSize: avgEventSize,
capacity: capacity,
getMu: &sync.Mutex{},
backCounter: *atomic.NewInt64(int64(capacity)),
obj: make([]*Event, capacity),
freeptr: capacity - 1,
cond: sync.Cond{L: &sync.Mutex{}},
ptr2idx: make(map[*Event]int, capacity),
}

eventPool.getCond = sync.NewCond(eventPool.getMu)

for i := 0; i < capacity; i++ {
eventPool.free1 = append(eventPool.free1, *atomic.NewBool(true))
eventPool.free2 = append(eventPool.free2, *atomic.NewBool(true))
eventPool.events = append(eventPool.events, newEvent())
event := newEvent()
pool.obj[i] = event
pool.ptr2idx[event] = i
}

return eventPool
return pool
}

const maxTries = 3

func (p *eventPool) get() *Event {
x := (p.getCounter.Inc() - 1) % int64(p.capacity)
var tries int
for {
if x < p.backCounter.Load() {
// fast path
if p.free1[x].CAS(true, false) {
break
}
if p.free1[x].CAS(true, false) {
break
}
if p.free1[x].CAS(true, false) {
break
}
}
tries++
if tries%maxTries != 0 {
// slow path
runtime.Gosched()
} else {
// slowest path
p.getMu.Lock()
p.getCond.Wait()
p.getMu.Unlock()
tries = 0
}
p.cond.L.Lock()
for p.freeptr < 0 {
p.cond.Wait()
}
event := p.events[x]
p.events[x] = nil
p.free2[x].Store(false)
p.inUseEvents.Inc()
event := p.obj[p.freeptr]
p.freeptr--
p.cond.L.Unlock()

event.stage = eventStageInput

return event
}

func (p *eventPool) back(event *Event) {
event.stage = eventStagePool
x := (p.backCounter.Inc() - 1) % int64(p.capacity)
var tries int
for {
// fast path
if p.free2[x].CAS(false, true) {
break
}
if p.free2[x].CAS(false, true) {
break
}
if p.free2[x].CAS(false, true) {
break
}
tries++
if tries%maxTries != 0 {
// slow path
runtime.Gosched()
} else {
// slowest path, sleep instead of cond.Wait because of potential deadlock.
time.Sleep(5 * time.Millisecond)
tries = 0
}
}
event.reset(p.avgEventSize)
p.events[x] = event
p.free1[x].Store(true)
p.inUseEvents.Dec()
p.getCond.Broadcast()
func (p *eventPool) back(freeEvent *Event) {
freeEvent.stage = eventStagePool
freeEvent.reset(p.avgEventSize)

p.cond.L.Lock()
p.freeptr++

oldEvent := p.obj[p.freeptr]
freeEventIdx := p.ptr2idx[freeEvent]

// put free event
p.obj[p.freeptr] = freeEvent
p.ptr2idx[freeEvent] = p.freeptr

// put old event
p.obj[freeEventIdx] = oldEvent
p.ptr2idx[oldEvent] = freeEventIdx

p.cond.L.Unlock()
p.cond.Signal()
}

func (p *eventPool) size() int {
p.cond.L.Lock()
s := p.freeptr + 1
p.cond.L.Unlock()
return s
}

func (p *eventPool) dump() string {
out := logger.Cond(len(p.events) == 0, logger.Header("no events"), func() string {
out := logger.Cond(len(p.obj) == 0, logger.Header("no events"), func() string {
o := logger.Header("events")
for i := 0; i < p.capacity; i++ {
event := p.events[i]
for _, event := range p.obj {
eventStr := event.String()
if eventStr == "" {
eventStr = "nil"
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ func (p *Pipeline) logChanges(myDeltas *deltas) {
if ce := p.logger.Check(zapcore.InfoLevel, "pipeline stats"); ce != nil {
inputSize := p.inputSize.Load()
inputEvents := p.inputEvents.Load()
inUseEvents := p.eventPool.inUseEvents.Load()
inUseEvents := p.eventPool.size()

interval := p.settings.MaintenanceInterval
rate := int(myDeltas.deltaInputEvents * float64(time.Second) / float64(interval))
Expand Down Expand Up @@ -766,7 +766,7 @@ func (p *Pipeline) maintenance() {
p.metricHolder.Maintenance()

myDeltas := p.incMetrics(inputEvents, inputSize, outputEvents, outputSize, readOps)
p.setMetrics(p.eventPool.inUseEvents.Load())
p.setMetrics(int64(p.eventPool.size()))
p.logChanges(myDeltas)
}
}
Expand Down
39 changes: 4 additions & 35 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin/input/fake"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -26,7 +27,6 @@ func getFakeInputInfo() *pipeline.InputPluginInfo {
}

func TestInUnparsableMessages(t *testing.T) {
name := "invalid_json"
message := []byte("{wHo Is Json: YoU MeAn SoN oF JoHn???")
pipelineSettings := &pipeline.Settings{
Capacity: 5,
Expand All @@ -36,47 +36,16 @@ func TestInUnparsableMessages(t *testing.T) {
offset := int64(666)
sourceID := pipeline.SourceID(3<<16 + int(10))

t.Run(name, func(t *testing.T) {
t.Run("invalid_json", func(t *testing.T) {
pipe := pipeline.New("test_pipeline", pipelineSettings, prometheus.NewRegistry())

pipe.SetInput(getFakeInputInfo())

seqID := pipe.In(sourceID, "kafka", offset, message, false)
require.Equal(t, pipeline.EventSeqIDError, seqID)

refPipe := reflect.ValueOf(pipe)
eventPool := reflect.Indirect(refPipe).FieldByName("eventPool")

free1slice := reflect.
Indirect(
reflect.
Indirect(eventPool).
FieldByName("free1")).
Slice(0, pipelineSettings.Capacity)
free2slice := reflect.
Indirect(
reflect.
Indirect(eventPool).
FieldByName("free2")).
Slice(0, pipelineSettings.Capacity)

for i := 0; i < pipelineSettings.Capacity; i++ {
// free1, free2 are []atomic.Bool which underlying v is really uint32
// so if v val == uint32(1) event was released.
free1idxUint := reflect.
Indirect(free1slice.Index(i)).
FieldByName("v").
FieldByName("v").
Uint()
require.EqualValues(t, uint32(1), free1idxUint)

free2idxUint := reflect.
Indirect(free2slice.Index(i)).
FieldByName("v").
FieldByName("v").
Uint()
require.EqualValues(t, uint32(1), free2idxUint)
}
eventPool := reflect.ValueOf(pipe).Elem().FieldByName("eventPool").Elem().FieldByName("freeptr")
assert.Equal(t, int64(pipelineSettings.Capacity), eventPool.Int()+1)
})
}

Expand Down