Skip to content

Commit

Permalink
etl: distinct empty values from nil (erigontech#7039)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored and calmbeing committed Mar 16, 2023
1 parent 8a10c61 commit ada4723
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 56 deletions.
9 changes: 5 additions & 4 deletions cmd/devnet/devnetutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"math/big"
"os/exec"
"strconv"
"strings"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/cmd/devnet/models"
"github.com/ledgerwatch/erigon/cmd/rpctest/rpctest"
"github.com/ledgerwatch/erigon/common/hexutil"
"github.com/ledgerwatch/erigon/crypto"
"math/big"
"os/exec"
"strconv"
"strings"
)

// ClearDevDB cleans up the dev folder used for the operations
Expand Down
8 changes: 4 additions & 4 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,10 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {

must(sync.SetCurrentStage(stages.Senders))

if reset {
return db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetSenders(ctx, db, tx) })
}

tx, err := db.BeginRw(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -672,10 +676,6 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {
return nil
}

if reset {
return db.Update(ctx, func(tx kv.RwTx) error { return reset2.ResetSenders(ctx, db, tx) })
}

s := stage(sync, tx, nil, stages.Senders)
log.Info("Stage", "name", s.ID, "progress", s.BlockNumber)

Expand Down
2 changes: 0 additions & 2 deletions cmd/release/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"sort"
"strings"
"time"

"github.com/hashicorp/go-version"
)

type Binary struct {
Expand Down
60 changes: 30 additions & 30 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,30 @@ func (rs *StateV3) puts(table string, key string, val []byte) {
}
}

func (rs *StateV3) Get(table string, key []byte) []byte {
func (rs *StateV3) Get(table string, key []byte) (v []byte, ok bool) {
rs.lock.RLock()
v := rs.get(table, key)
v, ok = rs.get(table, key)
rs.lock.RUnlock()
return v
return v, ok
}

func (rs *StateV3) get(table string, key []byte) (v []byte) {
func (rs *StateV3) get(table string, key []byte) (v []byte, ok bool) {
keyS := *(*string)(unsafe.Pointer(&key))
switch table {
case StorageTable:
v, _ = rs.chStorage.Get(keyS)
v, ok = rs.chStorage.Get(keyS)
case kv.PlainState:
v = rs.chAccs[keyS]
v, ok = rs.chAccs[keyS]
case kv.Code:
v = rs.chCode[keyS]
v, ok = rs.chCode[keyS]
case kv.IncarnationMap:
v = rs.chIncs[keyS]
v, ok = rs.chIncs[keyS]
case kv.PlainContractCode:
v = rs.chContractCode[keyS]
v, ok = rs.chContractCode[keyS]
default:
panic(table)
}
return v
return v, ok
}

func (rs *StateV3) flushMap(ctx context.Context, rwTx kv.RwTx, table string, m map[string][]byte, logPrefix string, logEvery *time.Ticker) error {
Expand Down Expand Up @@ -351,8 +351,8 @@ func (rs *StateV3) writeStateHistory(roTx kv.Tx, txTask *exec22.TxTask, agg *lib
return err
}
codeHashBytes := original.CodeHash.Bytes()
codePrev := rs.get(kv.Code, codeHashBytes)
if codePrev == nil {
codePrev, ok := rs.get(kv.Code, codeHashBytes)
if !ok || codePrev == nil {
var err error
codePrev, err = roTx.GetOne(kv.Code, codeHashBytes)
if err != nil {
Expand Down Expand Up @@ -411,8 +411,8 @@ func (rs *StateV3) writeStateHistory(roTx kv.Tx, txTask *exec22.TxTask, agg *lib
copy(k, addr)
binary.BigEndian.PutUint64(k[20:], incarnation)

codeHash := rs.get(kv.PlainContractCode, k)
if codeHash == nil {
codeHash, ok := rs.get(kv.PlainContractCode, k)
if !ok || codeHash == nil {
var err error
codeHash, err = roTx.GetOne(kv.PlainContractCode, k)
if err != nil {
Expand All @@ -421,8 +421,8 @@ func (rs *StateV3) writeStateHistory(roTx kv.Tx, txTask *exec22.TxTask, agg *lib
}
var codePrev []byte
if codeHash != nil {
codePrev = rs.get(kv.Code, codeHash)
if codePrev == nil {
codePrev, ok = rs.get(kv.Code, codeHash)
if !ok || codePrev == nil {
var err error
codePrev, err = roTx.GetOne(kv.Code, codeHash)
if err != nil {
Expand All @@ -445,8 +445,8 @@ func (rs *StateV3) applyState(roTx kv.Tx, txTask *exec22.TxTask, agg *libstate.A
for addr, increase := range txTask.BalanceIncreaseSet {
increase := increase
addrBytes := addr.Bytes()
enc0 := rs.get(kv.PlainState, addrBytes)
if enc0 == nil {
enc0, ok := rs.get(kv.PlainState, addrBytes)
if !ok {
var err error
enc0, err = roTx.GetOne(kv.PlainState, addrBytes)
if err != nil {
Expand All @@ -464,7 +464,7 @@ func (rs *StateV3) applyState(roTx kv.Tx, txTask *exec22.TxTask, agg *libstate.A
a.Balance.Add(&a.Balance, &increase)
var enc1 []byte
if emptyRemoval && a.Nonce == 0 && a.Balance.IsZero() && a.IsEmptyCodeHash() {
enc1 = []byte{}
enc1 = nil
} else {
enc1 = make([]byte, a.EncodingLengthForStorage())
a.EncodeForStorage(enc1)
Expand Down Expand Up @@ -789,7 +789,7 @@ func (w *StateWriterV3) UpdateAccountCode(address common.Address, incarnation ui
func (w *StateWriterV3) DeleteAccount(address common.Address, original *accounts.Account) error {
addressBytes := address.Bytes()
w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, string(addressBytes))
w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, []byte{})
w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, nil)
if original.Incarnation > 0 {
var b [8]byte
binary.BigEndian.PutUint64(b[:], original.Incarnation)
Expand Down Expand Up @@ -852,8 +852,8 @@ func (r *StateReaderV3) ResetReadSet() { r.readLists = newR

func (r *StateReaderV3) ReadAccountData(address common.Address) (*accounts.Account, error) {
addr := address.Bytes()
enc := r.rs.Get(kv.PlainState, addr)
if enc == nil {
enc, ok := r.rs.Get(kv.PlainState, addr)
if !ok {
var err error
enc, err = r.tx.GetOne(kv.PlainState, addr)
if err != nil {
Expand All @@ -880,8 +880,8 @@ func (r *StateReaderV3) ReadAccountData(address common.Address) (*accounts.Accou

func (r *StateReaderV3) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
composite := dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes())
enc := r.rs.Get(StorageTable, composite)
if enc == nil {
enc, ok := r.rs.Get(StorageTable, composite)
if !ok || enc == nil {
var err error
enc, err = r.tx.GetOne(kv.PlainState, composite)
if err != nil {
Expand All @@ -907,8 +907,8 @@ func (r *StateReaderV3) ReadAccountStorage(address common.Address, incarnation u

func (r *StateReaderV3) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
addr, codeHashBytes := address.Bytes(), codeHash.Bytes()
enc := r.rs.Get(kv.Code, codeHashBytes)
if enc == nil {
enc, ok := r.rs.Get(kv.Code, codeHashBytes)
if !ok || enc == nil {
var err error
enc, err = r.tx.GetOne(kv.Code, codeHashBytes)
if err != nil {
Expand All @@ -927,8 +927,8 @@ func (r *StateReaderV3) ReadAccountCode(address common.Address, incarnation uint

func (r *StateReaderV3) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
codeHashBytes := codeHash.Bytes()
enc := r.rs.Get(kv.Code, codeHashBytes)
if enc == nil {
enc, ok := r.rs.Get(kv.Code, codeHashBytes)
if !ok || enc == nil {
var err error
enc, err = r.tx.GetOne(kv.Code, codeHashBytes)
if err != nil {
Expand All @@ -950,8 +950,8 @@ func (r *StateReaderV3) ReadAccountCodeSize(address common.Address, incarnation

func (r *StateReaderV3) ReadAccountIncarnation(address common.Address) (uint64, error) {
addrBytes := address[:]
enc := r.rs.Get(kv.IncarnationMap, addrBytes)
if enc == nil {
enc, ok := r.rs.Get(kv.IncarnationMap, addrBytes)
if !ok || enc == nil {
var err error
enc, err = r.tx.GetOne(kv.IncarnationMap, addrBytes)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions eth/integrity/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer c.Close()
kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfAccounts, nil, math.MaxInt32)
clear := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfAccounts, nil, math.MaxInt32)
defer clear()

trieAcc2, err := tx.Cursor(kv.TrieOfAccounts)
if err != nil {
Expand All @@ -56,7 +57,8 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer accC.Close()
kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxInt32)
clear2 := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxInt32)
defer clear2()

for k, v, errc := c.First(); k != nil; k, v, errc = c.Next() {
if errc != nil {
Expand Down Expand Up @@ -157,7 +159,8 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer c.Close()
kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfStorage, nil, math.MaxInt32)
clear := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.TrieOfStorage, nil, math.MaxInt32)
defer clear()

trieStorage, err := tx.Cursor(kv.TrieOfStorage)
if err != nil {
Expand All @@ -170,7 +173,8 @@ func Trie(db kv.RoDB, tx kv.Tx, slowChecks bool, ctx context.Context) {
panic(err)
}
defer storageC.Close()
kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxInt32)
clear2 := kv.ReadAhead(readAheadCtx, db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxInt32)
defer clear2()

for k, v, errc := c.First(); k != nil; k, v, errc = c.Next() {
if errc != nil {
Expand Down
21 changes: 14 additions & 7 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
libstate "github.com/ledgerwatch/erigon-lib/state"
state2 "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/log/v3"
"github.com/torquem-ch/mdbx-go/mdbx"
atomic2 "go.uber.org/atomic"

"github.com/ledgerwatch/erigon/cmd/state/exec22"
"github.com/ledgerwatch/erigon/cmd/state/exec3"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/rawdb"
Expand Down Expand Up @@ -1109,48 +1109,55 @@ func reconstituteStep(last bool,
plainContractCollector := etl.NewCollector(fmt.Sprintf("%s recon plainContract", s.LogPrefix()), dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer plainContractCollector.Close()
var transposedKey []byte

if err = db.View(ctx, func(roTx kv.Tx) error {
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateR, nil, math.MaxUint32)
clear := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateR, nil, math.MaxUint32)
defer clear()
if err = roTx.ForEach(kv.PlainStateR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return plainStateCollector.Collect(transposedKey, v)
}); err != nil {
return err
}
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateD, nil, math.MaxUint32)
clear2 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainStateD, nil, math.MaxUint32)
defer clear2()
if err = roTx.ForEach(kv.PlainStateD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
return plainStateCollector.Collect(transposedKey, nil)
}); err != nil {
return err
}
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeR, nil, math.MaxUint32)
clear3 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeR, nil, math.MaxUint32)
defer clear3()
if err = roTx.ForEach(kv.CodeR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return codeCollector.Collect(transposedKey, v)
}); err != nil {
return err
}
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeD, nil, math.MaxUint32)
clear4 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.CodeD, nil, math.MaxUint32)
defer clear4()
if err = roTx.ForEach(kv.CodeD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
return codeCollector.Collect(transposedKey, nil)
}); err != nil {
return err
}
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractR, nil, math.MaxUint32)
clear5 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractR, nil, math.MaxUint32)
defer clear5()
if err = roTx.ForEach(kv.PlainContractR, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], k[8:]...)
transposedKey = append(transposedKey, k[:8]...)
return plainContractCollector.Collect(transposedKey, v)
}); err != nil {
return err
}
kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractD, nil, math.MaxUint32)
clear6 := kv.ReadAhead(ctx, db, atomic2.NewBool(false), kv.PlainContractD, nil, math.MaxUint32)
defer clear6()
if err = roTx.ForEach(kv.PlainContractD, nil, func(k, v []byte) error {
transposedKey = append(transposedKey[:0], v...)
transposedKey = append(transposedKey, k...)
Expand Down
6 changes: 4 additions & 2 deletions eth/stagedsync/stage_interhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ func RegenerateIntermediateHashes(logPrefix string, db kv.RwTx, cfg TrieCfg, exp
defer log.Info(fmt.Sprintf("[%s] Regeneration ended", logPrefix))
_ = db.ClearBucket(kv.TrieOfAccounts)
_ = db.ClearBucket(kv.TrieOfStorage)
kv.ReadAhead(ctx, cfg.db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxUint32)
kv.ReadAhead(ctx, cfg.db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxUint32)
clean := kv.ReadAhead(ctx, cfg.db, atomic.NewBool(false), kv.HashedAccounts, nil, math.MaxUint32)
defer clean()
clean2 := kv.ReadAhead(ctx, cfg.db, atomic.NewBool(false), kv.HashedStorage, nil, math.MaxUint32)
defer clean2()

accTrieCollector := etl.NewCollector(logPrefix, cfg.tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
defer accTrieCollector.Close()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.18

require (
github.com/ledgerwatch/erigon-lib v0.0.0-20230306114514-2c4c92fd1fce
github.com/ledgerwatch/erigon-lib v0.0.0-20230307023045-f4a02864a931
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20230306083105-1391330d62a3
github.com/ledgerwatch/log/v3 v3.7.0
github.com/ledgerwatch/secp256k1 v1.0.0
Expand Down
6 changes: 4 additions & 2 deletions turbo/snapshotsync/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,10 +1395,12 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF
return true, nil
}
if doWarmup && !warmupSenders.Load() && blockNum%1_000 == 0 {
kv.ReadAhead(warmupCtx, db, warmupSenders, kv.Senders, hexutility.EncodeTs(blockNum), 10_000)
clean := kv.ReadAhead(warmupCtx, db, warmupSenders, kv.Senders, hexutility.EncodeTs(blockNum), 10_000)
defer clean()
}
if doWarmup && !warmupTxs.Load() && blockNum%1_000 == 0 {
kv.ReadAhead(warmupCtx, db, warmupTxs, kv.EthTx, hexutility.EncodeTs(body.BaseTxId), 100*10_000)
clean := kv.ReadAhead(warmupCtx, db, warmupTxs, kv.EthTx, hexutility.EncodeTs(body.BaseTxId), 100*10_000)
defer clean()
}
senders, err := rawdb.ReadSenders(tx, h, blockNum)
if err != nil {
Expand Down

0 comments on commit ada4723

Please sign in to comment.