Skip to content

Commit

Permalink
fix: state listener observe writes at wrong time
Browse files Browse the repository at this point in the history
Closes: cosmos#13457

Currently state listener is notified when the cache store write, which happens in commit event only, which breaks the current design.
The solution (as discussed in the issue) is to listen state writes on rootmulti store only.

It also changes the file streamer to output single data file for the writes in the whole block, since we can't distinguish writes from different stage of abci events.

It adds new config items for file streamer:
- streamers.file.output-metadata
- streamers.file.stop-node-on-error
- streamers.file.fsync
  • Loading branch information
yihuang committed Nov 27, 2022
1 parent 45cc1b0 commit ac86866
Show file tree
Hide file tree
Showing 36 changed files with 2,916 additions and 665 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ extension interfaces. `module.Manager.Modules` is now of type `map[string]interf
`cosmossdk.io/core/appmodule.AppModule` API.
* (x/group) [#13876](https://github.com/cosmos/cosmos-sdk/pull/13876) Add `GetMinExecutionPeriod` method on DecisionPolicy interface.
* (x/auth)[#13780](https://github.com/cosmos/cosmos-sdk/pull/13780) Querying with `id` (type of int64) in `AccountAddressByID` grpc query now throws error, use account-id(type of uint64) instead.
* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) Add method `ListenCommit` to `ABCIListener`, move `ListeningEnabled` `AddListener` methods to `CommitMultiStore`, remove `CacheWrapWithListeners` from `CacheWrap` and `CacheWrapper` interfaces, remove listening apis from caching layer, should only listen to the `rootmulti.Store`, add three new options to file streaming service constructor.

### CLI Breaking Changes

Expand Down Expand Up @@ -206,6 +207,7 @@ extension interfaces. `module.Manager.Modules` is now of type `map[string]interf
* (server) [#13778](https://github.com/cosmos/cosmos-sdk/pull/13778) Set Cosmos SDK default endpoints to localhost to avoid unknown exposure of endpoints.
* (x/auth) [#13877](https://github.com/cosmos/cosmos-sdk/pull/13877) Handle missing account numbers during `InitGenesis`.
* (x/gov) [#13918](https://github.com/cosmos/cosmos-sdk/pull/13918) Fix propagation of message errors when executing a proposal.
* (store) [#13516](https://github.com/cosmos/cosmos-sdk/pull/13516) fix state listener observe writes at wrong time.

### Deprecated

Expand Down
1,670 changes: 1,639 additions & 31 deletions api/cosmos/base/store/v1beta1/listening.pulsar.go

Large diffs are not rendered by default.

20 changes: 15 additions & 5 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliv
// defined in config, Commit will execute a deferred function call to check
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() (res abci.ResponseCommit) {
func (app *BaseApp) Commit() abci.ResponseCommit {
header := app.deliverState.ctx.BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)

Expand All @@ -373,6 +373,19 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {
// MultiStore (app.cms) so when Commit() is called is persists those values.
app.deliverState.ms.Write()
commitID := app.cms.Commit()

res := abci.ResponseCommit{
Data: commitID.Hash,
RetainHeight: retainHeight,
}

// call the hooks with the Commit message
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenCommit(app.deliverState.ctx, res); err != nil {
app.logger.Error("Commit listening hook failed", "height", header.Height, "err", err)
}
}

app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID))

// Reset the Check state to the latest committed.
Expand Down Expand Up @@ -406,10 +419,7 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {

go app.snapshotManager.SnapshotIfApplicable(header.Height)

return abci.ResponseCommit{
Data: commitID.Hash,
RetainHeight: retainHeight,
}
return res
}

// halt attempts to gracefully shutdown the node via SIGINT and SIGTERM falling
Expand Down
5 changes: 4 additions & 1 deletion baseapp/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ import (
"github.com/cosmos/cosmos-sdk/types"
)

// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
// ABCIListener interface used to hook into the ABCI message processing of the BaseApp.
// the error result won't stop consensus state machine, if you want to stop if, need to call panic explicitly.
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
// ListenCommit updates the steaming service with the latest Commit event
ListenCommit(ctx types.Context, res abci.ResponseCommit) error
}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
Expand Down
Loading

0 comments on commit ac86866

Please sign in to comment.