From e98bc04a53a082af4f670d0c9b24a8fa1630d766 Mon Sep 17 00:00:00 2001 From: Julien Robert <julien@rbrt.fr> Date: Wed, 11 Dec 2024 21:36:48 +0100 Subject: [PATCH 1/4] test(systemtest): fix cometbft client (#22835) (cherry picked from commit f2663280c41366cb87a02037eac4df232235f727) # Conflicts: # store/v2/root/store.go # tests/systemtests/cometbft_client_test.go --- store/v2/root/store.go | 388 ++++++++++++++++++++++ tests/systemtests/cometbft_client_test.go | 23 +- 2 files changed, 410 insertions(+), 1 deletion(-) create mode 100644 store/v2/root/store.go diff --git a/store/v2/root/store.go b/store/v2/root/store.go new file mode 100644 index 000000000000..ab22d3f90117 --- /dev/null +++ b/store/v2/root/store.go @@ -0,0 +1,388 @@ +package root + +import ( + "crypto/sha256" + "errors" + "fmt" + "io" + "sync" + "time" + + corelog "cosmossdk.io/core/log" + corestore "cosmossdk.io/core/store" + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/metrics" + "cosmossdk.io/store/v2/migration" + "cosmossdk.io/store/v2/proof" + "cosmossdk.io/store/v2/pruning" +) + +var ( + _ store.RootStore = (*Store)(nil) + _ store.UpgradeableStore = (*Store)(nil) +) + +// Store defines the SDK's default RootStore implementation. It contains a single +// State Storage (SS) backend and a single State Commitment (SC) backend. The SC +// backend may or may not support multiple store keys and is implementation +// dependent. +type Store struct { + logger corelog.Logger + + // holds the db instance for closing it + dbCloser io.Closer + + // stateCommitment reflects the state commitment (SC) backend + stateCommitment store.Committer + + // lastCommitInfo reflects the last version/hash that has been committed + lastCommitInfo *proof.CommitInfo + + // telemetry reflects a telemetry agent responsible for emitting metrics (if any) + telemetry metrics.StoreMetrics + + // pruningManager reflects the pruning manager used to prune state of the SS and SC backends + pruningManager *pruning.Manager + + // Migration related fields + // migrationManager reflects the migration manager used to migrate state from v1 to v2 + migrationManager *migration.Manager + // chChangeset reflects the channel used to send the changeset to the migration manager + chChangeset chan *migration.VersionedChangeset + // chDone reflects the channel used to signal the migration manager that the migration + // is done + chDone chan struct{} + // isMigrating reflects whether the store is currently migrating + isMigrating bool +} + +// New creates a new root Store instance. +// +// NOTE: The migration manager is optional and can be nil if no migration is required. +func New( + dbCloser io.Closer, + logger corelog.Logger, + sc store.Committer, + pm *pruning.Manager, + mm *migration.Manager, + m metrics.StoreMetrics, +) (store.RootStore, error) { + return &Store{ + dbCloser: dbCloser, + logger: logger, + stateCommitment: sc, + pruningManager: pm, + migrationManager: mm, + telemetry: m, + isMigrating: mm != nil, + }, nil +} + +// Close closes the store and resets all internal fields. Note, Close() is NOT +// idempotent and should only be called once. +func (s *Store) Close() (err error) { + err = errors.Join(err, s.stateCommitment.Close()) + err = errors.Join(err, s.dbCloser.Close()) + + s.stateCommitment = nil + s.lastCommitInfo = nil + + return err +} + +func (s *Store) SetMetrics(m metrics.Metrics) { + s.telemetry = m +} + +func (s *Store) SetInitialVersion(v uint64) error { + return s.stateCommitment.SetInitialVersion(v) +} + +// getVersionedReader returns a VersionedReader based on the given version. If the +// version exists in the state storage, it returns the state storage. +// If not, it checks if the state commitment implements the VersionedReader interface +// and the version exists in the state commitment, since the state storage will be +// synced during migration. +func (s *Store) getVersionedReader(version uint64) (store.VersionedReader, error) { + isExist, err := s.stateCommitment.VersionExists(version) + if err != nil { + return nil, err + } + if isExist { + return s.stateCommitment, nil + } + return nil, fmt.Errorf("version %d does not exist", version) +} + +func (s *Store) StateLatest() (uint64, corestore.ReaderMap, error) { + v, err := s.GetLatestVersion() + if err != nil { + return 0, nil, err + } + vReader, err := s.getVersionedReader(v) + if err != nil { + return 0, nil, err + } + + return v, NewReaderMap(v, vReader), nil +} + +// StateAt returns a read-only view of the state at a given version. +func (s *Store) StateAt(v uint64) (corestore.ReaderMap, error) { + vReader, err := s.getVersionedReader(v) + return NewReaderMap(v, vReader), err +} + +func (s *Store) GetStateCommitment() store.Committer { + return s.stateCommitment +} + +// LastCommitID returns a CommitID based off of the latest internal CommitInfo. +// If an internal CommitInfo is not set, a new one will be returned with only the +// latest version set, which is based off of the SC view. +func (s *Store) LastCommitID() (proof.CommitID, error) { + if s.lastCommitInfo != nil { + return s.lastCommitInfo.CommitID(), nil + } + + latestVersion, err := s.stateCommitment.GetLatestVersion() + if err != nil { + return proof.CommitID{}, err + } + // if the latest version is 0, we return a CommitID with version 0 and a hash of an empty byte slice + bz := sha256.Sum256([]byte{}) + + return proof.CommitID{Version: latestVersion, Hash: bz[:]}, nil +} + +// GetLatestVersion returns the latest version based on the latest internal +// CommitInfo. An error is returned if the latest CommitInfo or version cannot +// be retrieved. +func (s *Store) GetLatestVersion() (uint64, error) { + lastCommitID, err := s.LastCommitID() + if err != nil { + return 0, err + } + + return lastCommitID.Version, nil +} + +func (s *Store) Query(storeKey []byte, version uint64, key []byte, prove bool) (store.QueryResult, error) { + if s.telemetry != nil { + now := time.Now() + defer s.telemetry.MeasureSince(now, "root_store", "query") + } + + val, err := s.stateCommitment.Get(storeKey, version, key) + if err != nil { + return store.QueryResult{}, fmt.Errorf("failed to query SC store: %w", err) + } + + result := store.QueryResult{ + Key: key, + Value: val, + Version: version, + } + + if prove { + result.ProofOps, err = s.stateCommitment.GetProof(storeKey, version, key) + if err != nil { + return store.QueryResult{}, fmt.Errorf("failed to get SC store proof: %w", err) + } + } + + return result, nil +} + +func (s *Store) LoadLatestVersion() error { + if s.telemetry != nil { + now := time.Now() + defer s.telemetry.MeasureSince(now, "root_store", "load_latest_version") + } + + lv, err := s.GetLatestVersion() + if err != nil { + return err + } + + return s.loadVersion(lv, nil, false) +} + +func (s *Store) LoadVersion(version uint64) error { + if s.telemetry != nil { + now := time.Now() + defer s.telemetry.MeasureSince(now, "root_store", "load_version") + } + + return s.loadVersion(version, nil, false) +} + +func (s *Store) LoadVersionForOverwriting(version uint64) error { + if s.telemetry != nil { + now := time.Now() + defer s.telemetry.MeasureSince(now, "root_store", "load_version_for_overwriting") + } + + return s.loadVersion(version, nil, true) +} + +// LoadVersionAndUpgrade implements the UpgradeableStore interface. +// +// NOTE: It cannot be called while the store is migrating. +func (s *Store) LoadVersionAndUpgrade(version uint64, upgrades *corestore.StoreUpgrades) error { + if upgrades == nil { + return errors.New("upgrades cannot be nil") + } + + if s.telemetry != nil { + defer s.telemetry.MeasureSince(time.Now(), "root_store", "load_version_and_upgrade") + } + + if s.isMigrating { + return errors.New("cannot upgrade while migrating") + } + + if err := s.loadVersion(version, upgrades, true); err != nil { + return err + } + + return nil +} + +func (s *Store) loadVersion(v uint64, upgrades *corestore.StoreUpgrades, overrideAfter bool) error { + s.logger.Debug("loading version", "version", v) + + if upgrades == nil { + if !overrideAfter { + if err := s.stateCommitment.LoadVersion(v); err != nil { + return fmt.Errorf("failed to load SC version %d: %w", v, err) + } + } else { + if err := s.stateCommitment.LoadVersionForOverwriting(v); err != nil { + return fmt.Errorf("failed to load SC version %d: %w", v, err) + } + } + } else { + // if upgrades are provided, we need to load the version and apply the upgrades + if err := s.stateCommitment.LoadVersionAndUpgrade(v, upgrades); err != nil { + return fmt.Errorf("failed to load SS version with upgrades %d: %w", v, err) + } + } + + // set lastCommitInfo explicitly s.t. Commit commits the correct version, i.e. v+1 + var err error + s.lastCommitInfo, err = s.stateCommitment.GetCommitInfo(v) + if err != nil { + return fmt.Errorf("failed to get commit info for version %d: %w", v, err) + } + + // if we're migrating, we need to start the migration process + if s.isMigrating { + s.startMigration() + } + + return nil +} + +// Commit commits all state changes to the underlying SS and SC backends. It +// writes a batch of the changeset to the SC tree, and retrieves the CommitInfo +// from the SC tree. Finally, it commits the SC tree and returns the hash of +// the CommitInfo. +func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) { + if s.telemetry != nil { + now := time.Now() + defer s.telemetry.MeasureSince(now, "root_store", "commit") + } + + if err := s.handleMigration(cs); err != nil { + return nil, err + } + + // signal to the pruning manager that a new version is about to be committed + // this may be required if the SS and SC backends implementation have the + // background pruning process (iavl v1 for example) which must be paused during the commit + s.pruningManager.PausePruning() + + var cInfo *proof.CommitInfo + if err := s.stateCommitment.WriteChangeset(cs); err != nil { + return nil, fmt.Errorf("failed to write batch to SC store: %w", err) + } + + cInfo, err := s.stateCommitment.Commit(cs.Version) + if err != nil { + return nil, fmt.Errorf("failed to commit SC store: %w", err) + } + + if cInfo.Version != cs.Version { + return nil, fmt.Errorf("commit version mismatch: got %d, expected %d", cInfo.Version, cs.Version) + } + s.lastCommitInfo = cInfo + + // signal to the pruning manager that the commit is done + if err := s.pruningManager.ResumePruning(s.lastCommitInfo.Version); err != nil { + s.logger.Error("failed to signal commit done to pruning manager", "err", err) + } + + return s.lastCommitInfo.Hash(), nil +} + +// startMigration starts a migration process to migrate the RootStore/v1 to the +// SS and SC backends of store/v2 and initializes the channels. +// It runs in a separate goroutine and replaces the current RootStore with the +// migrated new backends once the migration is complete. +// +// NOTE: This method should only be called once after loadVersion. +func (s *Store) startMigration() { + // buffer at most 1 changeset, if the receiver is behind attempting to buffer + // more than 1 will block. + s.chChangeset = make(chan *migration.VersionedChangeset, 1) + // it is used to signal the migration manager that the migration is done + s.chDone = make(chan struct{}) + + mtx := sync.Mutex{} + mtx.Lock() + go func() { + version := s.lastCommitInfo.Version + s.logger.Info("starting migration", "version", version) + mtx.Unlock() + if err := s.migrationManager.Start(version, s.chChangeset, s.chDone); err != nil { + s.logger.Error("failed to start migration", "err", err) + } + }() + + // wait for the migration manager to start + mtx.Lock() + defer mtx.Unlock() +} + +func (s *Store) handleMigration(cs *corestore.Changeset) error { + if s.isMigrating { + // if the migration manager has already migrated to the version, close the + // channels and replace the state commitment + if s.migrationManager.GetMigratedVersion() == s.lastCommitInfo.Version { + close(s.chDone) + close(s.chChangeset) + s.isMigrating = false + // close the old state commitment and replace it with the new one + if err := s.stateCommitment.Close(); err != nil { + return fmt.Errorf("failed to close the old SC store: %w", err) + } + newStateCommitment := s.migrationManager.GetStateCommitment() + if newStateCommitment != nil { + s.stateCommitment = newStateCommitment + } + if err := s.migrationManager.Close(); err != nil { + return fmt.Errorf("failed to close migration manager: %w", err) + } + s.logger.Info("migration completed", "version", s.lastCommitInfo.Version) + } else { + // queue the next changeset to the migration manager + s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs} + } + } + return nil +} + +func (s *Store) Prune(version uint64) error { + return s.pruningManager.Prune(version) +} diff --git a/tests/systemtests/cometbft_client_test.go b/tests/systemtests/cometbft_client_test.go index 732b39a480ae..855594833cc8 100644 --- a/tests/systemtests/cometbft_client_test.go +++ b/tests/systemtests/cometbft_client_test.go @@ -22,6 +22,25 @@ import ( qtypes "github.com/cosmos/cosmos-sdk/types/query" ) +<<<<<<< HEAD +======= +func TestQueryStatus(t *testing.T) { + systest.Sut.ResetChain(t) + cli := systest.NewCLIWrapper(t, systest.Sut, systest.Verbose) + systest.Sut.StartChain(t) + + var resp string + if systest.IsV2() { + resp = cli.CustomQuery("comet", "status") + } else { + resp = cli.CustomQuery("status") + } + + // make sure the output has the validator moniker. + assert.Contains(t, resp, "\"moniker\":\"node0\"") +} + +>>>>>>> f2663280c (test(systemtest): fix cometbft client (#22835)) func TestQueryNodeInfo(t *testing.T) { systest.Sut.ResetChain(t) systest.Sut.StartChain(t) @@ -213,7 +232,7 @@ func TestValidatorSetByHeight(t *testing.T) { } } -func TestValidatorSetByHeight_GRPCRestGateway(t *testing.T) { +func TestValidatorSetByHeight_GRPCGateway(t *testing.T) { systest.Sut.ResetChain(t) systest.Sut.StartChain(t) @@ -247,7 +266,9 @@ func TestValidatorSetByHeight_GRPCRestGateway(t *testing.T) { } func TestABCIQuery(t *testing.T) { + systest.Sut.ResetChain(t) systest.Sut.StartChain(t) + _ = systest.Sut.AwaitNextBlock(t, time.Second*3) qc := cmtservice.NewServiceClient(systest.Sut.RPCClient(t)) cdc := codec.NewProtoCodec(codectypes.NewInterfaceRegistry()) From f9db860965a47e5ddd3f763f8013f4d0d503e14c Mon Sep 17 00:00:00 2001 From: Julien Robert <julien@rbrt.fr> Date: Wed, 11 Dec 2024 21:59:15 +0100 Subject: [PATCH 2/4] Delete store/v2/root/store.go --- store/v2/root/store.go | 388 ----------------------------------------- 1 file changed, 388 deletions(-) delete mode 100644 store/v2/root/store.go diff --git a/store/v2/root/store.go b/store/v2/root/store.go deleted file mode 100644 index ab22d3f90117..000000000000 --- a/store/v2/root/store.go +++ /dev/null @@ -1,388 +0,0 @@ -package root - -import ( - "crypto/sha256" - "errors" - "fmt" - "io" - "sync" - "time" - - corelog "cosmossdk.io/core/log" - corestore "cosmossdk.io/core/store" - "cosmossdk.io/store/v2" - "cosmossdk.io/store/v2/metrics" - "cosmossdk.io/store/v2/migration" - "cosmossdk.io/store/v2/proof" - "cosmossdk.io/store/v2/pruning" -) - -var ( - _ store.RootStore = (*Store)(nil) - _ store.UpgradeableStore = (*Store)(nil) -) - -// Store defines the SDK's default RootStore implementation. It contains a single -// State Storage (SS) backend and a single State Commitment (SC) backend. The SC -// backend may or may not support multiple store keys and is implementation -// dependent. -type Store struct { - logger corelog.Logger - - // holds the db instance for closing it - dbCloser io.Closer - - // stateCommitment reflects the state commitment (SC) backend - stateCommitment store.Committer - - // lastCommitInfo reflects the last version/hash that has been committed - lastCommitInfo *proof.CommitInfo - - // telemetry reflects a telemetry agent responsible for emitting metrics (if any) - telemetry metrics.StoreMetrics - - // pruningManager reflects the pruning manager used to prune state of the SS and SC backends - pruningManager *pruning.Manager - - // Migration related fields - // migrationManager reflects the migration manager used to migrate state from v1 to v2 - migrationManager *migration.Manager - // chChangeset reflects the channel used to send the changeset to the migration manager - chChangeset chan *migration.VersionedChangeset - // chDone reflects the channel used to signal the migration manager that the migration - // is done - chDone chan struct{} - // isMigrating reflects whether the store is currently migrating - isMigrating bool -} - -// New creates a new root Store instance. -// -// NOTE: The migration manager is optional and can be nil if no migration is required. -func New( - dbCloser io.Closer, - logger corelog.Logger, - sc store.Committer, - pm *pruning.Manager, - mm *migration.Manager, - m metrics.StoreMetrics, -) (store.RootStore, error) { - return &Store{ - dbCloser: dbCloser, - logger: logger, - stateCommitment: sc, - pruningManager: pm, - migrationManager: mm, - telemetry: m, - isMigrating: mm != nil, - }, nil -} - -// Close closes the store and resets all internal fields. Note, Close() is NOT -// idempotent and should only be called once. -func (s *Store) Close() (err error) { - err = errors.Join(err, s.stateCommitment.Close()) - err = errors.Join(err, s.dbCloser.Close()) - - s.stateCommitment = nil - s.lastCommitInfo = nil - - return err -} - -func (s *Store) SetMetrics(m metrics.Metrics) { - s.telemetry = m -} - -func (s *Store) SetInitialVersion(v uint64) error { - return s.stateCommitment.SetInitialVersion(v) -} - -// getVersionedReader returns a VersionedReader based on the given version. If the -// version exists in the state storage, it returns the state storage. -// If not, it checks if the state commitment implements the VersionedReader interface -// and the version exists in the state commitment, since the state storage will be -// synced during migration. -func (s *Store) getVersionedReader(version uint64) (store.VersionedReader, error) { - isExist, err := s.stateCommitment.VersionExists(version) - if err != nil { - return nil, err - } - if isExist { - return s.stateCommitment, nil - } - return nil, fmt.Errorf("version %d does not exist", version) -} - -func (s *Store) StateLatest() (uint64, corestore.ReaderMap, error) { - v, err := s.GetLatestVersion() - if err != nil { - return 0, nil, err - } - vReader, err := s.getVersionedReader(v) - if err != nil { - return 0, nil, err - } - - return v, NewReaderMap(v, vReader), nil -} - -// StateAt returns a read-only view of the state at a given version. -func (s *Store) StateAt(v uint64) (corestore.ReaderMap, error) { - vReader, err := s.getVersionedReader(v) - return NewReaderMap(v, vReader), err -} - -func (s *Store) GetStateCommitment() store.Committer { - return s.stateCommitment -} - -// LastCommitID returns a CommitID based off of the latest internal CommitInfo. -// If an internal CommitInfo is not set, a new one will be returned with only the -// latest version set, which is based off of the SC view. -func (s *Store) LastCommitID() (proof.CommitID, error) { - if s.lastCommitInfo != nil { - return s.lastCommitInfo.CommitID(), nil - } - - latestVersion, err := s.stateCommitment.GetLatestVersion() - if err != nil { - return proof.CommitID{}, err - } - // if the latest version is 0, we return a CommitID with version 0 and a hash of an empty byte slice - bz := sha256.Sum256([]byte{}) - - return proof.CommitID{Version: latestVersion, Hash: bz[:]}, nil -} - -// GetLatestVersion returns the latest version based on the latest internal -// CommitInfo. An error is returned if the latest CommitInfo or version cannot -// be retrieved. -func (s *Store) GetLatestVersion() (uint64, error) { - lastCommitID, err := s.LastCommitID() - if err != nil { - return 0, err - } - - return lastCommitID.Version, nil -} - -func (s *Store) Query(storeKey []byte, version uint64, key []byte, prove bool) (store.QueryResult, error) { - if s.telemetry != nil { - now := time.Now() - defer s.telemetry.MeasureSince(now, "root_store", "query") - } - - val, err := s.stateCommitment.Get(storeKey, version, key) - if err != nil { - return store.QueryResult{}, fmt.Errorf("failed to query SC store: %w", err) - } - - result := store.QueryResult{ - Key: key, - Value: val, - Version: version, - } - - if prove { - result.ProofOps, err = s.stateCommitment.GetProof(storeKey, version, key) - if err != nil { - return store.QueryResult{}, fmt.Errorf("failed to get SC store proof: %w", err) - } - } - - return result, nil -} - -func (s *Store) LoadLatestVersion() error { - if s.telemetry != nil { - now := time.Now() - defer s.telemetry.MeasureSince(now, "root_store", "load_latest_version") - } - - lv, err := s.GetLatestVersion() - if err != nil { - return err - } - - return s.loadVersion(lv, nil, false) -} - -func (s *Store) LoadVersion(version uint64) error { - if s.telemetry != nil { - now := time.Now() - defer s.telemetry.MeasureSince(now, "root_store", "load_version") - } - - return s.loadVersion(version, nil, false) -} - -func (s *Store) LoadVersionForOverwriting(version uint64) error { - if s.telemetry != nil { - now := time.Now() - defer s.telemetry.MeasureSince(now, "root_store", "load_version_for_overwriting") - } - - return s.loadVersion(version, nil, true) -} - -// LoadVersionAndUpgrade implements the UpgradeableStore interface. -// -// NOTE: It cannot be called while the store is migrating. -func (s *Store) LoadVersionAndUpgrade(version uint64, upgrades *corestore.StoreUpgrades) error { - if upgrades == nil { - return errors.New("upgrades cannot be nil") - } - - if s.telemetry != nil { - defer s.telemetry.MeasureSince(time.Now(), "root_store", "load_version_and_upgrade") - } - - if s.isMigrating { - return errors.New("cannot upgrade while migrating") - } - - if err := s.loadVersion(version, upgrades, true); err != nil { - return err - } - - return nil -} - -func (s *Store) loadVersion(v uint64, upgrades *corestore.StoreUpgrades, overrideAfter bool) error { - s.logger.Debug("loading version", "version", v) - - if upgrades == nil { - if !overrideAfter { - if err := s.stateCommitment.LoadVersion(v); err != nil { - return fmt.Errorf("failed to load SC version %d: %w", v, err) - } - } else { - if err := s.stateCommitment.LoadVersionForOverwriting(v); err != nil { - return fmt.Errorf("failed to load SC version %d: %w", v, err) - } - } - } else { - // if upgrades are provided, we need to load the version and apply the upgrades - if err := s.stateCommitment.LoadVersionAndUpgrade(v, upgrades); err != nil { - return fmt.Errorf("failed to load SS version with upgrades %d: %w", v, err) - } - } - - // set lastCommitInfo explicitly s.t. Commit commits the correct version, i.e. v+1 - var err error - s.lastCommitInfo, err = s.stateCommitment.GetCommitInfo(v) - if err != nil { - return fmt.Errorf("failed to get commit info for version %d: %w", v, err) - } - - // if we're migrating, we need to start the migration process - if s.isMigrating { - s.startMigration() - } - - return nil -} - -// Commit commits all state changes to the underlying SS and SC backends. It -// writes a batch of the changeset to the SC tree, and retrieves the CommitInfo -// from the SC tree. Finally, it commits the SC tree and returns the hash of -// the CommitInfo. -func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) { - if s.telemetry != nil { - now := time.Now() - defer s.telemetry.MeasureSince(now, "root_store", "commit") - } - - if err := s.handleMigration(cs); err != nil { - return nil, err - } - - // signal to the pruning manager that a new version is about to be committed - // this may be required if the SS and SC backends implementation have the - // background pruning process (iavl v1 for example) which must be paused during the commit - s.pruningManager.PausePruning() - - var cInfo *proof.CommitInfo - if err := s.stateCommitment.WriteChangeset(cs); err != nil { - return nil, fmt.Errorf("failed to write batch to SC store: %w", err) - } - - cInfo, err := s.stateCommitment.Commit(cs.Version) - if err != nil { - return nil, fmt.Errorf("failed to commit SC store: %w", err) - } - - if cInfo.Version != cs.Version { - return nil, fmt.Errorf("commit version mismatch: got %d, expected %d", cInfo.Version, cs.Version) - } - s.lastCommitInfo = cInfo - - // signal to the pruning manager that the commit is done - if err := s.pruningManager.ResumePruning(s.lastCommitInfo.Version); err != nil { - s.logger.Error("failed to signal commit done to pruning manager", "err", err) - } - - return s.lastCommitInfo.Hash(), nil -} - -// startMigration starts a migration process to migrate the RootStore/v1 to the -// SS and SC backends of store/v2 and initializes the channels. -// It runs in a separate goroutine and replaces the current RootStore with the -// migrated new backends once the migration is complete. -// -// NOTE: This method should only be called once after loadVersion. -func (s *Store) startMigration() { - // buffer at most 1 changeset, if the receiver is behind attempting to buffer - // more than 1 will block. - s.chChangeset = make(chan *migration.VersionedChangeset, 1) - // it is used to signal the migration manager that the migration is done - s.chDone = make(chan struct{}) - - mtx := sync.Mutex{} - mtx.Lock() - go func() { - version := s.lastCommitInfo.Version - s.logger.Info("starting migration", "version", version) - mtx.Unlock() - if err := s.migrationManager.Start(version, s.chChangeset, s.chDone); err != nil { - s.logger.Error("failed to start migration", "err", err) - } - }() - - // wait for the migration manager to start - mtx.Lock() - defer mtx.Unlock() -} - -func (s *Store) handleMigration(cs *corestore.Changeset) error { - if s.isMigrating { - // if the migration manager has already migrated to the version, close the - // channels and replace the state commitment - if s.migrationManager.GetMigratedVersion() == s.lastCommitInfo.Version { - close(s.chDone) - close(s.chChangeset) - s.isMigrating = false - // close the old state commitment and replace it with the new one - if err := s.stateCommitment.Close(); err != nil { - return fmt.Errorf("failed to close the old SC store: %w", err) - } - newStateCommitment := s.migrationManager.GetStateCommitment() - if newStateCommitment != nil { - s.stateCommitment = newStateCommitment - } - if err := s.migrationManager.Close(); err != nil { - return fmt.Errorf("failed to close migration manager: %w", err) - } - s.logger.Info("migration completed", "version", s.lastCommitInfo.Version) - } else { - // queue the next changeset to the migration manager - s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs} - } - } - return nil -} - -func (s *Store) Prune(version uint64) error { - return s.pruningManager.Prune(version) -} From f046aa6bb6ddc5c2454217b298b83e8cf3ba700a Mon Sep 17 00:00:00 2001 From: Julien Robert <julien@rbrt.fr> Date: Wed, 11 Dec 2024 22:02:40 +0100 Subject: [PATCH 3/4] updates --- tests/systemtests/cometbft_client_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/systemtests/cometbft_client_test.go b/tests/systemtests/cometbft_client_test.go index 855594833cc8..c183e27a7da3 100644 --- a/tests/systemtests/cometbft_client_test.go +++ b/tests/systemtests/cometbft_client_test.go @@ -22,8 +22,6 @@ import ( qtypes "github.com/cosmos/cosmos-sdk/types/query" ) -<<<<<<< HEAD -======= func TestQueryStatus(t *testing.T) { systest.Sut.ResetChain(t) cli := systest.NewCLIWrapper(t, systest.Sut, systest.Verbose) @@ -40,7 +38,6 @@ func TestQueryStatus(t *testing.T) { assert.Contains(t, resp, "\"moniker\":\"node0\"") } ->>>>>>> f2663280c (test(systemtest): fix cometbft client (#22835)) func TestQueryNodeInfo(t *testing.T) { systest.Sut.ResetChain(t) systest.Sut.StartChain(t) From bae697114b374743474d5e06ebf9cb4ef902e26b Mon Sep 17 00:00:00 2001 From: Julien Robert <julien@rbrt.fr> Date: Wed, 11 Dec 2024 22:03:33 +0100 Subject: [PATCH 4/4] sync with main --- tests/systemtests/cometbft_client_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/systemtests/cometbft_client_test.go b/tests/systemtests/cometbft_client_test.go index c183e27a7da3..dbaa3e6d14ee 100644 --- a/tests/systemtests/cometbft_client_test.go +++ b/tests/systemtests/cometbft_client_test.go @@ -192,7 +192,6 @@ func TestLatestValidatorSet_GRPCGateway(t *testing.T) { } rsp := systest.GetRequest(t, baseurl+tc.url) assert.Equal(t, len(vals), int(gjson.GetBytes(rsp, "pagination.total").Int())) - }) } } @@ -255,9 +254,9 @@ func TestValidatorSetByHeight_GRPCGateway(t *testing.T) { if tc.expErr { errMsg := gjson.GetBytes(rsp, "message").String() assert.Contains(t, errMsg, tc.expErrMsg) - } else { - assert.Equal(t, len(vals), int(gjson.GetBytes(rsp, "pagination.total").Int())) + return } + assert.Equal(t, len(vals), int(gjson.GetBytes(rsp, "pagination.total").Int())) }) } }