Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(server/v2): Update prepare & process proposal #21237

Merged
merged 11 commits into from
Sep 4, 2024
3 changes: 2 additions & 1 deletion server/v2/appmanager/appmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func (a AppManager[T]) ValidateTx(ctx context.Context, tx T) (server.TxResult, e
if err != nil {
return server.TxResult{}, err
}
return a.stf.ValidateTx(ctx, latestState, a.config.ValidateTxGasLimit, tx), nil
res := a.stf.ValidateTx(ctx, latestState, a.config.ValidateTxGasLimit, tx)
return res, res.Error
}

// Simulate runs validation and execution flow of a Tx.
Expand Down
33 changes: 10 additions & 23 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,17 +325,8 @@ func (c *Consensus[T]) PrepareProposal(
return nil, errors.New("PrepareProposal called with invalid height")
}

decodedTxs := make([]T, len(req.Txs))
for i, tx := range req.Txs {
decTx, err := c.txCodec.Decode(tx)
if err != nil {
// TODO: vote extension meta data as a custom type to avoid possibly accepting invalid txs
// continue even if tx decoding fails
c.logger.Error("failed to decode tx", "err", err)
continue
}

decodedTxs[i] = decTx
if c.prepareProposalHandler == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this fail at app build time?

Copy link
Member

@julienrbrt julienrbrt Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you don't pass anything to the comet server, we use the default options (no op handlers).
so to get this you would have to purposely pass nil. as we don't want to make those mandatory, making it fail at compile time isn't best.

return nil, errors.New("no prepare proposal function was set")
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Expand All @@ -345,7 +336,7 @@ func (c *Consensus[T]) PrepareProposal(
LastCommit: toCoreExtendedCommitInfo(req.LocalLastCommit),
})

txs, err := c.prepareProposalHandler(ciCtx, c.app, decodedTxs, req)
txs, err := c.prepareProposalHandler(ciCtx, c.app, c.txCodec, req)
if err != nil {
return nil, err
}
Expand All @@ -366,16 +357,12 @@ func (c *Consensus[T]) ProcessProposal(
ctx context.Context,
req *abciproto.ProcessProposalRequest,
) (*abciproto.ProcessProposalResponse, error) {
decodedTxs := make([]T, len(req.Txs))
for _, tx := range req.Txs {
decTx, err := c.txCodec.Decode(tx)
if err != nil {
// TODO: vote extension meta data as a custom type to avoid possibly accepting invalid txs
// continue even if tx decoding fails
c.logger.Error("failed to decode tx", "err", err)
continue
}
decodedTxs = append(decodedTxs, decTx)
if req.Height < 1 {
return nil, errors.New("ProcessProposal called with invalid height")
}

if c.processProposalHandler == nil {
return nil, errors.New("no process proposal function was set")
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Expand All @@ -385,7 +372,7 @@ func (c *Consensus[T]) ProcessProposal(
LastCommit: toCoreCommitInfo(req.ProposedLastCommit),
})

err := c.processProposalHandler(ciCtx, c.app, decodedTxs, req)
err := c.processProposalHandler(ciCtx, c.app, c.txCodec, req)
if err != nil {
c.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abciproto.ProcessProposalResponse{
Expand Down
68 changes: 43 additions & 25 deletions server/v2/cometbft/handlers/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (
"fmt"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/cosmos/gogoproto/proto"

consensusv1 "cosmossdk.io/api/cosmos/consensus/v1"
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
"cosmossdk.io/server/v2/cometbft/mempool"
consensustypes "cosmossdk.io/x/consensus/types"
)

type AppManager[T transaction.Tx] interface {
Expand All @@ -33,28 +32,25 @@ func NewDefaultProposalHandler[T transaction.Tx](mp mempool.Mempool[T]) *Default
}

func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) ([]T, error) {
abciReq, ok := req.(*abci.PrepareProposalRequest)
if !ok {
return nil, fmt.Errorf("expected abci.PrepareProposalRequest, invalid request type: %T,", req)
}

return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.PrepareProposalRequest) ([]T, error) {
var maxBlockGas uint64

res, err := app.Query(ctx, 0, &consensusv1.QueryParamsRequest{})
res, err := app.Query(ctx, 0, &consensustypes.QueryParamsRequest{})
if err != nil {
return nil, err
}

paramsResp, ok := res.(*consensusv1.QueryParamsResponse)
paramsResp, ok := res.(*consensustypes.QueryParamsResponse)
if !ok {
return nil, fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensusv1.QueryParamsResponse{}, res)
return nil, fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensustypes.QueryParamsResponse{}, res)
}

if b := paramsResp.GetParams().Block; b != nil {
maxBlockGas = uint64(b.MaxGas)
}

txs := decodeTxs(codec, req.Txs)

defer h.txSelector.Clear()

// If the mempool is nil or NoOp we simply return the transactions
Expand All @@ -64,7 +60,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
_, isNoOp := h.mempool.(mempool.NoOpMempool[T])
if h.mempool == nil || isNoOp {
for _, tx := range txs {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(abciReq.MaxTxBytes), maxBlockGas, tx)
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx)
if stop {
break
}
Expand All @@ -88,7 +84,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
return nil, err
}
} else {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(abciReq.MaxTxBytes), maxBlockGas, memTx)
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, memTx)
if stop {
break
}
Expand All @@ -102,34 +98,40 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
}

func (h *DefaultProposalHandler[T]) ProcessHandler() ProcessHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) error {
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.ProcessProposalRequest) error {
// If the mempool is nil we simply return ACCEPT,
// because PrepareProposal may have included txs that could fail verification.
_, isNoOp := h.mempool.(mempool.NoOpMempool[T])
if h.mempool == nil || isNoOp {
return nil
}

_, ok := req.(*abci.PrepareProposalRequest)
if !ok {
return fmt.Errorf("invalid request type: %T", req)
}

res, err := app.Query(ctx, 0, &consensusv1.QueryParamsRequest{})
res, err := app.Query(ctx, 0, &consensustypes.QueryParamsRequest{})
if err != nil {
return err
}

paramsResp, ok := res.(*consensusv1.QueryParamsResponse)
paramsResp, ok := res.(*consensustypes.QueryParamsResponse)
if !ok {
return fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensusv1.QueryParamsResponse{}, res)
return fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensustypes.QueryParamsResponse{}, res)
}

var maxBlockGas uint64
if b := paramsResp.GetParams().Block; b != nil {
maxBlockGas = uint64(b.MaxGas)
}

// Decode request txs bytes
// If there an tx decoded fail, return err
var txs []T
for _, tx := range req.Txs {
decTx, err := codec.Decode(tx)
if err != nil {
return fmt.Errorf("failed to decode tx: %w", err)
}
txs = append(txs, decTx)
}

var totalTxGas uint64
for _, tx := range txs {
_, err := app.ValidateTx(ctx, tx)
Expand All @@ -153,18 +155,34 @@ func (h *DefaultProposalHandler[T]) ProcessHandler() ProcessHandler[T] {
}
}

// decodeTxs decodes the txs bytes into a decoded txs
// If there a fail decoding tx, remove from the list
// Used for prepare proposal
func decodeTxs[T transaction.Tx](codec transaction.Codec[T], txsBz [][]byte) []T {
var txs []T
for _, tx := range txsBz {
decTx, err := codec.Decode(tx)
if err != nil {
continue
}

txs = append(txs, decTx)
}
return txs
}

// NoOpPrepareProposal defines a no-op PrepareProposal handler. It will always
// return the transactions sent by the client's request.
func NoOpPrepareProposal[T transaction.Tx]() PrepareHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) ([]T, error) {
return txs, nil
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.PrepareProposalRequest) ([]T, error) {
return decodeTxs(codec, req.Txs), nil
}
}

// NoOpProcessProposal defines a no-op ProcessProposal Handler. It will always
// return ACCEPT.
func NoOpProcessProposal[T transaction.Tx]() ProcessHandler[T] {
return func(context.Context, AppManager[T], []T, proto.Message) error {
return func(context.Context, AppManager[T], transaction.Codec[T], *abci.ProcessProposalRequest) error {
return nil
}
}
Expand Down
5 changes: 2 additions & 3 deletions server/v2/cometbft/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/cosmos/gogoproto/proto"

"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
Expand All @@ -13,11 +12,11 @@ import (
type (
// PrepareHandler passes in the list of Txs that are being proposed. The app can then do stateful operations
// over the list of proposed transactions. It can return a modified list of txs to include in the proposal.
PrepareHandler[T transaction.Tx] func(context.Context, AppManager[T], []T, proto.Message) ([]T, error)
PrepareHandler[T transaction.Tx] func(context.Context, AppManager[T], transaction.Codec[T], *abci.PrepareProposalRequest) ([]T, error)

// ProcessHandler is a function that takes a list of transactions and returns a boolean and an error.
// If the verification of a transaction fails, the boolean is false and the error is non-nil.
ProcessHandler[T transaction.Tx] func(context.Context, AppManager[T], []T, proto.Message) error
ProcessHandler[T transaction.Tx] func(context.Context, AppManager[T], transaction.Codec[T], *abci.ProcessProposalRequest) error

// VerifyVoteExtensionhandler is a function type that handles the verification of a vote extension request.
// It takes a context, a store reader map, and a request to verify a vote extension.
Expand Down
Loading