From 93d2950d7dae23a51ddd1a8a6d92a89c913e3708 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Tue, 24 Jan 2023 14:50:54 +0000 Subject: [PATCH 1/2] chore(state): remove Full online pruner --- cmd/gossamer/config.go | 2 +- cmd/gossamer/flags.go | 4 +- dot/state/pruner/interfaces.go | 11 - dot/state/pruner/pruner.go | 338 -------------------------- dot/state/service_integration_test.go | 4 +- dot/state/storage.go | 19 +- 6 files changed, 6 insertions(+), 372 deletions(-) delete mode 100644 dot/state/pruner/interfaces.go diff --git a/cmd/gossamer/config.go b/cmd/gossamer/config.go index 0042542484..4eecb1ed43 100644 --- a/cmd/gossamer/config.go +++ b/cmd/gossamer/config.go @@ -158,7 +158,7 @@ func createInitConfig(ctx *cli.Context) (*dot.Config, error) { } if !cfg.Global.Pruning.IsValid() { - return nil, fmt.Errorf("--%s must be either %s or %s", PruningFlag.Name, pruner.Full, pruner.Archive) + return nil, fmt.Errorf("--%s must be %s", PruningFlag.Name, pruner.Archive) } if cfg.Global.RetainBlocks < dev.DefaultRetainBlocks { diff --git a/cmd/gossamer/flags.go b/cmd/gossamer/flags.go index 48df62beab..7c03e1906e 100644 --- a/cmd/gossamer/flags.go +++ b/cmd/gossamer/flags.go @@ -366,11 +366,9 @@ var ( } // PruningFlag triggers the online pruning of historical state tries. - // It's either full or archive. - // To enable pruning the value should be set to `full`. PruningFlag = cli.StringFlag{ Name: "pruning", - Usage: `State trie online pruning ("full", "archive")`, + Usage: `State trie online pruning (""archive")`, Value: dev.DefaultPruningMode, } ) diff --git a/dot/state/pruner/interfaces.go b/dot/state/pruner/interfaces.go deleted file mode 100644 index b3d1ecabec..0000000000 --- a/dot/state/pruner/interfaces.go +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright 2022 ChainSafe Systems (ON) -// SPDX-License-Identifier: LGPL-3.0-only - -package pruner - -// Logger logs formatted strings at the different log levels. -type Logger interface { - Debug(s string) - Debugf(format string, args ...interface{}) - Warnf(format string, args ...interface{}) -} diff --git a/dot/state/pruner/pruner.go b/dot/state/pruner/pruner.go index 6660076090..d7bd0d8298 100644 --- a/dot/state/pruner/pruner.go +++ b/dot/state/pruner/pruner.go @@ -4,25 +4,10 @@ package pruner import ( - "errors" - "fmt" - "sync" - "time" - - "github.com/ChainSafe/chaindb" "github.com/ChainSafe/gossamer/lib/common" - "github.com/ChainSafe/gossamer/pkg/scale" -) - -const ( - journalPrefix = "journal" - lastPrunedKey = "last_pruned" - pruneInterval = time.Second ) const ( - // Full pruner mode. - Full = Mode("full") // Archive pruner mode. Archive = Mode("archive") ) @@ -33,8 +18,6 @@ type Mode string // IsValid checks whether the pruning mode is valid func (p Mode) IsValid() bool { switch p { - case Full: - return true case Archive: return true default: @@ -62,324 +45,3 @@ func (*ArchiveNode) StoreJournalRecord(_, _ map[string]struct{}, _ common.Hash, _ int64) error { return nil } - -type deathRecord struct { - blockHash common.Hash - deletedMerkleValueToBlockNumber map[string]int64 -} - -type deathRow []*deathRecord - -// FullNode stores state trie diff and allows online state trie pruning -type FullNode struct { - logger Logger - deathList []deathRow - storageDB chaindb.Database - journalDB chaindb.Database - // deathIndex is the mapping from deleted node Merkle value to block number. - deathIndex map[string]int64 - // pendingNumber is the block number to be pruned. - // Initial value is set to 1 and is incremented after every block pruning. - pendingNumber int64 - retainBlocks uint32 - sync.RWMutex -} - -type journalRecord struct { - // blockHash of the block corresponding to journal record - blockHash common.Hash - // Merkle values of nodes inserted in the state trie of the block - insertedMerkleValues map[string]struct{} - // Merkle values of nodes deleted from the state trie of the block - deletedMerkleValues map[string]struct{} -} - -type journalKey struct { - blockNum int64 - blockHash common.Hash -} - -func newJournalRecord(hash common.Hash, insertedMerkleValues, - deletedMerkleValues map[string]struct{}) *journalRecord { - return &journalRecord{ - blockHash: hash, - insertedMerkleValues: insertedMerkleValues, - deletedMerkleValues: deletedMerkleValues, - } -} - -// NewFullNode creates a Pruner for full node. -func NewFullNode(db, storageDB chaindb.Database, retainBlocks uint32, l Logger) (Pruner, error) { - p := &FullNode{ - deathList: make([]deathRow, 0), - deathIndex: make(map[string]int64), - storageDB: storageDB, - journalDB: chaindb.NewTable(db, journalPrefix), - retainBlocks: retainBlocks, - logger: l, - } - - blockNum, err := p.getLastPrunedIndex() - if err != nil { - return nil, err - } - - p.logger.Debugf("last pruned block is %d", blockNum) - blockNum++ - - p.pendingNumber = blockNum - - err = p.loadDeathList() - if err != nil { - return nil, err - } - - go p.start() - - return p, nil -} - -// StoreJournalRecord stores journal record into DB and add deathRow into deathList -func (p *FullNode) StoreJournalRecord(deletedMerkleValues, insertedMerkleValues map[string]struct{}, - blockHash common.Hash, blockNum int64) error { - jr := newJournalRecord(blockHash, insertedMerkleValues, deletedMerkleValues) - - key := &journalKey{blockNum, blockHash} - err := p.storeJournal(key, jr) - if err != nil { - return fmt.Errorf("failed to store journal record for %d: %w", blockNum, err) - } - - p.logger.Debugf("journal record stored for block number %d", blockNum) - p.addDeathRow(jr, blockNum) - return nil -} - -func (p *FullNode) addDeathRow(jr *journalRecord, blockNum int64) { - if blockNum == 0 { - return - } - - p.Lock() - defer p.Unlock() - - // The block is already pruned. - if blockNum < p.pendingNumber { - return - } - - p.processInsertedKeys(jr.insertedMerkleValues, jr.blockHash) - - // add deleted node Merkle values from journal to death index - deletedMerkleValueToBlockNumber := make(map[string]int64, len(jr.deletedMerkleValues)) - for k := range jr.deletedMerkleValues { - p.deathIndex[k] = blockNum - deletedMerkleValueToBlockNumber[k] = blockNum - } - - blockIndex := blockNum - p.pendingNumber - for idx := blockIndex - int64(len(p.deathList)); idx >= 0; idx-- { - p.deathList = append(p.deathList, deathRow{}) - } - - record := &deathRecord{ - blockHash: jr.blockHash, - deletedMerkleValueToBlockNumber: deletedMerkleValueToBlockNumber, - } - - // add deathRow to deathList - p.deathList[blockIndex] = append(p.deathList[blockIndex], record) -} - -// Remove re-inserted keys -func (p *FullNode) processInsertedKeys(insertedMerkleValues map[string]struct{}, blockHash common.Hash) { - for k := range insertedMerkleValues { - num, ok := p.deathIndex[k] - if !ok { - continue - } - records := p.deathList[num-p.pendingNumber] - for _, v := range records { - if v.blockHash == blockHash { - delete(v.deletedMerkleValueToBlockNumber, k) - } - } - delete(p.deathIndex, k) - } -} - -func (p *FullNode) start() { - p.logger.Debug("pruning started") - - var canPrune bool - checkPruning := func() { - p.Lock() - defer p.Unlock() - if uint32(len(p.deathList)) <= p.retainBlocks { - canPrune = false - return - } - canPrune = true - - // pop first element from death list - row := p.deathList[0] - blockNum := p.pendingNumber - - p.logger.Debugf("pruning block number %d", blockNum) - - sdbBatch := p.storageDB.NewBatch() - for _, record := range row { - err := p.deleteKeys(sdbBatch, record.deletedMerkleValueToBlockNumber) - if err != nil { - p.logger.Warnf("failed to prune keys for block number %d: %s", blockNum, err) - sdbBatch.Reset() - return - } - - for k := range record.deletedMerkleValueToBlockNumber { - delete(p.deathIndex, k) - } - } - - if err := sdbBatch.Flush(); err != nil { - p.logger.Warnf("failed to prune keys for block number %d: %s", blockNum, err) - return - } - - err := p.storeLastPrunedIndex(blockNum) - if err != nil { - p.logger.Warnf("failed to store last pruned index for block number %d: %s", blockNum, err) - return - } - - p.deathList = p.deathList[1:] - p.pendingNumber++ - - jdbBatch := p.journalDB.NewBatch() - for _, record := range row { - jk := &journalKey{blockNum, record.blockHash} - err = p.deleteJournalRecord(jdbBatch, jk) - if err != nil { - p.logger.Warnf("failed to delete journal record for block number %d: %s", blockNum, err) - jdbBatch.Reset() - return - } - } - - if err = jdbBatch.Flush(); err != nil { - p.logger.Warnf("failed to flush delete journal record for block number %d: %s", blockNum, err) - return - } - p.logger.Debugf("pruned block number %d", blockNum) - } - - for { - checkPruning() - // Don't sleep if we have data to prune. - if !canPrune { - time.Sleep(pruneInterval) - } - } -} - -func (p *FullNode) storeJournal(key *journalKey, jr *journalRecord) error { - encKey, err := scale.Marshal(*key) - if err != nil { - return fmt.Errorf("failed to encode journal key block num %d: %w", key.blockNum, err) - } - - encRecord, err := scale.Marshal(*jr) - if err != nil { - return fmt.Errorf("failed to encode journal record block num %d: %w", key.blockNum, err) - } - - err = p.journalDB.Put(encKey, encRecord) - if err != nil { - return err - } - - return nil -} - -// loadDeathList loads deathList and deathIndex from journalRecord. -func (p *FullNode) loadDeathList() error { - itr := p.journalDB.NewIterator() - defer itr.Release() - - for itr.Next() { - key := &journalKey{} - err := scale.Unmarshal(itr.Key(), key) - if err != nil { - return fmt.Errorf("failed to decode journal key %w", err) - } - - val := itr.Value() - - jr := &journalRecord{} - err = scale.Unmarshal(val, jr) - if err != nil { - return fmt.Errorf("failed to decode journal record block num %d : %w", key.blockNum, err) - } - - p.addDeathRow(jr, key.blockNum) - } - return nil -} - -func (*FullNode) deleteJournalRecord(b chaindb.Batch, key *journalKey) error { - encKey, err := scale.Marshal(*key) - if err != nil { - return err - } - - err = b.Del(encKey) - if err != nil { - return err - } - - return nil -} - -func (p *FullNode) storeLastPrunedIndex(blockNum int64) error { - encNum, err := scale.Marshal(blockNum) - if err != nil { - return err - } - - err = p.journalDB.Put([]byte(lastPrunedKey), encNum) - if err != nil { - return err - } - - return nil -} - -func (p *FullNode) getLastPrunedIndex() (int64, error) { - val, err := p.journalDB.Get([]byte(lastPrunedKey)) - if errors.Is(err, chaindb.ErrKeyNotFound) { - return 0, nil - } - - if err != nil { - return 0, err - } - - blockNum := int64(0) - err = scale.Unmarshal(val, &blockNum) - if err != nil { - return 0, err - } - - return blockNum, nil -} - -func (*FullNode) deleteKeys(b chaindb.Batch, deletedMerkleValueToBlockNumber map[string]int64) error { - for merkleValue := range deletedMerkleValueToBlockNumber { - err := b.Del([]byte(merkleValue)) - if err != nil { - return err - } - } - - return nil -} diff --git a/dot/state/service_integration_test.go b/dot/state/service_integration_test.go index 7309890c56..2c7d7e4480 100644 --- a/dot/state/service_integration_test.go +++ b/dot/state/service_integration_test.go @@ -168,6 +168,8 @@ func TestService_BlockTree(t *testing.T) { } func TestService_StorageTriePruning(t *testing.T) { + t.Skip() // Unskip once https://github.com/ChainSafe/gossamer/pull/2831 is done + ctrl := gomock.NewController(t) telemetryMock := NewMockTelemetry(ctrl) telemetryMock.EXPECT().SendMessage(gomock.Any()).AnyTimes() @@ -177,7 +179,7 @@ func TestService_StorageTriePruning(t *testing.T) { Path: t.TempDir(), LogLevel: log.Info, PrunerCfg: pruner.Config{ - Mode: pruner.Full, + // Mode: pruner.Full, RetainedBlocks: uint32(retainBlocks), }, Telemetry: telemetryMock, diff --git a/dot/state/storage.go b/dot/state/storage.go index 705db2fe8d..0a4db11ec2 100644 --- a/dot/state/storage.go +++ b/dot/state/storage.go @@ -48,23 +48,12 @@ func NewStorageState(db *chaindb.BadgerDB, blockState *BlockState, tries *Tries, onlinePruner pruner.Config) (*StorageState, error) { storageTable := chaindb.NewTable(db, storagePrefix) - var p pruner.Pruner - if onlinePruner.Mode == pruner.Full { - var err error - p, err = pruner.NewFullNode(db, storageTable, onlinePruner.RetainedBlocks, logger) - if err != nil { - return nil, err - } - } else { - p = &pruner.ArchiveNode{} - } - return &StorageState{ blockState: blockState, tries: tries, db: storageTable, observerList: []Observer{}, - pruner: p, + pruner: &pruner.ArchiveNode{}, }, nil } @@ -74,12 +63,6 @@ func (s *StorageState) StoreTrie(ts *rtstorage.TrieState, header *types.Header) s.tries.softSet(root, ts.Trie()) - if header == nil { - if _, ok := s.pruner.(*pruner.FullNode); ok { - panic("block header cannot be empty for Full node pruner") - } - } - if header != nil { insertedMerkleValues, deletedMerkleValues, err := ts.GetChangedNodeHashes() if err != nil { From b5b1fd91dd6fae7c7f03f07d64dcd4787fb390c5 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Thu, 26 Jan 2023 13:31:30 +0000 Subject: [PATCH 2/2] PR feedback fix typo --- cmd/gossamer/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/gossamer/flags.go b/cmd/gossamer/flags.go index 7c03e1906e..1f795f3e96 100644 --- a/cmd/gossamer/flags.go +++ b/cmd/gossamer/flags.go @@ -368,7 +368,7 @@ var ( // PruningFlag triggers the online pruning of historical state tries. PruningFlag = cli.StringFlag{ Name: "pruning", - Usage: `State trie online pruning (""archive")`, + Usage: `State trie online pruning ("archive")`, Value: dev.DefaultPruningMode, } )