Skip to content

Commit

Permalink
Problem: transient store usage not compatible with parallel tx execut…
Browse files Browse the repository at this point in the history
…ion (#450)

* Problem: transient store usage not compatible with parallel tx execution

Currently we use shared transient store keys to accumulate some states,
which cause issues when developing parallel tx execution

Solution:
- remove some transient stores.
- the others are used in a per-tx fasion.

Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

cleanup

fix test

* fix

* cleanup
  • Loading branch information
yihuang authored Apr 3, 2024
1 parent 078d3b0 commit 4e07a42
Show file tree
Hide file tree
Showing 20 changed files with 175 additions and 139 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (statedb) [#446](https://github.com/crypto-org-chain/ethermint/pull/446) Re-use the cache store implementation with sdk.
* (evm) [#447](https://github.com/crypto-org-chain/ethermint/pull/447) Deduct fee through virtual bank transfer.
* (evm) [#448](https://github.com/crypto-org-chain/ethermint/pull/448) Refactor the evm transfer to be more efficient.
* (evm) [#450](https://github.com/crypto-org-chain/ethermint/pull/450) Refactor transient stores to be compatible with parallel tx execution.

### State Machine Breaking

Expand Down
2 changes: 1 addition & 1 deletion app/ante/handler_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func newEthAnteHandler(options HandlerOptions) sdk.AnteHandler {
}

// We need to setup an empty gas config so that the gas is consistent with Ethereum.
ctx, err = SetupEthContext(ctx, options.EvmKeeper)
ctx, err = SetupEthContext(ctx)
if err != nil {
return ctx, err
}
Expand Down
2 changes: 0 additions & 2 deletions app/ante/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type EVMKeeper interface {
ChainID() *big.Int

DeductTxCostsFromUserBalance(ctx sdk.Context, fees sdk.Coins, from common.Address) error
ResetTransientGasUsed(ctx sdk.Context)
GetTxIndexTransient(ctx sdk.Context) uint64
}

type protoTxProvider interface {
Expand Down
6 changes: 1 addition & 5 deletions app/ante/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,12 @@ import (

// SetupEthContext is adapted from SetUpContextDecorator from cosmos-sdk, it ignores gas consumption
// by setting the gas meter to infinite
func SetupEthContext(ctx sdk.Context, evmKeeper EVMKeeper) (newCtx sdk.Context, err error) {
func SetupEthContext(ctx sdk.Context) (newCtx sdk.Context, err error) {
// We need to setup an empty gas config so that the gas is consistent with Ethereum.
newCtx = ctx.WithGasMeter(storetypes.NewInfiniteGasMeter()).
WithKVGasConfig(storetypes.GasConfig{}).
WithTransientKVGasConfig(storetypes.GasConfig{})

// Reset transient gas used to prepare the execution of current cosmos tx.
// Transient gas-used is necessary to sum the gas-used of cosmos tx, when it contains multiple eth msgs.
evmKeeper.ResetTransientGasUsed(ctx)

return newCtx, nil
}

Expand Down
2 changes: 1 addition & 1 deletion app/ante/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (suite *AnteTestSuite) TestEthSetupContextDecorator() {

for _, tc := range testCases {
suite.Run(tc.name, func() {
ctx, err := ante.SetupEthContext(suite.ctx, suite.app.EvmKeeper)
ctx, err := ante.SetupEthContext(suite.ctx)

if tc.expPass {
suite.Require().NoError(err)
Expand Down
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ func NewEthermintApp(
bApp.SetVersion(version.Version)
bApp.SetInterfaceRegistry(interfaceRegistry)
bApp.SetTxEncoder(txConfig.TxEncoder())
bApp.SetTxExecutor(DefaultTxExecutor)

keys := storetypes.NewKVStoreKeys(
// SDK keys
Expand Down
21 changes: 21 additions & 0 deletions app/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package app

import (
"context"

storetypes "cosmossdk.io/store/types"
abci "github.com/cometbft/cometbft/abci/types"
evmtypes "github.com/evmos/ethermint/x/evm/types"
)

func DefaultTxExecutor(_ context.Context,
blockSize int,
ms storetypes.MultiStore,
deliverTxWithMultiStore func(int, storetypes.MultiStore) *abci.ExecTxResult,
) ([]*abci.ExecTxResult, error) {
results := make([]*abci.ExecTxResult, blockSize)
for i := 0; i < blockSize; i++ {
results[i] = deliverTxWithMultiStore(i, ms)
}
return evmtypes.PatchTxResponses(results), nil
}
18 changes: 3 additions & 15 deletions docs/architecture/adr-001-state.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,10 @@ func (k *Keeper) BeginBlock(ctx sdk.Context, req abci.RequestBeginBlock) {
func (k Keeper) EndBlock(ctx sdk.Context, req abci.RequestEndBlock) []abci.ValidatorUpdate {
// NOTE: UpdateAccounts, Commit and Reset execution steps have been removed in favor of directly
// updating the state.

// Gas costs are handled within msg handler so costs should be ignored
infCtx := ctx.WithGasMeter(sdk.NewInfiniteGasMeter())
k.WithContext(ctx)

// get the block bloom bytes from the transient store and set it to the persistent storage
bloomBig, found := k.GetBlockBloomTransient()
if !found {
bloomBig = big.NewInt(0)
}

bloom := ethtypes.BytesToBloom(bloomBig.Bytes())
k.SetBlockBloom(infCtx, req.Height, bloom)
k.WithContext(ctx)

return []abci.ValidatorUpdate{}
infCtx := ctx.WithGasMeter(types.NewInfiniteGasMeter())
k.CollectTxBloom(infCtx)
return nil
}
```

Expand Down
8 changes: 4 additions & 4 deletions tests/importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ func (suite *ImporterTestSuite) TestImportBlocks() {
applyDAOHardFork(vmdb)
}

for _, tx := range block.Transactions() {
for i, tx := range block.Transactions() {

receipt, gas, err := applyTransaction(
ctx, chainConfig, chainContext, nil, gp, suite.app.EvmKeeper, vmdb, header, tx, usedGas, vmConfig,
ctx, chainConfig, chainContext, nil, gp, suite.app.EvmKeeper, vmdb, header, tx, usedGas, vmConfig, uint(i),
)
suite.Require().NoError(err, "failed to apply tx at block %d; tx: %X; gas %d; receipt:%v", block.NumberU64(), tx.Hash(), gas, receipt)
suite.Require().NotNil(receipt)
Expand Down Expand Up @@ -230,7 +230,7 @@ func applyDAOHardFork(vmdb ethvm.StateDB) {
func applyTransaction(
ctx sdk.Context, config *ethparams.ChainConfig, bc ethcore.ChainContext, author *common.Address,
gp *ethcore.GasPool, evmKeeper *evmkeeper.Keeper, vmdb *statedb.StateDB, header *ethtypes.Header,
tx *ethtypes.Transaction, usedGas *uint64, cfg ethvm.Config,
tx *ethtypes.Transaction, usedGas *uint64, cfg ethvm.Config, index uint,
) (*ethtypes.Receipt, uint64, error) {
msg, err := ethcore.TransactionToMessage(tx, ethtypes.MakeSigner(config, header.Number), sdkmath.ZeroInt().BigInt())
if err != nil {
Expand Down Expand Up @@ -271,7 +271,7 @@ func applyTransaction(
receipt.Bloom = ethtypes.CreateBloom(ethtypes.Receipts{receipt})
receipt.BlockHash = header.Hash()
receipt.BlockNumber = header.Number
receipt.TransactionIndex = uint(evmKeeper.GetTxIndexTransient(ctx))
receipt.TransactionIndex = index

return receipt, execResult.UsedGas, err
}
7 changes: 1 addition & 6 deletions x/evm/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"cosmossdk.io/store/types"

sdk "github.com/cosmos/cosmos-sdk/types"

ethtypes "github.com/ethereum/go-ethereum/core/types"
)

// BeginBlock sets the sdk Context and EIP155 chain id to the Keeper.
Expand All @@ -35,9 +33,6 @@ func (k *Keeper) BeginBlock(ctx sdk.Context) error {
func (k *Keeper) EndBlock(ctx sdk.Context) error {
// Gas costs are handled within msg handler so costs should be ignored
infCtx := ctx.WithGasMeter(types.NewInfiniteGasMeter())

bloom := ethtypes.BytesToBloom(k.GetBlockBloomTransient(infCtx).Bytes())
k.EmitBlockBloomEvent(infCtx, bloom)

k.CollectTxBloom(infCtx)
return nil
}
27 changes: 27 additions & 0 deletions x/evm/keeper/bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package keeper

import (
"math/big"

"cosmossdk.io/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/evmos/ethermint/x/evm/types"
)

func (k Keeper) SetTxBloom(ctx sdk.Context, bloom []byte) {
store := ctx.KVStore(k.transientKey)
store.Set(types.TransientBloomKey(ctx.TxIndex(), ctx.MsgIndex()), bloom)
}

func (k Keeper) CollectTxBloom(ctx sdk.Context) {
store := prefix.NewStore(ctx.KVStore(k.transientKey), types.KeyPrefixTransientBloom)
it := store.Iterator(nil, nil)
defer it.Close()

bloom := new(big.Int)
for ; it.Valid(); it.Next() {
bloom.Or(bloom, big.NewInt(0).SetBytes(it.Value()))
}

k.EmitBlockBloomEvent(ctx, bloom.Bytes())
}
3 changes: 1 addition & 2 deletions x/evm/keeper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ func (k *Keeper) TxConfig(ctx sdk.Context, txHash common.Hash) statedb.TxConfig
return statedb.NewTxConfig(
common.BytesToHash(ctx.HeaderHash()), // BlockHash
txHash, // TxHash
uint(k.GetTxIndexTransient(ctx)), // TxIndex
uint(k.GetLogSizeTransient(ctx)), // LogIndex
0, 0,
)
}

Expand Down
78 changes: 4 additions & 74 deletions x/evm/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
corestoretypes "cosmossdk.io/core/store"
errorsmod "cosmossdk.io/errors"
"cosmossdk.io/log"
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -146,11 +145,11 @@ func (k Keeper) ChainID() *big.Int {
// ----------------------------------------------------------------------------

// EmitBlockBloomEvent emit block bloom events
func (k Keeper) EmitBlockBloomEvent(ctx sdk.Context, bloom ethtypes.Bloom) {
func (k Keeper) EmitBlockBloomEvent(ctx sdk.Context, bloom []byte) {
ctx.EventManager().EmitEvent(
sdk.NewEvent(
types.EventTypeBlockBloom,
sdk.NewAttribute(types.AttributeKeyEthereumBloom, string(bloom.Bytes())),
sdk.NewAttribute(types.AttributeKeyEthereumBloom, string(bloom)),
),
)
}
Expand All @@ -160,69 +159,6 @@ func (k Keeper) GetAuthority() sdk.AccAddress {
return k.authority
}

// GetBlockBloomTransient returns bloom bytes for the current block height
func (k Keeper) GetBlockBloomTransient(ctx sdk.Context) *big.Int {
store := prefix.NewStore(ctx.TransientStore(k.transientKey), types.KeyPrefixTransientBloom)
heightBz := sdk.Uint64ToBigEndian(uint64(ctx.BlockHeight()))
bz := store.Get(heightBz)
if len(bz) == 0 {
return big.NewInt(0)
}

return new(big.Int).SetBytes(bz)
}

// SetBlockBloomTransient sets the given bloom bytes to the transient store. This value is reset on
// every block.
func (k Keeper) SetBlockBloomTransient(ctx sdk.Context, bloom *big.Int) {
store := prefix.NewStore(ctx.TransientStore(k.transientKey), types.KeyPrefixTransientBloom)
heightBz := sdk.Uint64ToBigEndian(uint64(ctx.BlockHeight()))
store.Set(heightBz, bloom.Bytes())
}

// ----------------------------------------------------------------------------
// Tx
// ----------------------------------------------------------------------------

// SetTxIndexTransient set the index of processing transaction
func (k Keeper) SetTxIndexTransient(ctx sdk.Context, index uint64) {
store := ctx.TransientStore(k.transientKey)
store.Set(types.KeyPrefixTransientTxIndex, sdk.Uint64ToBigEndian(index))
}

// GetTxIndexTransient returns EVM transaction index on the current block.
func (k Keeper) GetTxIndexTransient(ctx sdk.Context) uint64 {
store := ctx.TransientStore(k.transientKey)
bz := store.Get(types.KeyPrefixTransientTxIndex)
if len(bz) == 0 {
return 0
}

return sdk.BigEndianToUint64(bz)
}

// ----------------------------------------------------------------------------
// Log
// ----------------------------------------------------------------------------

// GetLogSizeTransient returns EVM log index on the current block.
func (k Keeper) GetLogSizeTransient(ctx sdk.Context) uint64 {
store := ctx.TransientStore(k.transientKey)
bz := store.Get(types.KeyPrefixTransientLogSize)
if len(bz) == 0 {
return 0
}

return sdk.BigEndianToUint64(bz)
}

// SetLogSizeTransient fetches the current EVM log index from the transient store, increases its
// value by one and then sets the new index back to the transient store.
func (k Keeper) SetLogSizeTransient(ctx sdk.Context, logSize uint64) {
store := ctx.TransientStore(k.transientKey)
store.Set(types.KeyPrefixTransientLogSize, sdk.Uint64ToBigEndian(logSize))
}

// ----------------------------------------------------------------------------
// Storage
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -349,16 +285,10 @@ func (k Keeper) getBaseFee(ctx sdk.Context, london bool) *big.Int {
return baseFee
}

// ResetTransientGasUsed reset gas used to prepare for execution of current cosmos tx, called in ante handler.
func (k Keeper) ResetTransientGasUsed(ctx sdk.Context) {
store := ctx.TransientStore(k.transientKey)
store.Delete(types.KeyPrefixTransientGasUsed)
}

// GetTransientGasUsed returns the gas used by current cosmos tx.
func (k Keeper) GetTransientGasUsed(ctx sdk.Context) uint64 {
store := ctx.TransientStore(k.transientKey)
bz := store.Get(types.KeyPrefixTransientGasUsed)
bz := store.Get(types.TransientGasUsedKey(ctx.TxIndex()))
if len(bz) == 0 {
return 0
}
Expand All @@ -369,7 +299,7 @@ func (k Keeper) GetTransientGasUsed(ctx sdk.Context) uint64 {
func (k Keeper) SetTransientGasUsed(ctx sdk.Context, gasUsed uint64) {
store := ctx.TransientStore(k.transientKey)
bz := sdk.Uint64ToBigEndian(gasUsed)
store.Set(types.KeyPrefixTransientGasUsed, bz)
store.Set(types.TransientGasUsedKey(ctx.TxIndex()), bz)
}

// AddTransientGasUsed accumulate gas used by each eth msgs included in current cosmos tx.
Expand Down
3 changes: 0 additions & 3 deletions x/evm/keeper/msg_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func (k *Keeper) EthereumTx(goCtx context.Context, msg *types.MsgEthereumTx) (*t
ctx := sdk.UnwrapSDKContext(goCtx)

tx := msg.AsTransaction()
txIndex := k.GetTxIndexTransient(ctx)

labels := []metrics.Label{
telemetry.NewLabel("tx_type", fmt.Sprintf("%d", tx.Type())),
Expand Down Expand Up @@ -92,8 +91,6 @@ func (k *Keeper) EthereumTx(goCtx context.Context, msg *types.MsgEthereumTx) (*t
sdk.NewAttribute(sdk.AttributeKeyAmount, tx.Value().String()),
// add event for ethereum transaction hash format
sdk.NewAttribute(types.AttributeKeyEthereumTxHash, response.Hash),
// add event for index of valid ethereum tx
sdk.NewAttribute(types.AttributeKeyTxIndex, strconv.FormatUint(txIndex, 10)),
// add event for eth tx gas used, we can't get it from cosmos tx result when it contains multiple eth tx msgs.
sdk.NewAttribute(types.AttributeKeyTxGasUsed, strconv.FormatUint(response.GasUsed, 10)),
}
Expand Down
21 changes: 2 additions & 19 deletions x/evm/keeper/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,6 @@ func (k Keeper) GetHashFn(ctx sdk.Context) vm.GetHashFunc {
//
// For relevant discussion see: https://github.com/cosmos/cosmos-sdk/discussions/9072
func (k *Keeper) ApplyTransaction(ctx sdk.Context, msgEth *types.MsgEthereumTx) (*types.MsgEthereumTxResponse, error) {
var (
bloom *big.Int
bloomReceipt ethtypes.Bloom
)

ethTx := msgEth.AsTransaction()
cfg, err := k.EVMConfig(ctx, sdk.ConsAddress(ctx.BlockHeader().ProposerAddress), k.eip155ChainID, ethTx.Hash())
if err != nil {
Expand Down Expand Up @@ -205,9 +200,7 @@ func (k *Keeper) ApplyTransaction(ctx sdk.Context, msgEth *types.MsgEthereumTx)

// Compute block bloom filter
if len(logs) > 0 {
bloom = k.GetBlockBloomTransient(ctx)
bloom.Or(bloom, big.NewInt(0).SetBytes(ethtypes.LogsBloom(logs)))
bloomReceipt = ethtypes.BytesToBloom(bloom.Bytes())
k.SetTxBloom(tmpCtx, ethtypes.LogsBloom(logs))
}

cumulativeGasUsed := res.GasUsed
Expand All @@ -228,14 +221,12 @@ func (k *Keeper) ApplyTransaction(ctx sdk.Context, msgEth *types.MsgEthereumTx)
Type: ethTx.Type(),
PostState: nil, // TODO: intermediate state root
CumulativeGasUsed: cumulativeGasUsed,
Bloom: bloomReceipt,
Logs: logs,
TxHash: cfg.TxConfig.TxHash,
ContractAddress: contractAddr,
GasUsed: res.GasUsed,
BlockHash: cfg.TxConfig.BlockHash,
BlockNumber: big.NewInt(ctx.BlockHeight()),
TransactionIndex: cfg.TxConfig.TxIndex,
}

if !res.Failed() {
Expand All @@ -258,17 +249,9 @@ func (k *Keeper) ApplyTransaction(ctx sdk.Context, msgEth *types.MsgEthereumTx)

// refund gas in order to match the Ethereum gas consumption instead of the default SDK one.
if err = k.RefundGas(ctx, msg, msg.GasLimit-res.GasUsed, cfg.Params.EvmDenom); err != nil {
return nil, errorsmod.Wrapf(err, "failed to refund gas leftover gas to sender %s", msg.From)
return nil, errorsmod.Wrapf(err, "failed to refund leftover gas to sender %s", msg.From)
}

if len(receipt.Logs) > 0 {
// Update transient block bloom filter
k.SetBlockBloomTransient(ctx, receipt.Bloom.Big())
k.SetLogSizeTransient(ctx, uint64(cfg.TxConfig.LogIndex)+uint64(len(receipt.Logs)))
}

k.SetTxIndexTransient(ctx, uint64(cfg.TxConfig.TxIndex)+1)

totalGasUsed, err := k.AddTransientGasUsed(ctx, res.GasUsed)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to add transient gas used")
Expand Down
Loading

0 comments on commit 4e07a42

Please sign in to comment.