Skip to content

Commit

Permalink
rpc bottleneck: block files mutex (e2) (#11155)
Browse files Browse the repository at this point in the history
for #11090

thank you [tholcman](https://github.com/tholcman) for finding
  • Loading branch information
AskAlexSharov authored Jul 15, 2024
1 parent 81c28cd commit 7e56d99
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 70 deletions.
141 changes: 75 additions & 66 deletions turbo/snapshotsync/freezeblocks/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sort"
"time"

borsnaptype "github.com/ledgerwatch/erigon/polygon/bor/snaptype"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/common/hexutility"
Expand Down Expand Up @@ -386,12 +387,11 @@ func (r *BlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHei
}
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
return
}
defer release()

h, _, err = r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
Expand All @@ -410,9 +410,8 @@ func (r *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash commo
return h, nil
}

view := r.sn.View()
defer view.Close()
segments := view.Headers()
segments, release := r.sn.ViewType(coresnaptype.Headers)
defer release()

buf := make([]byte, 128)
for i := len(segments) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -442,12 +441,11 @@ func (r *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeig
return h, nil
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
return
}
defer release()

header, _, err := r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
Expand Down Expand Up @@ -477,12 +475,12 @@ func (r *BlockReader) Header(ctx context.Context, tx kv.Getter, hash common.Hash
}
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
return
}
defer release()

h, _, err = r.headerFromSnapshot(blockHeight, seg, nil)
if err != nil {
return h, err
Expand Down Expand Up @@ -517,40 +515,46 @@ func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, ha
}
}

view := r.sn.View()
defer view.Close()

var baseTxnID uint64
var txsAmount uint32
var buf []byte
seg, ok := view.BodiesSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix + "no bodies file for this block num")
}
return nil, nil
}
body, baseTxnID, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, seg, buf)
defer release()

var baseTxnID uint64
var txCount uint32
var buf []byte
body, baseTxnID, txCount, buf, err = r.bodyFromSnapshot(blockHeight, seg, buf)
if err != nil {
return nil, err
}
release()

if body == nil {
if dbgLogs {
log.Info(dbgPrefix + "got nil body from file")
}
return nil, nil
}
txnSeg, ok := view.TxsSegment(blockHeight)

txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.idxMax", r.sn.idxMax.Load(), "r.sn.segmetntsMax", r.sn.segmentsMax.Load())
}
return nil, nil
}
txs, senders, err := r.txsFromSnapshot(baseTxnID, txsAmount, txnSeg, buf)
defer release()

txs, senders, err := r.txsFromSnapshot(baseTxnID, txCount, txnSeg, buf)
if err != nil {
return nil, err
}
release()

if txs == nil {
if dbgLogs {
log.Info(dbgPrefix + "got nil txs from file")
Expand Down Expand Up @@ -586,13 +590,13 @@ func (r *BlockReader) Body(ctx context.Context, tx kv.Getter, hash common.Hash,
body, _, txAmount = rawdb.ReadBody(tx, hash, blockHeight)
return body, txAmount, nil
}
view := r.sn.View()
defer view.Close()

seg, ok := view.BodiesSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight)
if !ok {
return
}
defer release()

body, _, txAmount, _, err = r.bodyFromSnapshot(blockHeight, seg, nil)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -656,15 +660,14 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
return
}

view := r.sn.View()
defer view.Close()
seg, ok := view.HeadersSegment(blockHeight)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix + "no header files for this block num")
}
return
}
defer release()

var buf []byte
h, buf, err := r.headerFromSnapshot(blockHeight, seg, buf)
Expand All @@ -677,21 +680,26 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
}
return
}
release()

var b *types.Body
var baseTxnId uint64
var txsAmount uint32
bodySeg, ok := view.BodiesSegment(blockHeight)
bodySeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix + "no bodies file for this block num")
}
return
}
defer release()

b, baseTxnId, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, bodySeg, buf)
if err != nil {
return nil, nil, err
}
release()

if b == nil {
if dbgLogs {
log.Info(dbgPrefix + "got nil body from file")
Expand All @@ -710,18 +718,21 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c
return block, senders, nil
}

txnSeg, ok := view.TxsSegment(blockHeight)
txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockHeight)
if !ok {
if dbgLogs {
log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.indicesReady", r.sn.indicesReady.Load())
}
return
}
defer release()
var txs []types.Transaction
txs, senders, err = r.txsFromSnapshot(baseTxnId, txsAmount, txnSeg, buf)
if err != nil {
return nil, nil, err
}
release()

block = types.NewBlockFromStorage(hash, h, txs, b.Uncles, b.Withdrawals)
if len(senders) != block.Transactions().Len() {
if dbgLogs {
Expand Down Expand Up @@ -976,18 +987,18 @@ func (r *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNu
return rawdb.TxnByIdxInBlock(tx, canonicalHash, blockNum, txIdxInBlock)
}

view := r.sn.View()
defer view.Close()
seg, ok := view.BodiesSegment(blockNum)
seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockNum)
if !ok {
return
}
defer release()

var b *types.BodyForStorage
b, _, err = r.bodyForStorageFromSnapshot(blockNum, seg, nil)
if err != nil {
return nil, err
}
release()
if b == nil {
return
}
Expand All @@ -997,10 +1008,12 @@ func (r *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNu
return nil, nil
}

txnSeg, ok := view.TxsSegment(blockNum)
txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockNum)
if !ok {
return
}
defer release()

// +1 because block has system-txn in the beginning of block
return r.txnByID(b.BaseTxId+1+uint64(txIdxInBlock), txnSeg, nil)
}
Expand All @@ -1015,10 +1028,9 @@ func (r *BlockReader) TxnLookup(_ context.Context, tx kv.Getter, txnHash common.
return *n, true, nil
}

view := r.sn.View()
defer view.Close()

_, blockNum, ok, err := r.txnByHash(txnHash, view.Txs(), nil)
txns, release := r.sn.ViewType(coresnaptype.Transactions)
defer release()
_, blockNum, ok, err := r.txnByHash(txnHash, txns, nil)
if err != nil {
return 0, false, err
}
Expand All @@ -1027,13 +1039,11 @@ func (r *BlockReader) TxnLookup(_ context.Context, tx kv.Getter, txnHash common.
}

func (r *BlockReader) FirstTxnNumNotInSnapshots() uint64 {
view := r.sn.View()
defer view.Close()

sn, ok := view.TxsSegment(r.sn.BlocksAvailable())
sn, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, r.sn.BlocksAvailable())
if !ok {
return 0
}
defer release()

lastTxnID := sn.Index(coresnaptype.Indexes.TxnHash).BaseDataID() + uint64(sn.Count())
return lastTxnID
Expand Down Expand Up @@ -1247,10 +1257,10 @@ func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common
}

borTxHash := bortypes.ComputeBorTxHash(blockHeight, hash)
view := r.borSn.View()
defer view.Close()

segments := view.Events()
segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
if sn.from > blockHeight {
Expand Down Expand Up @@ -1328,9 +1338,10 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H
return result, nil
}
borTxHash := bortypes.ComputeBorTxHash(blockHeight, hash)
view := r.borSn.View()
defer view.Close()
segments := view.Events()

segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

var buf []byte
result := []rlp.RawValue{}
for i := len(segments) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -1368,10 +1379,9 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H

// EventsByIdFromSnapshot returns the list of records limited by time, or the number of records along with a bool value to signify if the records were limited by time
func (r *BlockReader) EventsByIdFromSnapshot(from uint64, to time.Time, limit int) ([]*heimdall.EventRecordWithTime, bool, error) {
view := r.borSn.View()
defer view.Close()
segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

segments := view.Events()
var buf []byte
var result []*heimdall.EventRecordWithTime
stateContract := bor.GenesisContractStateReceiverABI()
Expand Down Expand Up @@ -1448,9 +1458,9 @@ func (r *BlockReader) LastFrozenEventId() uint64 {
return 0
}

view := r.borSn.View()
defer view.Close()
segments := view.Events()
segments, release := r.borSn.ViewType(borsnaptype.BorEvents)
defer release()

if len(segments) == 0 {
return 0
}
Expand Down Expand Up @@ -1505,9 +1515,9 @@ func (r *BlockReader) LastFrozenSpanId() uint64 {
return 0
}

view := r.borSn.View()
defer view.Close()
segments := view.Spans()
segments, release := r.borSn.ViewType(borsnaptype.BorSpans)
defer release()

if len(segments) == 0 {
return 0
}
Expand Down Expand Up @@ -1549,9 +1559,9 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([]
}
return common.Copy(v), nil
}
view := r.borSn.View()
defer view.Close()
segments := view.Spans()
segments, release := r.borSn.ViewType(borsnaptype.BorSpans)
defer release()

for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
idx := sn.Index()
Expand Down Expand Up @@ -1653,9 +1663,9 @@ func (r *BlockReader) Checkpoint(ctx context.Context, tx kv.Getter, checkpointId
return common.Copy(v), nil
}

view := r.borSn.View()
defer view.Close()
segments := view.Checkpoints()
segments, release := r.borSn.ViewType(borsnaptype.BorCheckpoints)
defer release()

for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
index := sn.Index()
Expand All @@ -1679,9 +1689,8 @@ func (r *BlockReader) LastFrozenCheckpointId() uint64 {
return 0
}

view := r.borSn.View()
defer view.Close()
segments := view.Checkpoints()
segments, release := r.borSn.ViewType(borsnaptype.BorCheckpoints)
defer release()
if len(segments) == 0 {
return 0
}
Expand Down
Loading

0 comments on commit 7e56d99

Please sign in to comment.