Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cleanup unnecessary ndb.Commit calls (backport #902) #921

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 4 additions & 29 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (
)

var (
// commitGap after upgrade/delete commitGap FastNodes when commit the batch
commitGap uint64 = 5000000

// ErrVersionDoesNotExist is returned if a requested version does not exist.
ErrVersionDoesNotExist = errors.New("version does not exist")

Expand Down Expand Up @@ -577,16 +574,6 @@ func (tree *MutableTree) enableFastStorageAndCommitIfNotEnabled() (bool, error)
if err := tree.ndb.DeleteFastNode(fastItr.Key()); err != nil {
return false, err
}
if deletedFastNodes%commitGap == 0 {
if err := tree.ndb.Commit(); err != nil {
return false, err
}
}
}
if deletedFastNodes%commitGap != 0 {
if err := tree.ndb.Commit(); err != nil {
return false, err
}
}

if err := tree.enableFastStorageAndCommit(); err != nil {
Expand All @@ -607,12 +594,6 @@ func (tree *MutableTree) enableFastStorageAndCommit() error {
if err = tree.ndb.SaveFastNodeNoCache(fastnode.NewNode(itr.Key(), itr.Value(), tree.version)); err != nil {
return err
}
if upgradedFastNodes%commitGap == 0 {
err := tree.ndb.Commit()
if err != nil {
return err
}
}
}

if err = itr.Error(); err != nil {
Expand Down Expand Up @@ -710,7 +691,6 @@ func (tree *MutableTree) GetVersioned(key []byte, version int64) ([]byte, error)
// SaveVersion saves a new tree version to disk, based on the current state of
// the tree. Returns the hash and new version number.
func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
isGenesis := (tree.version == 0)
version := tree.WorkingVersion()

if tree.VersionExists(version) {
Expand Down Expand Up @@ -745,7 +725,7 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {

// save new fast nodes
if !tree.skipFastStorageUpgrade {
if err := tree.saveFastNodeVersion(version, isGenesis); err != nil {
if err := tree.saveFastNodeVersion(version); err != nil {
return nil, version, err
}
}
Expand Down Expand Up @@ -794,8 +774,8 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
return tree.Hash(), version, nil
}

func (tree *MutableTree) saveFastNodeVersion(latestVersion int64, isGenesis bool) error {
if err := tree.saveFastNodeAdditions(isGenesis); err != nil {
func (tree *MutableTree) saveFastNodeVersion(latestVersion int64) error {
if err := tree.saveFastNodeAdditions(); err != nil {
return err
}
if err := tree.saveFastNodeRemovals(); err != nil {
Expand Down Expand Up @@ -831,7 +811,7 @@ func (tree *MutableTree) addUnsavedAddition(key []byte, node *fastnode.Node) {
tree.unsavedFastNodeAdditions.Store(skey, node)
}

func (tree *MutableTree) saveFastNodeAdditions(batchCommmit bool) error {
func (tree *MutableTree) saveFastNodeAdditions() error {
keysToSort := make([]string, 0)
tree.unsavedFastNodeAdditions.Range(func(k, v interface{}) bool {
keysToSort = append(keysToSort, k.(string))
Expand All @@ -844,11 +824,6 @@ func (tree *MutableTree) saveFastNodeAdditions(batchCommmit bool) error {
if err := tree.ndb.SaveFastNode(val.(*fastnode.Node)); err != nil {
return err
}
if batchCommmit {
if err := tree.ndb.resetBatch(); err != nil {
return err
}
}
}
return nil
}
Expand Down
34 changes: 13 additions & 21 deletions mutable_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_
// dbMock represents the underlying database under the hood of nodeDB
dbMock.EXPECT().Get(gomock.Any()).Return(expectedStorageVersion, nil).Times(1)

dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(3)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(2)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1) // called to get latest version
startFormat := fastKeyFormat.Key()
endFormat := fastKeyFormat.Key()
Expand All @@ -940,8 +940,8 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_
batchMock.EXPECT().GetByteSize().Return(100, nil).Times(2)
batchMock.EXPECT().Delete(fastKeyFormat.Key(fastNodeKeyToDelete)).Return(nil).Times(1)
batchMock.EXPECT().Set(metadataKeyFormat.Key([]byte(storageVersionKey)), updatedExpectedStorageVersion).Return(nil).Times(1)
batchMock.EXPECT().Write().Return(nil).Times(2)
batchMock.EXPECT().Close().Return(nil).Times(2)
batchMock.EXPECT().Write().Return(nil).Times(1)
batchMock.EXPECT().Close().Return(nil).Times(1)

// iterMock is used to mock the underlying db iterator behing fast iterator
// Here, we want to mock the behavior of deleting fast nodes from disk when
Expand Down Expand Up @@ -1124,11 +1124,7 @@ func TestUpgradeStorageToFast_Integration_Upgraded_GetFast_Success(t *testing.T)
}

func TestUpgradeStorageToFast_Success(t *testing.T) {
tmpCommitGap := commitGap
commitGap = 1000
defer func() {
commitGap = tmpCommitGap
}()
commitGap := 1000

type fields struct {
nodeCount int
Expand All @@ -1138,10 +1134,10 @@ func TestUpgradeStorageToFast_Success(t *testing.T) {
fields fields
}{
{"less than commit gap", fields{nodeCount: 100}},
{"equal to commit gap", fields{nodeCount: int(commitGap)}},
{"great than commit gap", fields{nodeCount: int(commitGap) + 100}},
{"two times commit gap", fields{nodeCount: int(commitGap) * 2}},
{"two times plus commit gap", fields{nodeCount: int(commitGap)*2 + 1}},
{"equal to commit gap", fields{nodeCount: commitGap}},
{"great than commit gap", fields{nodeCount: commitGap + 100}},
{"two times commit gap", fields{nodeCount: commitGap * 2}},
{"two times plus commit gap", fields{nodeCount: commitGap*2 + 1}},
}

for _, tt := range tests {
Expand All @@ -1164,11 +1160,7 @@ func TestUpgradeStorageToFast_Success(t *testing.T) {

func TestUpgradeStorageToFast_Delete_Stale_Success(t *testing.T) {
// we delete fast node, in case of deadlock. we should limit the stale count lower than chBufferSize(64)
tmpCommitGap := commitGap
commitGap = 5
defer func() {
commitGap = tmpCommitGap
}()
commitGap := 5

valStale := "val_stale"
addStaleKey := func(ndb *nodeDB, staleCount int) {
Expand Down Expand Up @@ -1197,10 +1189,10 @@ func TestUpgradeStorageToFast_Delete_Stale_Success(t *testing.T) {
fields fields
}{
{"stale less than commit gap", fields{nodeCount: 100, staleCount: 4}},
{"stale equal to commit gap", fields{nodeCount: int(commitGap), staleCount: int(commitGap)}},
{"stale great than commit gap", fields{nodeCount: int(commitGap) + 100, staleCount: int(commitGap)*2 - 1}},
{"stale twice commit gap", fields{nodeCount: int(commitGap) + 100, staleCount: int(commitGap) * 2}},
{"stale great than twice commit gap", fields{nodeCount: int(commitGap), staleCount: int(commitGap)*2 + 1}},
{"stale equal to commit gap", fields{nodeCount: commitGap, staleCount: commitGap}},
{"stale great than commit gap", fields{nodeCount: commitGap + 100, staleCount: commitGap*2 - 1}},
{"stale twice commit gap", fields{nodeCount: commitGap + 100, staleCount: commitGap * 2}},
{"stale great than twice commit gap", fields{nodeCount: commitGap, staleCount: commitGap*2 + 1}},
}

for _, tt := range tests {
Expand Down
42 changes: 0 additions & 42 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,6 @@ func (ndb *nodeDB) SaveNode(node *Node) error {
return err
}

// resetBatch only working on generate a genesis block
if node.nodeKey.version <= genesisVersion {
if err := ndb.resetBatch(); err != nil {
return err
}
}

ndb.logger.Debug("BATCH SAVE", "node", node)
ndb.nodeCache.Add(node)
return nil
Expand Down Expand Up @@ -338,41 +331,6 @@ func (ndb *nodeDB) Has(nk []byte) (bool, error) {
return ndb.db.Has(ndb.nodeKey(nk))
}

// resetBatch reset the db batch, keep low memory used
func (ndb *nodeDB) resetBatch() error {
size, err := ndb.batch.GetByteSize()
if err != nil {
// just don't do an optimization here. write with batch size 1.
return ndb.writeBatch()
}
// write in ~64kb chunks. if less than 64kb, continue.
if size < 64*1024 {
return nil
}

return ndb.writeBatch()
}

func (ndb *nodeDB) writeBatch() error {
var err error
if ndb.opts.Sync {
err = ndb.batch.WriteSync()
} else {
err = ndb.batch.Write()
}
if err != nil {
return err
}
err = ndb.batch.Close()
if err != nil {
return err
}

ndb.batch = NewBatchWithFlusher(ndb.db, ndb.opts.FlushThreshold)

return nil
}

// deleteVersion deletes a tree version from disk.
// deletes orphans
func (ndb *nodeDB) deleteVersion(version int64) error {
Expand Down
Loading