From 21d19324770d53fffb08d189eea17bf226364d7f Mon Sep 17 00:00:00 2001 From: HuangYi Date: Wed, 11 Jan 2023 10:39:02 +0800 Subject: [PATCH] Optimize diff algorithm with insightes from #646 fix test rename try to lower memory usage fix lint --- diff.go | 158 ++++++++++++++++++++++++++++++++++++++++------------ iterator.go | 5 +- nodedb.go | 14 ++++- 3 files changed, 133 insertions(+), 44 deletions(-) diff --git a/diff.go b/diff.go index b16cd32fa..2013a3c8f 100644 --- a/diff.go +++ b/diff.go @@ -2,9 +2,6 @@ package iavl import ( "bytes" - "sort" - - ibytes "github.com/cosmos/iavl/internal/bytes" ) // ChangeSet represents the state changes extracted from diffing iavl versions. @@ -18,59 +15,146 @@ type KVPair struct { Value []byte } +// KVPairReceiver is callback parameter of method `extractStateChanges` to receive stream of `KVPair`s. +type KVPairReceiver func(pair *KVPair) error + // extractStateChanges extracts the state changes by between two versions of the tree. -// it first traverse the `root` tree to find out the `newKeys` and `sharedNodes`, -// `newKeys` are the keys of the newly added leaf nodes, which represents the inserts and updates, -// `sharedNodes` are the referenced nodes that are created in previous versions, -// then we traverse the `prevRoot` tree to find out the deletion entries, we can skip the subtrees -// marked by the `sharedNodes`. -func (ndb *nodeDB) extractStateChanges(prevVersion int64, prevRoot []byte, root []byte) (*ChangeSet, error) { +// it first traverse the `root` tree until the first `sharedNode` and record the new leave nodes, +// then traverse the `prevRoot` tree until the current `sharedNode` to find out orphaned leave nodes, +// compare orphaned leave nodes and new leave nodes to produce stream of `KVPair`s and passed to callback. +// +// The algorithm don't run in constant memory strictly, but it tried the best the only +// keep minimal intermediate states in memory. +func (ndb *nodeDB) extractStateChanges(prevVersion int64, prevRoot []byte, root []byte, receiver KVPairReceiver) error { curIter, err := NewNodeIterator(root, ndb) if err != nil { - return nil, err + return err } prevIter, err := NewNodeIterator(prevRoot, ndb) if err != nil { - return nil, err + return err } - var changeSet []KVPair - sharedNodes := make(map[string]struct{}) - newKeys := make(map[string]struct{}) - for curIter.Valid() { - node := curIter.GetNode() - shared := node.version <= prevVersion - if shared { - sharedNodes[ibytes.UnsafeBytesToStr(node.hash)] = struct{}{} - } else if node.isLeaf() { - changeSet = append(changeSet, KVPair{Key: node.key, Value: node.value}) - newKeys[ibytes.UnsafeBytesToStr(node.key)] = struct{}{} + var ( + // current shared node between two versions + sharedNode *Node + // record the newly added leaf nodes during the traversal to the `sharedNode`, + // will be compared with found orphaned nodes to produce change set stream. + newLeaves []*Node + ) + + // consumeNewLeaves concumes remaining `newLeaves` nodes and produce insertion `KVPair`. + consumeNewLeaves := func() error { + for _, node := range newLeaves { + if err := receiver(&KVPair{ + Key: node.key, + Value: node.value, + }); err != nil { + return err + } } - // skip subtree of shared nodes - curIter.Next(shared) + + newLeaves = nil + return nil } - if err := curIter.Error(); err != nil { - return nil, err + + // advanceSharedNode forward `curIter` until the next `sharedNode`, + // `sharedNode` will be `nil` if the new version is exhausted. + // it also records the new leaf nodes during the traversal. + advanceSharedNode := func() error { + if err := consumeNewLeaves(); err != nil { + return err + } + + sharedNode = nil + for curIter.Valid() { + node := curIter.GetNode() + shared := node.version <= prevVersion + curIter.Next(shared) + if shared { + sharedNode = node + break + } else if node.isLeaf() { + newLeaves = append(newLeaves, node) + } + } + + return nil + } + if err := advanceSharedNode(); err != nil { + return err + } + + // addOrphanedLeave receives a new orphaned leave node found in previous version, + // compare with the current newLeaves, to produce `iavl.KVPair` stream. + addOrphanedLeave := func(orphaned *Node) error { + for len(newLeaves) > 0 { + new := newLeaves[0] + switch bytes.Compare(orphaned.key, new.key) { + case 1: + // consume a new node as insertion and continue + newLeaves = newLeaves[1:] + if err := receiver(&KVPair{ + Key: new.key, + Value: new.value, + }); err != nil { + return err + } + continue + + case -1: + // removal, don't consume new nodes + return receiver(&KVPair{ + Delete: true, + Key: orphaned.key, + }) + + case 0: + // update, consume the new node and stop + newLeaves = newLeaves[1:] + return receiver(&KVPair{ + Key: new.key, + Value: new.value, + }) + } + } + + // removal + return receiver(&KVPair{ + Delete: true, + Key: orphaned.key, + }) } + // Traverse `prevIter` to find orphaned nodes in the previous version, + // and compare them with newLeaves to generate `KVPair` stream. for prevIter.Valid() { node := prevIter.GetNode() - _, shared := sharedNodes[ibytes.UnsafeBytesToStr(node.hash)] - if !shared && node.isLeaf() { - _, updated := newKeys[ibytes.UnsafeBytesToStr(node.key)] - if !updated { - changeSet = append(changeSet, KVPair{Delete: true, Key: node.key}) + shared := sharedNode != nil && (node == sharedNode || bytes.Equal(node.hash, sharedNode.hash)) + // skip sub-tree of shared nodes + prevIter.Next(shared) + if shared { + if err := advanceSharedNode(); err != nil { + return err + } + } else if node.isLeaf() { + if err := addOrphanedLeave(node); err != nil { + return err } } - prevIter.Next(shared) + } + + if err := consumeNewLeaves(); err != nil { + return err + } + + if err := curIter.Error(); err != nil { + return err } if err := prevIter.Error(); err != nil { - return nil, err + return err } - sort.Slice(changeSet, func(i, j int) bool { - return bytes.Compare(changeSet[i].Key, changeSet[j].Key) == -1 - }) - return &ChangeSet{Pairs: changeSet}, nil + return nil } diff --git a/iterator.go b/iterator.go index 384aeedfe..1ae5c8c1a 100644 --- a/iterator.go +++ b/iterator.go @@ -294,10 +294,7 @@ func (iter *NodeIterator) GetNode() *Node { // Valid checks if the validator is valid. func (iter *NodeIterator) Valid() bool { - if iter.err != nil { - return false - } - return len(iter.nodesToVisit) > 0 + return iter.err == nil && len(iter.nodesToVisit) > 0 } // Error returns an error if any errors. diff --git a/nodedb.go b/nodedb.go index 44721ca10..499059c47 100644 --- a/nodedb.go +++ b/nodedb.go @@ -1060,13 +1060,21 @@ func (ndb *nodeDB) traverseStateChanges(startVersion, endVersion int64, fn func( return ndb.traverseRange(rootKeyFormat.Key(startVersion), rootKeyFormat.Key(endVersion), func(k, hash []byte) error { var version int64 rootKeyFormat.Scan(k, &version) - changeSet, err := ndb.extractStateChanges(predecessor, prevRoot, hash) - if err != nil { + + var changeSet ChangeSet + receiveKVPair := func(pair *KVPair) error { + changeSet.Pairs = append(changeSet.Pairs, *pair) + return nil + } + + if err := ndb.extractStateChanges(predecessor, prevRoot, hash, receiveKVPair); err != nil { return err } - if err := fn(version, changeSet); err != nil { + + if err := fn(version, &changeSet); err != nil { return err } + predecessor = version prevRoot = hash return nil