Skip to content

Commit

Permalink
Problem: parallel tx execution is not supported (cosmos#205)
Browse files Browse the repository at this point in the history
add basic support in sdk:
- add a TxExecutor baseapp option
- add TxIndex/TxCount/MsgIndex in context

Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

fix misspell

fix lint

run gci

fix lint

gci seems not compatible with gofumpt
  • Loading branch information
yihuang authored and mmsqe committed Dec 14, 2024
1 parent d62bcbd commit 07e85db
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 49 deletions.
9 changes: 0 additions & 9 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ linters:
- exportloopref
- goconst
- gocritic
- gci
- gofumpt
- gosec
- gosimple
Expand Down Expand Up @@ -63,14 +62,6 @@ issues:
max-same-issues: 10000

linters-settings:
gci:
custom-order: true
sections:
- standard # Standard section: captures all standard packages.
- default # Default section: contains all imports that could not be matched to another section type.
- prefix(cosmossdk.io)
- prefix(github.com/cosmos/cosmos-sdk)

gosec:
# To select a subset of rules to run.
# Available rules: https://github.com/securego/gosec#available-rules
Expand Down
91 changes: 58 additions & 33 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,11 @@ func (app *BaseApp) ApplySnapshotChunk(req *abci.RequestApplySnapshotChunk) (*ab
func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
var mode execMode

switch {
case req.Type == abci.CheckTxType_New:
switch req.Type {
case abci.CheckTxType_New:
mode = execModeCheck

case req.Type == abci.CheckTxType_Recheck:
case abci.CheckTxType_Recheck:
mode = execModeReCheck

default:
Expand Down Expand Up @@ -778,48 +778,34 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
app.finalizeBlockState.SetContext(
app.finalizeBlockState.Context().
WithBlockGasMeter(gasMeter).
WithTxCount(len(req.Txs)),
)

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
//
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
// vote extensions, so skip those.
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
for _, rawTx := range req.Txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
txResults, err := app.executeTxs(ctx, req.Txs)
if err != nil {
// usually due to canceled
return nil, err
}

if app.finalizeBlockState.ms.TracingEnabled() {
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

endBlock, err := app.endBlock(app.finalizeBlockState.Context())
var blockGasUsed uint64
for _, res := range txResults {
blockGasUsed += uint64(res.GasUsed)
}
sdkCtx := app.finalizeBlockState.Context().WithBlockGasUsed(blockGasUsed)

endBlock, err := app.endBlock(sdkCtx)
if err != nil {
return nil, err
}
Expand All @@ -843,6 +829,45 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}, nil
}

func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
if app.txExecutor != nil {
return app.txExecutor(ctx, len(txs), app.finalizeBlockState.ms, func(i int, ms storetypes.MultiStore) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(txs[i], i, ms)
})
}

txResults := make([]*abci.ExecTxResult, 0, len(txs))
for i, rawTx := range txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx, i)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
}
return txResults, nil
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
Expand Down
25 changes: 21 additions & 4 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ type BaseApp struct {
//
// SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler.
disableBlockGasMeter bool

// Optional alternative tx executor, used for block-stm parallel transaction execution.
txExecutor TxExecutor
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down Expand Up @@ -659,13 +662,14 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter {
}

// retrieve the context for the tx w/ txBytes and other memoized values.
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context {
modeState := app.getState(mode)
if modeState == nil {
panic(fmt.Sprintf("state is nil for mode %v", mode))
}
ctx := modeState.Context().
WithTxBytes(txBytes).
WithTxIndex(txIndex).
WithGasMeter(storetypes.NewInfiniteGasMeter())
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed

Expand Down Expand Up @@ -752,7 +756,11 @@ func (app *BaseApp) beginBlock(_ *abci.RequestFinalizeBlock) (sdk.BeginBlock, er
return resp, nil
}

func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
func (app *BaseApp) deliverTx(tx []byte, txIndex int) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(tx, txIndex, nil)
}

func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txIndex int, txMultiStore storetypes.MultiStore) *abci.ExecTxResult {
gInfo := sdk.GasInfo{}
resultStr := "successful"

Expand All @@ -765,7 +773,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx)
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, txIndex, txMultiStore)
if err != nil {
resultStr = "failed"
resp = sdkerrors.ResponseExecTxResultWithEvents(
Expand Down Expand Up @@ -823,12 +831,19 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
// returned if the tx does not run out of gas and if all the messages are valid
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
return app.runTxWithMultiStore(mode, txBytes, -1, nil)
}

func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, txIndex int, txMultiStore storetypes.MultiStore) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
// determined by the GasMeter. We need access to the context to get the gas
// meter, so we initialize upfront.
var gasWanted uint64

ctx := app.getContextForTx(mode, txBytes)
ctx := app.getContextForTx(mode, txBytes, txIndex)
if txMultiStore != nil {
ctx = ctx.WithMultiStore(txMultiStore)
}
ms := ctx.MultiStore()

// only run the tx if there is block gas remaining
Expand Down Expand Up @@ -1012,6 +1027,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me
break
}

ctx = ctx.WithMsgIndex(i)

handler := app.msgServiceRouter.Handler(msg)
if handler == nil {
return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg)
Expand Down
2 changes: 1 addition & 1 deletion baseapp/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil)
// ExecuteGenesisTx implements genesis.GenesisState from
// cosmossdk.io/core/genesis to set initial state in genesis
func (ba BaseApp) ExecuteGenesisTx(tx []byte) error {
res := ba.deliverTx(tx)
res := ba.deliverTx(tx, -1)

if res.Code != types.CodeTypeOK {
return errors.New(res.Log)
Expand Down
10 changes: 10 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func DisableBlockGasMeter() func(*BaseApp) {
return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) }
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
return func(app *BaseApp) { app.txExecutor = executor }
}

func (app *BaseApp) SetName(name string) {
if app.sealed {
panic("SetName() on sealed BaseApp")
Expand Down Expand Up @@ -378,6 +383,11 @@ func (app *BaseApp) SetDisableBlockGasMeter(disableBlockGasMeter bool) {
app.disableBlockGasMeter = disableBlockGasMeter
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
app.txExecutor = executor
}

// SetMsgServiceRouter sets the MsgServiceRouter of a BaseApp.
func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
app.msgServiceRouter = msgServiceRouter
Expand Down
4 changes: 2 additions & 2 deletions baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s
}

func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeFinalize, txBytes)
return app.getContextForTx(execModeFinalize, txBytes, -1)
}

func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeCheck, txBytes)
return app.getContextForTx(execModeCheck, txBytes, -1)
}
16 changes: 16 additions & 0 deletions baseapp/txexecutor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package baseapp

import (
"context"

abci "github.com/cometbft/cometbft/abci/types"

"cosmossdk.io/store/types"
)

type TxExecutor func(
ctx context.Context,
blockSize int,
cms types.MultiStore,
deliverTxWithMultiStore func(int, types.MultiStore) *abci.ExecTxResult,
) ([]*abci.ExecTxResult, error)
35 changes: 35 additions & 0 deletions types/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ type Context struct {
streamingManager storetypes.StreamingManager
cometInfo comet.BlockInfo
headerInfo header.Info

// the index of the current tx in the block, -1 means not in finalize block context
txIndex int
// the index of the current msg in the tx, -1 means not in finalize block context
msgIndex int
// the total number of transactions in current block
txCount int
// sum the gas used by all the transactions in the current block, only accessible by end blocker
blockGasUsed uint64
}

// Proposed rename, not done to avoid API breakage
Expand Down Expand Up @@ -93,6 +102,10 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans
func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager }
func (c Context) CometInfo() comet.BlockInfo { return c.cometInfo }
func (c Context) HeaderInfo() header.Info { return c.headerInfo }
func (c Context) TxIndex() int { return c.txIndex }
func (c Context) MsgIndex() int { return c.msgIndex }
func (c Context) TxCount() int { return c.txCount }
func (c Context) BlockGasUsed() uint64 { return c.blockGasUsed }

// clone the header before returning
func (c Context) BlockHeader() cmtproto.Header {
Expand Down Expand Up @@ -140,6 +153,8 @@ func NewContext(ms storetypes.MultiStore, header cmtproto.Header, isCheckTx bool
eventManager: NewEventManager(),
kvGasConfig: storetypes.KVGasConfig(),
transientKVGasConfig: storetypes.TransientGasConfig(),
txIndex: -1,
msgIndex: -1,
}
}

Expand Down Expand Up @@ -319,6 +334,26 @@ func (c Context) WithHeaderInfo(headerInfo header.Info) Context {
return c
}

func (c Context) WithTxIndex(txIndex int) Context {
c.txIndex = txIndex
return c
}

func (c Context) WithTxCount(txCount int) Context {
c.txCount = txCount
return c
}

func (c Context) WithMsgIndex(msgIndex int) Context {
c.msgIndex = msgIndex
return c
}

func (c Context) WithBlockGasUsed(gasUsed uint64) Context {
c.blockGasUsed = gasUsed
return c
}

// TODO: remove???
func (c Context) IsZero() bool {
return c.ms == nil
Expand Down

0 comments on commit 07e85db

Please sign in to comment.