Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: parallel execution of state pruing #166

Merged
merged 1 commit into from
May 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 96 additions & 52 deletions taraxa/state/state_db_rocksdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/big"
"runtime"
"strconv"
"sync"

"github.com/Taraxa-project/taraxa-evm/common"
"github.com/Taraxa-project/taraxa-evm/core/types"
Expand Down Expand Up @@ -139,31 +140,66 @@ func (self *DB) RecreateColumn(column state_db.Column, nodes map[common.Hash][]b
self.db.CompactRangeCF(self.cf_handles[column], range32)
}

func (self *DB) Prune(state_root_to_keep []common.Hash, blk_num types.BlockNum) {
var member struct{}
current_block_state := state_db.GetBlockState(self, blk_num)

func (self *DB) deleteStateValues(blk_num types.BlockNum) {
range_limit := [common.VersionedKeyLength]byte{255}
range40 := grocksdb.Range{
Start: make([]byte, common.VersionedKeyLength),
Limit: make([]byte, common.VersionedKeyLength),
}
copy(range40.Limit, range_limit[0:common.VersionedKeyLength])
prev_key := make([]byte, common.VersionedKeyLength)

//Select account storage values to prune
set_account_storage_value_to_prune := make([][]byte, 0)
itr := self.db.NewIteratorCF(self.opts_r_itr, self.cf_handles[state_db.COL_acc_trie_value])
itr.SeekToFirst()
for {
copy(prev_key, itr.Key().Data())
itr.Key().Free()
itr.Next()
if !itr.Valid() {
break
}
if bytes.Compare(prev_key[0:common.HashLength], itr.Key().Data()[0:common.HashLength]) == 0 {
ver_blk_num := binary.BigEndian.Uint64(itr.Key().Data()[common.HashLength:common.VersionedKeyLength])
if ver_blk_num < blk_num {
set_account_storage_value_to_prune = append(set_account_storage_value_to_prune, prev_key)
}
}
}
itr.Close()

for _, value_to_remove := range set_account_storage_value_to_prune {
self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_acc_trie_value], value_to_remove)
}
self.db.CompactRangeCF(self.cf_handles[state_db.COL_acc_trie_value], range40)
}

func (self *DB) recreateMainTrie(state_root_to_keep *[]common.Hash, blk_num types.BlockNum) {
current_block_state := state_db.GetBlockState(self, blk_num)
//Select nodes which are not to be deleted
nodes_to_keep := make(map[common.Hash][]byte)
for _, root_to_keep := range state_root_to_keep {
for _, root_to_keep := range *state_root_to_keep {
current_block_state.ForEachMainNodeHashByRoot(&root_to_keep, func(h *common.Hash, b []byte) {
bytes_s := common.CopyBytes(b)
nodes_to_keep[*h] = bytes_s
})
}
self.RecreateColumn(state_db.COL_main_trie_node, nodes_to_keep)
}

func (self *DB) deleteStateRoot(blk_num types.BlockNum) {
range_limit := [common.VersionedKeyLength]byte{255}
range40 := grocksdb.Range{
Start: make([]byte, common.VersionedKeyLength),
Limit: make([]byte, common.VersionedKeyLength),
}
copy(range40.Limit, range_limit[0:common.VersionedKeyLength])

//Select main trie values to prune/remove
set_value_to_prune := make(map[string]struct{})
set_storage_root_to_keep := make(map[common.Hash]struct{})
set_storage_root_to_prune := make(map[common.Hash]struct{})
set_value_to_prune := make([][]byte, 0)
set_storage_root_to_keep := make([]common.Hash, 0)
current_block_state := state_db.GetBlockState(self, blk_num)

//Iterate over all values and select which to keep
itr := self.db.NewIteratorCF(self.opts_r_itr, self.cf_handles[state_db.COL_main_trie_value])
Expand All @@ -177,71 +213,79 @@ func (self *DB) Prune(state_root_to_keep []common.Hash, blk_num types.BlockNum)
itr.Next()
if !itr.Valid() {
if account.StorageRootHash != nil {
set_storage_root_to_keep[*account.StorageRootHash] = member
set_storage_root_to_keep = append(set_storage_root_to_keep, *account.StorageRootHash)
}
break
}
if bytes.Compare(prev_key[0:common.HashLength], itr.Key().Data()[0:common.HashLength]) == 0 {
//Only prune the previous value if current value for the same account is below blk_num
ver_blk_num := binary.BigEndian.Uint64(itr.Key().Data()[common.HashLength:common.VersionedKeyLength])
if ver_blk_num < blk_num {
set_value_to_prune[string(prev_key)] = member
if account.StorageRootHash != nil {
set_storage_root_to_prune[*account.StorageRootHash] = member
}
set_value_to_prune = append(set_value_to_prune, prev_key)
} else {
if account.StorageRootHash != nil {
set_storage_root_to_keep[*account.StorageRootHash] = member
set_storage_root_to_keep = append(set_storage_root_to_keep, *account.StorageRootHash)
}
}
} else {
if account.StorageRootHash != nil {
set_storage_root_to_keep[*account.StorageRootHash] = member
set_storage_root_to_keep = append(set_storage_root_to_keep, *account.StorageRootHash)
}
}
}
itr.Close()
for value_to_remove, _ := range set_value_to_prune {
self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_main_trie_value], []byte(value_to_remove))
}
self.db.CompactRangeCF(self.cf_handles[state_db.COL_main_trie_value], range40)

//Select account nodes to prune
account_nodes_to_keep := make(map[common.Hash][]byte)
//set_account_node_to_remove := make(map[common.Hash]struct{})
for root_to_keep, _ := range set_storage_root_to_keep {
current_block_state.ForEachAccountNodeHashByRoot(&root_to_keep, func(h *common.Hash, b []byte) {
b = common.CopyBytes(b)
account_nodes_to_keep[*h] = b
})
}
self.RecreateColumn(state_db.COL_acc_trie_node, account_nodes_to_keep)

//Select account storage values to prune
set_account_storage_value_to_prune := make(map[string]struct{})
itr = self.db.NewIteratorCF(self.opts_r_itr, self.cf_handles[state_db.COL_acc_trie_value])
itr.SeekToFirst()
prev_key = make([]byte, common.VersionedKeyLength)
for {
copy(prev_key, itr.Key().Data())
itr.Key().Free()
itr.Next()
if !itr.Valid() {
break
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for _, value_to_remove := range set_value_to_prune {
self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_main_trie_value], value_to_remove)
}
if bytes.Compare(prev_key[0:common.HashLength], itr.Key().Data()[0:common.HashLength]) == 0 {
ver_blk_num := binary.BigEndian.Uint64(itr.Key().Data()[common.HashLength:common.VersionedKeyLength])
if ver_blk_num < blk_num {
set_account_storage_value_to_prune[string(prev_key)] = member
}
self.db.CompactRangeCF(self.cf_handles[state_db.COL_main_trie_value], range40)
}()

wg.Add(1)
go func() {
defer wg.Done()
//Select account nodes to prune
account_nodes_to_keep := make(map[common.Hash][]byte)
for _, root_to_keep := range set_storage_root_to_keep {
current_block_state.ForEachAccountNodeHashByRoot(&root_to_keep, func(h *common.Hash, b []byte) {
b = common.CopyBytes(b)
account_nodes_to_keep[*h] = b
})
}
}
itr.Close()
self.RecreateColumn(state_db.COL_acc_trie_node, account_nodes_to_keep)
}()
wg.Wait()
}

for value_to_remove, _ := range set_account_storage_value_to_prune {
self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_acc_trie_value], []byte(value_to_remove))
}
self.db.CompactRangeCF(self.cf_handles[state_db.COL_acc_trie_value], range40)
func (self *DB) Prune(state_root_to_keep []common.Hash, blk_num types.BlockNum) {
var wg sync.WaitGroup

// Asynchronously delete state values
wg.Add(1)
go func() {
defer wg.Done()
self.deleteStateValues(blk_num)
}()

// Asynchronously recreate Main trie
wg.Add(1)
go func() {
defer wg.Done()
self.recreateMainTrie(&state_root_to_keep, blk_num)
}()

// Asynchronously delete state root and main trie values
wg.Add(1)
go func() {
defer wg.Done()
self.deleteStateRoot(blk_num)
}()

wg.Wait()
}

func (self *DB) Close() {
Expand Down