Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server/v2/cometbft,stf): Listener integration in server/v2 (backport #21917) #22051

Merged
merged 2 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"cosmossdk.io/core/transaction"
errorsmod "cosmossdk.io/errors/v2"
"cosmossdk.io/log"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/cometbft/client/grpc/cmtservice"
"cosmossdk.io/server/v2/cometbft/handlers"
Expand All @@ -40,6 +41,7 @@ type Consensus[T transaction.Tx] struct {
txCodec transaction.Codec[T]
store types.Store
streaming streaming.Manager
listener *appdata.Listener
snapshotManager *snapshots.Manager
mempool mempool.Mempool[T]

Expand Down Expand Up @@ -104,6 +106,11 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// SetListener sets the listener for the consensus module.
func (c *Consensus[T]) SetListener(l *appdata.Listener) {
c.listener = l
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
Expand Down
12 changes: 12 additions & 0 deletions server/v2/cometbft/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,18 @@ require (
cosmossdk.io/core v1.0.0-alpha.3
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5
cosmossdk.io/log v1.4.1
<<<<<<< HEAD
cosmossdk.io/server/v2 v2.0.0-20240927165321-7fe95fc3f945 // main
cosmossdk.io/server/v2/appmanager v0.0.0-20240920095614-aa90bb43d8f8 // main
cosmossdk.io/server/v2/stf v0.0.0-20240926131628-f927e9b55173 // main
cosmossdk.io/store/v2 v2.0.0-20240916221850-7856d226038c // main
=======
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9
cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000
cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d
cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1
cosmossdk.io/store/v2 v2.0.0-00010101000000-000000000000
>>>>>>> bf95c814f (feat(server/v2/cometbft,stf): Listener integration in server/v2 (#21917))
cosmossdk.io/x/consensus v0.0.0-00010101000000-000000000000
github.com/cometbft/cometbft v1.0.0-rc1.0.20240908111210-ab0be101882f
github.com/cometbft/cometbft/api v1.0.0-rc.1
Expand All @@ -43,8 +51,12 @@ require (
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // indirect
cosmossdk.io/depinject v1.0.0 // indirect
cosmossdk.io/math v1.3.0 // indirect
<<<<<<< HEAD
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 // indirect
cosmossdk.io/store v1.1.1-0.20240909133312-50288938d1b6 // indirect
=======
cosmossdk.io/store v1.1.1-0.20240418092142-896cdf1971bc // indirect
>>>>>>> bf95c814f (feat(server/v2/cometbft,stf): Listener integration in server/v2 (#21917))
cosmossdk.io/x/bank v0.0.0-20240226161501-23359a0b6d91 // indirect
cosmossdk.io/x/staking v0.0.0-00010101000000-000000000000 // indirect
cosmossdk.io/x/tx v0.13.4-0.20240918094839-0c8ad9d2c64b // indirect; main
Expand Down
50 changes: 50 additions & 0 deletions server/v2/cometbft/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
errorsmod "cosmossdk.io/errors/v2"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/server/v2/streaming"
)

Expand Down Expand Up @@ -57,6 +58,55 @@ func (c *Consensus[T]) streamDeliverBlockChanges(
c.logger.Error("ListenStateChanges listening hook failed", "height", height, "err", err)
}
}

if c.listener == nil {
return nil
}
// stream the StartBlockData to the listener.
if c.listener.StartBlock != nil {
if err := c.listener.StartBlock(appdata.StartBlockData{
Height: uint64(height),
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
}
// stream the TxData to the listener.
if c.listener.OnTx != nil {
for i, tx := range txs {
if err := c.listener.OnTx(appdata.TxData{
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
}
}
// stream the EventData to the listener.
if c.listener.OnEvent != nil {
if err := c.listener.OnEvent(appdata.EventData{Events: events}); err != nil {
return err
}
}
// stream the KVPairData to the listener.
if c.listener.OnKVPair != nil {
if err := c.listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil {
return err
}
}
// stream the CommitData to the listener.
if c.listener.Commit != nil {
if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil {
return err
} else if completionCallback != nil {
if err := completionCallback(); err != nil {
return err
}
}
}

return nil
}

Expand Down
Loading
Loading