Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ope poc #203

Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 59 additions & 34 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,11 @@ func (app *BaseApp) ApplySnapshotChunk(req *abci.RequestApplySnapshotChunk) (*ab
func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
var mode execMode

switch {
case req.Type == abci.CheckTxType_New:
switch req.Type {
case abci.CheckTxType_New:
mode = execModeCheck

case req.Type == abci.CheckTxType_Recheck:
case abci.CheckTxType_Recheck:
mode = execModeReCheck

default:
Expand Down Expand Up @@ -775,48 +775,34 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
app.finalizeBlockState.SetContext(
app.finalizeBlockState.Context().
WithBlockGasMeter(gasMeter).
WithTxCount(len(req.Txs)),
)

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
//
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
// vote extensions, so skip those.
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
for _, rawTx := range req.Txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
txResults, err := app.executeTxs(ctx, req.Txs)
if err != nil {
// usually due to canceled
return nil, err
}

if app.finalizeBlockState.ms.TracingEnabled() {
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

endBlock, err := app.endBlock(app.finalizeBlockState.Context())
var blockGasUsed uint64
for _, res := range txResults {
blockGasUsed += uint64(res.GasUsed)
}
sdkCtx := app.finalizeBlockState.Context().WithBlockGasUsed(blockGasUsed)

endBlock, err := app.endBlock(sdkCtx)
if err != nil {
return nil, err
}
Expand All @@ -840,6 +826,45 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}, nil
}

func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
if app.txExecutor != nil {
return app.txExecutor(ctx, txs, app.finalizeBlockState.ms, func(i int, memTx sdk.Tx, ms storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(txs[i], memTx, i, ms, incarnationCache)
})
}

txResults := make([]*abci.ExecTxResult, 0, len(txs))
for i, rawTx := range txs {
var response *abci.ExecTxResult

if memTx, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx, memTx, i)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
}
return txResults, nil
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
Expand Down Expand Up @@ -1188,7 +1213,7 @@ func (app *BaseApp) CreateQueryContext(height int64, prove bool) (sdk.Context, e
// use custom query multi-store if provided
qms := app.qms
if qms == nil {
qms = app.cms.(storetypes.MultiStore)
qms = storetypes.RootMultiStore(app.cms)
}

lastBlockHeight := qms.LatestVersion()
Expand Down
59 changes: 50 additions & 9 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type BaseApp struct {
name string // application name from abci.BlockInfo
db dbm.DB // common DB backend
cms storetypes.CommitMultiStore // Main (uncached) state
qms storetypes.MultiStore // Optional alternative multistore for querying only.
qms storetypes.RootMultiStore // Optional alternative multistore for querying only.
storeLoader StoreLoader // function to handle store loading, may be overridden with SetStoreLoader()
grpcQueryRouter *GRPCQueryRouter // router for redirecting gRPC query calls
msgServiceRouter *MsgServiceRouter // router for redirecting Msg service messages
Expand Down Expand Up @@ -194,6 +194,9 @@ type BaseApp struct {
//
// SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler.
disableBlockGasMeter bool

// Optional alternative tx executor, used for block-stm parallel transaction execution.
txExecutor TxExecutor
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down Expand Up @@ -302,6 +305,9 @@ func (app *BaseApp) MountStores(keys ...storetypes.StoreKey) {
case *storetypes.MemoryStoreKey:
app.MountStore(key, storetypes.StoreTypeMemory)

case *storetypes.ObjectStoreKey:
app.MountStore(key, storetypes.StoreTypeObject)

default:
panic(fmt.Sprintf("Unrecognized store key type :%T", key))
}
Expand Down Expand Up @@ -341,6 +347,17 @@ func (app *BaseApp) MountMemoryStores(keys map[string]*storetypes.MemoryStoreKey
}
}

// MountObjectStores mounts all transient object stores with the BaseApp's internal
// commit multi-store.
func (app *BaseApp) MountObjectStores(keys map[string]*storetypes.ObjectStoreKey) {
skeys := maps.Keys(keys)
sort.Strings(skeys)
for _, key := range skeys {
memKey := keys[key]
app.MountStore(memKey, storetypes.StoreTypeObject)
}
}

// MountStore mounts a store to the provided key in the BaseApp multistore,
// using the default DB.
func (app *BaseApp) MountStore(key storetypes.StoreKey, typ storetypes.StoreType) {
Expand Down Expand Up @@ -659,14 +676,15 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter {
}

// retrieve the context for the tx w/ txBytes and other memoized values.
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context {
modeState := app.getState(mode)
if modeState == nil {
panic(fmt.Sprintf("state is nil for mode %v", mode))
}
ctx := modeState.Context().
WithTxBytes(txBytes).
WithGasMeter(storetypes.NewInfiniteGasMeter())
WithGasMeter(storetypes.NewInfiniteGasMeter()).
WithTxIndex(txIndex)
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed

ctx = ctx.WithIsSigverifyTx(app.sigverifyTx)
Expand Down Expand Up @@ -750,7 +768,11 @@ func (app *BaseApp) beginBlock(_ *abci.RequestFinalizeBlock) (sdk.BeginBlock, er
return resp, nil
}

func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
func (app *BaseApp) deliverTx(tx []byte, memTx sdk.Tx, txIndex int) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(tx, memTx, txIndex, nil, nil)
}

func (app *BaseApp) deliverTxWithMultiStore(tx []byte, memTx sdk.Tx, txIndex int, txMultiStore storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
gInfo := sdk.GasInfo{}
resultStr := "successful"

Expand All @@ -763,7 +785,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx)
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, memTx, txIndex, txMultiStore, incarnationCache)
if err != nil {
resultStr = "failed"
resp = sdkerrors.ResponseExecTxResultWithEvents(
Expand Down Expand Up @@ -821,12 +843,27 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
// returned if the tx does not run out of gas and if all the messages are valid
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
return app.runTxWithMultiStore(mode, txBytes, nil, -1, nil, nil)
}

func (app *BaseApp) runTxWithMultiStore(
mode execMode,
txBytes []byte,
tx sdk.Tx,
txIndex int,
txMultiStore storetypes.MultiStore,
incarnationCache map[string]any,
) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
// determined by the GasMeter. We need access to the context to get the gas
// meter, so we initialize upfront.
var gasWanted uint64

ctx := app.getContextForTx(mode, txBytes)
ctx := app.getContextForTx(mode, txBytes, txIndex)
ctx = ctx.WithIncarnationCache(incarnationCache)
if txMultiStore != nil {
ctx = ctx.WithMultiStore(txMultiStore)
}
ms := ctx.MultiStore()

// only run the tx if there is block gas remaining
Expand Down Expand Up @@ -868,9 +905,11 @@ func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, res
defer consumeBlockGas()
}

tx, err := app.txDecoder(txBytes)
if err != nil {
return sdk.GasInfo{}, nil, nil, err
if tx == nil {
tx, err = app.txDecoder(txBytes)
if err != nil {
return sdk.GasInfo{}, nil, nil, err
}
}

msgs := tx.GetMsgs()
Expand Down Expand Up @@ -1010,6 +1049,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me
break
}

ctx = ctx.WithMsgIndex(i)

handler := app.msgServiceRouter.Handler(msg)
if handler == nil {
return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg)
Expand Down
2 changes: 1 addition & 1 deletion baseapp/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil)
// ExecuteGenesisTx implements genesis.GenesisState from
// cosmossdk.io/core/genesis to set initial state in genesis
func (ba BaseApp) ExecuteGenesisTx(tx []byte) error {
res := ba.deliverTx(tx)
res := ba.deliverTx(tx, nil, -1)

if res.Code != types.CodeTypeOK {
return errors.New(res.Log)
Expand Down
12 changes: 11 additions & 1 deletion baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func DisableBlockGasMeter() func(*BaseApp) {
return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) }
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
return func(app *BaseApp) { app.txExecutor = executor }
}

func (app *BaseApp) SetName(name string) {
if app.sealed {
panic("SetName() on sealed BaseApp")
Expand Down Expand Up @@ -314,7 +319,7 @@ func (app *BaseApp) SetTxEncoder(txEncoder sdk.TxEncoder) {
// SetQueryMultiStore set a alternative MultiStore implementation to support grpc query service.
//
// Ref: https://github.com/cosmos/cosmos-sdk/issues/13317
func (app *BaseApp) SetQueryMultiStore(ms storetypes.MultiStore) {
func (app *BaseApp) SetQueryMultiStore(ms storetypes.RootMultiStore) {
app.qms = ms
}

Expand Down Expand Up @@ -387,3 +392,8 @@ func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) {
app.grpcQueryRouter = grpcQueryRouter
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
app.txExecutor = executor
}
4 changes: 2 additions & 2 deletions baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s
}

func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeFinalize, txBytes)
return app.getContextForTx(execModeFinalize, txBytes, -1)
}

func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeCheck, txBytes)
return app.getContextForTx(execModeCheck, txBytes, -1)
}
17 changes: 17 additions & 0 deletions baseapp/txexecutor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package baseapp

import (
"context"

abci "github.com/cometbft/cometbft/abci/types"
sdk "github.com/cosmos/cosmos-sdk/types"

"cosmossdk.io/store/types"
)

type TxExecutor func(
ctx context.Context,
block [][]byte,
cms types.MultiStore,
deliverTxWithMultiStore func(int, sdk.Tx, types.MultiStore, map[string]any) *abci.ExecTxResult,
) ([]*abci.ExecTxResult, error)
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ require (

// Below are the long-lived replace of the Cosmos SDK
replace (
cosmossdk.io/store => github.com/b-harvest/cosmos-sdk/store v0.0.0-20241015155842-364149db166b
// use cosmos fork of keyring
github.com/99designs/keyring => github.com/cosmos/keyring v1.2.0
// dgrijalva/jwt-go is deprecated and doesn't receive security updates.
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ cosmossdk.io/log v1.3.1 h1:UZx8nWIkfbbNEWusZqzAx3ZGvu54TZacWib3EzUYmGI=
cosmossdk.io/log v1.3.1/go.mod h1:2/dIomt8mKdk6vl3OWJcPk2be3pGOS8OQaLUM/3/tCM=
cosmossdk.io/math v1.3.0 h1:RC+jryuKeytIiictDslBP9i1fhkVm6ZDmZEoNP316zE=
cosmossdk.io/math v1.3.0/go.mod h1:vnRTxewy+M7BtXBNFybkuhSH4WfedVAAnERHgVFhp3k=
cosmossdk.io/store v1.1.0 h1:LnKwgYMc9BInn9PhpTFEQVbL9UK475G2H911CGGnWHk=
cosmossdk.io/store v1.1.0/go.mod h1:oZfW/4Fc/zYqu3JmQcQdUJ3fqu5vnYTn3LZFFy8P8ng=
cosmossdk.io/x/tx v0.13.3 h1:Ha4mNaHmxBc6RMun9aKuqul8yHiL78EKJQ8g23Zf73g=
cosmossdk.io/x/tx v0.13.3/go.mod h1:I8xaHv0rhUdIvIdptKIqzYy27+n2+zBVaxO6fscFhys=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
Expand Down Expand Up @@ -61,6 +59,8 @@ github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6l
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/b-harvest/cosmos-sdk/store v0.0.0-20241015155842-364149db166b h1:V8FGjyQqNZ4qTWGEFJQsInCWORqevhSs9oAENzuLEEE=
github.com/b-harvest/cosmos-sdk/store v0.0.0-20241015155842-364149db166b/go.mod h1:oZfW/4Fc/zYqu3JmQcQdUJ3fqu5vnYTn3LZFFy8P8ng=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
7 changes: 7 additions & 0 deletions runtime/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func init() {
ProvideKVStoreKey,
ProvideTransientStoreKey,
ProvideMemoryStoreKey,
ProvideObjectStoreKey,
ProvideGenesisTxHandler,
ProvideKVStoreService,
ProvideMemoryStoreService,
Expand Down Expand Up @@ -225,6 +226,12 @@ func ProvideMemoryStoreKey(key depinject.ModuleKey, app *AppBuilder) *storetypes
return storeKey
}

func ProvideObjectStoreKey(key depinject.ModuleKey, app *AppBuilder) *storetypes.ObjectStoreKey {
storeKey := storetypes.NewObjectStoreKey(fmt.Sprintf("object:%s", key.Name()))
registerStoreKey(app, storeKey)
return storeKey
}

func ProvideGenesisTxHandler(appBuilder *AppBuilder) genesis.TxHandler {
return appBuilder.app
}
Expand Down
Loading
Loading