Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merkledb -- Add Clearer interface #2277

Merged
merged 18 commits into from
Nov 13, 2023
103 changes: 54 additions & 49 deletions proto/pb/sync/sync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions proto/pb/sync/sync_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/sync/sync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ message Request {
service DB {
rpc GetMerkleRoot(google.protobuf.Empty) returns (GetMerkleRootResponse);

rpc Clear(google.protobuf.Empty) returns (google.protobuf.Empty);

rpc GetProof(GetProofRequest) returns (GetProofResponse);

rpc GetChangeProof(GetChangeProofRequest) returns (GetChangeProofResponse);
Expand Down
37 changes: 37 additions & 0 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// TODO: name better
rebuildViewSizeFractionOfCacheSize = 50
minRebuildViewSizePerCommit = 1000
clearBatchSize = units.MiB
rebuildIntermediateDeletionWriteSize = units.MiB
valueNodePrefixLen = 1
)
Expand Down Expand Up @@ -113,6 +114,12 @@ type RangeProofer interface {
CommitRangeProof(ctx context.Context, start, end maybe.Maybe[[]byte], proof *RangeProof) error
}

type Clearer interface {
// Deletes all key/value pairs from the database
// and clears the change history.
Clear() error
}

type Prefetcher interface {
// PrefetchPath attempts to load all trie nodes on the path of [key]
// into the cache.
Expand All @@ -128,6 +135,7 @@ type Prefetcher interface {

type MerkleDB interface {
database.Database
Clearer
Trie
MerkleRootGetter
ProofGetter
Expand Down Expand Up @@ -1264,6 +1272,35 @@ func (db *merkleDB) getNode(key Key, hasValue bool) (*node, error) {
return db.intermediateNodeDB.Get(key)
}

func (db *merkleDB) Clear() error {
db.commitLock.Lock()
defer db.commitLock.Unlock()

db.lock.Lock()
defer db.lock.Unlock()

// Clear nodes from disk and caches
if err := db.valueNodeDB.Clear(); err != nil {
return err
}
if err := db.intermediateNodeDB.Clear(); err != nil {
return err
}

// Clear root
db.sentinelNode = newNode(Key{})
db.sentinelNode.calculateID(db.metrics)

// Clear history
db.history = newTrieHistory(db.history.maxHistoryLen)
db.history.record(&changeSummary{
rootID: db.getMerkleRoot(),
values: map[Key]*change[maybe.Maybe[[]byte]]{},
nodes: map[Key]*change[*node]{},
})
return nil
}

// Returns [key] prefixed by [prefix].
// The returned []byte is taken from [bufferPool] and
// should be returned to it when the caller is done with it.
Expand Down
45 changes: 45 additions & 0 deletions x/merkledb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

const defaultHistoryLength = 300

var emptyKey Key

// newDB returns a new merkle database with the underlying type so that tests can access unexported fields
func newDB(ctx context.Context, db database.Database, config Config) (*merkleDB, error) {
db, err := New(ctx, db, config)
Expand Down Expand Up @@ -773,6 +775,49 @@ func Test_MerkleDB_Random_Insert_Ordering(t *testing.T) {
}
}

func TestMerkleDBClear(t *testing.T) {
require := require.New(t)

// Make a database and insert some key-value pairs.
db, err := getBasicDB()
require.NoError(err)

emptyRootID := db.getMerkleRoot()

now := time.Now().UnixNano()
t.Logf("seed: %d", now)
r := rand.New(rand.NewSource(now)) // #nosec G404

insertRandomKeyValues(
require,
r,
[]database.Database{db},
1_000,
0.25,
)

// Clear the database.
require.NoError(db.Clear())

// Assert that the database is empty.
iter := db.NewIterator()
defer iter.Release()
require.False(iter.Next())
require.Equal(emptyRootID, db.getMerkleRoot())
require.Equal(emptyKey, db.sentinelNode.key)

// Assert caches are empty.
require.Zero(db.valueNodeDB.nodeCache.Len())
require.Zero(db.intermediateNodeDB.nodeCache.currentSize)

// Assert history has only the clearing change.
require.Len(db.history.lastChanges, 1)
change, ok := db.history.lastChanges[emptyRootID]
require.True(ok)
require.Empty(change.nodes)
require.Empty(change.values)
}

func FuzzMerkleDBEmptyRandomizedActions(f *testing.F) {
f.Fuzz(
func(
Expand Down
11 changes: 11 additions & 0 deletions x/merkledb/intermediate_node_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,14 @@ func (db *intermediateNodeDB) Flush() error {
func (db *intermediateNodeDB) Delete(key Key) error {
return db.nodeCache.Put(key, nil)
}

func (db *intermediateNodeDB) Clear() error {
// Reset the cache. Note we don't flush because that would cause us to
// persist intermediate nodes we're about to delete.
db.nodeCache = newOnEvictCache(
db.nodeCache.maxSize,
db.nodeCache.size,
db.nodeCache.onEviction,
)
return database.AtomicClearPrefix(db.baseDB, db.baseDB, intermediateNodePrefix)
}
29 changes: 29 additions & 0 deletions x/merkledb/intermediate_node_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,32 @@ func Test_IntermediateNodeDB_ConstructDBKey_DirtyBuffer(t *testing.T) {
require.Equal(intermediateNodePrefix, constructedKey[:len(intermediateNodePrefix)])
require.Equal(p.Extend(ToToken(1, 4)).Bytes(), constructedKey[len(intermediateNodePrefix):])
}

func TestIntermediateNodeDBClear(t *testing.T) {
require := require.New(t)
cacheSize := 200
evictionBatchSize := cacheSize
baseDB := memdb.New()
db := newIntermediateNodeDB(
baseDB,
&sync.Pool{
New: func() interface{} { return make([]byte, 0) },
},
&mockMetrics{},
cacheSize,
evictionBatchSize,
4,
)

for _, b := range [][]byte{{1}, {2}, {3}} {
require.NoError(db.Put(ToKey(b), newNode(ToKey(b))))
}

require.NoError(db.Clear())

iter := baseDB.NewIteratorWithPrefix(intermediateNodePrefix)
defer iter.Release()
require.False(iter.Next())

require.Zero(db.nodeCache.currentSize)
}
Loading
Loading