Skip to content

Commit

Permalink
go/storage/mkvs: Fix Finalize after checkpoint restore
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Sep 28, 2020
1 parent 34289ba commit 21b7131
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 2 deletions.
1 change: 1 addition & 0 deletions .changelog/3330.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/storage/mkvs: Fix Finalize after checkpoint restore
147 changes: 147 additions & 0 deletions go/storage/mkvs/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,150 @@ func TestFileCheckpointCreator(t *testing.T) {
_, err = fc.CreateCheckpoint(ctx, invalidRoot, 16*1024)
require.Error(err, "CreateCheckpoint should fail for invalid root")
}

func TestPruneGapAfterCheckpointRestore(t *testing.T) {
require := require.New(t)

// Generate some data.
dir, err := ioutil.TempDir("", "mkvs.checkpoint")
require.NoError(err, "TempDir")
defer os.RemoveAll(dir)

// Create two databases, the first will contain everything while the second one will only
// contain the first few versions.
ndb1, err := badgerDb.New(&db.Config{
DB: filepath.Join(dir, "db1"),
Namespace: testNs,
})
require.NoError(err, "New")

ndb2, err := badgerDb.New(&db.Config{
DB: filepath.Join(dir, "db2"),
Namespace: testNs,
})
require.NoError(err, "New")

ctx := context.Background()
root := node.Root{
Namespace: testNs,
Version: 0,
}
root.Hash.Empty()

const (
numVersions = 20
numKeysPerVersion = 50
numVersionsDb2 = 5
numExtraVersions = 5
)

for v := uint64(0); v < numVersions; v++ {
tree1 := mkvs.NewWithRoot(nil, ndb1, root)
var tree2 mkvs.Tree
if v < numVersionsDb2 {
tree2 = mkvs.NewWithRoot(nil, ndb2, root)
}

for i := 0; i < numKeysPerVersion; i++ {
err = tree1.Insert(ctx, []byte(strconv.Itoa(int(v)*1000+i)), []byte(strconv.Itoa(i)))
require.NoError(err, "Insert")

if tree2 != nil {
err = tree2.Insert(ctx, []byte(strconv.Itoa(int(v)*1000+i)), []byte(strconv.Itoa(i)))
require.NoError(err, "Insert")
}
}

var rootHash1 hash.Hash
_, rootHash1, err = tree1.Commit(ctx, testNs, v)
require.NoError(err, "Commit")
err = ndb1.Finalize(ctx, v, []hash.Hash{rootHash1})
require.NoError(err, "Finalize")
tree1.Close()

if tree2 != nil {
var rootHash2 hash.Hash
_, rootHash2, err = tree2.Commit(ctx, testNs, v)
require.NoError(err, "Commit")
require.EqualValues(rootHash1, rootHash2, "root hashes should be equal")
err = ndb2.Finalize(ctx, v, []hash.Hash{rootHash2})
require.NoError(err, "Finalize")
tree2.Close()
}

root.Version = v
root.Hash = rootHash1
}

// Create a file-based checkpoint creator for the first database.
fc, err := NewFileCreator(filepath.Join(dir, "checkpoints"), ndb1)
require.NoError(err, "NewFileCreator")

// Create a checkpoint and check that it has been created correctly.
cp, err := fc.CreateCheckpoint(ctx, root, 16*1024)
require.NoError(err, "CreateCheckpoint")

// Restore checkpoints in the second database.
rs, err := NewRestorer(ndb2)
require.NoError(err, "NewRestorer")

err = rs.StartRestore(ctx, cp)
require.NoError(err, "StartRestore")
for i := 0; i < len(cp.Chunks); i++ {
var cm *ChunkMetadata
cm, err = cp.GetChunkMetadata(uint64(i))
require.NoError(err, "GetChunkMetadata")

var buf bytes.Buffer
err = fc.GetCheckpointChunk(ctx, cm, &buf)
require.NoError(err, "GetChunk")

_, err = rs.RestoreChunk(ctx, uint64(i), &buf)
require.NoError(err, "RestoreChunk")
}
err = ndb2.Finalize(ctx, root.Version, []hash.Hash{root.Hash})
require.NoError(err, "Finalize")

// Now attempt to prune everything up to (but excluding) the current root.
for v := uint64(0); v < root.Version; v++ {
err = ndb2.Prune(ctx, v)
require.NoError(err, "Prune(%d)", v)
}
checkpointRootVersion := root.Version

// Insert some more stuff in the node database to make sure everything still works.
finalVersion := root.Version + numExtraVersions
for v := root.Version + 1; v <= finalVersion; v++ {
tree1 := mkvs.NewWithRoot(nil, ndb1, root)
tree2 := mkvs.NewWithRoot(nil, ndb2, root)

for i := 0; i < numKeysPerVersion; i++ {
err = tree1.Insert(ctx, []byte(strconv.Itoa(int(v)*1000+i)), []byte(strconv.Itoa(i)))
require.NoError(err, "Insert")
err = tree2.Insert(ctx, []byte(strconv.Itoa(int(v)*1000+i)), []byte(strconv.Itoa(i)))
require.NoError(err, "Insert")
}

var rootHash1 hash.Hash
_, rootHash1, err = tree1.Commit(ctx, testNs, v)
require.NoError(err, "Commit")
err = ndb1.Finalize(ctx, v, []hash.Hash{rootHash1})
require.NoError(err, "Finalize")
tree1.Close()

var rootHash2 hash.Hash
_, rootHash2, err = tree2.Commit(ctx, testNs, v)
require.NoError(err, "Commit")
require.EqualValues(rootHash1, rootHash2, "root hashes should be equal")
err = ndb2.Finalize(ctx, v, []hash.Hash{rootHash2})
require.NoError(err, "Finalize")
tree2.Close()

root.Version = v
root.Hash = rootHash1
}

// Prune the checkpoint root version.
err = ndb2.Prune(ctx, checkpointRootVersion)
require.NoError(err, "Prune(%d)", checkpointRootVersion)
}
4 changes: 2 additions & 2 deletions go/storage/mkvs/db/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,9 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, version uint64, roots []has
tx := d.db.NewTransactionAt(versionToTs(version), true)
defer tx.Discard()

// Make sure that the previous version has been finalized.
// Make sure that the previous version has been finalized (if we are not restoring).
lastFinalizedVersion, exists := d.meta.getLastFinalizedVersion()
if version > 0 && exists && lastFinalizedVersion < (version-1) {
if d.multipartVersion == multipartVersionNone && version > 0 && exists && lastFinalizedVersion < (version-1) {
return api.ErrNotFinalized
}
// Make sure that this version has not yet been finalized.
Expand Down

0 comments on commit 21b7131

Please sign in to comment.