Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate orphan storage #32

Merged
merged 2 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 70 additions & 26 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ var ErrVersionDoesNotExist = errors.New("version does not exist")
//
// The inner ImmutableTree should not be used directly by callers.
type MutableTree struct {
*ImmutableTree // The current, working tree.
lastSaved *ImmutableTree // The most recently saved tree.
orphans map[string]int64 // Nodes removed by changes to working tree.
versions map[int64]bool // The previous, saved versions of the tree.
allRootLoaded bool // Whether all roots are loaded or not(by LazyLoadVersion)
unsavedFastNodeAdditions map[string]*FastNode // FastNodes that have not yet been saved to disk
unsavedFastNodeRemovals map[string]interface{} // FastNodes that have not yet been removed from disk
ndb *nodeDB
skipFastStorageUpgrade bool // If true, the tree will work like no fast storage and always not upgrade fast storage
mtx *sync.Mutex
*ImmutableTree // The current, working tree.
lastSaved *ImmutableTree // The most recently saved tree.
orphans map[string]int64 // Nodes removed by changes to working tree.
versions map[int64]bool // The previous, saved versions of the tree.
allRootLoaded bool // Whether all roots are loaded or not(by LazyLoadVersion)
unsavedFastNodeAdditions map[string]*FastNode // FastNodes that have not yet been saved to disk
unsavedFastNodeRemovals map[string]interface{} // FastNodes that have not yet been removed from disk
ndb *nodeDB
skipFastStorageUpgrade bool // If true, the tree will work like no fast storage and always not upgrade fast storage
separateOrphanStorage bool
separateOrphanVersionsToKeep int64
orphandb *orphanDB
mtx *sync.Mutex
}

// NewMutableTree returns a new tree with the specified cache size and datastore.
Expand All @@ -49,18 +52,29 @@ func NewMutableTree(db dbm.DB, cacheSize int, skipFastStorageUpgrade bool) (*Mut
func NewMutableTreeWithOpts(db dbm.DB, cacheSize int, opts *Options, skipFastStorageUpgrade bool) (*MutableTree, error) {
ndb := newNodeDB(db, cacheSize, opts)
head := &ImmutableTree{ndb: ndb, skipFastStorageUpgrade: skipFastStorageUpgrade}
if opts == nil {
defaultOpts := DefaultOptions()
opts = &defaultOpts
}
var orphandb *orphanDB
if opts.SeparateOrphanStorage {
orphandb = NewOrphanDB(opts)
}

return &MutableTree{
ImmutableTree: head,
lastSaved: head.clone(),
orphans: map[string]int64{},
versions: map[int64]bool{},
allRootLoaded: false,
unsavedFastNodeAdditions: make(map[string]*FastNode),
unsavedFastNodeRemovals: make(map[string]interface{}),
ndb: ndb,
skipFastStorageUpgrade: skipFastStorageUpgrade,
mtx: &sync.Mutex{},
ImmutableTree: head,
lastSaved: head.clone(),
orphans: map[string]int64{},
versions: map[int64]bool{},
allRootLoaded: false,
unsavedFastNodeAdditions: make(map[string]*FastNode),
unsavedFastNodeRemovals: make(map[string]interface{}),
ndb: ndb,
skipFastStorageUpgrade: skipFastStorageUpgrade,
separateOrphanStorage: opts.SeparateOrphanStorage,
separateOrphanVersionsToKeep: opts.SeparateOphanVersionsToKeep,
orphandb: orphandb,
mtx: &sync.Mutex{},
}, nil
}

Expand Down Expand Up @@ -951,12 +965,44 @@ func (tree *MutableTree) saveFastNodeVersion() error {
return tree.ndb.setFastStorageVersionToBatch()
}

func (tree *MutableTree) handleOrphans(version int64) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this function threadsafe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not threadsafe, but it would not run concurrently

if !tree.separateOrphanStorage {
// store orphan in the same levelDB as application data
return tree.ndb.SaveOrphans(version, tree.orphans)
}

if tree.separateOrphanVersionsToKeep == 0 {
panic("must keep at least one version")
}

// optimization for the 1 version case so that we don't have to save and immediately delete the same version
if tree.separateOrphanVersionsToKeep == 1 {
for orphan := range tree.orphans {
if err := tree.ndb.deleteOrphanedData([]byte(orphan)); err != nil {
return err
}
}
return nil
}

if err := tree.orphandb.SaveOrphans(version, tree.orphans); err != nil {
return err
}
oldOrphans := tree.orphandb.GetOrphans(version - tree.separateOrphanVersionsToKeep + 1)
for orphan := range oldOrphans {
if err := tree.ndb.deleteOrphanedData([]byte(orphan)); err != nil {
return err
}
}
return tree.orphandb.DeleteOrphans(version - tree.separateOrphanVersionsToKeep + 1)
}

func (tree *MutableTree) commitVersion(version int64, silentSaveRootError bool) (int64, error) {
if tree.root == nil {
// There can still be orphans, for example if the root is the node being
// removed.
logger.Debug("SAVE EMPTY TREE %v\n", version)
if err := tree.ndb.SaveOrphans(version, tree.orphans); err != nil {
if err := tree.handleOrphans(version); err != nil {
return 0, err
}
if err := tree.ndb.SaveEmptyRoot(version); !silentSaveRootError && err != nil {
Expand All @@ -967,7 +1013,7 @@ func (tree *MutableTree) commitVersion(version int64, silentSaveRootError bool)
if _, err := tree.ndb.SaveBranch(tree.root); err != nil {
return 0, err
}
if err := tree.ndb.SaveOrphans(version, tree.orphans); err != nil {
if err := tree.handleOrphans(version); err != nil {
return 0, err
}
if err := tree.ndb.SaveRoot(tree.root, version); !silentSaveRootError && err != nil {
Expand Down Expand Up @@ -1070,10 +1116,8 @@ func (tree *MutableTree) SetInitialVersion(version uint64) {
func (tree *MutableTree) DeleteVersions(versions ...int64) error {
logger.Debug("DELETING VERSIONS: %v\n", versions)

if tree.ndb.ShouldNotUseVersion() {
// no need to delete versions since there is no version to be
// deleted except the current one, which shouldn't be deleted
// in any circumstance
if tree.separateOrphanStorage {
// no need to delete versions if we are keeping orphans separately
return nil
}

Expand Down
16 changes: 0 additions & 16 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,18 +634,6 @@ func (ndb *nodeDB) SaveOrphans(version int64, orphans map[string]int64) error {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()

// instead of saving orphan metadata and later read orphan metadata->delete
// orphan data->delete orphan metadata, we directly delete orphan data here
// without doing anything for orphan metadata, if versioning is not needed.
if ndb.ShouldNotUseVersion() {
for orphan := range orphans {
if err := ndb.deleteOrphanedData([]byte(orphan)); err != nil {
return err
}
}
return nil
}

toVersion, err := ndb.getPreviousVersion(version)
if err != nil {
return err
Expand Down Expand Up @@ -1073,10 +1061,6 @@ func (ndb *nodeDB) traverseNodes(fn func(hash []byte, node *Node) error) error {
return nil
}

func (ndb *nodeDB) ShouldNotUseVersion() bool {
return ndb.opts.NoVersioning
}

func (ndb *nodeDB) String() (string, error) {
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
Expand Down
22 changes: 18 additions & 4 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,26 @@ type Options struct {
// When Stat is not nil, statistical logic needs to be executed
Stat *Statistics

// When set to true, the DB will only keep the most recent version and immediately delete
// obsolete data upon new data's commit
NoVersioning bool
// When true, orphan data will be stored in separate directory than application data, and
// the pruning of application data will happen during commit (rather than after commit)
SeparateOrphanStorage bool

// Only meaningful if SeparateOrphanStorage is true.
// The number of application data versions to keep in the application database.
SeparateOphanVersionsToKeep int64

// Only meaningful if SeparateOrphanStorage is true.
// The max number of orphan entries to store in the separate orphan files.
NumOrphansPerFile int

// Only meaningful if SeparateOrphanStorage is true.
// The directory to store orphan files.
OrphanDirectory string
}

// DefaultOptions returns the default options for IAVL.
func DefaultOptions() Options {
return Options{}
return Options{
NumOrphansPerFile: 100000,
}
}
75 changes: 75 additions & 0 deletions orphandb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package iavl

import (
"fmt"
"io/fs"
"io/ioutil"
"os"
"path"
"strings"
)

type orphanDB struct {
cache map[int64]map[string]int64 // key: version, value: orphans
directory string
numOrphansPerFile int
}

func NewOrphanDB(opts *Options) *orphanDB {
return &orphanDB{
cache: map[int64]map[string]int64{},
directory: opts.OrphanDirectory,
numOrphansPerFile: opts.NumOrphansPerFile,
}
}

func (o *orphanDB) SaveOrphans(version int64, orphans map[string]int64) error {
o.cache[version] = orphans
chunks := [][]string{{}}
for orphan := range orphans {
if len(chunks[len(chunks)-1]) == o.numOrphansPerFile {
chunks = append(chunks, []string{})
}
chunks[len(chunks)-1] = append(chunks[len(chunks)-1], orphan)
}
dir := path.Join(o.directory, fmt.Sprintf("%d", version))
os.RemoveAll(dir)
os.MkdirAll(dir, fs.ModePerm)
for i, chunk := range chunks {
f, err := os.Create(path.Join(dir, fmt.Sprintf("%d", i)))
if err != nil {
return err
}
f.WriteString(strings.Join(chunk, "\n"))
f.Close()
}
return nil
}

func (o *orphanDB) GetOrphans(version int64) map[string]int64 {
if _, ok := o.cache[version]; !ok {
o.cache[version] = map[string]int64{}
dir := path.Join(o.directory, fmt.Sprintf("%d", version))
files, err := ioutil.ReadDir(dir)
if err != nil {
// no orphans found
return o.cache[version]
}
for _, file := range files {
content, err := ioutil.ReadFile(path.Join(dir, file.Name()))
if err != nil {
return o.cache[version]
}
for _, orphan := range strings.Split(string(content), "\n") {
o.cache[version][orphan] = version
}
}
}
return o.cache[version]
}

func (o *orphanDB) DeleteOrphans(version int64) error {
delete(o.cache, version)
dir := path.Join(o.directory, fmt.Sprintf("%d", version))
return os.RemoveAll(dir)
}
73 changes: 73 additions & 0 deletions orphandb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package iavl

import (
"fmt"
"io/ioutil"
"path"
"testing"

"github.com/stretchr/testify/require"
)

func TestOrphanDBSaveGet(t *testing.T) {
dir := t.TempDir()
db := NewOrphanDB(&Options{
NumOrphansPerFile: 2,
OrphanDirectory: dir,
})
err := db.SaveOrphans(123, map[string]int64{
"o1": 123,
"o2": 123,
"o3": 123,
})
require.Nil(t, err)
files, err := ioutil.ReadDir(path.Join(dir, fmt.Sprintf("%d", 123)))
require.Nil(t, err)
require.Equal(t, 2, len(files)) // 3 orphans would result in 2 files
orphans := db.GetOrphans(123)
require.Equal(t, map[string]int64{
"o1": 123,
"o2": 123,
"o3": 123,
}, orphans)
orphans = db.GetOrphans(456) // not exist
require.Equal(t, map[string]int64{}, orphans)

// flush cache
db = NewOrphanDB(&Options{
NumOrphansPerFile: 2,
OrphanDirectory: dir,
})
orphans = db.GetOrphans(123) // would load from disk
require.Equal(t, map[string]int64{
"o1": 123,
"o2": 123,
"o3": 123,
}, orphans)
}

func TestOrphanDelete(t *testing.T) {
dir := t.TempDir()
db := NewOrphanDB(&Options{
NumOrphansPerFile: 2,
OrphanDirectory: dir,
})
err := db.SaveOrphans(123, map[string]int64{
"o1": 123,
"o2": 123,
"o3": 123,
})
require.Nil(t, err)
err = db.DeleteOrphans(123)
require.Nil(t, err)
orphans := db.GetOrphans(123) // not exist in cache
require.Equal(t, map[string]int64{}, orphans)

// flush cache
db = NewOrphanDB(&Options{
NumOrphansPerFile: 2,
OrphanDirectory: dir,
})
orphans = db.GetOrphans(123) // would load from disk
require.Equal(t, map[string]int64{}, orphans)
}