Skip to content

Commit

Permalink
Merge branch 'main' into fix-prestate-tracer-on-create-e3
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Jul 2, 2024
2 parents 288b287 + fa5d2dd commit fe7d504
Show file tree
Hide file tree
Showing 15 changed files with 465 additions and 163 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/qa-snap-download.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:

jobs:
snap-download-test:
runs-on: [self-hosted, Erigon3]
runs-on: self-hosted
timeout-minutes: 800
env:
ERIGON_DATA_DIR: ${{ github.workspace }}/erigon_data
Expand Down
3 changes: 3 additions & 0 deletions cl/cltypes/blob_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func VerifyCommitmentInclusionProof(commitment libcommon.Bytes48, commitmentIncl
commitmentsDepth := uint64(13) // log2(4096) + 1 = 13
bIndex := uint64(11)

if commitmentInclusionProof == nil || commitmentInclusionProof.Length() < bodyDepth+int(commitmentsDepth) {
return false
}
// Start by constructing the commitments subtree
for i := uint64(0); i < commitmentsDepth; i++ {
curr := commitmentInclusionProof.Get(int(i))
Expand Down
12 changes: 7 additions & 5 deletions cl/phase1/network/services/blob_sidecar_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ledgerwatch/erigon/cl/beacon/synced_data"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice/mock_services"
"github.com/ledgerwatch/erigon/cl/utils"
Expand Down Expand Up @@ -40,11 +41,12 @@ func getObjectsForBlobSidecarServiceTests(t *testing.T) (*state.CachingBeaconSta
proofBytes := common.Hex2Bytes(proofStr[2:])
copy(proof[:], proofBytes)
sidecar := &cltypes.BlobSidecar{
Index: uint64(0),
SignedBlockHeader: block.SignedBeaconBlockHeader(),
Blob: blob,
KzgCommitment: common.Bytes48(*block.Block.Body.BlobKzgCommitments.Get(0)),
KzgProof: proof,
Index: uint64(0),
SignedBlockHeader: block.SignedBeaconBlockHeader(),
Blob: blob,
KzgCommitment: common.Bytes48(*block.Block.Body.BlobKzgCommitments.Get(0)),
KzgProof: proof,
CommitmentInclusionProof: solid.NewHashVector(cltypes.CommitmentBranchSize),
}
return stateObj, block, sidecar
}
Expand Down
11 changes: 6 additions & 5 deletions erigon-lib/diagnostics/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) {
case <-rootCtx.Done():
return
case info := <-ch:
d.mu.Lock()
d.SetStagesList(info.StagesList)
d.mu.Unlock()

d.saveSyncStagesToDB()
func() {
d.mu.Lock()
defer d.mu.Unlock()
d.SetStagesList(info.StagesList)
d.saveSyncStagesToDB()
}()
}
}
}()
Expand Down
13 changes: 7 additions & 6 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosi

cfg.ClientConfig.WebTransport = requestHandler

db, c, m, torrentClient, err := openClient(ctx, cfg.Dirs.Downloader, cfg.Dirs.Snap, cfg.ClientConfig, cfg.MdbxWriteMap)
db, c, m, torrentClient, err := openClient(ctx, cfg.Dirs.Downloader, cfg.Dirs.Snap, cfg.ClientConfig, cfg.MdbxWriteMap, logger)
if err != nil {
return nil, fmt.Errorf("openClient: %w", err)
}
Expand Down Expand Up @@ -838,6 +838,7 @@ func (d *Downloader) mainLoop(silent bool) error {
if ok && err == nil {
_, _, err = addTorrentFile(d.ctx, ts, d.torrentClient, d.db, d.webseeds)
if err != nil {
d.logger.Warn("[snapshots] addTorrentFile from webseed", "err", err)
continue
}
}
Expand Down Expand Up @@ -1092,10 +1093,10 @@ func (d *Downloader) mainLoop(silent bool) error {
}
}

d.lock.RLock()
d.lock.Lock()
downloadingLen := len(d.downloading)
d.stats.Downloading = int32(downloadingLen)
d.lock.RUnlock()
d.lock.Unlock()

// the call interval of the loop (elapsed sec) used to get slots/sec for
// calculating the number of files to download based on the loop speed
Expand Down Expand Up @@ -2139,7 +2140,7 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
}

if !stats.Completed {
logger.Debug("[snapshots] info",
logger.Debug("[snapshots] download info",
"len", len(torrents),
"webTransfers", webTransfers,
"torrent", torrentInfo,
Expand Down Expand Up @@ -2717,7 +2718,7 @@ func (d *Downloader) StopSeeding(hash metainfo.Hash) error {

func (d *Downloader) TorrentClient() *torrent.Client { return d.torrentClient }

func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientConfig, writeMap bool) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) {
func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientConfig, writeMap bool, logger log.Logger) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) {
dbCfg := mdbx.NewMDBX(log.New()).
Label(kv.DownloaderDB).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DownloaderTablesCfg }).
Expand All @@ -2735,7 +2736,7 @@ func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientC
return nil, nil, nil, nil, fmt.Errorf("torrentcfg.openClient: %w", err)
}
//c, err = NewMdbxPieceCompletion(db)
c, err = NewMdbxPieceCompletionBatch(db)
c, err = NewMdbxPieceCompletion(db, logger)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("torrentcfg.NewMdbxPieceCompletion: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion erigon-lib/downloader/downloadercfg/downloadercfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package downloadercfg

import (
"github.com/ledgerwatch/erigon-lib/common/dbg"
"net"
"net/url"
"os"
Expand Down Expand Up @@ -68,7 +69,7 @@ func Default() *torrent.ClientConfig {
// better don't increase because erigon periodically producing "new seedable files" - and adding them to downloader.
// it must not impact chain tip sync - so, limit resources to minimum by default.
// but when downloader is started as a separated process - rise it to max
//torrentConfig.PieceHashersPerTorrent = max(1, runtime.NumCPU()-1)
torrentConfig.PieceHashersPerTorrent = dbg.EnvInt("DL_HASHERS", min(16, max(2, runtime.NumCPU()-2)))

torrentConfig.MinDialTimeout = 6 * time.Second //default: 3s
torrentConfig.HandshakesTimeout = 8 * time.Second //default: 4s
Expand Down
165 changes: 101 additions & 64 deletions erigon-lib/downloader/mdbx_piece_completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package downloader
import (
"context"
"encoding/binary"
"sync"

"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/log/v3"
)

const (
Expand All @@ -33,17 +36,37 @@ const (
)

type mdbxPieceCompletion struct {
db kv.RwDB
db *mdbx.MdbxKV
mu sync.RWMutex
completed map[infohash.T]*roaring.Bitmap
flushed map[infohash.T]*roaring.Bitmap
logger log.Logger
}

var _ storage.PieceCompletion = (*mdbxPieceCompletion)(nil)

func NewMdbxPieceCompletion(db kv.RwDB) (ret storage.PieceCompletion, err error) {
ret = &mdbxPieceCompletion{db: db}
func NewMdbxPieceCompletion(db kv.RwDB, logger log.Logger) (ret storage.PieceCompletion, err error) {
ret = &mdbxPieceCompletion{
db: db.(*mdbx.MdbxKV),
logger: logger,
completed: map[infohash.T]*roaring.Bitmap{},
flushed: map[infohash.T]*roaring.Bitmap{}}
return
}

func (m mdbxPieceCompletion) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) {
func (m *mdbxPieceCompletion) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) {
m.mu.RLock()
if completed, ok := m.completed[pk.InfoHash]; ok {
if completed.Contains(uint32(pk.Index)) {
m.mu.RUnlock()
return storage.Completion{
Complete: true,
Ok: true,
}, nil
}
}
m.mu.RUnlock()

err = m.db.View(context.Background(), func(tx kv.Tx) error {
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
Expand All @@ -66,11 +89,14 @@ func (m mdbxPieceCompletion) Get(pk metainfo.PieceKey) (cn storage.Completion, e
return
}

func (m mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
func (m *mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
if c, err := m.Get(pk); err == nil && c.Ok && c.Complete == b {
return nil
}

m.mu.Lock()
defer m.mu.Unlock()

var tx kv.RwTx
var err error
// On power-off recent "no-sync" txs may be lost.
Expand All @@ -84,91 +110,102 @@ func (m mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
// 1K fsyncs/2minutes it's quite expensive, but even on cloud (high latency) drive it allow download 100mb/s
// and Erigon doesn't do anything when downloading snapshots
if b {
tx, err = m.db.BeginRwNosync(context.Background())
if err != nil {
return err
completed, ok := m.completed[pk.InfoHash]

if !ok {
completed = &roaring.Bitmap{}
m.completed[pk.InfoHash] = completed
}
} else {
tx, err = m.db.BeginRw(context.Background())
if err != nil {
return err

completed.Add(uint32(pk.Index))

if flushed, ok := m.flushed[pk.InfoHash]; !ok || !flushed.Contains(uint32(pk.Index)) {
return nil
}
}

tx, err = m.db.BeginRw(context.Background())
if err != nil {
return err
}

defer tx.Rollback()

var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))
err = putCompletion(tx, pk.InfoHash, uint32(pk.Index), b)

v := []byte(incomplete)
if b {
v = []byte(complete)
}
err = tx.Put(kv.BittorrentCompletion, key[:], v)
if err != nil {
return err
}

return tx.Commit()
}

func (m *mdbxPieceCompletion) Close() error {
m.db.Close()
return nil
}
func putCompletion(tx kv.RwTx, infoHash infohash.T, index uint32, c bool) error {
var key [infohash.Size + 4]byte
copy(key[:], infoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], index)

type mdbxPieceCompletionBatch struct {
db *mdbx.MdbxKV
v := []byte(incomplete)
if c {
v = []byte(complete)
}
//fmt.Println("PUT", infoHash, index, c)
return tx.Put(kv.BittorrentCompletion, key[:], v)
}

var _ storage.PieceCompletion = (*mdbxPieceCompletionBatch)(nil)
func (m *mdbxPieceCompletion) Flushed(infoHash infohash.T, flushed *roaring.Bitmap) {
m.mu.Lock()
defer m.mu.Unlock()

func NewMdbxPieceCompletionBatch(db kv.RwDB) (ret storage.PieceCompletion, err error) {
ret = &mdbxPieceCompletionBatch{db: db.(*mdbx.MdbxKV)}
return
tx, err := m.db.BeginRw(context.Background())

if err != nil {
m.logger.Warn("[snapshots] failed to flush piece completions", "hash", infoHash, err, err)
return
}

defer tx.Rollback()

m.putFlushed(tx, infoHash, flushed)

err = tx.Commit()

if err != nil {
m.logger.Warn("[snapshots] failed to flush piece completions", "hash", infoHash, err, err)
}
}

func (m *mdbxPieceCompletionBatch) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) {
err = m.db.View(context.Background(), func(tx kv.Tx) error {
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))
cn.Ok = true
v, err := tx.GetOne(kv.BittorrentCompletion, key[:])
if err != nil {
return err
}
switch string(v) {
case complete:
cn.Complete = true
case incomplete:
cn.Complete = false
default:
cn.Ok = false
func (m *mdbxPieceCompletion) putFlushed(tx kv.RwTx, infoHash infohash.T, flushed *roaring.Bitmap) {
if completed, ok := m.completed[infoHash]; ok {
setters := flushed.Clone()
setters.And(completed)

if setters.GetCardinality() > 0 {
setters.Iterate(func(piece uint32) bool {
// TODO deal with error (? don't remove from bitset ?)
_ = putCompletion(tx, infoHash, piece, true)
return true
})
}
return nil
})
return
}

func (m *mdbxPieceCompletionBatch) Set(pk metainfo.PieceKey, b bool) error {
if c, err := m.Get(pk); err == nil && c.Ok && c.Complete == b {
return nil
completed.AndNot(setters)

if completed.IsEmpty() {
delete(m.completed, infoHash)
}
}
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))

v := []byte(incomplete)
if b {
v = []byte(complete)
allFlushed, ok := m.flushed[infoHash]

if !ok {
allFlushed = &roaring.Bitmap{}
m.flushed[infoHash] = allFlushed
}
return m.db.Batch(func(tx kv.RwTx) error {
return tx.Put(kv.BittorrentCompletion, key[:], v)
})

allFlushed.Or(flushed)
}

func (m *mdbxPieceCompletionBatch) Close() error {
func (m *mdbxPieceCompletion) Close() error {
m.db.Close()
return nil
}
Loading

0 comments on commit fe7d504

Please sign in to comment.