Skip to content

Commit

Permalink
chore: cleanup unnecessary ndb.Commit calls (backport #902) (#921)
Browse files Browse the repository at this point in the history
Co-authored-by: cool-developer <[email protected]>
  • Loading branch information
mergify[bot] and cool-develope authored Mar 25, 2024
1 parent 162216b commit ff86011
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 92 deletions.
33 changes: 4 additions & 29 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (
)

var (
// commitGap after upgrade/delete commitGap FastNodes when commit the batch
commitGap uint64 = 5000000

// ErrVersionDoesNotExist is returned if a requested version does not exist.
ErrVersionDoesNotExist = errors.New("version does not exist")

Expand Down Expand Up @@ -577,16 +574,6 @@ func (tree *MutableTree) enableFastStorageAndCommitIfNotEnabled() (bool, error)
if err := tree.ndb.DeleteFastNode(fastItr.Key()); err != nil {
return false, err
}
if deletedFastNodes%commitGap == 0 {
if err := tree.ndb.Commit(); err != nil {
return false, err
}
}
}
if deletedFastNodes%commitGap != 0 {
if err := tree.ndb.Commit(); err != nil {
return false, err
}
}

if err := tree.enableFastStorageAndCommit(); err != nil {
Expand All @@ -607,12 +594,6 @@ func (tree *MutableTree) enableFastStorageAndCommit() error {
if err = tree.ndb.SaveFastNodeNoCache(fastnode.NewNode(itr.Key(), itr.Value(), tree.version)); err != nil {
return err
}
if upgradedFastNodes%commitGap == 0 {
err := tree.ndb.Commit()
if err != nil {
return err
}
}
}

if err = itr.Error(); err != nil {
Expand Down Expand Up @@ -710,7 +691,6 @@ func (tree *MutableTree) GetVersioned(key []byte, version int64) ([]byte, error)
// SaveVersion saves a new tree version to disk, based on the current state of
// the tree. Returns the hash and new version number.
func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
isGenesis := (tree.version == 0)
version := tree.WorkingVersion()

if tree.VersionExists(version) {
Expand Down Expand Up @@ -745,7 +725,7 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {

// save new fast nodes
if !tree.skipFastStorageUpgrade {
if err := tree.saveFastNodeVersion(version, isGenesis); err != nil {
if err := tree.saveFastNodeVersion(version); err != nil {
return nil, version, err
}
}
Expand Down Expand Up @@ -794,8 +774,8 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
return tree.Hash(), version, nil
}

func (tree *MutableTree) saveFastNodeVersion(latestVersion int64, isGenesis bool) error {
if err := tree.saveFastNodeAdditions(isGenesis); err != nil {
func (tree *MutableTree) saveFastNodeVersion(latestVersion int64) error {
if err := tree.saveFastNodeAdditions(); err != nil {
return err
}
if err := tree.saveFastNodeRemovals(); err != nil {
Expand Down Expand Up @@ -831,7 +811,7 @@ func (tree *MutableTree) addUnsavedAddition(key []byte, node *fastnode.Node) {
tree.unsavedFastNodeAdditions.Store(skey, node)
}

func (tree *MutableTree) saveFastNodeAdditions(batchCommmit bool) error {
func (tree *MutableTree) saveFastNodeAdditions() error {
keysToSort := make([]string, 0)
tree.unsavedFastNodeAdditions.Range(func(k, v interface{}) bool {
keysToSort = append(keysToSort, k.(string))
Expand All @@ -844,11 +824,6 @@ func (tree *MutableTree) saveFastNodeAdditions(batchCommmit bool) error {
if err := tree.ndb.SaveFastNode(val.(*fastnode.Node)); err != nil {
return err
}
if batchCommmit {
if err := tree.ndb.resetBatch(); err != nil {
return err
}
}
}
return nil
}
Expand Down
34 changes: 13 additions & 21 deletions mutable_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_
// dbMock represents the underlying database under the hood of nodeDB
dbMock.EXPECT().Get(gomock.Any()).Return(expectedStorageVersion, nil).Times(1)

dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(3)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(2)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1) // called to get latest version
startFormat := fastKeyFormat.Key()
endFormat := fastKeyFormat.Key()
Expand All @@ -940,8 +940,8 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_
batchMock.EXPECT().GetByteSize().Return(100, nil).Times(2)
batchMock.EXPECT().Delete(fastKeyFormat.Key(fastNodeKeyToDelete)).Return(nil).Times(1)
batchMock.EXPECT().Set(metadataKeyFormat.Key([]byte(storageVersionKey)), updatedExpectedStorageVersion).Return(nil).Times(1)
batchMock.EXPECT().Write().Return(nil).Times(2)
batchMock.EXPECT().Close().Return(nil).Times(2)
batchMock.EXPECT().Write().Return(nil).Times(1)
batchMock.EXPECT().Close().Return(nil).Times(1)

// iterMock is used to mock the underlying db iterator behing fast iterator
// Here, we want to mock the behavior of deleting fast nodes from disk when
Expand Down Expand Up @@ -1124,11 +1124,7 @@ func TestUpgradeStorageToFast_Integration_Upgraded_GetFast_Success(t *testing.T)
}

func TestUpgradeStorageToFast_Success(t *testing.T) {
tmpCommitGap := commitGap
commitGap = 1000
defer func() {
commitGap = tmpCommitGap
}()
commitGap := 1000

type fields struct {
nodeCount int
Expand All @@ -1138,10 +1134,10 @@ func TestUpgradeStorageToFast_Success(t *testing.T) {
fields fields
}{
{"less than commit gap", fields{nodeCount: 100}},
{"equal to commit gap", fields{nodeCount: int(commitGap)}},
{"great than commit gap", fields{nodeCount: int(commitGap) + 100}},
{"two times commit gap", fields{nodeCount: int(commitGap) * 2}},
{"two times plus commit gap", fields{nodeCount: int(commitGap)*2 + 1}},
{"equal to commit gap", fields{nodeCount: commitGap}},
{"great than commit gap", fields{nodeCount: commitGap + 100}},
{"two times commit gap", fields{nodeCount: commitGap * 2}},
{"two times plus commit gap", fields{nodeCount: commitGap*2 + 1}},
}

for _, tt := range tests {
Expand All @@ -1164,11 +1160,7 @@ func TestUpgradeStorageToFast_Success(t *testing.T) {

func TestUpgradeStorageToFast_Delete_Stale_Success(t *testing.T) {
// we delete fast node, in case of deadlock. we should limit the stale count lower than chBufferSize(64)
tmpCommitGap := commitGap
commitGap = 5
defer func() {
commitGap = tmpCommitGap
}()
commitGap := 5

valStale := "val_stale"
addStaleKey := func(ndb *nodeDB, staleCount int) {
Expand Down Expand Up @@ -1197,10 +1189,10 @@ func TestUpgradeStorageToFast_Delete_Stale_Success(t *testing.T) {
fields fields
}{
{"stale less than commit gap", fields{nodeCount: 100, staleCount: 4}},
{"stale equal to commit gap", fields{nodeCount: int(commitGap), staleCount: int(commitGap)}},
{"stale great than commit gap", fields{nodeCount: int(commitGap) + 100, staleCount: int(commitGap)*2 - 1}},
{"stale twice commit gap", fields{nodeCount: int(commitGap) + 100, staleCount: int(commitGap) * 2}},
{"stale great than twice commit gap", fields{nodeCount: int(commitGap), staleCount: int(commitGap)*2 + 1}},
{"stale equal to commit gap", fields{nodeCount: commitGap, staleCount: commitGap}},
{"stale great than commit gap", fields{nodeCount: commitGap + 100, staleCount: commitGap*2 - 1}},
{"stale twice commit gap", fields{nodeCount: commitGap + 100, staleCount: commitGap * 2}},
{"stale great than twice commit gap", fields{nodeCount: commitGap, staleCount: commitGap*2 + 1}},
}

for _, tt := range tests {
Expand Down
42 changes: 0 additions & 42 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,6 @@ func (ndb *nodeDB) SaveNode(node *Node) error {
return err
}

// resetBatch only working on generate a genesis block
if node.nodeKey.version <= genesisVersion {
if err := ndb.resetBatch(); err != nil {
return err
}
}

ndb.logger.Debug("BATCH SAVE", "node", node)
ndb.nodeCache.Add(node)
return nil
Expand Down Expand Up @@ -338,41 +331,6 @@ func (ndb *nodeDB) Has(nk []byte) (bool, error) {
return ndb.db.Has(ndb.nodeKey(nk))
}

// resetBatch reset the db batch, keep low memory used
func (ndb *nodeDB) resetBatch() error {
size, err := ndb.batch.GetByteSize()
if err != nil {
// just don't do an optimization here. write with batch size 1.
return ndb.writeBatch()
}
// write in ~64kb chunks. if less than 64kb, continue.
if size < 64*1024 {
return nil
}

return ndb.writeBatch()
}

func (ndb *nodeDB) writeBatch() error {
var err error
if ndb.opts.Sync {
err = ndb.batch.WriteSync()
} else {
err = ndb.batch.Write()
}
if err != nil {
return err
}
err = ndb.batch.Close()
if err != nil {
return err
}

ndb.batch = NewBatchWithFlusher(ndb.db, ndb.opts.FlushThreshold)

return nil
}

// deleteVersion deletes a tree version from disk.
// deletes orphans
func (ndb *nodeDB) deleteVersion(version int64) error {
Expand Down

0 comments on commit ff86011

Please sign in to comment.