Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
TxPool sendersBatch optimizations (#927)
Browse files Browse the repository at this point in the history
Reduce memory allocations and using inverted index (instead of
brute-force).
Use common.Address type instead of []byte

[Related issue](erigontech/erigon#7002)
  • Loading branch information
Qjawko authored Mar 16, 2023
1 parent 0a69d98 commit 8234bab
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 48 deletions.
75 changes: 38 additions & 37 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg Config, cache kvca
search: &metaTx{Tx: &types.TxSlot{}},
senderIDTxnCount: map[uint64]int{},
}
tracedSenders := make(map[string]struct{})
tracedSenders := make(map[common.Address]struct{})
for _, sender := range cfg.TracedSenders {
tracedSenders[sender] = struct{}{}
tracedSenders[common.BytesToAddress([]byte(sender))] = struct{}{}
}
return &TxPool{
lock: &sync.Mutex{},
Expand Down Expand Up @@ -540,19 +540,19 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error {
//log.Info("[txpool] on new txs", "amount", len(newPendingTxs.txs), "in", time.Since(t))
return nil
}
func (p *TxPool) getRlpLocked(tx kv.Tx, hash []byte) (rlpTxn []byte, sender []byte, isLocal bool, err error) {
func (p *TxPool) getRlpLocked(tx kv.Tx, hash []byte) (rlpTxn []byte, sender common.Address, isLocal bool, err error) {
txn, ok := p.byHash[string(hash)]
if ok && txn.Tx.Rlp != nil {
return txn.Tx.Rlp, p.senders.senderID2Addr[txn.Tx.SenderID], txn.subPool&IsLocal > 0, nil
}
v, err := tx.GetOne(kv.PoolTransaction, hash)
if err != nil {
return nil, nil, false, err
return nil, common.Address{}, false, err
}
if v == nil {
return nil, nil, false, nil
return nil, common.Address{}, false, nil
}
return v[20:], v[:20], txn != nil && txn.subPool&IsLocal > 0, nil
return v[20:], *(*[20]byte)(v[:20]), txn != nil && txn.subPool&IsLocal > 0, nil
}
func (p *TxPool) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) {
p.lock.Lock()
Expand Down Expand Up @@ -675,7 +675,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG
}

txs.Txs[count] = rlpTx
copy(txs.Senders.At(count), sender)
copy(txs.Senders.At(count), sender.Bytes())
txs.IsLocal[count] = isLocal
toSkip.Add(mt.Tx.IDHash)
count++
Expand Down Expand Up @@ -704,7 +704,7 @@ func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, avail
}

func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas uint64) (bool, error) {
set := mapset.NewSet[[32]byte]()
set := mapset.NewThreadUnsafeSet[[32]byte]()
onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, set)
return onTime, err
}
Expand Down Expand Up @@ -1089,7 +1089,7 @@ func addTxsOnNewBlock(blockNum uint64, cacheView kvcache.CacheView, stateChanges
continue
}
addr := gointerfaces.ConvertH160toAddress(change.Address)
id, ok := senders.getID(addr[:])
id, ok := senders.getID(addr)
if !ok {
continue
}
Expand Down Expand Up @@ -1193,7 +1193,7 @@ func (p *TxPool) discardLocked(mt *metaTx, reason DiscardReason) {
func (p *TxPool) NonceFromAddress(addr [20]byte) (nonce uint64, inPool bool) {
p.lock.Lock()
defer p.lock.Unlock()
senderID, found := p.senders.getID(addr[:])
senderID, found := p.senders.getID(addr)
if !found {
return 0, false
}
Expand Down Expand Up @@ -1593,7 +1593,7 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
addr, ok := p.senders.senderID2Addr[id]
if ok {
delete(p.senders.senderID2Addr, id)
delete(p.senders.senderIDs, string(addr))
delete(p.senders.senderIDs, addr)
}
}
//fmt.Printf("del:%d,%d,%d\n", mt.Tx.senderID, mt.Tx.nonce, mt.Tx.tip)
Expand Down Expand Up @@ -1627,12 +1627,14 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
continue
}
v = common.EnsureEnoughSize(v, 20+len(metaTx.Tx.Rlp))
for addr, id := range p.senders.senderIDs { // no inverted index - tradeoff flush speed for memory usage
if id == metaTx.Tx.SenderID {
copy(v[:20], addr)
break
}

addr, ok := p.senders.senderID2Addr[metaTx.Tx.SenderID]
if !ok {
log.Warn("[txpool] flush: sender address not found by ID", metaTx.Tx.SenderID)
continue
}

copy(v[:20], addr.Bytes())
copy(v[20:], metaTx.Tx.Rlp)

has, err := tx.Has(kv.PoolTransaction, []byte(txHash))
Expand Down Expand Up @@ -1701,7 +1703,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
if err != nil {
return err
}
addr, txRlp := v[:20], v[20:]
addr, txRlp := *(*[20]byte)(v[:20]), v[20:]
txn := &types.TxSlot{}

_, err = parseCtx.ParseTransaction(txRlp, 0, txn, nil, false /* hasEnvelope */, nil)
Expand All @@ -1723,7 +1725,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
txs.Resize(uint(i + 1))
txs.Txs[i] = txn
txs.IsLocal[i] = isLocalTx
copy(txs.Senders.At(i), addr)
copy(txs.Senders.At(i), addr[:])
i++
}

Expand Down Expand Up @@ -1839,7 +1841,7 @@ func (p *TxPool) logStats() {
}

// Deprecated need switch to streaming-like
func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp, sender []byte, t SubPoolType), tx kv.Tx) {
func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp []byte, sender common.Address, t SubPoolType), tx kv.Tx) {
p.lock.Lock()
defer p.lock.Unlock()
p.all.ascendAll(func(mt *metaTx) bool {
Expand Down Expand Up @@ -1984,29 +1986,28 @@ func (sc *sendersBatch) printDebug(prefix string) {
// flushing to db periodicaly. it doesn't play as read-cache (because db is small and memory-mapped - doesn't need cache)
// non thread-safe
type sendersBatch struct {
senderIDs map[string]uint64
senderID2Addr map[uint64][]byte
tracedSenders map[string]struct{}
senderIDs map[common.Address]uint64
senderID2Addr map[uint64]common.Address
tracedSenders map[common.Address]struct{}
senderID uint64
}

func newSendersCache(tracedSenders map[string]struct{}) *sendersBatch {
return &sendersBatch{senderIDs: map[string]uint64{}, senderID2Addr: map[uint64][]byte{}, tracedSenders: tracedSenders}
func newSendersCache(tracedSenders map[common.Address]struct{}) *sendersBatch {
return &sendersBatch{senderIDs: map[common.Address]uint64{}, senderID2Addr: map[uint64]common.Address{}, tracedSenders: tracedSenders}
}

func (sc *sendersBatch) getID(addr []byte) (uint64, bool) {
id, ok := sc.senderIDs[string(addr)]
func (sc *sendersBatch) getID(addr common.Address) (uint64, bool) {
id, ok := sc.senderIDs[addr]
return id, ok
}
func (sc *sendersBatch) getOrCreateID(addr []byte) (uint64, bool) {
_, traced := sc.tracedSenders[string(addr)]
id, ok := sc.senderIDs[string(addr)]
func (sc *sendersBatch) getOrCreateID(addr common.Address) (uint64, bool) {
_, traced := sc.tracedSenders[addr]
id, ok := sc.senderIDs[addr]
if !ok {
copyAddr := common.Copy(addr)
sc.senderID++
id = sc.senderID
sc.senderIDs[string(copyAddr)] = id
sc.senderID2Addr[id] = copyAddr
sc.senderIDs[addr] = id
sc.senderID2Addr[id] = addr
if traced {
log.Info(fmt.Sprintf("TX TRACING: allocated senderID %d to sender %x", id, addr))
}
Expand All @@ -2018,7 +2019,7 @@ func (sc *sendersBatch) info(cacheView kvcache.CacheView, id uint64) (nonce uint
if !ok {
panic("must not happen")
}
encoded, err := cacheView.Get(addr)
encoded, err := cacheView.Get(addr.Bytes())
if err != nil {
return 0, emptySender.balance, err
}
Expand All @@ -2034,23 +2035,23 @@ func (sc *sendersBatch) info(cacheView kvcache.CacheView, id uint64) (nonce uint

func (sc *sendersBatch) registerNewSenders(newTxs *types.TxSlots) (err error) {
for i, txn := range newTxs.Txs {
txn.SenderID, txn.Traced = sc.getOrCreateID(newTxs.Senders.At(i))
txn.SenderID, txn.Traced = sc.getOrCreateID(newTxs.Senders.AddressAt(i))
}
return nil
}
func (sc *sendersBatch) onNewBlock(stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots) error {
for _, diff := range stateChanges.ChangeBatch {
for _, change := range diff.Changes { // merge state changes
addrB := gointerfaces.ConvertH160toAddress(change.Address)
sc.getOrCreateID(addrB[:])
sc.getOrCreateID(addrB)
}

for i, txn := range unwindTxs.Txs {
txn.SenderID, txn.Traced = sc.getOrCreateID(unwindTxs.Senders.At(i))
txn.SenderID, txn.Traced = sc.getOrCreateID(unwindTxs.Senders.AddressAt(i))
}

for i, txn := range minedTxs.Txs {
txn.SenderID, txn.Traced = sc.getOrCreateID(minedTxs.Senders.At(i))
txn.SenderID, txn.Traced = sc.getOrCreateID(minedTxs.Senders.AddressAt(i))
}
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions txpool/pool_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"encoding/binary"
"github.com/ledgerwatch/erigon-lib/common"
"testing"

"github.com/holiman/uint256"
Expand Down Expand Up @@ -164,7 +165,7 @@ func parseSenders(in []byte) (nonces []uint64, balances []uint256.Int) {
return
}

func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []byte) (sendersInfo map[uint64]*sender, senderIDs map[string]uint64, txs types.TxSlots, ok bool) {
func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []byte) (sendersInfo map[uint64]*sender, senderIDs map[common.Address]uint64, txs types.TxSlots, ok bool) {
if len(rawTxNonce) < 1 || len(rawValues) < 1 || len(rawTips) < 1 || len(rawFeeCap) < 1 || len(rawSender) < 1+1 {
return nil, nil, txs, false
}
Expand All @@ -187,13 +188,13 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []b
}

sendersInfo = map[uint64]*sender{}
senderIDs = map[string]uint64{}
senderIDs = map[common.Address]uint64{}
senders := make(types.Addresses, 20*len(senderNonce))
for i := 0; i < len(senderNonce); i++ {
senderID := uint64(i + 1) //non-zero expected
binary.BigEndian.PutUint64(senders.At(i%senders.Len()), senderID)
sendersInfo[senderID] = newSender(senderNonce[i], senderBalance[i%len(senderBalance)])
senderIDs[string(senders.At(i%senders.Len()))] = senderID
senderIDs[senders.AddressAt(i%senders.Len())] = senderID
}
txs.Txs = make([]*types.TxSlot, len(txNonce))
parseCtx := types.NewTxParseContext(*u256.N1)
Expand Down Expand Up @@ -315,7 +316,7 @@ func FuzzOnNewBlocks(f *testing.F) {
assert.NoError(err)
pool.senders.senderIDs = senderIDs
for addr, id := range senderIDs {
pool.senders.senderID2Addr[id] = []byte(addr)
pool.senders.senderID2Addr[id] = addr
}
pool.senders.senderID = uint64(len(senderIDs))
check := func(unwindTxs, minedTxs types.TxSlots, msg string) {
Expand Down Expand Up @@ -476,8 +477,7 @@ func FuzzOnNewBlocks(f *testing.F) {
},
}
for id, sender := range senders {
var addr [20]byte
copy(addr[:], pool.senders.senderID2Addr[id])
addr := pool.senders.senderID2Addr[id]
v := make([]byte, types.EncodeSenderLengthForStorage(sender.nonce, sender.balance))
types.EncodeSender(sender.nonce, sender.balance, v)
change.ChangeBatch[0].Changes = append(change.ChangeBatch[0].Changes, &remote.AccountChange{
Expand Down
8 changes: 3 additions & 5 deletions txpool/txpool_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type txPool interface {
PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas uint64) (bool, error)
GetRlp(tx kv.Tx, hash []byte) ([]byte, error)
AddLocalTxs(ctx context.Context, newTxs types.TxSlots, tx kv.Tx) ([]DiscardReason, error)
deprecatedForEach(_ context.Context, f func(rlp, sender []byte, t SubPoolType), tx kv.Tx)
deprecatedForEach(_ context.Context, f func(rlp []byte, sender common.Address, t SubPoolType), tx kv.Tx)
CountContent() (int, int, int)
IdHashKnown(tx kv.Tx, hash []byte) (bool, error)
NonceFromAddress(addr [20]byte) (nonce uint64, inPool bool)
Expand Down Expand Up @@ -134,11 +134,9 @@ func (s *GrpcServer) All(ctx context.Context, _ *txpool_proto.AllRequest) (*txpo
defer tx.Rollback()
reply := &txpool_proto.AllReply{}
reply.Txs = make([]*txpool_proto.AllReply_Tx, 0, 32)
var senderArr [20]byte
s.txPool.deprecatedForEach(ctx, func(rlp, sender []byte, t SubPoolType) {
copy(senderArr[:], sender) // TODO: optimize
s.txPool.deprecatedForEach(ctx, func(rlp []byte, sender common.Address, t SubPoolType) {
reply.Txs = append(reply.Txs, &txpool_proto.AllReply_Tx{
Sender: gointerfaces.ConvertAddressToH160(senderArr),
Sender: gointerfaces.ConvertAddressToH160(sender),
TxnType: convertSubPoolType(t),
RlpTx: common.Copy(rlp),
})
Expand Down
6 changes: 6 additions & 0 deletions types/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,12 @@ func (a Announcements) Copy() Announcements {

type Addresses []byte // flatten list of 20-byte addresses

// AddressAt returns an address at the given index in the flattened list.
// Use this method if you want to reduce memory allocations
func (h Addresses) AddressAt(i int) common.Address {
return *(*[20]byte)(h[i*length.Addr : (i+1)*length.Addr])
}

func (h Addresses) At(i int) []byte { return h[i*length.Addr : (i+1)*length.Addr] }
func (h Addresses) Len() int { return len(h) / length.Addr }

Expand Down

0 comments on commit 8234bab

Please sign in to comment.