diff --git a/mutable_tree.go b/mutable_tree.go index 0c14af666..078c6eb2b 100644 --- a/mutable_tree.go +++ b/mutable_tree.go @@ -4,8 +4,10 @@ import ( "bytes" "crypto/sha256" "fmt" + "runtime" "sort" "sync" + "time" "github.com/pkg/errors" @@ -528,12 +530,17 @@ func (tree *MutableTree) enableFastStorageAndCommitIfNotEnabled() (bool, error) // Therefore, there might exist stale fast nodes on disk. As a result, to avoid persisting the stale state, it might // be worth to delete the fast nodes from disk. fastItr := NewFastIterator(nil, nil, true, tree.ndb) + defer fastItr.Close() for ; fastItr.Valid(); fastItr.Next() { - tree.ndb.DeleteFastNode(fastItr.Key()) + if err := tree.ndb.DeleteFastNode(fastItr.Key()); err != nil { + return false, err + } } - fastItr.Close() } + // Force garbage collection before we proceed to enabling fast storage. + runtime.GC() + if err := tree.enableFastStorageAndCommit(); err != nil { tree.ndb.storageVersion = defaultStorageVersionValue return false, err @@ -558,10 +565,45 @@ func (tree *MutableTree) enableFastStorageAndCommit() error { } }() + // We start a new thread to keep on checking if we are above 4GB, and if so garbage collect. + // This thread only lasts during the fast node migration. + // This is done to keep RAM usage down. + done := make(chan struct{}) + defer func() { + done <- struct{}{} + close(done) + }() + + go func () { + timer := time.NewTimer(time.Second) + var m runtime.MemStats + + for { + // Sample the current memory usage + runtime.ReadMemStats(&m) + + if m.Alloc > 4 * 1024 * 1024 * 1024 { + // If we are using more than 4GB of memory, we should trigger garbage collection + // to free up some memory. + runtime.GC() + } + + select { + case <-timer.C: + timer.Reset(time.Second) + case <-done: + if !timer.Stop() { + <-timer.C + } + return + } + } + }() + itr := NewIterator(nil, nil, true, tree.ImmutableTree) defer itr.Close() for ; itr.Valid(); itr.Next() { - if err = tree.ndb.SaveFastNode(NewFastNode(itr.Key(), itr.Value(), tree.version)); err != nil { + if err = tree.ndb.SaveFastNodeNoCache(NewFastNode(itr.Key(), itr.Value(), tree.version)); err != nil { return err } } diff --git a/nodedb.go b/nodedb.go index c1843a7b4..7bee811c5 100644 --- a/nodedb.go +++ b/nodedb.go @@ -215,11 +215,18 @@ func (ndb *nodeDB) SaveNode(node *Node) { ndb.cacheNode(node) } -// SaveNode saves a FastNode to disk. +// SaveNode saves a FastNode to disk and add to cache. func (ndb *nodeDB) SaveFastNode(node *FastNode) error { ndb.mtx.Lock() defer ndb.mtx.Unlock() - return ndb.saveFastNodeUnlocked(node) + return ndb.saveFastNodeUnlocked(node, true) +} + +// SaveNode saves a FastNode to disk without adding to cache. +func (ndb *nodeDB) SaveFastNodeNoCache(node *FastNode) error { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + return ndb.saveFastNodeUnlocked(node, false) } // setFastStorageVersionToBatch sets storage version to fast where the version is @@ -274,7 +281,7 @@ func (ndb *nodeDB) shouldForceFastStorageUpgrade() bool { } // SaveNode saves a FastNode to disk. -func (ndb *nodeDB) saveFastNodeUnlocked(node *FastNode) error { +func (ndb *nodeDB) saveFastNodeUnlocked(node *FastNode, shouldAddToCache bool) error { if node.key == nil { return fmt.Errorf("FastNode cannot have a nil value for key") } @@ -290,8 +297,9 @@ func (ndb *nodeDB) saveFastNodeUnlocked(node *FastNode) error { if err := ndb.batch.Set(ndb.fastNodeKey(node.key), buf.Bytes()); err != nil { return fmt.Errorf("error while writing key/val to nodedb batch. Err: %w", err) } - debug("BATCH SAVE %X %p\n", node.key, node) - ndb.cacheFastNode(node) + if shouldAddToCache { + ndb.cacheFastNode(node) + } return nil }