Skip to content

Commit

Permalink
Optimize diff algorithm with insightes from cosmos#646
Browse files Browse the repository at this point in the history
fix test

rename

try to lower memory usage

fix lint
  • Loading branch information
yihuang committed Jan 26, 2023
1 parent 9f9c2a0 commit 21d1932
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 44 deletions.
158 changes: 121 additions & 37 deletions diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package iavl

import (
"bytes"
"sort"

ibytes "github.com/cosmos/iavl/internal/bytes"
)

// ChangeSet represents the state changes extracted from diffing iavl versions.
Expand All @@ -18,59 +15,146 @@ type KVPair struct {
Value []byte
}

// KVPairReceiver is callback parameter of method `extractStateChanges` to receive stream of `KVPair`s.
type KVPairReceiver func(pair *KVPair) error

// extractStateChanges extracts the state changes by between two versions of the tree.
// it first traverse the `root` tree to find out the `newKeys` and `sharedNodes`,
// `newKeys` are the keys of the newly added leaf nodes, which represents the inserts and updates,
// `sharedNodes` are the referenced nodes that are created in previous versions,
// then we traverse the `prevRoot` tree to find out the deletion entries, we can skip the subtrees
// marked by the `sharedNodes`.
func (ndb *nodeDB) extractStateChanges(prevVersion int64, prevRoot []byte, root []byte) (*ChangeSet, error) {
// it first traverse the `root` tree until the first `sharedNode` and record the new leave nodes,
// then traverse the `prevRoot` tree until the current `sharedNode` to find out orphaned leave nodes,
// compare orphaned leave nodes and new leave nodes to produce stream of `KVPair`s and passed to callback.
//
// The algorithm don't run in constant memory strictly, but it tried the best the only
// keep minimal intermediate states in memory.
func (ndb *nodeDB) extractStateChanges(prevVersion int64, prevRoot []byte, root []byte, receiver KVPairReceiver) error {
curIter, err := NewNodeIterator(root, ndb)
if err != nil {
return nil, err
return err
}

prevIter, err := NewNodeIterator(prevRoot, ndb)
if err != nil {
return nil, err
return err
}

var changeSet []KVPair
sharedNodes := make(map[string]struct{})
newKeys := make(map[string]struct{})
for curIter.Valid() {
node := curIter.GetNode()
shared := node.version <= prevVersion
if shared {
sharedNodes[ibytes.UnsafeBytesToStr(node.hash)] = struct{}{}
} else if node.isLeaf() {
changeSet = append(changeSet, KVPair{Key: node.key, Value: node.value})
newKeys[ibytes.UnsafeBytesToStr(node.key)] = struct{}{}
var (
// current shared node between two versions
sharedNode *Node
// record the newly added leaf nodes during the traversal to the `sharedNode`,
// will be compared with found orphaned nodes to produce change set stream.
newLeaves []*Node
)

// consumeNewLeaves concumes remaining `newLeaves` nodes and produce insertion `KVPair`.
consumeNewLeaves := func() error {
for _, node := range newLeaves {
if err := receiver(&KVPair{
Key: node.key,
Value: node.value,
}); err != nil {
return err
}
}
// skip subtree of shared nodes
curIter.Next(shared)

newLeaves = nil
return nil
}
if err := curIter.Error(); err != nil {
return nil, err

// advanceSharedNode forward `curIter` until the next `sharedNode`,
// `sharedNode` will be `nil` if the new version is exhausted.
// it also records the new leaf nodes during the traversal.
advanceSharedNode := func() error {
if err := consumeNewLeaves(); err != nil {
return err
}

sharedNode = nil
for curIter.Valid() {
node := curIter.GetNode()
shared := node.version <= prevVersion
curIter.Next(shared)
if shared {
sharedNode = node
break
} else if node.isLeaf() {
newLeaves = append(newLeaves, node)
}
}

return nil
}
if err := advanceSharedNode(); err != nil {
return err
}

// addOrphanedLeave receives a new orphaned leave node found in previous version,
// compare with the current newLeaves, to produce `iavl.KVPair` stream.
addOrphanedLeave := func(orphaned *Node) error {
for len(newLeaves) > 0 {
new := newLeaves[0]
switch bytes.Compare(orphaned.key, new.key) {
case 1:
// consume a new node as insertion and continue
newLeaves = newLeaves[1:]
if err := receiver(&KVPair{
Key: new.key,
Value: new.value,
}); err != nil {
return err
}
continue

case -1:
// removal, don't consume new nodes
return receiver(&KVPair{
Delete: true,
Key: orphaned.key,
})

case 0:
// update, consume the new node and stop
newLeaves = newLeaves[1:]
return receiver(&KVPair{
Key: new.key,
Value: new.value,
})
}
}

// removal
return receiver(&KVPair{
Delete: true,
Key: orphaned.key,
})
}

// Traverse `prevIter` to find orphaned nodes in the previous version,
// and compare them with newLeaves to generate `KVPair` stream.
for prevIter.Valid() {
node := prevIter.GetNode()
_, shared := sharedNodes[ibytes.UnsafeBytesToStr(node.hash)]
if !shared && node.isLeaf() {
_, updated := newKeys[ibytes.UnsafeBytesToStr(node.key)]
if !updated {
changeSet = append(changeSet, KVPair{Delete: true, Key: node.key})
shared := sharedNode != nil && (node == sharedNode || bytes.Equal(node.hash, sharedNode.hash))
// skip sub-tree of shared nodes
prevIter.Next(shared)
if shared {
if err := advanceSharedNode(); err != nil {
return err
}
} else if node.isLeaf() {
if err := addOrphanedLeave(node); err != nil {
return err
}
}
prevIter.Next(shared)
}

if err := consumeNewLeaves(); err != nil {
return err
}

if err := curIter.Error(); err != nil {
return err
}
if err := prevIter.Error(); err != nil {
return nil, err
return err
}

sort.Slice(changeSet, func(i, j int) bool {
return bytes.Compare(changeSet[i].Key, changeSet[j].Key) == -1
})
return &ChangeSet{Pairs: changeSet}, nil
return nil
}
5 changes: 1 addition & 4 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,7 @@ func (iter *NodeIterator) GetNode() *Node {

// Valid checks if the validator is valid.
func (iter *NodeIterator) Valid() bool {
if iter.err != nil {
return false
}
return len(iter.nodesToVisit) > 0
return iter.err == nil && len(iter.nodesToVisit) > 0
}

// Error returns an error if any errors.
Expand Down
14 changes: 11 additions & 3 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,13 +1060,21 @@ func (ndb *nodeDB) traverseStateChanges(startVersion, endVersion int64, fn func(
return ndb.traverseRange(rootKeyFormat.Key(startVersion), rootKeyFormat.Key(endVersion), func(k, hash []byte) error {
var version int64
rootKeyFormat.Scan(k, &version)
changeSet, err := ndb.extractStateChanges(predecessor, prevRoot, hash)
if err != nil {

var changeSet ChangeSet
receiveKVPair := func(pair *KVPair) error {
changeSet.Pairs = append(changeSet.Pairs, *pair)
return nil
}

if err := ndb.extractStateChanges(predecessor, prevRoot, hash, receiveKVPair); err != nil {
return err
}
if err := fn(version, changeSet); err != nil {

if err := fn(version, &changeSet); err != nil {
return err
}

predecessor = version
prevRoot = hash
return nil
Expand Down

0 comments on commit 21d1932

Please sign in to comment.