diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index 5b5d4fac0e1..77804a8ced1 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -10,6 +10,7 @@ import ( "sort" "time" + borsnaptype "github.com/ledgerwatch/erigon/polygon/bor/snaptype" "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon-lib/common/hexutility" @@ -395,12 +396,11 @@ func (r *BlockReader) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHei } } - view := r.sn.View() - defer view.Close() - seg, ok := view.HeadersSegment(blockHeight) + seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight) if !ok { return } + defer release() h, _, err = r.headerFromSnapshot(blockHeight, seg, nil) if err != nil { @@ -419,9 +419,8 @@ func (r *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash commo return h, nil } - view := r.sn.View() - defer view.Close() - segments := view.Headers() + segments, release := r.sn.ViewType(coresnaptype.Headers) + defer release() buf := make([]byte, 128) for i := len(segments) - 1; i >= 0; i-- { @@ -451,12 +450,11 @@ func (r *BlockReader) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeig return h, nil } - view := r.sn.View() - defer view.Close() - seg, ok := view.HeadersSegment(blockHeight) + seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight) if !ok { return } + defer release() header, _, err := r.headerFromSnapshot(blockHeight, seg, nil) if err != nil { @@ -486,12 +484,12 @@ func (r *BlockReader) Header(ctx context.Context, tx kv.Getter, hash common.Hash } } - view := r.sn.View() - defer view.Close() - seg, ok := view.HeadersSegment(blockHeight) + seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight) if !ok { return } + defer release() + h, _, err = r.headerFromSnapshot(blockHeight, seg, nil) if err != nil { return h, err @@ -526,40 +524,46 @@ func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, ha } } - view := r.sn.View() - defer view.Close() - - var baseTxnID uint64 - var txsAmount uint32 - var buf []byte - seg, ok := view.BodiesSegment(blockHeight) + seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight) if !ok { if dbgLogs { log.Info(dbgPrefix + "no bodies file for this block num") } return nil, nil } - body, baseTxnID, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, seg, buf) + defer release() + + var baseTxnID uint64 + var txCount uint32 + var buf []byte + body, baseTxnID, txCount, buf, err = r.bodyFromSnapshot(blockHeight, seg, buf) if err != nil { return nil, err } + release() + if body == nil { if dbgLogs { log.Info(dbgPrefix + "got nil body from file") } return nil, nil } - txnSeg, ok := view.TxsSegment(blockHeight) + + txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockHeight) if !ok { if dbgLogs { log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.idxMax", r.sn.idxMax.Load(), "r.sn.segmetntsMax", r.sn.segmentsMax.Load()) } return nil, nil } - txs, senders, err := r.txsFromSnapshot(baseTxnID, txsAmount, txnSeg, buf) + defer release() + + txs, senders, err := r.txsFromSnapshot(baseTxnID, txCount, txnSeg, buf) if err != nil { return nil, err } + release() + if txs == nil { if dbgLogs { log.Info(dbgPrefix + "got nil txs from file") @@ -606,13 +610,13 @@ func (r *BlockReader) Body(ctx context.Context, tx kv.Getter, hash common.Hash, body, _, txAmount = rawdb.ReadBody(tx, hash, blockHeight) return body, txAmount, nil } - view := r.sn.View() - defer view.Close() - seg, ok := view.BodiesSegment(blockHeight) + seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight) if !ok { return } + defer release() + body, _, txAmount, _, err = r.bodyFromSnapshot(blockHeight, seg, nil) if err != nil { return nil, 0, err @@ -676,15 +680,14 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c return } - view := r.sn.View() - defer view.Close() - seg, ok := view.HeadersSegment(blockHeight) + seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Headers, blockHeight) if !ok { if dbgLogs { log.Info(dbgPrefix + "no header files for this block num") } return } + defer release() var buf []byte h, buf, err := r.headerFromSnapshot(blockHeight, seg, buf) @@ -697,21 +700,26 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c } return } + release() var b *types.Body var baseTxnId uint64 var txsAmount uint32 - bodySeg, ok := view.BodiesSegment(blockHeight) + bodySeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockHeight) if !ok { if dbgLogs { log.Info(dbgPrefix + "no bodies file for this block num") } return } + defer release() + b, baseTxnId, txsAmount, buf, err = r.bodyFromSnapshot(blockHeight, bodySeg, buf) if err != nil { return nil, nil, err } + release() + if b == nil { if dbgLogs { log.Info(dbgPrefix + "got nil body from file") @@ -730,18 +738,21 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c return block, senders, nil } - txnSeg, ok := view.TxsSegment(blockHeight) + txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockHeight) if !ok { if dbgLogs { log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.indicesReady", r.sn.indicesReady.Load()) } return } + defer release() var txs []types.Transaction txs, senders, err = r.txsFromSnapshot(baseTxnId, txsAmount, txnSeg, buf) if err != nil { return nil, nil, err } + release() + block = types.NewBlockFromStorage(hash, h, txs, b.Uncles, b.Withdrawals) if len(senders) != block.Transactions().Len() { if dbgLogs { @@ -997,18 +1008,18 @@ func (r *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNu return rawdb.TxnByIdxInBlock(tx, canonicalHash, blockNum, txIdxInBlock) } - view := r.sn.View() - defer view.Close() - seg, ok := view.BodiesSegment(blockNum) + seg, ok, release := r.sn.ViewSingleFile(coresnaptype.Bodies, blockNum) if !ok { return } + defer release() var b *types.BodyForStorage b, _, err = r.bodyForStorageFromSnapshot(blockNum, seg, nil) if err != nil { return nil, err } + release() if b == nil { return } @@ -1018,10 +1029,12 @@ func (r *BlockReader) TxnByIdxInBlock(ctx context.Context, tx kv.Getter, blockNu return nil, nil } - txnSeg, ok := view.TxsSegment(blockNum) + txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockNum) if !ok { return } + defer release() + // +1 because block has system-txn in the beginning of block return r.txnByID(b.BaseTxId+1+uint64(txIdxInBlock), txnSeg, nil) } @@ -1036,10 +1049,9 @@ func (r *BlockReader) TxnLookup(_ context.Context, tx kv.Getter, txnHash common. return *n, true, nil } - view := r.sn.View() - defer view.Close() - - _, blockNum, ok, err := r.txnByHash(txnHash, view.Txs(), nil) + txns, release := r.sn.ViewType(coresnaptype.Transactions) + defer release() + _, blockNum, ok, err := r.txnByHash(txnHash, txns, nil) if err != nil { return 0, false, err } @@ -1048,13 +1060,11 @@ func (r *BlockReader) TxnLookup(_ context.Context, tx kv.Getter, txnHash common. } func (r *BlockReader) FirstTxnNumNotInSnapshots() uint64 { - view := r.sn.View() - defer view.Close() - - sn, ok := view.TxsSegment(r.sn.BlocksAvailable()) + sn, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, r.sn.BlocksAvailable()) if !ok { return 0 } + defer release() lastTxnID := sn.Index(coresnaptype.Indexes.TxnHash).BaseDataID() + uint64(sn.Count()) return lastTxnID @@ -1268,10 +1278,10 @@ func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common } borTxHash := bortypes.ComputeBorTxHash(blockHeight, hash) - view := r.borSn.View() - defer view.Close() - segments := view.Events() + segments, release := r.borSn.ViewType(borsnaptype.BorEvents) + defer release() + for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] if sn.from > blockHeight { @@ -1349,9 +1359,10 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H return result, nil } borTxHash := bortypes.ComputeBorTxHash(blockHeight, hash) - view := r.borSn.View() - defer view.Close() - segments := view.Events() + + segments, release := r.borSn.ViewType(borsnaptype.BorEvents) + defer release() + var buf []byte result := []rlp.RawValue{} for i := len(segments) - 1; i >= 0; i-- { @@ -1389,10 +1400,9 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H // EventsByIdFromSnapshot returns the list of records limited by time, or the number of records along with a bool value to signify if the records were limited by time func (r *BlockReader) EventsByIdFromSnapshot(from uint64, to time.Time, limit int) ([]*heimdall.EventRecordWithTime, bool, error) { - view := r.borSn.View() - defer view.Close() + segments, release := r.borSn.ViewType(borsnaptype.BorEvents) + defer release() - segments := view.Events() var buf []byte var result []*heimdall.EventRecordWithTime stateContract := bor.GenesisContractStateReceiverABI() @@ -1469,9 +1479,9 @@ func (r *BlockReader) LastFrozenEventId() uint64 { return 0 } - view := r.borSn.View() - defer view.Close() - segments := view.Events() + segments, release := r.borSn.ViewType(borsnaptype.BorEvents) + defer release() + if len(segments) == 0 { return 0 } @@ -1526,9 +1536,9 @@ func (r *BlockReader) LastFrozenSpanId() uint64 { return 0 } - view := r.borSn.View() - defer view.Close() - segments := view.Spans() + segments, release := r.borSn.ViewType(borsnaptype.BorSpans) + defer release() + if len(segments) == 0 { return 0 } @@ -1570,9 +1580,9 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([] } return common.Copy(v), nil } - view := r.borSn.View() - defer view.Close() - segments := view.Spans() + segments, release := r.borSn.ViewType(borsnaptype.BorSpans) + defer release() + for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] idx := sn.Index() @@ -1674,9 +1684,9 @@ func (r *BlockReader) Checkpoint(ctx context.Context, tx kv.Getter, checkpointId return common.Copy(v), nil } - view := r.borSn.View() - defer view.Close() - segments := view.Checkpoints() + segments, release := r.borSn.ViewType(borsnaptype.BorCheckpoints) + defer release() + for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] index := sn.Index() @@ -1700,9 +1710,8 @@ func (r *BlockReader) LastFrozenCheckpointId() uint64 { return 0 } - view := r.borSn.View() - defer view.Close() - segments := view.Checkpoints() + segments, release := r.borSn.ViewType(borsnaptype.BorCheckpoints) + defer release() if len(segments) == 0 { return 0 } diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index b4d298236e6..2c203945194 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -808,8 +808,8 @@ func (s *RoSnapshots) buildMissedIndices(logPrefix string, ctx context.Context, } func (s *RoSnapshots) PrintDebug() { - s.lockSegments() - defer s.unlockSegments() + view := s.View() + defer view.Close() s.segments.Scan(func(key snaptype.Enum, value *segments) bool { fmt.Println(" == [dbg] Snapshots,", key.String()) @@ -2040,9 +2040,13 @@ type View struct { closed bool } +// ViewSingleFile - RLock files of all types func (s *RoSnapshots) View() *View { v := &View{s: s, baseSegType: coresnaptype.Headers} - s.lockSegments() + s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { + value.lock.RLock() + return true + }) return v } @@ -2051,7 +2055,55 @@ func (v *View) Close() { return } v.closed = true - v.s.unlockSegments() + v.s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { + value.lock.RUnlock() + return true + }) +} + +var noop = func() {} + +// ViewSingleFile - RLock all files of given type +func (s *RoSnapshots) ViewType(t snaptype.Type) (segments []*Segment, release func()) { + segs, ok := s.segments.Get(t.Enum()) + if !ok { + return nil, noop + } + + segs.lock.RLock() + var released = false + return segs.segments, func() { + if released { + return + } + segs.lock.RUnlock() + released = true + } +} + +// ViewSingleFile - RLock all files of given type if has file with `blockNum` +func (s *RoSnapshots) ViewSingleFile(t snaptype.Type, blockNum uint64) (segment *Segment, ok bool, release func()) { + segs, ok := s.segments.Get(t.Enum()) + if !ok { + return nil, false, noop + } + + segs.lock.RLock() + var released = false + for _, seg := range segs.segments { + if !(blockNum >= seg.from && blockNum < seg.to) { + continue + } + return seg, true, func() { + if released { + return + } + segs.lock.RUnlock() + released = true + } + } + segs.lock.RUnlock() + return nil, false, noop } func (v *View) Segments(t snaptype.Type) []*Segment {