Skip to content

Commit

Permalink
Merge pull request #3843 from oasisprotocol/kostko/fix/mkvs-migrate-w…
Browse files Browse the repository at this point in the history
…l-and-deps

go/storage/mkvs: Prune old write logs and do not depend on root order
  • Loading branch information
kostko authored Apr 9, 2021
2 parents b58a0aa + d3efbcb commit 474daee
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 31 deletions.
Empty file added .changelog/3843.trivial.md
Empty file.
33 changes: 21 additions & 12 deletions go/storage/mkvs/db/badger/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,8 @@ func checkSanityInternal(ctx context.Context, db *badgerNodeDB, display DisplayH
return fmt.Errorf("mkvs/badger/check: error reading roots metadata for version %d: %w", version, err)
}

for rootHash, dstRoots := range rootsMeta.Roots {
// Make sure a writelog exists for each root pair.
for _, dstRoot := range dstRoots {
dstVersion, ok := lastRoots[dstRoot]
if !ok {
return fmt.Errorf("mkvs/badger/check: missing target root (%s -> %s)", rootHash, dstRoot)
}
_, err = txn.Get(writeLogKeyFmt.Encode(dstVersion, &dstRoot, &rootHash)) //nolint: gosec
if err != nil {
return fmt.Errorf("mkvs/badger/check: missing write log (%d, %s, %s)", dstVersion, dstRoot, rootHash)
}
}
// Check tree consistenncy.
for rootHash := range rootsMeta.Roots {
lastRoots[rootHash] = version

root := node.Root{
Expand All @@ -164,6 +154,25 @@ func checkSanityInternal(ctx context.Context, db *badgerNodeDB, display DisplayH
}
}

// Make sure a writelog exists for each root pair.
for rootHash, dstRoots := range rootsMeta.Roots {
for _, dstRoot := range dstRoots {
dstVersion, ok := lastRoots[dstRoot]
if !ok {
return fmt.Errorf("mkvs/badger/check: missing target root (%s -> %s)", rootHash, dstRoot)
}
_, err = txn.Get(writeLogKeyFmt.Encode(dstVersion, &dstRoot, &rootHash)) //nolint: gosec
if err != nil {
return fmt.Errorf("mkvs/badger/check: missing write log (%d, %s, %s)", dstVersion, dstRoot, rootHash)
}
}
}
for rootHash, v := range lastRoots {
if v != version {
delete(lastRoots, rootHash)
}
}

doneVersions++
display.DisplayProgress("versions checked", doneVersions, totalVersions)
}
Expand Down
67 changes: 48 additions & 19 deletions go/storage/mkvs/db/badger/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ func (v5 *v5Migrator) migrateVersion(version uint64, migratedRoots map[typedHash
}

newRoots := make(map[typedHash][]typedHash)
for root, dstRoots := range roots.Roots {
for root := range roots.Roots {
// Migrate the tree (if not empty).
var newRootHash hash.Hash
if rootHash := root.Hash(); !rootHash.IsEmpty() {
Expand All @@ -1025,22 +1025,6 @@ func (v5 *v5Migrator) migrateVersion(version uint64, migratedRoots map[typedHash

newRoot := typedHashFromParts(root.Type(), newRootHash)
newRoots[newRoot] = []typedHash{}
for _, dstRoot := range dstRoots {
if migratedRoot, exists := migratedRoots[dstRoot]; exists {
newRoots[newRoot] = append(newRoots[newRoot], migratedRoot.Hash)

// Migrate write log.
if err = v5.migrateWriteLog(root, dstRoot, newRoot, migratedRoot); err != nil {
return false, err
}
} else {
return false, fmt.Errorf("internal error: derived root %s not migrated", dstRoot)
}
}
for _, dstRoot := range dstRoots {
delete(migratedRoots, dstRoot)
}

migratedRoots[root] = v5MigratedRoot{Hash: newRoot, Version: version}

// Check for a write log from empty root.
Expand Down Expand Up @@ -1081,6 +1065,28 @@ func (v5 *v5Migrator) migrateVersion(version uint64, migratedRoots map[typedHash

v5.helper.Display(fmt.Sprintf("migrated root %s -> %s", root, newRoot))
}
for root, dstRoots := range roots.Roots {
newRoot := migratedRoots[root].Hash

for _, dstRoot := range dstRoots {
migratedRoot, exists := migratedRoots[dstRoot]
if !exists {
return false, fmt.Errorf("internal error: derived root %s not migrated", dstRoot)
}
newRoots[newRoot] = append(newRoots[newRoot], migratedRoot.Hash)

// Migrate write log.
if err = v5.migrateWriteLog(root, dstRoot, newRoot, migratedRoot); err != nil {
return false, err
}
}
}
// Remove any migrated roots that are not in this version.
for root, meta := range migratedRoots {
if meta.Version != version {
delete(migratedRoots, root)
}
}

// Update roots metadata.
roots.Roots = newRoots
Expand Down Expand Up @@ -1200,13 +1206,13 @@ func (v5 *v5Migrator) pruneVersion(version uint64) error {
}

// Prune all write logs in version.
prefix := writeLogKeyFmt.Encode(version)
prefix := v4WriteLogKeyFmt.Encode(version)
it := v5.readTxn.NewIterator(badger.IteratorOptions{Prefix: prefix})
defer it.Close()

for it.Rewind(); it.Valid(); it.Next() {
if err = v5.changeBatch.DeleteAt(it.Item().KeyCopy(nil), it.Item().Version()); err != nil {
return err
return fmt.Errorf("error pruning writelog: %w", err)
}
}

Expand All @@ -1220,6 +1226,19 @@ func (v5 *v5Migrator) pruneVersion(version uint64) error {
return nil
}

func (v5 *v5Migrator) pruneWriteLog(version uint64, oldRoot typedHash) error {
prefix := v4WriteLogKeyFmt.Encode(version, &oldRoot)
it := v5.readTxn.NewIterator(badger.IteratorOptions{Prefix: prefix})
defer it.Close()

for it.Rewind(); it.Valid(); it.Next() {
if err := v5.changeBatch.DeleteAt(it.Item().KeyCopy(nil), it.Item().Version()); err != nil {
return fmt.Errorf("error pruning writelog: %w", err)
}
}
return nil
}

func (v5 *v5Migrator) TargetVersion() uint64 {
return 5
}
Expand Down Expand Up @@ -1306,6 +1325,16 @@ func (v5 *v5Migrator) Migrate() (rversion uint64, rerr error) {
}
}

// Make sure to prune any write logs which originate before the first version.
for old, new := range migratedRoots {
if new.Version != firstVersion {
continue
}
if err = v5.pruneWriteLog(new.Version, old); err != nil {
return 0, fmt.Errorf("error pruning old writelogs: %w", err)
}
}

// Remove any data and metadata for versions less than firstVersion.
v5.db.db.SetDiscardTs(versionToTs(firstVersion))

Expand Down

0 comments on commit 474daee

Please sign in to comment.