Skip to content

Commit

Permalink
txdag: using a new mem pool;
Browse files Browse the repository at this point in the history
  • Loading branch information
galaio committed Sep 11, 2024
1 parent 48c219c commit 5cbabb9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 102 deletions.
69 changes: 53 additions & 16 deletions core/types/mvstates.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"golang.org/x/exp/slices"
)

Expand All @@ -21,7 +22,8 @@ var (
)

const (
initSyncPoolSize = 4
initSyncPoolSize = 4
asyncSendInterval = 20
)

func init() {
Expand All @@ -37,6 +39,34 @@ func init() {
}
}

type ChanPool struct {
ch chan any
new func() any
}

func NewChanPool(size int, f func() any) *ChanPool {
return &ChanPool{
ch: make(chan any, size),
new: f,
}
}

func (p ChanPool) Get() any {
select {
case item := <-p.ch:
return item
default:
}
return p.new()
}

func (p ChanPool) Put(item any) {
select {
case p.ch <- item:
default:
}
}

// RWSet record all read & write set in txs
// Attention: this is not a concurrent safety structure
type RWSet struct {
Expand Down Expand Up @@ -275,21 +305,30 @@ func (w *StateWrites) Copy() *StateWrites {
}

var (
rwEventCachePool = sync.Pool{New: func() any {
rwEventsAllocMeter = metrics.GetOrRegisterMeter("mvstate/alloc/rwevents/cnt", nil)
rwSetsAllocMeter = metrics.GetOrRegisterMeter("mvstate/alloc/rwsets/cnt", nil)
txDepsAllocMeter = metrics.GetOrRegisterMeter("mvstate/alloc/txdeps/cnt", nil)
)

var (
rwEventCachePool = NewChanPool(initSyncPoolSize*4, func() any {
rwEventsAllocMeter.Mark(1)
buf := make([]RWEventItem, 0)
return &buf
}}
stateWritesPool = sync.Pool{New: func() any {
return NewStateWrites()
}}
})
rwSetsPool = sync.Pool{New: func() any {
rwSetsAllocMeter.Mark(1)
buf := make([]RWSet, 0)
return &buf
}}
txDepsPool = sync.Pool{New: func() any {
txDepsAllocMeter.Mark(1)
buf := make([]TxDep, 0)
return &buf
}}
stateWritesPool = sync.Pool{New: func() any {
return NewStateWrites()
}}
accWriteSetPool = sync.Pool{New: func() any {
return make(map[common.Address]map[AccountState]*StateWrites)
}}
Expand Down Expand Up @@ -347,7 +386,7 @@ func (s *MVStates) EnableAsyncGen() *MVStates {
s.asyncWG.Add(1)
s.asyncRunning = true
s.rwEventCache = *rwEventCachePool.Get().(*[]RWEventItem)
s.rwEventCache = (s.rwEventCache)[:cap(s.rwEventCache)]
s.rwEventCache = s.rwEventCache[:cap(s.rwEventCache)]
s.rwEventCacheIndex = 0
s.asyncRWSet.index = -1
go s.asyncRWEventLoop()
Expand Down Expand Up @@ -472,12 +511,12 @@ func (s *MVStates) RecordNewTx(index int) {
if !s.asyncRunning {
return
}
if index%20 == 0 {
if index%asyncSendInterval == 0 {
s.BatchRecordHandle()
}
if s.rwEventCacheIndex < len(s.rwEventCache) {
(s.rwEventCache)[s.rwEventCacheIndex].Event = NewTxRWEvent
(s.rwEventCache)[s.rwEventCacheIndex].Index = index
s.rwEventCache[s.rwEventCacheIndex].Event = NewTxRWEvent
s.rwEventCache[s.rwEventCacheIndex].Index = index
} else {
s.rwEventCache = append(s.rwEventCache, RWEventItem{
Event: NewTxRWEvent,
Expand All @@ -502,9 +541,9 @@ func (s *MVStates) RecordAccountRead(addr common.Address, state AccountState) {
return
}
if s.rwEventCacheIndex < len(s.rwEventCache) {
(s.rwEventCache)[s.rwEventCacheIndex].Event = ReadAccRWEvent
(s.rwEventCache)[s.rwEventCacheIndex].Addr = addr
(s.rwEventCache)[s.rwEventCacheIndex].State = state
s.rwEventCache[s.rwEventCacheIndex].Event = ReadAccRWEvent
s.rwEventCache[s.rwEventCacheIndex].Addr = addr
s.rwEventCache[s.rwEventCacheIndex].State = state
s.rwEventCacheIndex++
return
}
Expand Down Expand Up @@ -592,9 +631,7 @@ func (s *MVStates) BatchRecordHandle() {
if !s.asyncRunning || s.rwEventCacheIndex == 0 {
return
}
s.rwEventCache = s.rwEventCache[:s.rwEventCacheIndex]
s.rwEventCh <- s.rwEventCache

s.rwEventCh <- s.rwEventCache[:s.rwEventCacheIndex]
s.rwEventCache = *rwEventCachePool.Get().(*[]RWEventItem)
s.rwEventCache = s.rwEventCache[:cap(s.rwEventCache)]
s.rwEventCacheIndex = 0
Expand Down
86 changes: 0 additions & 86 deletions core/types/mvstates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"compress/gzip"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -239,91 +238,6 @@ func BenchmarkMVStates_Finalise(b *testing.B) {
}
}

func checkMap(m map[int][10]byte) {
for i, j := range m {
m[i] = j
}
}

func BenchmarkEmptyMap(b *testing.B) {
for i := 0; i < b.N; i++ {
m := make(map[int][10]byte)
for j := 0; j < 10000; j++ {
m[i] = [10]byte{byte(j)}
}
checkMap(m)
}
}

func BenchmarkInitMapWithSize(b *testing.B) {
for i := 0; i < b.N; i++ {
m := make(map[int][10]byte, 1000)
for j := 0; j < 1000; j++ {
m[i] = [10]byte{byte(j)}
}
}
}

func BenchmarkReuseMap(b *testing.B) {
sp := sync.Pool{New: func() interface{} {
return make(map[int][10]byte, 10)
}}
for i := 0; i < b.N; i++ {
m := sp.Get().(map[int][10]byte)
for j := 0; j < 1000; j++ {
m[i] = [10]byte{byte(j)}
}
for k := range m {
delete(m, k)
}
sp.Put(m)
}
}

func BenchmarkExistArray(b *testing.B) {
for i := 0; i < b.N; i++ {
m := make(map[[20]byte]struct{})
m[common.Address{1}] = struct{}{}
addr := common.Address{1}
if _, ok := m[addr]; ok {
continue
}
}
}

func BenchmarkDonotExistArray(b *testing.B) {
for i := 0; i < b.N; i++ {
m := make(map[[20]byte]struct{})
addr := common.Address{1}
if _, ok := m[addr]; !ok {
m[addr] = struct{}{}
delete(m, addr)
}
}
}
func BenchmarkSlicePtrPool_3(b *testing.B) {
pool := sync.Pool{
New: func() interface{} {
s := make([]int, 0, 0)
return &s
},
}
for i := 0; i < 4; i++ {
s := make([]int, 100)
pool.Put(&s)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
ptr := pool.Get().(*[]int)
s := *ptr
s = s[0:0]
s = append(s, 123)
*ptr = s // Copy the stack header over to the heap
pool.Put(ptr)
}
}

func resolveTxDAGInMVStates(s *MVStates, txCnt int) TxDAG {
txDAG := NewPlainTxDAG(txCnt)
for i := 0; i < txCnt; i++ {
Expand Down

0 comments on commit 5cbabb9

Please sign in to comment.