Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store/v2): pruning manager #20430

Merged
merged 12 commits into from
May 29, 2024
11 changes: 11 additions & 0 deletions store/v2/commitment/iavl/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (

corestore "cosmossdk.io/core/store"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
dbm "cosmossdk.io/store/v2/db"
)

var _ commitment.Tree = (*IavlTree)(nil)
var _ store.PausablePruner = (*IavlTree)(nil)

// IavlTree is a wrapper around iavl.MutableTree.
type IavlTree struct {
Expand Down Expand Up @@ -98,6 +100,15 @@ func (t *IavlTree) Prune(version uint64) error {
return t.tree.DeleteVersionsTo(int64(version))
}

// PausePruning pauses the pruning process.
func (t *IavlTree) PausePruning(pause bool) {
if pause {
t.tree.SetCommitting()
} else {
t.tree.UnsetCommitting()
}
}

// Export exports the tree exporter at the given version.
func (t *IavlTree) Export(version uint64) (commitment.Exporter, error) {
tree, err := t.tree.GetImmutable(int64(version))
Expand Down
5 changes: 2 additions & 3 deletions store/v2/commitment/iavl/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,20 @@ import (

corestore "cosmossdk.io/core/store"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
dbm "cosmossdk.io/store/v2/db"
)

func TestCommitterSuite(t *testing.T) {
s := &commitment.CommitStoreTestSuite{
NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*commitment.CommitStore, error) {
NewStore: func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*commitment.CommitStore, error) {
multiTrees := make(map[string]commitment.Tree)
cfg := DefaultConfig()
for _, storeKey := range storeKeys {
prefixDB := dbm.NewPrefixDB(db, []byte(storeKey))
multiTrees[storeKey] = NewIavlTree(prefixDB, logger, cfg)
}
return commitment.NewCommitStore(multiTrees, db, pruneOpts, logger)
return commitment.NewCommitStore(multiTrees, db, logger)
},
}

Expand Down
34 changes: 15 additions & 19 deletions store/v2/commitment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
var (
_ store.Committer = (*CommitStore)(nil)
_ snapshots.CommitSnapshotter = (*CommitStore)(nil)
_ store.PausablePruner = (*CommitStore)(nil)
)

// CommitStore is a wrapper around multiple Tree objects mapped by a unique store
Expand All @@ -39,22 +40,14 @@
logger log.Logger
db corestore.KVStoreWithBatch
multiTrees map[string]Tree

// pruneOptions is the pruning configuration.
pruneOptions *store.PruneOptions // TODO are there no default prune options?
}

// NewCommitStore creates a new CommitStore instance.
func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error) {
if pruneOpts == nil {
pruneOpts = store.DefaultPruneOptions()
}

func NewCommitStore(trees map[string]Tree, db corestore.KVStoreWithBatch, logger log.Logger) (*CommitStore, error) {
return &CommitStore{
logger: logger,
db: db,
multiTrees: trees,
pruneOptions: pruneOpts,
logger: logger,
db: db,
multiTrees: trees,
}, nil
}

Expand Down Expand Up @@ -237,13 +230,6 @@
return nil, err
}

// Prune the old versions.
if prune, pruneVersion := c.pruneOptions.ShouldPrune(version); prune {
if err := c.Prune(pruneVersion); err != nil {
c.logger.Info("failed to prune SC", "prune_version", pruneVersion, "err", err)
}
}

return cInfo, nil
}

Expand Down Expand Up @@ -297,6 +283,7 @@
return bz, nil
}

// Prune implements store.PausablePruner.
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
func (c *CommitStore) Prune(version uint64) (ferr error) {
// prune the metadata
batch := c.db.NewBatch()
Expand All @@ -322,6 +309,15 @@
return ferr
}

// PausePruning implements store.PausablePruner.
func (c *CommitStore) PausePruning(pause bool) {
for _, tree := range c.multiTrees {
if pruner, ok := tree.(store.PausablePruner); ok {
pruner.PausePruning(pause)
}
}
Dismissed Show dismissed Hide dismissed
}

// Snapshot implements snapshotstypes.CommitSnapshotter.
func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error {
if version == 0 {
Expand Down
13 changes: 9 additions & 4 deletions store/v2/commitment/store_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ const (
type CommitStoreTestSuite struct {
suite.Suite

NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, pruneOpts *store.PruneOptions, logger log.Logger) (*CommitStore, error)
NewStore func(db corestore.KVStoreWithBatch, storeKeys []string, logger log.Logger) (*CommitStore, error)
}

func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
storeKeys := []string{storeKey1, storeKey2}
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

latestVersion := uint64(10)
Expand Down Expand Up @@ -64,7 +64,7 @@ func (s *CommitStoreTestSuite) TestStore_Snapshotter() {
},
}

targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, nil, log.NewNopLogger())
targetStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

chunks := make(chan io.ReadCloser, kvCount*int(latestVersion))
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() {
KeepRecent: 10,
Interval: 5,
}
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, pruneOpts, log.NewNopLogger())
commitStore, err := s.NewStore(dbm.NewMemDB(), storeKeys, log.NewNopLogger())
s.Require().NoError(err)

latestVersion := uint64(100)
Expand All @@ -148,6 +148,11 @@ func (s *CommitStoreTestSuite) TestStore_Pruning() {

_, err = commitStore.Commit(i)
s.Require().NoError(err)

if prune, pruneVersion := pruneOpts.ShouldPrune(i); prune {
s.Require().NoError(commitStore.Prune(pruneVersion))
}

}

pruneVersion := latestVersion - pruneOpts.KeepRecent - 1
Expand Down
2 changes: 1 addition & 1 deletion store/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/cockroachdb/pebble v1.1.0
github.com/cometbft/cometbft v0.38.7
github.com/cosmos/gogoproto v1.4.12
github.com/cosmos/iavl v1.1.4
github.com/cosmos/iavl v1.2.0
github.com/cosmos/ics23/go v0.10.0
github.com/google/btree v1.1.2
github.com/hashicorp/go-metrics v0.5.3
Expand Down
4 changes: 2 additions & 2 deletions store/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/cosmos/cosmos-db v1.0.2 h1:hwMjozuY1OlJs/uh6vddqnk9j7VamLv+0DBlbEXbAK
github.com/cosmos/cosmos-db v1.0.2/go.mod h1:Z8IXcFJ9PqKK6BIsVOB3QXtkKoqUOp1vRvPT39kOXEA=
github.com/cosmos/gogoproto v1.4.12 h1:vB6Lbe/rtnYGjQuFxkPiPYiCybqFT8QvLipDZP8JpFE=
github.com/cosmos/gogoproto v1.4.12/go.mod h1:LnZob1bXRdUoqMMtwYlcR3wjiElmlC+FkjaZRv1/eLY=
github.com/cosmos/iavl v1.1.4 h1:Z0cVVjeQqOUp78/nWt/uhQy83vYluWlAMGQ4zbH9G34=
github.com/cosmos/iavl v1.1.4/go.mod h1:vCYmRQUJU1wwj0oRD3wMEtOM9sJNDP+GFMaXmIxZ/rU=
github.com/cosmos/iavl v1.2.0 h1:kVxTmjTh4k0Dh1VNL046v6BXqKziqMDzxo93oh3kOfM=
github.com/cosmos/iavl v1.2.0/go.mod h1:HidWWLVAtODJqFD6Hbne2Y0q3SdxByJepHUOeoH4LiI=
github.com/cosmos/ics23/go v0.10.0 h1:iXqLLgp2Lp+EdpIuwXTYIQU+AiHj9mOC2X9ab++bZDM=
github.com/cosmos/ics23/go v0.10.0/go.mod h1:ZfJSmng/TBNTBkFemHHHj5YY7VAU/MBU980F4VU1NG0=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down
6 changes: 3 additions & 3 deletions store/v2/migration/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm
multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}

commitStore, err := commitment.NewCommitStore(multiTrees, db, nil, log.NewNopLogger())
commitStore, err := commitment.NewCommitStore(multiTrees, db, log.NewNopLogger())
require.NoError(t, err)

snapshotsStore, err := snapshots.NewStore(t.TempDir())
Expand All @@ -38,7 +38,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm

storageDB, err := pebbledb.New(t.TempDir())
require.NoError(t, err)
newStorageStore := storage.NewStorageStore(storageDB, nil, log.NewNopLogger()) // for store/v2
newStorageStore := storage.NewStorageStore(storageDB, log.NewNopLogger()) // for store/v2

db1 := dbm.NewMemDB()
multiTrees1 := make(map[string]commitment.Tree)
Expand All @@ -47,7 +47,7 @@ func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitm
multiTrees1[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig())
}

newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2
newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, log.NewNopLogger()) // for store/v2
require.NoError(t, err)
if noCommitStore {
newCommitStore = nil
Expand Down
67 changes: 67 additions & 0 deletions store/v2/pruning/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package pruning
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a small readme/doc.go on the design with a small diagram on how this is meant to be used


import "cosmossdk.io/store/v2"

// Manager is a struct that manages the pruning of old versions of the SC and SS.
type Manager struct {
// scPruner is the pruner for the SC.
scPruner store.Pruner
// scPruningOptions are the pruning options for the SC.
scPruningOptions *store.PruneOptions
// ssPruner is the pruner for the SS.
ssPruner store.Pruner
// ssPruningOptions are the pruning options for the SS.
ssPruningOptions *store.PruneOptions
}

// NewManager creates a new Pruning Manager.
func NewManager(scPruner, ssPruner store.Pruner, scPruningOptions, ssPruningOptions *store.PruneOptions) *Manager {
return &Manager{
scPruner: scPruner,
scPruningOptions: scPruningOptions,
ssPruner: ssPruner,
ssPruningOptions: ssPruningOptions,
}
}

// Prune prunes the SC and SS to the provided version.
//
// NOTE: It can be called outside of the store manually.
func (m *Manager) Prune(version uint64) error {
// Prune the SC.
if m.scPruningOptions != nil {
if prune, pruneTo := m.scPruningOptions.ShouldPrune(version); prune {
if err := m.scPruner.Prune(pruneTo); err != nil {
return err
}
}
}

// Prune the SS.
if m.ssPruningOptions != nil {
if prune, pruneTo := m.ssPruningOptions.ShouldPrune(version); prune {
if err := m.ssPruner.Prune(pruneTo); err != nil {
return err
}
}
}

return nil
}

// SignalCommit signals to the manager that a commit has started or finished.
// This is used to avoid concurrent writes while pruning.
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
func (m *Manager) SignalCommit(start bool, version uint64) error {
if scPausablePruner, ok := m.scPruner.(store.PausablePruner); ok {
scPausablePruner.PausePruning(start)
}
if ssPausablePruner, ok := m.ssPruner.(store.PausablePruner); ok {
ssPausablePruner.PausePruning(start)
}

if !start {
return m.Prune(version)
}

return nil
}
29 changes: 16 additions & 13 deletions store/v2/root/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"cosmossdk.io/store/v2/commitment/mem"
"cosmossdk.io/store/v2/db"
"cosmossdk.io/store/v2/internal"
"cosmossdk.io/store/v2/pruning"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/pebbledb"
"cosmossdk.io/store/v2/storage/sqlite"
Expand All @@ -31,14 +32,15 @@ const (
)

type FactoryOptions struct {
Logger log.Logger
RootDir string
SSType SSType
SCType SCType
PruneOptions *store.PruneOptions
IavlConfig *iavl.Config
StoreKeys []string
SCRawDB corestore.KVStoreWithBatch
Logger log.Logger
RootDir string
SSType SSType
SCType SCType
SSPruneOptions *store.PruneOptions
SCPruneOptions *store.PruneOptions
IavlConfig *iavl.Config
StoreKeys []string
SCRawDB corestore.KVStoreWithBatch
}

// CreateRootStore is a convenience function to create a root store based on the
Expand Down Expand Up @@ -71,15 +73,15 @@ func CreateRootStore(opts *FactoryOptions) (store.RootStore, error) {
if err = ensureDir(dir); err != nil {
return nil, err
}
ssDb, err = pebbledb.New(fmt.Sprintf("%s/data/ss/pebble", opts.RootDir))
ssDb, err = pebbledb.New(dir)
case SSTypeRocks:
// TODO: rocksdb requires build tags so is not supported here by default
return nil, fmt.Errorf("rocksdb not supported")
}
if err != nil {
return nil, err
}
ss = storage.NewStorageStore(ssDb, opts.PruneOptions, opts.Logger)
ss = storage.NewStorageStore(ssDb, opts.Logger)

trees := make(map[string]commitment.Tree)
for _, key := range opts.StoreKeys {
Expand All @@ -93,12 +95,13 @@ func CreateRootStore(opts *FactoryOptions) (store.RootStore, error) {
return nil, fmt.Errorf("iavl v2 not supported")
}
}
sc, err = commitment.NewCommitStore(trees, opts.SCRawDB, opts.PruneOptions, opts.Logger)
}

sc, err = commitment.NewCommitStore(trees, opts.SCRawDB, opts.Logger)
if err != nil {
return nil, err
}

return New(opts.Logger, ss, sc, nil, nil)
pm := pruning.NewManager(sc, ss, opts.SCPruneOptions, opts.SSPruneOptions)

return New(opts.Logger, ss, sc, pm, nil, nil)
}
Loading
Loading