diff --git a/core/types/mvstates.go b/core/types/mvstates.go index 7c2ff828aa..80a5b68f9e 100644 --- a/core/types/mvstates.go +++ b/core/types/mvstates.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "golang.org/x/exp/slices" ) @@ -21,7 +22,8 @@ var ( ) const ( - initSyncPoolSize = 4 + initSyncPoolSize = 4 + asyncSendInterval = 20 ) func init() { @@ -37,6 +39,34 @@ func init() { } } +type ChanPool struct { + ch chan any + new func() any +} + +func NewChanPool(size int, f func() any) *ChanPool { + return &ChanPool{ + ch: make(chan any, size), + new: f, + } +} + +func (p ChanPool) Get() any { + select { + case item := <-p.ch: + return item + default: + } + return p.new() +} + +func (p ChanPool) Put(item any) { + select { + case p.ch <- item: + default: + } +} + // RWSet record all read & write set in txs // Attention: this is not a concurrent safety structure type RWSet struct { @@ -275,21 +305,30 @@ func (w *StateWrites) Copy() *StateWrites { } var ( - rwEventCachePool = sync.Pool{New: func() any { + rwEventsAllocMeter = metrics.GetOrRegisterMeter("mvstate/alloc/rwevents/cnt", nil) + rwSetsAllocMeter = metrics.GetOrRegisterMeter("mvstate/alloc/rwsets/cnt", nil) + txDepsAllocMeter = metrics.GetOrRegisterMeter("mvstate/alloc/txdeps/cnt", nil) +) + +var ( + rwEventCachePool = NewChanPool(initSyncPoolSize*4, func() any { + rwEventsAllocMeter.Mark(1) buf := make([]RWEventItem, 0) return &buf - }} - stateWritesPool = sync.Pool{New: func() any { - return NewStateWrites() - }} + }) rwSetsPool = sync.Pool{New: func() any { + rwSetsAllocMeter.Mark(1) buf := make([]RWSet, 0) return &buf }} txDepsPool = sync.Pool{New: func() any { + txDepsAllocMeter.Mark(1) buf := make([]TxDep, 0) return &buf }} + stateWritesPool = sync.Pool{New: func() any { + return NewStateWrites() + }} accWriteSetPool = sync.Pool{New: func() any { return make(map[common.Address]map[AccountState]*StateWrites) }} @@ -347,7 +386,7 @@ func (s *MVStates) EnableAsyncGen() *MVStates { s.asyncWG.Add(1) s.asyncRunning = true s.rwEventCache = *rwEventCachePool.Get().(*[]RWEventItem) - s.rwEventCache = (s.rwEventCache)[:cap(s.rwEventCache)] + s.rwEventCache = s.rwEventCache[:cap(s.rwEventCache)] s.rwEventCacheIndex = 0 s.asyncRWSet.index = -1 go s.asyncRWEventLoop() @@ -472,12 +511,12 @@ func (s *MVStates) RecordNewTx(index int) { if !s.asyncRunning { return } - if index%20 == 0 { + if index%asyncSendInterval == 0 { s.BatchRecordHandle() } if s.rwEventCacheIndex < len(s.rwEventCache) { - (s.rwEventCache)[s.rwEventCacheIndex].Event = NewTxRWEvent - (s.rwEventCache)[s.rwEventCacheIndex].Index = index + s.rwEventCache[s.rwEventCacheIndex].Event = NewTxRWEvent + s.rwEventCache[s.rwEventCacheIndex].Index = index } else { s.rwEventCache = append(s.rwEventCache, RWEventItem{ Event: NewTxRWEvent, @@ -502,9 +541,9 @@ func (s *MVStates) RecordAccountRead(addr common.Address, state AccountState) { return } if s.rwEventCacheIndex < len(s.rwEventCache) { - (s.rwEventCache)[s.rwEventCacheIndex].Event = ReadAccRWEvent - (s.rwEventCache)[s.rwEventCacheIndex].Addr = addr - (s.rwEventCache)[s.rwEventCacheIndex].State = state + s.rwEventCache[s.rwEventCacheIndex].Event = ReadAccRWEvent + s.rwEventCache[s.rwEventCacheIndex].Addr = addr + s.rwEventCache[s.rwEventCacheIndex].State = state s.rwEventCacheIndex++ return } @@ -592,9 +631,7 @@ func (s *MVStates) BatchRecordHandle() { if !s.asyncRunning || s.rwEventCacheIndex == 0 { return } - s.rwEventCache = s.rwEventCache[:s.rwEventCacheIndex] - s.rwEventCh <- s.rwEventCache - + s.rwEventCh <- s.rwEventCache[:s.rwEventCacheIndex] s.rwEventCache = *rwEventCachePool.Get().(*[]RWEventItem) s.rwEventCache = s.rwEventCache[:cap(s.rwEventCache)] s.rwEventCacheIndex = 0 diff --git a/core/types/mvstates_test.go b/core/types/mvstates_test.go index c30b45a781..e0f744d129 100644 --- a/core/types/mvstates_test.go +++ b/core/types/mvstates_test.go @@ -4,7 +4,6 @@ import ( "bytes" "compress/gzip" "fmt" - "sync" "testing" "time" @@ -239,91 +238,6 @@ func BenchmarkMVStates_Finalise(b *testing.B) { } } -func checkMap(m map[int][10]byte) { - for i, j := range m { - m[i] = j - } -} - -func BenchmarkEmptyMap(b *testing.B) { - for i := 0; i < b.N; i++ { - m := make(map[int][10]byte) - for j := 0; j < 10000; j++ { - m[i] = [10]byte{byte(j)} - } - checkMap(m) - } -} - -func BenchmarkInitMapWithSize(b *testing.B) { - for i := 0; i < b.N; i++ { - m := make(map[int][10]byte, 1000) - for j := 0; j < 1000; j++ { - m[i] = [10]byte{byte(j)} - } - } -} - -func BenchmarkReuseMap(b *testing.B) { - sp := sync.Pool{New: func() interface{} { - return make(map[int][10]byte, 10) - }} - for i := 0; i < b.N; i++ { - m := sp.Get().(map[int][10]byte) - for j := 0; j < 1000; j++ { - m[i] = [10]byte{byte(j)} - } - for k := range m { - delete(m, k) - } - sp.Put(m) - } -} - -func BenchmarkExistArray(b *testing.B) { - for i := 0; i < b.N; i++ { - m := make(map[[20]byte]struct{}) - m[common.Address{1}] = struct{}{} - addr := common.Address{1} - if _, ok := m[addr]; ok { - continue - } - } -} - -func BenchmarkDonotExistArray(b *testing.B) { - for i := 0; i < b.N; i++ { - m := make(map[[20]byte]struct{}) - addr := common.Address{1} - if _, ok := m[addr]; !ok { - m[addr] = struct{}{} - delete(m, addr) - } - } -} -func BenchmarkSlicePtrPool_3(b *testing.B) { - pool := sync.Pool{ - New: func() interface{} { - s := make([]int, 0, 0) - return &s - }, - } - for i := 0; i < 4; i++ { - s := make([]int, 100) - pool.Put(&s) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - ptr := pool.Get().(*[]int) - s := *ptr - s = s[0:0] - s = append(s, 123) - *ptr = s // Copy the stack header over to the heap - pool.Put(ptr) - } -} - func resolveTxDAGInMVStates(s *MVStates, txCnt int) TxDAG { txDAG := NewPlainTxDAG(txCnt) for i := 0; i < txCnt; i++ {