diff --git a/taraxa/state/state_db_rocksdb/db.go b/taraxa/state/state_db_rocksdb/db.go index 0a21c99a8..28fcf1d6e 100644 --- a/taraxa/state/state_db_rocksdb/db.go +++ b/taraxa/state/state_db_rocksdb/db.go @@ -6,6 +6,7 @@ import ( "math/big" "runtime" "strconv" + "sync" "github.com/Taraxa-project/taraxa-evm/common" "github.com/Taraxa-project/taraxa-evm/core/types" @@ -139,31 +140,66 @@ func (self *DB) RecreateColumn(column state_db.Column, nodes map[common.Hash][]b self.db.CompactRangeCF(self.cf_handles[column], range32) } -func (self *DB) Prune(state_root_to_keep []common.Hash, blk_num types.BlockNum) { - var member struct{} - current_block_state := state_db.GetBlockState(self, blk_num) - +func (self *DB) deleteStateValues(blk_num types.BlockNum) { range_limit := [common.VersionedKeyLength]byte{255} range40 := grocksdb.Range{ Start: make([]byte, common.VersionedKeyLength), Limit: make([]byte, common.VersionedKeyLength), } copy(range40.Limit, range_limit[0:common.VersionedKeyLength]) + prev_key := make([]byte, common.VersionedKeyLength) + + //Select account storage values to prune + set_account_storage_value_to_prune := make([][]byte, 0) + itr := self.db.NewIteratorCF(self.opts_r_itr, self.cf_handles[state_db.COL_acc_trie_value]) + itr.SeekToFirst() + for { + copy(prev_key, itr.Key().Data()) + itr.Key().Free() + itr.Next() + if !itr.Valid() { + break + } + if bytes.Compare(prev_key[0:common.HashLength], itr.Key().Data()[0:common.HashLength]) == 0 { + ver_blk_num := binary.BigEndian.Uint64(itr.Key().Data()[common.HashLength:common.VersionedKeyLength]) + if ver_blk_num < blk_num { + set_account_storage_value_to_prune = append(set_account_storage_value_to_prune, prev_key) + } + } + } + itr.Close() + for _, value_to_remove := range set_account_storage_value_to_prune { + self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_acc_trie_value], value_to_remove) + } + self.db.CompactRangeCF(self.cf_handles[state_db.COL_acc_trie_value], range40) +} + +func (self *DB) recreateMainTrie(state_root_to_keep *[]common.Hash, blk_num types.BlockNum) { + current_block_state := state_db.GetBlockState(self, blk_num) //Select nodes which are not to be deleted nodes_to_keep := make(map[common.Hash][]byte) - for _, root_to_keep := range state_root_to_keep { + for _, root_to_keep := range *state_root_to_keep { current_block_state.ForEachMainNodeHashByRoot(&root_to_keep, func(h *common.Hash, b []byte) { bytes_s := common.CopyBytes(b) nodes_to_keep[*h] = bytes_s }) } self.RecreateColumn(state_db.COL_main_trie_node, nodes_to_keep) +} + +func (self *DB) deleteStateRoot(blk_num types.BlockNum) { + range_limit := [common.VersionedKeyLength]byte{255} + range40 := grocksdb.Range{ + Start: make([]byte, common.VersionedKeyLength), + Limit: make([]byte, common.VersionedKeyLength), + } + copy(range40.Limit, range_limit[0:common.VersionedKeyLength]) //Select main trie values to prune/remove - set_value_to_prune := make(map[string]struct{}) - set_storage_root_to_keep := make(map[common.Hash]struct{}) - set_storage_root_to_prune := make(map[common.Hash]struct{}) + set_value_to_prune := make([][]byte, 0) + set_storage_root_to_keep := make([]common.Hash, 0) + current_block_state := state_db.GetBlockState(self, blk_num) //Iterate over all values and select which to keep itr := self.db.NewIteratorCF(self.opts_r_itr, self.cf_handles[state_db.COL_main_trie_value]) @@ -177,7 +213,7 @@ func (self *DB) Prune(state_root_to_keep []common.Hash, blk_num types.BlockNum) itr.Next() if !itr.Valid() { if account.StorageRootHash != nil { - set_storage_root_to_keep[*account.StorageRootHash] = member + set_storage_root_to_keep = append(set_storage_root_to_keep, *account.StorageRootHash) } break } @@ -185,63 +221,71 @@ func (self *DB) Prune(state_root_to_keep []common.Hash, blk_num types.BlockNum) //Only prune the previous value if current value for the same account is below blk_num ver_blk_num := binary.BigEndian.Uint64(itr.Key().Data()[common.HashLength:common.VersionedKeyLength]) if ver_blk_num < blk_num { - set_value_to_prune[string(prev_key)] = member - if account.StorageRootHash != nil { - set_storage_root_to_prune[*account.StorageRootHash] = member - } + set_value_to_prune = append(set_value_to_prune, prev_key) } else { if account.StorageRootHash != nil { - set_storage_root_to_keep[*account.StorageRootHash] = member + set_storage_root_to_keep = append(set_storage_root_to_keep, *account.StorageRootHash) } } } else { if account.StorageRootHash != nil { - set_storage_root_to_keep[*account.StorageRootHash] = member + set_storage_root_to_keep = append(set_storage_root_to_keep, *account.StorageRootHash) } } } itr.Close() - for value_to_remove, _ := range set_value_to_prune { - self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_main_trie_value], []byte(value_to_remove)) - } - self.db.CompactRangeCF(self.cf_handles[state_db.COL_main_trie_value], range40) - - //Select account nodes to prune - account_nodes_to_keep := make(map[common.Hash][]byte) - //set_account_node_to_remove := make(map[common.Hash]struct{}) - for root_to_keep, _ := range set_storage_root_to_keep { - current_block_state.ForEachAccountNodeHashByRoot(&root_to_keep, func(h *common.Hash, b []byte) { - b = common.CopyBytes(b) - account_nodes_to_keep[*h] = b - }) - } - self.RecreateColumn(state_db.COL_acc_trie_node, account_nodes_to_keep) - //Select account storage values to prune - set_account_storage_value_to_prune := make(map[string]struct{}) - itr = self.db.NewIteratorCF(self.opts_r_itr, self.cf_handles[state_db.COL_acc_trie_value]) - itr.SeekToFirst() - prev_key = make([]byte, common.VersionedKeyLength) - for { - copy(prev_key, itr.Key().Data()) - itr.Key().Free() - itr.Next() - if !itr.Valid() { - break + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for _, value_to_remove := range set_value_to_prune { + self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_main_trie_value], value_to_remove) } - if bytes.Compare(prev_key[0:common.HashLength], itr.Key().Data()[0:common.HashLength]) == 0 { - ver_blk_num := binary.BigEndian.Uint64(itr.Key().Data()[common.HashLength:common.VersionedKeyLength]) - if ver_blk_num < blk_num { - set_account_storage_value_to_prune[string(prev_key)] = member - } + self.db.CompactRangeCF(self.cf_handles[state_db.COL_main_trie_value], range40) + }() + + wg.Add(1) + go func() { + defer wg.Done() + //Select account nodes to prune + account_nodes_to_keep := make(map[common.Hash][]byte) + for _, root_to_keep := range set_storage_root_to_keep { + current_block_state.ForEachAccountNodeHashByRoot(&root_to_keep, func(h *common.Hash, b []byte) { + b = common.CopyBytes(b) + account_nodes_to_keep[*h] = b + }) } - } - itr.Close() + self.RecreateColumn(state_db.COL_acc_trie_node, account_nodes_to_keep) + }() + wg.Wait() +} - for value_to_remove, _ := range set_account_storage_value_to_prune { - self.db.DeleteCF(self.latest_state.opts_w, self.cf_handles[state_db.COL_acc_trie_value], []byte(value_to_remove)) - } - self.db.CompactRangeCF(self.cf_handles[state_db.COL_acc_trie_value], range40) +func (self *DB) Prune(state_root_to_keep []common.Hash, blk_num types.BlockNum) { + var wg sync.WaitGroup + + // Asynchronously delete state values + wg.Add(1) + go func() { + defer wg.Done() + self.deleteStateValues(blk_num) + }() + + // Asynchronously recreate Main trie + wg.Add(1) + go func() { + defer wg.Done() + self.recreateMainTrie(&state_root_to_keep, blk_num) + }() + + // Asynchronously delete state root and main trie values + wg.Add(1) + go func() { + defer wg.Done() + self.deleteStateRoot(blk_num) + }() + + wg.Wait() } func (self *DB) Close() {