Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merkledb -- Add Clearer interface #2277

Merged
merged 18 commits into from
Nov 13, 2023
103 changes: 54 additions & 49 deletions proto/pb/sync/sync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions proto/pb/sync/sync_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/sync/sync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ message Request {
service DB {
rpc GetMerkleRoot(google.protobuf.Empty) returns (GetMerkleRootResponse);

rpc Clear(google.protobuf.Empty) returns (google.protobuf.Empty);

rpc GetProof(GetProofRequest) returns (GetProofResponse);

rpc GetChangeProof(GetChangeProofRequest) returns (GetChangeProofResponse);
Expand Down
45 changes: 45 additions & 0 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
// TODO: name better
rebuildViewSizeFractionOfCacheSize = 50
minRebuildViewSizePerCommit = 1000
clearBatchSize = units.MiB
rebuildIntermediateDeletionWriteSize = units.MiB
valueNodePrefixLen = 1
)
Expand Down Expand Up @@ -114,6 +115,12 @@
CommitRangeProof(ctx context.Context, start, end maybe.Maybe[[]byte], proof *RangeProof) error
}

type Clearer interface {
// Deletes all key/value pairs from the database
// and clears the change history.
Clear() error
}

type Prefetcher interface {
// PrefetchPath attempts to load all trie nodes on the path of [key]
// into the cache.
Expand All @@ -129,6 +136,7 @@

type MerkleDB interface {
database.Database
Clearer
Trie
MerkleRootGetter
ProofGetter
Expand Down Expand Up @@ -1250,9 +1258,46 @@
return db.intermediateNodeDB.Get(key)
}

func (db *merkleDB) Clear() error {
db.commitLock.Lock()
defer db.commitLock.Unlock()

db.lock.Lock()
defer db.lock.Unlock()

// Clear caches
if err := db.valueNodeDB.Clear(); err != nil {
return err
}
if err := db.intermediateNodeDB.Clear(); err != nil {
return err
}

// Clear nodes from disk
if err := database.ClearPrefix(db.baseDB, valueNodePrefix, clearBatchSize); err != nil {
danlaine marked this conversation as resolved.
Show resolved Hide resolved
return err
}
if err := database.ClearPrefix(db.baseDB, intermediateNodePrefix, clearBatchSize); err != nil {
return err
}

// Clear root
db.root = newNode(Key{})
db.root.calculateID(db.metrics)

// Clear history
db.history = newTrieHistory(db.history.maxHistoryLen)
db.history.record(&changeSummary{
rootID: db.getMerkleRoot(),
values: map[Key]*change[maybe.Maybe[[]byte]]{},
nodes: map[Key]*change[*node]{},
})
return nil
}

// Returns [key] prefixed by [prefix].
// The returned []byte is taken from [bufferPool] and

Check failure on line 1299 in x/merkledb/db.go

View workflow job for this annotation

GitHub Actions / Static analysis

db.root undefined (type *merkleDB has no field or method root)

Check failure on line 1299 in x/merkledb/db.go

View workflow job for this annotation

GitHub Actions / Static analysis

db.root undefined (type *merkleDB has no field or method root)

Check failure on line 1299 in x/merkledb/db.go

View workflow job for this annotation

GitHub Actions / Static analysis

db.root undefined (type *merkleDB has no field or method root)

Check failure on line 1299 in x/merkledb/db.go

View workflow job for this annotation

GitHub Actions / unit_tests (self-hosted, linux, ARM64, focal)

db.root undefined (type *merkleDB has no field or method root)

Check failure on line 1299 in x/merkledb/db.go

View workflow job for this annotation

GitHub Actions / unit_tests (self-hosted, linux, ARM64, jammy)

db.root undefined (type *merkleDB has no field or method root)
// should be returned to it when the caller is done with it.

Check failure on line 1300 in x/merkledb/db.go

View workflow job for this annotation

GitHub Actions / Static analysis

db.root undefined (type *merkleDB has no field or method root)) (typecheck)

Check failure on line 1300 in x/merkledb/db.go

View workflow job for this annotation

GitHub Actions / Static analysis

db.root undefined (type *merkleDB has no field or method root)

Check failure on line 1300 in x/merkledb/db.go

View workflow job for this annotation

GitHub Actions / Static analysis

db.root undefined (type *merkleDB has no field or method root)) (typecheck)

Check failure on line 1300 in x/merkledb/db.go

View workflow job for this annotation

GitHub Actions / unit_tests (self-hosted, linux, ARM64, focal)

db.root undefined (type *merkleDB has no field or method root)

Check failure on line 1300 in x/merkledb/db.go

View workflow job for this annotation

GitHub Actions / unit_tests (self-hosted, linux, ARM64, jammy)

db.root undefined (type *merkleDB has no field or method root)
func addPrefixToKey(bufferPool *sync.Pool, prefix []byte, key []byte) []byte {
prefixLen := len(prefix)
keyLen := prefixLen + len(key)
Expand Down
45 changes: 45 additions & 0 deletions x/merkledb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

const defaultHistoryLength = 300

var emptyKey Key

// newDB returns a new merkle database with the underlying type so that tests can access unexported fields
func newDB(ctx context.Context, db database.Database, config Config) (*merkleDB, error) {
db, err := New(ctx, db, config)
Expand Down Expand Up @@ -773,6 +775,49 @@
}
}

func TestMerkleDBClear(t *testing.T) {
require := require.New(t)

// Make a database and insert some key-value pairs.
db, err := getBasicDB()
require.NoError(err)

emptyRootID := db.getMerkleRoot()

now := time.Now().UnixNano()
t.Logf("seed: %d", now)
r := rand.New(rand.NewSource(now)) // #nosec G404

insertRandomKeyValues(
require,
r,
[]database.Database{db},
1_000,
0.25,
)

// Clear the database.
require.NoError(db.Clear())

// Assert that the database is empty.
iter := db.NewIterator()
defer iter.Release()
require.False(iter.Next())
require.Equal(emptyRootID, db.getMerkleRoot())
require.Equal(emptyKey, db.root.key)

Check failure on line 807 in x/merkledb/db_test.go

View workflow job for this annotation

GitHub Actions / Static analysis

db.root undefined (type *merkleDB has no field or method root) (typecheck)

// Assert caches are empty.
require.Zero(db.valueNodeDB.nodeCache.Len())
require.Zero(db.intermediateNodeDB.nodeCache.currentSize)

// Assert history has only the clearing change.
require.Len(db.history.lastChanges, 1)
change, ok := db.history.lastChanges[emptyRootID]
require.True(ok)
require.Empty(change.nodes)
require.Empty(change.values)
}

func FuzzMerkleDBEmptyRandomizedActions(f *testing.F) {
f.Fuzz(
func(
Expand Down
11 changes: 11 additions & 0 deletions x/merkledb/intermediate_node_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,14 @@ func (db *intermediateNodeDB) Flush() error {
func (db *intermediateNodeDB) Delete(key Key) error {
return db.nodeCache.Put(key, nil)
}

func (db *intermediateNodeDB) Clear() error {
// Reset the cache. Note we don't flush because that would cause us to
// persist intermediate nodes we're about to delete.
db.nodeCache = newOnEvictCache(
db.nodeCache.maxSize,
db.nodeCache.size,
db.nodeCache.onEviction,
)
return database.AtomicClearPrefix(db.baseDB, db.baseDB, valueNodePrefix)
danlaine marked this conversation as resolved.
Show resolved Hide resolved
}
29 changes: 29 additions & 0 deletions x/merkledb/intermediate_node_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,32 @@ func Test_IntermediateNodeDB_ConstructDBKey_DirtyBuffer(t *testing.T) {
require.Equal(intermediateNodePrefix, constructedKey[:len(intermediateNodePrefix)])
require.Equal(p.Extend(ToToken(1, 4)).Bytes(), constructedKey[len(intermediateNodePrefix):])
}

func TestIntermediateNodeDBClear(t *testing.T) {
require := require.New(t)
cacheSize := 200
evictionBatchSize := cacheSize
baseDB := memdb.New()
db := newIntermediateNodeDB(
baseDB,
&sync.Pool{
New: func() interface{} { return make([]byte, 0) },
},
&mockMetrics{},
cacheSize,
evictionBatchSize,
4,
)

for _, b := range [][]byte{{1}, {2}, {3}} {
require.NoError(db.Put(ToKey(b), newNode(ToKey(b))))
}

require.NoError(db.Clear())

iter := baseDB.NewIteratorWithPrefix(intermediateNodePrefix)
defer iter.Release()
require.False(iter.Next())

require.Zero(db.nodeCache.currentSize)
}
Loading
Loading