Skip to content

Commit

Permalink
Integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
qdm12 committed Jan 5, 2023
1 parent 1611647 commit a1819b1
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 115 deletions.
83 changes: 58 additions & 25 deletions internal/pruner/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package pruner

import (
"bytes"
"errors"
"fmt"
"sort"
"sync"

"github.com/ChainSafe/chaindb"
Expand Down Expand Up @@ -53,12 +55,38 @@ type journalKey struct {
}

type journalRecord struct {
// InsertedNodeHashes is the set of node hashes of the trie nodes
// InsertedNodeHashes is the node hashes of the trie nodes
// inserted in the trie for the block.
InsertedNodeHashes map[common.Hash]struct{}
// DeletedNodeHashes is the set of node hashes of the trie nodes
InsertedNodeHashes []common.Hash
// DeletedNodeHashes is the node hashes of the trie nodes
// removed from the trie for the block.
DeletedNodeHashes map[common.Hash]struct{}
DeletedNodeHashes []common.Hash
}

// newJournalRecord creates a journal record. Note it converts inserted
// and deleted node hashes from sets to sorted slices to have a deterministic
// encoding, especially for tests.
func newJournalRecord(insertedNodeHashes, deletedNodeHashes map[common.Hash]struct{}) journalRecord {
record := journalRecord{
InsertedNodeHashes: make([]common.Hash, 0, len(insertedNodeHashes)),
DeletedNodeHashes: make([]common.Hash, 0, len(deletedNodeHashes)),
}

for hash := range insertedNodeHashes {
record.InsertedNodeHashes = append(record.InsertedNodeHashes, hash)
}
sort.Slice(record.InsertedNodeHashes, func(i, j int) bool {
return bytes.Compare(record.InsertedNodeHashes[i][:], record.InsertedNodeHashes[j][:]) < 0
})

for hash := range deletedNodeHashes {
record.DeletedNodeHashes = append(record.DeletedNodeHashes, hash)
}
sort.Slice(record.DeletedNodeHashes, func(i, j int) bool {
return bytes.Compare(record.DeletedNodeHashes[i][:], record.DeletedNodeHashes[j][:]) < 0
})

return record
}

// NewFullNode creates a full node pruner.
Expand Down Expand Up @@ -141,10 +169,15 @@ func (p *FullNode) StoreJournalRecord(deletedNodeHashes, insertedNodeHashes map[

journalDBBatch = p.journalDatabase.NewBatch()

// Update highest block number only in memory so `pruneAll` can use it,
// Update highest block number in memory and on disk so `pruneAll` can use it,
// prune and flush the deletions in the journal and storage databases.
if blockNumber > p.highestBlockNumber {
p.highestBlockNumber = blockNumber
err = storeBlockNumberAtKey(journalDBBatch, []byte(highestBlockNumberKey), blockNumber)
if err != nil {
journalDBBatch.Reset()
return fmt.Errorf("storing highest block number in journal database: %w", err)
}
}

// Prune before inserting a new journal record
Expand All @@ -154,14 +187,6 @@ func (p *FullNode) StoreJournalRecord(deletedNodeHashes, insertedNodeHashes map[
return fmt.Errorf("pruning database: %w", err)
}

if blockNumber > p.highestBlockNumber {
err = storeBlockNumberAtKey(journalDBBatch, []byte(highestBlockNumberKey), blockNumber)
if err != nil {
journalDBBatch.Reset()
return fmt.Errorf("storing highest block number in journal database: %w", err)
}
}

// Note we store block number <-> block hashes in the database
// so we can pick up the block hashes after a program restart
// using the stored last pruned block number and stored highest
Expand All @@ -172,10 +197,7 @@ func (p *FullNode) StoreJournalRecord(deletedNodeHashes, insertedNodeHashes map[
return fmt.Errorf("recording block hash in journal database: %w", err)
}

record := journalRecord{
InsertedNodeHashes: insertedNodeHashes,
DeletedNodeHashes: deletedNodeHashes,
}
record := newJournalRecord(insertedNodeHashes, deletedNodeHashes)
err = storeJournalRecord(journalDBBatch, blockNumber, blockHash, record)
if err != nil {
journalDBBatch.Reset()
Expand Down Expand Up @@ -238,7 +260,7 @@ func (p *FullNode) handleInsertedKey(insertedNodeHash common.Hash, blockNumber u
return nil
}

// Remove node hash from the deleted set of the block it was deleted in.
// Remove node hash from the deleted node hashes of the block it was deleted in.
encodedJournalRecord, err := p.journalDatabase.Get(journalKeyDeletedAt)
if err != nil {
return fmt.Errorf("getting record from journal database: %w", err)
Expand All @@ -249,8 +271,16 @@ func (p *FullNode) handleInsertedKey(insertedNodeHash common.Hash, blockNumber u
if err != nil {
return fmt.Errorf("decoding journal record: %w", err)
}

delete(record.DeletedNodeHashes, insertedNodeHash)
for i, deletedNodeHash := range record.DeletedNodeHashes {
if deletedNodeHash != insertedNodeHash {
continue
}
lastIndex := len(record.DeletedNodeHashes) - 1
record.DeletedNodeHashes[lastIndex], record.DeletedNodeHashes[i] =
record.DeletedNodeHashes[i], record.DeletedNodeHashes[lastIndex]
record.DeletedNodeHashes = record.DeletedNodeHashes[:lastIndex-1]
break
}

encodedJournalRecord, err = scale.Marshal(record)
if err != nil {
Expand All @@ -266,13 +296,13 @@ func (p *FullNode) handleInsertedKey(insertedNodeHash common.Hash, blockNumber u
}

func (p *FullNode) pruneAll(journalDBBatch PutDeleter) (err error) {
if p.highestBlockNumber-p.nextBlockNumberToPrune <= p.retainBlocks {
if p.highestBlockNumber-p.nextBlockNumberToPrune < p.retainBlocks {
return nil
}

storageBatch := p.storageDatabase.NewBatch()
blockNumberToPrune := p.nextBlockNumberToPrune
for p.highestBlockNumber-blockNumberToPrune > p.retainBlocks {
for p.highestBlockNumber-blockNumberToPrune >= p.retainBlocks {
err := prune(blockNumberToPrune, p.journalDatabase, journalDBBatch, storageBatch)
if err != nil {
storageBatch.Reset()
Expand All @@ -281,7 +311,10 @@ func (p *FullNode) pruneAll(journalDBBatch PutDeleter) (err error) {
blockNumberToPrune++
}

lastBlockNumberPruned := blockNumberToPrune - 1
var lastBlockNumberPruned uint32
if blockNumberToPrune > 0 {
lastBlockNumberPruned = blockNumberToPrune - 1
}

err = storeBlockNumberAtKey(journalDBBatch, []byte(lastPrunedKey), lastBlockNumberPruned)
if err != nil {
Expand Down Expand Up @@ -329,7 +362,7 @@ func pruneStorage(blockNumber uint32, blockHashes []common.Hash,
return fmt.Errorf("getting journal record: %w", err)
}

for deletedNodeHash := range record.DeletedNodeHashes {
for _, deletedNodeHash := range record.DeletedNodeHashes {
err = batch.Del(deletedNodeHash.ToBytes())
if err != nil {
return fmt.Errorf("deleting key from batch: %w", err)
Expand Down Expand Up @@ -375,7 +408,7 @@ func storeJournalRecord(batch Putter, blockNumber uint32, blockHash common.Hash,
return fmt.Errorf("scale encoding journal key: %w", err)
}

for deletedNodeHash := range record.DeletedNodeHashes {
for _, deletedNodeHash := range record.DeletedNodeHashes {
// We store the block hash + block number for each deleted node hash
// so a node hash can quickly be checked for from the journal database
// when running `handleInsertedKey`.
Expand Down
Loading

0 comments on commit a1819b1

Please sign in to comment.