diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 176dc0037daf..2e10adf79ffc 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -56,9 +56,9 @@ type consensus[T transaction.Tx] struct { appCodec codec.Codec txCodec transaction.Codec[T] store types.Store - streaming streaming.Manager listener *appdata.Listener snapshotManager *snapshots.Manager + streamingManager streaming.Manager mempool mempool.Mempool[T] cfg Config diff --git a/server/v2/cometbft/options.go b/server/v2/cometbft/options.go index df1f4b6d7ed3..b5936148b5a5 100644 --- a/server/v2/cometbft/options.go +++ b/server/v2/cometbft/options.go @@ -47,6 +47,7 @@ func DefaultServerOptions[T transaction.Tx]() ServerOptions[T] { VerifyVoteExtensionHandler: handlers.NoOpVerifyVoteExtensionHandler(), ExtendVoteHandler: handlers.NoOpExtendVote(), Mempool: func(cfg map[string]any) mempool.Mempool[T] { return mempool.NoOpMempool[T]{} }, + StreamingManager: streaming.Manager{}, SnapshotOptions: func(cfg map[string]any) snapshots.SnapshotOptions { return snapshots.NewSnapshotOptions(0, 0) }, SnapshotExtensions: []snapshots.ExtensionSnapshotter{}, AddrPeerFilter: nil, diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index aa7d9bb32452..47c93d97c3b3 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -35,7 +35,6 @@ import ( cometlog "cosmossdk.io/server/v2/cometbft/log" "cosmossdk.io/server/v2/cometbft/mempool" "cosmossdk.io/server/v2/cometbft/types" - "cosmossdk.io/server/v2/streaming" "cosmossdk.io/store/v2/snapshots" "github.com/cosmos/cosmos-sdk/client" @@ -174,9 +173,9 @@ func New[T transaction.Tx]( logger: logger, txCodec: txCodec, appCodec: appCodec, - streaming: streaming.Manager{}, listener: listener, snapshotManager: snapshotManager, + streamingManager: srv.serverOptions.StreamingManager, mempool: srv.serverOptions.Mempool(cfg), lastCommittedHeight: atomic.Int64{}, prepareProposalHandler: srv.serverOptions.PrepareProposalHandler, diff --git a/server/v2/cometbft/streaming.go b/server/v2/cometbft/streaming.go index fc13f3e3f603..c9ca7fddcc7e 100644 --- a/server/v2/cometbft/streaming.go +++ b/server/v2/cometbft/streaming.go @@ -40,7 +40,7 @@ func (c *consensus[T]) streamDeliverBlockChanges( } } - for _, streamingListener := range c.streaming.Listeners { + for _, streamingListener := range c.streamingManager.Listeners { events, err := streaming.IntoStreamingEvents(events) if err != nil { return err