Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store/v2): add WorkingHash #20706

Merged
merged 9 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions runtime/v2/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type Store interface {
// state. Must error when the version does not exist.
StateAt(version uint64) (store.ReaderMap, error)

// SetInitialVersion sets the initial version of the store.
SetInitialVersion(uint64) error

WorkingHash(changeset *store.Changeset) (store.Hash, error)
Commit(changeset *store.Changeset) (store.Hash, error)
cool-develope marked this conversation as resolved.
Show resolved Hide resolved

// Query is a key/value query directly to the underlying database. This skips the appmanager
Expand Down
3 changes: 0 additions & 3 deletions scripts/simapp-v2-init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ jq '.app_state.gov.voting_params.voting_period = "600s"' genesis.json > temp.jso
# to change the inflation
jq '.app_state.mint.minter.inflation = "0.300000000000000000"' genesis.json > temp.json && mv temp.json genesis.json

# change the initial height to 2 to work around store/v2 and iavl limitations with a genesis block
jq '.initial_height = 2' genesis.json > temp.json && mv temp.json genesis.json

$SIMD keys add test_validator --indiscreet
VALIDATOR_ADDRESS=$($SIMD keys show test_validator -a --keyring-backend test)

Expand Down
25 changes: 23 additions & 2 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe

// store chainID to be used later on in execution
c.chainID = req.ChainId
c.cfg.InitialHeight = uint64(req.InitialHeight) // TODO: cfg is loaded initially ???
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @julienrbrt also we should document why this is being set in a small godoc so the next person understands the reasoning

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment means?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, c.cfg is empty, I am not sure why needs an empty config ❓


// On a new chain, we consider the init chain block height as 0, even though
// req.InitialHeight is 1 by default.
Expand Down Expand Up @@ -282,16 +283,21 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe

validatorUpdates := intoABCIValidatorUpdates(blockresponse.ValidatorUpdates)

// set the initial version of the store
if err := c.store.SetInitialVersion(uint64(req.InitialHeight)); err != nil {
return nil, fmt.Errorf("failed to set initial version: %w", err)
}

stateChanges, err := genesisState.GetStateChanges()
if err != nil {
return nil, err
}
cs := &store.Changeset{
Changes: stateChanges,
}
stateRoot, err := c.store.Commit(cs)
stateRoot, err := c.store.WorkingHash(cs)
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("unable to commit the changeset: %w", err)
return nil, fmt.Errorf("unable to write the changeset: %w", err)
}

return &abci.InitChainResponse{
Expand Down Expand Up @@ -413,6 +419,21 @@ func (c *Consensus[T]) FinalizeBlock(
// LastCommit: sdktypes.ToSDKCommitInfo(req.DecidedLastCommit),
// })

// we don't need to deliver the block in the genesis block
if req.Height == int64(c.cfg.InitialHeight) {
appHash, err := c.store.Commit(store.NewChangeset())
if err != nil {
return nil, fmt.Errorf("unable to commit the changeset: %w", err)
}
c.lastCommittedBlock.Store(&BlockData{
Height: req.Height,
Hash: appHash,
})
return &abciproto.FinalizeBlockResponse{
AppHash: appHash,
}, nil
}

// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
Expand Down
7 changes: 7 additions & 0 deletions server/v2/cometbft/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ type Store interface {
// associated with it.
StateLatest() (uint64, store.ReaderMap, error)

// SetInitialVersion sets the initial version of the store.
SetInitialVersion(uint64) error

// WorkingHash writes the provided changeset to the state and returns
// the working hash of the state.
WorkingHash(*store.Changeset) (store.Hash, error)

// Commit commits the provided changeset and returns
// the new state root of the state.
Commit(*store.Changeset) (store.Hash, error)
Expand Down
25 changes: 6 additions & 19 deletions store/v2/commitment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,6 @@ func (c *CommitStore) GetLatestVersion() (uint64, error) {
return version, nil
}

// IsEmpty returns true if the CommitStore is empty.
func (c *CommitStore) IsEmpty() (bool, error) {
value, err := c.db.Get([]byte(latestVersionKey))
if err != nil {
return false, err
}
if value == nil {
return true, nil
} else {
return false, nil
}
}

func (c *CommitStore) LoadVersion(targetVersion uint64) error {
// Rollback the metadata to the target version.
latestVersion, err := c.GetLatestVersion()
Expand Down Expand Up @@ -219,18 +206,18 @@ func (c *CommitStore) Commit(version uint64) (*proof.CommitInfo, error) {
// If a commit event execution is interrupted, a new iavl store's version
// will be larger than the RMS's metadata, when the block is replayed, we
// should avoid committing that iavl store again.
var (
commitID proof.CommitID
latestVersion = tree.GetLatestVersion()
)
if latestVersion != 0 && latestVersion >= version {
var commitID proof.CommitID
if tree.GetLatestVersion() >= version {
commitID.Version = version
commitID.Hash = tree.Hash()
} else {
hash, version, err := tree.Commit()
hash, cversion, err := tree.Commit()
if err != nil {
return nil, err
}
if cversion != version {
return nil, fmt.Errorf("commit version %d does not match the target version %d", cversion, version)
}
commitID = proof.CommitID{
Version: version,
Hash: hash,
Expand Down
3 changes: 0 additions & 3 deletions store/v2/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ type Committer interface {
// GetCommitInfo returns the CommitInfo for the given version.
GetCommitInfo(version uint64) (*proof.CommitInfo, error)

// IsEmpty returns true if the database is empty.
IsEmpty() (bool, error)

// Close releases associated resources. It should NOT be idempotent. It must
// only be called once and any call after may panic.
io.Closer
Expand Down
44 changes: 37 additions & 7 deletions store/v2/root/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type Store struct {
// stateCommitment reflects the state commitment (SC) backend
stateCommitment store.Committer

// commitHeader reflects the header used when committing state (note, this isn't required and only used for query purposes)
// commitHeader reflects the header used when committing state
// note, this isn't required and only used for query purposes)
Comment on lines +38 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify the use of commitHeader in the Store struct.

The comment for commitHeader should clarify its exact role, especially how it interacts with other components and any implications it has on the state or behavior of the store.

commitHeader *coreheader.Info

// lastCommitInfo reflects the last version/hash that has been committed
Expand Down Expand Up @@ -257,6 +258,40 @@ func (s *Store) SetCommitHeader(h *coreheader.Info) {
s.commitHeader = h
}

// WorkingHash writes the changeset to SC and SS and returns the workingHash
// of the CommitInfo.
func (s *Store) WorkingHash(cs *corestore.Changeset) ([]byte, error) {
if s.telemetry != nil {
now := time.Now()
Dismissed Show dismissed Hide dismissed
defer s.telemetry.MeasureSince(now, "root_store", "working_hash")
}

// write the changeset to the SC and SS backends
eg := new(errgroup.Group)
eg.Go(func() error {
if err := s.writeSC(cs); err != nil {
return fmt.Errorf("failed to write SC: %w", err)
}

return nil
})
eg.Go(func() error {
if err := s.stateStorage.ApplyChangeset(s.initialVersion, cs); err != nil {
return fmt.Errorf("failed to commit SS: %w", err)
}

return nil
})
if err := eg.Wait(); err != nil {
return nil, err
}

workingHash := s.lastCommitInfo.Hash()
s.lastCommitInfo.Version -= 1 // reset lastCommitInfo to allow Commit() to work correctly

return workingHash, nil
}
Comment on lines +265 to +297
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor the WorkingHash method for better error handling and separation of concerns.

The WorkingHash method should separate the concerns of writing to the SC and SS backends more clearly and ensure that errors from these operations are handled correctly without affecting each other.

- if err := s.writeSC(cs); err != nil {
-     return fmt.Errorf("failed to write SC: %w", err)
- }
+ scErr := s.writeSC(cs)
+ ssErr := s.stateStorage.ApplyChangeset(s.initialVersion, cs)
+ if scErr != nil || ssErr != nil {
+     return nil, fmt.Errorf("SC error: %w, SS error: %w", scErr, ssErr)
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// WorkingHash writes the changeset to SC and SS and returns the workingHash
// of the CommitInfo.
func (s *Store) WorkingHash(cs *corestore.Changeset) ([]byte, error) {
if s.telemetry != nil {
now := time.Now()
defer s.telemetry.MeasureSince(now, "root_store", "working_hash")
}
// write the changeset to the SC and SS backends
eg := new(errgroup.Group)
eg.Go(func() error {
if err := s.writeSC(cs); err != nil {
return fmt.Errorf("failed to write SC: %w", err)
}
return nil
})
eg.Go(func() error {
if err := s.stateStorage.ApplyChangeset(s.initialVersion, cs); err != nil {
return fmt.Errorf("failed to commit SS: %w", err)
}
return nil
})
if err := eg.Wait(); err != nil {
return nil, err
}
workingHash := s.lastCommitInfo.Hash()
s.lastCommitInfo.Version -= 1 // reset lastCommitInfo to allow Commit() to work correctly
return workingHash, nil
}
// WorkingHash writes the changeset to SC and SS and returns the workingHash
// of the CommitInfo.
func (s *Store) WorkingHash(cs *corestore.Changeset) ([]byte, error) {
if s.telemetry != nil {
now := time.Now()
defer s.telemetry.MeasureSince(now, "root_store", "working_hash")
}
// write the changeset to the SC and SS backends
eg := new(errgroup.Group)
scErr := s.writeSC(cs)
ssErr := s.stateStorage.ApplyChangeset(s.initialVersion, cs)
if scErr != nil || ssErr != nil {
return nil, fmt.Errorf("SC error: %w, SS error: %w", scErr, ssErr)
}
if err := eg.Wait(); err != nil {
return nil, err
}
workingHash := s.lastCommitInfo.Hash()
s.lastCommitInfo.Version -= 1 // reset lastCommitInfo to allow Commit() to work correctly
return workingHash, nil
}


// Commit commits all state changes to the underlying SS and SC backends. It
// writes a batch of the changeset to the SC tree, and retrieves the CommitInfo
// from the SC tree. Finally, it commits the SC tree and returns the hash of the
Expand Down Expand Up @@ -388,13 +423,8 @@ func (s *Store) writeSC(cs *corestore.Changeset) error {
return fmt.Errorf("failed to write batch to SC store: %w", err)
}

isEmpty, err := s.stateCommitment.IsEmpty()
if err != nil {
return fmt.Errorf("failed to check if SC store is empty: %w", err)
}

var previousHeight, version uint64
if isEmpty {
if s.lastCommitInfo.GetVersion() == 0 && s.initialVersion > 1 {
// This case means that no commit has been made in the store, we
// start from initialVersion.
version = s.initialVersion
Expand Down
56 changes: 55 additions & 1 deletion store/v2/root/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,38 @@ func (s *RootStoreTestSuite) TestGetStateStorage() {
}

func (s *RootStoreTestSuite) TestSetInitialVersion() {
s.Require().NoError(s.rootStore.SetInitialVersion(100))
initialVersion := uint64(5)
s.Require().NoError(s.rootStore.SetInitialVersion(initialVersion))

// perform the initial commit
cs := corestore.NewChangeset()
cs.Add(testStoreKeyBytes, []byte("foo"), []byte("bar"), false)

wHash, err := s.rootStore.WorkingHash(cs)
s.Require().NoError(err)
cHash, err := s.rootStore.Commit(corestore.NewChangeset())
s.Require().NoError(err)
s.Require().Equal(wHash, cHash)

// check the latest version
lVersion, err := s.rootStore.GetLatestVersion()
s.Require().NoError(err)
s.Require().Equal(initialVersion, lVersion)

// set the initial version again
rInitialVersion := uint64(100)
s.Require().NoError(s.rootStore.SetInitialVersion(rInitialVersion))

// perform the commit
cs = corestore.NewChangeset()
cs.Add(testStoreKey2Bytes, []byte("foo"), []byte("bar"), false)
_, err = s.rootStore.Commit(cs)
s.Require().NoError(err)
lVersion, err = s.rootStore.GetLatestVersion()
s.Require().NoError(err)
// SetInitialVersion only works once
s.Require().NotEqual(rInitialVersion, lVersion)
s.Require().Equal(initialVersion+1, lVersion)
}

func (s *RootStoreTestSuite) TestSetCommitHeader() {
Expand Down Expand Up @@ -309,3 +340,26 @@ func (s *RootStoreTestSuite) TestStateAt() {
}
}
}

func (s *RootStoreTestSuite) TestWorkingHash() {
// write keys over multiple versions
for v := uint64(1); v <= 5; v++ {
// perform changes
cs := corestore.NewChangeset()
for _, storeKeyBytes := range [][]byte{testStoreKeyBytes, testStoreKey2Bytes, testStoreKey3Bytes} {
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key_%x_%03d", i, storeKeyBytes) // key000, key001, ..., key099
val := fmt.Sprintf("val%03d_%03d", i, v) // val000_1, val001_1, ..., val099_1

cs.Add(storeKeyBytes, []byte(key), []byte(val), false)
}
}

wHash, err := s.rootStore.WorkingHash(cs)
s.Require().NoError(err)
// execute Commit with empty changeset
cHash, err := s.rootStore.Commit(corestore.NewChangeset())
s.Require().NoError(err)
s.Require().Equal(wHash, cHash)
}
}
Comment on lines +382 to +403
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved TestWorkingHash, but consider additional scenarios.

The updated TestWorkingHash method now covers scenarios with multiple versions, which is a significant improvement. However, it would be beneficial to include tests where changes are made between calls to WorkingHash and Commit to ensure the method behaves as expected in more complex situations.

Consider adding tests that modify the state between WorkingHash and Commit calls to fully ensure the robustness of this functionality.

Would you like me to help draft these additional test scenarios?

13 changes: 9 additions & 4 deletions store/v2/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@ type RootStore interface {
// queries based on block time need to be supported.
SetCommitHeader(h *coreheader.Info)

// WorkingHash returns the current WIP commitment hash by applying the Changeset
// to the SC backend. It is only used to get the hash of the intermediate state
// before committing, the typical use case is for the genesis block.
// NOTE: It also writes the changeset to the SS backend.
WorkingHash(cs *corestore.Changeset) ([]byte, error)
Comment on lines +52 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a detailed method comment for WorkingHash.

The WorkingHash method lacks a detailed comment explaining its parameters, return values, and any side effects. This is crucial for understanding the method's impact, especially since it modifies the state.


// Commit should be responsible for taking the provided changeset and flushing
// it to disk. Note, depending on the implementation, the changeset, at this
// point, may already be written to the SC backends. Commit() should ensure
// the changeset is committed to all SC and SC backends and flushed to disk.
// It must return a hash of the merkle-ized committed state.
// it to disk. Note, it will overwrite the changeset if WorkingHash() was called.
// Commit() should ensure the changeset is committed to all SC and SS backends
// and flushed to disk. It must return a hash of the merkle-ized committed state.
Comment on lines +59 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify the Commit method's interaction with WorkingHash.

The comment on the Commit method should be expanded to clearly explain how it interacts with WorkingHash, especially regarding the state changes and the conditions under which the changeset is overwritten.

Commit(cs *corestore.Changeset) ([]byte, error)

// LastCommitID returns a CommitID pertaining to the last commitment.
Expand Down
Loading