diff --git a/mutable_tree.go b/mutable_tree.go index f374a0e..293090b 100644 --- a/mutable_tree.go +++ b/mutable_tree.go @@ -28,16 +28,19 @@ var ErrVersionDoesNotExist = errors.New("version does not exist") // // The inner ImmutableTree should not be used directly by callers. type MutableTree struct { - *ImmutableTree // The current, working tree. - lastSaved *ImmutableTree // The most recently saved tree. - orphans map[string]int64 // Nodes removed by changes to working tree. - versions map[int64]bool // The previous, saved versions of the tree. - allRootLoaded bool // Whether all roots are loaded or not(by LazyLoadVersion) - unsavedFastNodeAdditions map[string]*FastNode // FastNodes that have not yet been saved to disk - unsavedFastNodeRemovals map[string]interface{} // FastNodes that have not yet been removed from disk - ndb *nodeDB - skipFastStorageUpgrade bool // If true, the tree will work like no fast storage and always not upgrade fast storage - mtx *sync.Mutex + *ImmutableTree // The current, working tree. + lastSaved *ImmutableTree // The most recently saved tree. + orphans map[string]int64 // Nodes removed by changes to working tree. + versions map[int64]bool // The previous, saved versions of the tree. + allRootLoaded bool // Whether all roots are loaded or not(by LazyLoadVersion) + unsavedFastNodeAdditions map[string]*FastNode // FastNodes that have not yet been saved to disk + unsavedFastNodeRemovals map[string]interface{} // FastNodes that have not yet been removed from disk + ndb *nodeDB + skipFastStorageUpgrade bool // If true, the tree will work like no fast storage and always not upgrade fast storage + separateOrphanStorage bool + separateOrphanVersionsToKeep int64 + orphandb *orphanDB + mtx *sync.Mutex } // NewMutableTree returns a new tree with the specified cache size and datastore. @@ -49,18 +52,29 @@ func NewMutableTree(db dbm.DB, cacheSize int, skipFastStorageUpgrade bool) (*Mut func NewMutableTreeWithOpts(db dbm.DB, cacheSize int, opts *Options, skipFastStorageUpgrade bool) (*MutableTree, error) { ndb := newNodeDB(db, cacheSize, opts) head := &ImmutableTree{ndb: ndb, skipFastStorageUpgrade: skipFastStorageUpgrade} + if opts == nil { + defaultOpts := DefaultOptions() + opts = &defaultOpts + } + var orphandb *orphanDB + if opts.SeparateOrphanStorage { + orphandb = NewOrphanDB(opts) + } return &MutableTree{ - ImmutableTree: head, - lastSaved: head.clone(), - orphans: map[string]int64{}, - versions: map[int64]bool{}, - allRootLoaded: false, - unsavedFastNodeAdditions: make(map[string]*FastNode), - unsavedFastNodeRemovals: make(map[string]interface{}), - ndb: ndb, - skipFastStorageUpgrade: skipFastStorageUpgrade, - mtx: &sync.Mutex{}, + ImmutableTree: head, + lastSaved: head.clone(), + orphans: map[string]int64{}, + versions: map[int64]bool{}, + allRootLoaded: false, + unsavedFastNodeAdditions: make(map[string]*FastNode), + unsavedFastNodeRemovals: make(map[string]interface{}), + ndb: ndb, + skipFastStorageUpgrade: skipFastStorageUpgrade, + separateOrphanStorage: opts.SeparateOrphanStorage, + separateOrphanVersionsToKeep: opts.SeparateOphanVersionsToKeep, + orphandb: orphandb, + mtx: &sync.Mutex{}, }, nil } @@ -951,12 +965,44 @@ func (tree *MutableTree) saveFastNodeVersion() error { return tree.ndb.setFastStorageVersionToBatch() } +func (tree *MutableTree) handleOrphans(version int64) error { + if !tree.separateOrphanStorage { + // store orphan in the same levelDB as application data + return tree.ndb.SaveOrphans(version, tree.orphans) + } + + if tree.separateOrphanVersionsToKeep == 0 { + panic("must keep at least one version") + } + + // optimization for the 1 version case so that we don't have to save and immediately delete the same version + if tree.separateOrphanVersionsToKeep == 1 { + for orphan := range tree.orphans { + if err := tree.ndb.deleteOrphanedData([]byte(orphan)); err != nil { + return err + } + } + return nil + } + + if err := tree.orphandb.SaveOrphans(version, tree.orphans); err != nil { + return err + } + oldOrphans := tree.orphandb.GetOrphans(version - tree.separateOrphanVersionsToKeep + 1) + for orphan := range oldOrphans { + if err := tree.ndb.deleteOrphanedData([]byte(orphan)); err != nil { + return err + } + } + return tree.orphandb.DeleteOrphans(version - tree.separateOrphanVersionsToKeep + 1) +} + func (tree *MutableTree) commitVersion(version int64, silentSaveRootError bool) (int64, error) { if tree.root == nil { // There can still be orphans, for example if the root is the node being // removed. logger.Debug("SAVE EMPTY TREE %v\n", version) - if err := tree.ndb.SaveOrphans(version, tree.orphans); err != nil { + if err := tree.handleOrphans(version); err != nil { return 0, err } if err := tree.ndb.SaveEmptyRoot(version); !silentSaveRootError && err != nil { @@ -967,7 +1013,7 @@ func (tree *MutableTree) commitVersion(version int64, silentSaveRootError bool) if _, err := tree.ndb.SaveBranch(tree.root); err != nil { return 0, err } - if err := tree.ndb.SaveOrphans(version, tree.orphans); err != nil { + if err := tree.handleOrphans(version); err != nil { return 0, err } if err := tree.ndb.SaveRoot(tree.root, version); !silentSaveRootError && err != nil { @@ -1070,10 +1116,8 @@ func (tree *MutableTree) SetInitialVersion(version uint64) { func (tree *MutableTree) DeleteVersions(versions ...int64) error { logger.Debug("DELETING VERSIONS: %v\n", versions) - if tree.ndb.ShouldNotUseVersion() { - // no need to delete versions since there is no version to be - // deleted except the current one, which shouldn't be deleted - // in any circumstance + if tree.separateOrphanStorage { + // no need to delete versions if we are keeping orphans separately return nil } diff --git a/nodedb.go b/nodedb.go index 49bc846..18bd44a 100644 --- a/nodedb.go +++ b/nodedb.go @@ -634,18 +634,6 @@ func (ndb *nodeDB) SaveOrphans(version int64, orphans map[string]int64) error { ndb.mtx.Lock() defer ndb.mtx.Unlock() - // instead of saving orphan metadata and later read orphan metadata->delete - // orphan data->delete orphan metadata, we directly delete orphan data here - // without doing anything for orphan metadata, if versioning is not needed. - if ndb.ShouldNotUseVersion() { - for orphan := range orphans { - if err := ndb.deleteOrphanedData([]byte(orphan)); err != nil { - return err - } - } - return nil - } - toVersion, err := ndb.getPreviousVersion(version) if err != nil { return err @@ -1073,10 +1061,6 @@ func (ndb *nodeDB) traverseNodes(fn func(hash []byte, node *Node) error) error { return nil } -func (ndb *nodeDB) ShouldNotUseVersion() bool { - return ndb.opts.NoVersioning -} - func (ndb *nodeDB) String() (string, error) { buf := bufPool.Get().(*bytes.Buffer) defer bufPool.Put(buf) diff --git a/options.go b/options.go index 58dfcab..e42defd 100644 --- a/options.go +++ b/options.go @@ -82,12 +82,26 @@ type Options struct { // When Stat is not nil, statistical logic needs to be executed Stat *Statistics - // When set to true, the DB will only keep the most recent version and immediately delete - // obsolete data upon new data's commit - NoVersioning bool + // When true, orphan data will be stored in separate directory than application data, and + // the pruning of application data will happen during commit (rather than after commit) + SeparateOrphanStorage bool + + // Only meaningful if SeparateOrphanStorage is true. + // The number of application data versions to keep in the application database. + SeparateOphanVersionsToKeep int64 + + // Only meaningful if SeparateOrphanStorage is true. + // The max number of orphan entries to store in the separate orphan files. + NumOrphansPerFile int + + // Only meaningful if SeparateOrphanStorage is true. + // The directory to store orphan files. + OrphanDirectory string } // DefaultOptions returns the default options for IAVL. func DefaultOptions() Options { - return Options{} + return Options{ + NumOrphansPerFile: 100000, + } } diff --git a/orphandb.go b/orphandb.go new file mode 100644 index 0000000..fb714e1 --- /dev/null +++ b/orphandb.go @@ -0,0 +1,75 @@ +package iavl + +import ( + "fmt" + "io/fs" + "io/ioutil" + "os" + "path" + "strings" +) + +type orphanDB struct { + cache map[int64]map[string]int64 // key: version, value: orphans + directory string + numOrphansPerFile int +} + +func NewOrphanDB(opts *Options) *orphanDB { + return &orphanDB{ + cache: map[int64]map[string]int64{}, + directory: opts.OrphanDirectory, + numOrphansPerFile: opts.NumOrphansPerFile, + } +} + +func (o *orphanDB) SaveOrphans(version int64, orphans map[string]int64) error { + o.cache[version] = orphans + chunks := [][]string{{}} + for orphan := range orphans { + if len(chunks[len(chunks)-1]) == o.numOrphansPerFile { + chunks = append(chunks, []string{}) + } + chunks[len(chunks)-1] = append(chunks[len(chunks)-1], orphan) + } + dir := path.Join(o.directory, fmt.Sprintf("%d", version)) + os.RemoveAll(dir) + os.MkdirAll(dir, fs.ModePerm) + for i, chunk := range chunks { + f, err := os.Create(path.Join(dir, fmt.Sprintf("%d", i))) + if err != nil { + return err + } + f.WriteString(strings.Join(chunk, "\n")) + f.Close() + } + return nil +} + +func (o *orphanDB) GetOrphans(version int64) map[string]int64 { + if _, ok := o.cache[version]; !ok { + o.cache[version] = map[string]int64{} + dir := path.Join(o.directory, fmt.Sprintf("%d", version)) + files, err := ioutil.ReadDir(dir) + if err != nil { + // no orphans found + return o.cache[version] + } + for _, file := range files { + content, err := ioutil.ReadFile(path.Join(dir, file.Name())) + if err != nil { + return o.cache[version] + } + for _, orphan := range strings.Split(string(content), "\n") { + o.cache[version][orphan] = version + } + } + } + return o.cache[version] +} + +func (o *orphanDB) DeleteOrphans(version int64) error { + delete(o.cache, version) + dir := path.Join(o.directory, fmt.Sprintf("%d", version)) + return os.RemoveAll(dir) +} diff --git a/orphandb_test.go b/orphandb_test.go new file mode 100644 index 0000000..51a7dd8 --- /dev/null +++ b/orphandb_test.go @@ -0,0 +1,73 @@ +package iavl + +import ( + "fmt" + "io/ioutil" + "path" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestOrphanDBSaveGet(t *testing.T) { + dir := t.TempDir() + db := NewOrphanDB(&Options{ + NumOrphansPerFile: 2, + OrphanDirectory: dir, + }) + err := db.SaveOrphans(123, map[string]int64{ + "o1": 123, + "o2": 123, + "o3": 123, + }) + require.Nil(t, err) + files, err := ioutil.ReadDir(path.Join(dir, fmt.Sprintf("%d", 123))) + require.Nil(t, err) + require.Equal(t, 2, len(files)) // 3 orphans would result in 2 files + orphans := db.GetOrphans(123) + require.Equal(t, map[string]int64{ + "o1": 123, + "o2": 123, + "o3": 123, + }, orphans) + orphans = db.GetOrphans(456) // not exist + require.Equal(t, map[string]int64{}, orphans) + + // flush cache + db = NewOrphanDB(&Options{ + NumOrphansPerFile: 2, + OrphanDirectory: dir, + }) + orphans = db.GetOrphans(123) // would load from disk + require.Equal(t, map[string]int64{ + "o1": 123, + "o2": 123, + "o3": 123, + }, orphans) +} + +func TestOrphanDelete(t *testing.T) { + dir := t.TempDir() + db := NewOrphanDB(&Options{ + NumOrphansPerFile: 2, + OrphanDirectory: dir, + }) + err := db.SaveOrphans(123, map[string]int64{ + "o1": 123, + "o2": 123, + "o3": 123, + }) + require.Nil(t, err) + err = db.DeleteOrphans(123) + require.Nil(t, err) + orphans := db.GetOrphans(123) // not exist in cache + require.Equal(t, map[string]int64{}, orphans) + + // flush cache + db = NewOrphanDB(&Options{ + NumOrphansPerFile: 2, + OrphanDirectory: dir, + }) + orphans = db.GetOrphans(123) // would load from disk + require.Equal(t, map[string]int64{}, orphans) +}