Skip to content

Commit

Permalink
feat: more compact exported snapshot (#703)
Browse files Browse the repository at this point in the history
Co-authored-by: cool-developer <[email protected]>
Co-authored-by: Marko <[email protected]>
(cherry picked from commit b544dc0)

# Conflicts:
#	CHANGELOG.md
  • Loading branch information
yihuang authored and mergify[bot] committed Apr 13, 2023
1 parent 6553fd6 commit b435b63
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 37 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

### Improvements

<<<<<<< HEAD
- [#726](https://github.com/cosmos/iavl/pull/726) Make `KVPair` and `ChangeSet` serializable with protobuf.
=======
- [#703](https://github.com/cosmos/iavl/pull/703) New APIs `NewCompressExporter`/`NewCompressImporter` to support more compact snapshot format.
>>>>>>> b544dc0 (feat: more compact exported snapshot (#703))
### Breaking Changes

Expand Down
144 changes: 144 additions & 0 deletions compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package iavl

import (
"encoding/binary"
"fmt"
)

type NodeExporter interface {
Next() (*ExportNode, error)
}

type NodeImporter interface {
Add(*ExportNode) error
}

// CompressExporter wraps the normal exporter to apply some compressions on `ExportNode`:
// - branch keys are skipped
// - leaf keys are encoded with delta compared with the previous leaf
// - branch node's version are encoded with delta compared with the max version in it's children
type CompressExporter struct {
inner NodeExporter
lastKey []byte
versionStack []int64
}

var _ NodeExporter = (*CompressExporter)(nil)

func NewCompressExporter(exporter NodeExporter) NodeExporter {
return &CompressExporter{inner: exporter}
}

func (e *CompressExporter) Next() (*ExportNode, error) {
n, err := e.inner.Next()
if err != nil {
return nil, err
}

if n.Height == 0 {
// apply delta encoding to leaf keys
n.Key, e.lastKey = deltaEncode(n.Key, e.lastKey), n.Key

e.versionStack = append(e.versionStack, n.Version)
} else {
// branch keys can be derived on the fly when import, safe to skip
n.Key = nil

// delta encode the version
maxVersion := maxInt64(e.versionStack[len(e.versionStack)-1], e.versionStack[len(e.versionStack)-2])
e.versionStack = e.versionStack[:len(e.versionStack)-1]
e.versionStack[len(e.versionStack)-1] = n.Version
n.Version -= maxVersion
}

return n, nil
}

// CompressImporter wraps the normal importer to do de-compressions before hand.
type CompressImporter struct {
inner NodeImporter
lastKey []byte
minKeyStack [][]byte
versionStack []int64
}

var _ NodeImporter = (*CompressImporter)(nil)

func NewCompressImporter(importer NodeImporter) NodeImporter {
return &CompressImporter{inner: importer}
}

func (i *CompressImporter) Add(node *ExportNode) error {
if node.Height == 0 {
key, err := deltaDecode(node.Key, i.lastKey)
if err != nil {
return err
}
node.Key = key
i.lastKey = key

i.minKeyStack = append(i.minKeyStack, key)
i.versionStack = append(i.versionStack, node.Version)
} else {
// use the min-key in right branch as the node key
node.Key = i.minKeyStack[len(i.minKeyStack)-1]
// leave the min-key in left branch in the stack
i.minKeyStack = i.minKeyStack[:len(i.minKeyStack)-1]

// decode branch node version
maxVersion := maxInt64(i.versionStack[len(i.versionStack)-1], i.versionStack[len(i.versionStack)-2])
node.Version += maxVersion
i.versionStack = i.versionStack[:len(i.versionStack)-1]
i.versionStack[len(i.versionStack)-1] = node.Version
}

return i.inner.Add(node)
}

func deltaEncode(key, lastKey []byte) []byte {
var sizeBuf [binary.MaxVarintLen64]byte
shared := diffOffset(lastKey, key)
n := binary.PutUvarint(sizeBuf[:], uint64(shared))
return append(sizeBuf[:n], key[shared:]...)
}

func deltaDecode(key, lastKey []byte) ([]byte, error) {
shared, n := binary.Uvarint(key)
if n <= 0 {
return nil, fmt.Errorf("uvarint parse failed %d", n)
}

key = key[n:]
if shared == 0 {
return key, nil
}

newKey := make([]byte, shared+uint64(len(key)))
copy(newKey, lastKey[:shared])
copy(newKey[shared:], key)
return newKey, nil
}

// diffOffset returns the index of first byte that's different in two bytes slice.
func diffOffset(a, b []byte) int {
var off int
var l int
if len(a) < len(b) {
l = len(a)
} else {
l = len(b)
}
for ; off < l; off++ {
if a[off] != b[off] {
break
}
}
return off
}

func maxInt64(a, b int64) int64 {
if a > b {
return a
}
return b
}
129 changes: 92 additions & 37 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func setupExportTreeBasic(t require.TestingT) *ImmutableTree {
require.NoError(t, err)
_, _, err = tree.Remove([]byte("z"))
require.NoError(t, err)
_, err = tree.Set([]byte("abc"), []byte{6})
require.NoError(t, err)
_, version, err := tree.SaveVersion()
require.NoError(t, err)

Expand Down Expand Up @@ -162,10 +164,12 @@ func TestExporter(t *testing.T) {

expect := []*ExportNode{
{Key: []byte("a"), Value: []byte{1}, Version: 1, Height: 0},
{Key: []byte("abc"), Value: []byte{6}, Version: 3, Height: 0},
{Key: []byte("abc"), Value: nil, Version: 3, Height: 1},
{Key: []byte("b"), Value: []byte{2}, Version: 3, Height: 0},
{Key: []byte("b"), Value: nil, Version: 3, Height: 1},
{Key: []byte("c"), Value: []byte{3}, Version: 3, Height: 0},
{Key: []byte("c"), Value: nil, Version: 3, Height: 2},
{Key: []byte("c"), Value: nil, Version: 3, Height: 1},
{Key: []byte("b"), Value: nil, Version: 3, Height: 2},
{Key: []byte("d"), Value: []byte{4}, Version: 2, Height: 0},
{Key: []byte("e"), Value: []byte{5}, Version: 3, Height: 0},
{Key: []byte("e"), Value: nil, Version: 3, Height: 1},
Expand All @@ -188,6 +192,41 @@ func TestExporter(t *testing.T) {
assert.Equal(t, expect, actual)
}

func TestExporterCompress(t *testing.T) {
tree := setupExportTreeBasic(t)

expect := []*ExportNode{
{Key: []byte{0, 'a'}, Value: []byte{1}, Version: 1, Height: 0},
{Key: []byte{1, 'b', 'c'}, Value: []byte{6}, Version: 3, Height: 0},
{Key: nil, Value: nil, Version: 0, Height: 1},
{Key: []byte{0, 'b'}, Value: []byte{2}, Version: 3, Height: 0},
{Key: []byte{0, 'c'}, Value: []byte{3}, Version: 3, Height: 0},
{Key: nil, Value: nil, Version: 0, Height: 1},
{Key: nil, Value: nil, Version: 0, Height: 2},
{Key: []byte{0, 'd'}, Value: []byte{4}, Version: 2, Height: 0},
{Key: []byte{0, 'e'}, Value: []byte{5}, Version: 3, Height: 0},
{Key: nil, Value: nil, Version: 0, Height: 1},
{Key: nil, Value: nil, Version: 0, Height: 3},
}

actual := make([]*ExportNode, 0, len(expect))
innerExporter, err := tree.Export()
require.NoError(t, err)
defer innerExporter.Close()

exporter := NewCompressExporter(innerExporter)
for {
node, err := exporter.Next()
if err == ErrorExportDone {
break
}
require.NoError(t, err)
actual = append(actual, node)
}

assert.Equal(t, expect, actual)
}

func TestExporter_Import(t *testing.T) {
testcases := map[string]*ImmutableTree{
"empty tree": NewImmutableTree(db.NewMemDB(), 0, false),
Expand All @@ -200,50 +239,66 @@ func TestExporter_Import(t *testing.T) {

for desc, tree := range testcases {
tree := tree
t.Run(desc, func(t *testing.T) {
t.Parallel()

exporter, err := tree.Export()
require.NoError(t, err)
defer exporter.Close()

newTree, err := NewMutableTree(db.NewMemDB(), 0, false)
require.NoError(t, err)
importer, err := newTree.Import(tree.Version())
require.NoError(t, err)
defer importer.Close()

for {
item, err := exporter.Next()
if err == ErrorExportDone {
err = importer.Commit()
require.NoError(t, err)
break
for _, compress := range []bool{false, true} {
if compress {
desc += "-compress"
}
compress := compress
t.Run(desc, func(t *testing.T) {
t.Parallel()

innerExporter, err := tree.Export()
require.NoError(t, err)
defer innerExporter.Close()

exporter := NodeExporter(innerExporter)
if compress {
exporter = NewCompressExporter(innerExporter)
}

newTree, err := NewMutableTree(db.NewMemDB(), 0, false)
require.NoError(t, err)
err = importer.Add(item)
innerImporter, err := newTree.Import(tree.Version())
require.NoError(t, err)
}
defer innerImporter.Close()

treeHash, err := tree.Hash()
require.NoError(t, err)
newTreeHash, err := newTree.Hash()
require.NoError(t, err)
importer := NodeImporter(innerImporter)
if compress {
importer = NewCompressImporter(innerImporter)
}

require.Equal(t, treeHash, newTreeHash, "Tree hash mismatch")
require.Equal(t, tree.Size(), newTree.Size(), "Tree size mismatch")
require.Equal(t, tree.Version(), newTree.Version(), "Tree version mismatch")
for {
item, err := exporter.Next()
if err == ErrorExportDone {
err = innerImporter.Commit()
require.NoError(t, err)
break
}
require.NoError(t, err)
err = importer.Add(item)
require.NoError(t, err)
}

tree.Iterate(func(key, value []byte) bool { //nolint:errcheck
index, _, err := tree.GetWithIndex(key)
treeHash, err := tree.Hash()
require.NoError(t, err)
newIndex, newValue, err := newTree.GetWithIndex(key)
newTreeHash, err := newTree.Hash()
require.NoError(t, err)
require.Equal(t, index, newIndex, "Index mismatch for key %v", key)
require.Equal(t, value, newValue, "Value mismatch for key %v", key)
return false

require.Equal(t, treeHash, newTreeHash, "Tree hash mismatch")
require.Equal(t, tree.Size(), newTree.Size(), "Tree size mismatch")
require.Equal(t, tree.Version(), newTree.Version(), "Tree version mismatch")

tree.Iterate(func(key, value []byte) bool { //nolint:errcheck
index, _, err := tree.GetWithIndex(key)
require.NoError(t, err)
newIndex, newValue, err := newTree.GetWithIndex(key)
require.NoError(t, err)
require.Equal(t, index, newIndex, "Index mismatch for key %v", key)
require.Equal(t, value, newValue, "Value mismatch for key %v", key)
return false
})
})
})
}
}
}

Expand Down

0 comments on commit b435b63

Please sign in to comment.