Skip to content

Commit

Permalink
Problem: transient store usage not compatible with parallel tx execution
Browse files Browse the repository at this point in the history
Currently we use shared transient store keys to accumulate some states,
which cause issues when developing parallel tx execution

Solution:
- remove some transient stores.
- the others are used in a per-tx fasion.
  • Loading branch information
yihuang committed Apr 2, 2024
1 parent be00d15 commit ad7ff6d
Show file tree
Hide file tree
Showing 16 changed files with 165 additions and 96 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (rpc) [#439](https://github.com/crypto-org-chain/ethermint/pull/439), [#441](https://github.com/crypto-org-chain/ethermint/pull/441) Align trace response for failed tx with go-ethereum.
* (statedb) [#446](https://github.com/crypto-org-chain/ethermint/pull/446) Re-use the cache store implementation with sdk.
* (evm) [#447](https://github.com/crypto-org-chain/ethermint/pull/447) Deduct fee through virtual bank transfer.
* (evm) [#]() Refactor transient stores to be compatible with parallel tx execution.

### State Machine Breaking

Expand Down
2 changes: 0 additions & 2 deletions app/ante/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type EVMKeeper interface {
ChainID() *big.Int

DeductTxCostsFromUserBalance(ctx sdk.Context, fees sdk.Coins, from common.Address) error
ResetTransientGasUsed(ctx sdk.Context)
GetTxIndexTransient(ctx sdk.Context) uint64
}

type protoTxProvider interface {
Expand Down
4 changes: 0 additions & 4 deletions app/ante/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ func SetupEthContext(ctx sdk.Context, evmKeeper EVMKeeper) (newCtx sdk.Context,
WithKVGasConfig(storetypes.GasConfig{}).
WithTransientKVGasConfig(storetypes.GasConfig{})

// Reset transient gas used to prepare the execution of current cosmos tx.
// Transient gas-used is necessary to sum the gas-used of cosmos tx, when it contains multiple eth msgs.
evmKeeper.ResetTransientGasUsed(ctx)

return newCtx, nil
}

Expand Down
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ func NewEthermintApp(
bApp.SetVersion(version.Version)
bApp.SetInterfaceRegistry(interfaceRegistry)
bApp.SetTxEncoder(txConfig.TxEncoder())
bApp.SetTxExecutor(DefaultTxExecutor)

keys := storetypes.NewKVStoreKeys(
// SDK keys
Expand Down
21 changes: 21 additions & 0 deletions app/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package app

import (
"context"

storetypes "cosmossdk.io/store/types"
abci "github.com/cometbft/cometbft/abci/types"
evmtypes "github.com/evmos/ethermint/x/evm/types"
)

func DefaultTxExecutor(_ context.Context,
blockSize int,
ms storetypes.MultiStore,
deliverTxWithMultiStore func(int, storetypes.MultiStore) *abci.ExecTxResult,
) ([]*abci.ExecTxResult, error) {
results := make([]*abci.ExecTxResult, blockSize)
for i := 0; i < blockSize; i++ {
results[i] = deliverTxWithMultiStore(i, ms)
}

Check warning on line 19 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L18-L19

Added lines #L18 - L19 were not covered by tests
return evmtypes.PatchTxResponses(results), nil
}
7 changes: 1 addition & 6 deletions x/evm/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"cosmossdk.io/store/types"

sdk "github.com/cosmos/cosmos-sdk/types"

ethtypes "github.com/ethereum/go-ethereum/core/types"
)

// BeginBlock sets the sdk Context and EIP155 chain id to the Keeper.
Expand All @@ -35,9 +33,6 @@ func (k *Keeper) BeginBlock(ctx sdk.Context) error {
func (k *Keeper) EndBlock(ctx sdk.Context) error {
// Gas costs are handled within msg handler so costs should be ignored
infCtx := ctx.WithGasMeter(types.NewInfiniteGasMeter())

bloom := ethtypes.BytesToBloom(k.GetBlockBloomTransient(infCtx).Bytes())
k.EmitBlockBloomEvent(infCtx, bloom)

k.CollectTxBloom(infCtx)
return nil
}
27 changes: 27 additions & 0 deletions x/evm/keeper/bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package keeper

import (
"math/big"

"cosmossdk.io/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/evmos/ethermint/x/evm/types"
)

func (k Keeper) SetTxBloom(ctx sdk.Context, bloom []byte) {
store := ctx.KVStore(k.transientKey)
store.Set(types.TransientBloomKey(ctx.TxIndex(), ctx.MsgIndex()), bloom)
}

func (k Keeper) CollectTxBloom(ctx sdk.Context) {
store := prefix.NewStore(ctx.KVStore(k.transientKey), types.KeyPrefixTransientBloom)
it := store.Iterator(nil, nil)
defer it.Close()

bloom := new(big.Int)
for ; it.Valid(); it.Next() {
bloom.Or(bloom, big.NewInt(0).SetBytes(it.Value()))
}

k.EmitBlockBloomEvent(ctx, bloom.Bytes())
}
3 changes: 1 addition & 2 deletions x/evm/keeper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ func (k *Keeper) TxConfig(ctx sdk.Context, txHash common.Hash) statedb.TxConfig
return statedb.NewTxConfig(
common.BytesToHash(ctx.HeaderHash()), // BlockHash
txHash, // TxHash
uint(k.GetTxIndexTransient(ctx)), // TxIndex
uint(k.GetLogSizeTransient(ctx)), // LogIndex
0, 0,
)
}

Expand Down
57 changes: 4 additions & 53 deletions x/evm/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ func (k Keeper) ChainID() *big.Int {
// ----------------------------------------------------------------------------

// EmitBlockBloomEvent emit block bloom events
func (k Keeper) EmitBlockBloomEvent(ctx sdk.Context, bloom ethtypes.Bloom) {
func (k Keeper) EmitBlockBloomEvent(ctx sdk.Context, bloom []byte) {
ctx.EventManager().EmitEvent(
sdk.NewEvent(
types.EventTypeBlockBloom,
sdk.NewAttribute(types.AttributeKeyEthereumBloom, string(bloom.Bytes())),
sdk.NewAttribute(types.AttributeKeyEthereumBloom, string(bloom)),
),
)
}
Expand Down Expand Up @@ -180,49 +180,6 @@ func (k Keeper) SetBlockBloomTransient(ctx sdk.Context, bloom *big.Int) {
store.Set(heightBz, bloom.Bytes())
}

// ----------------------------------------------------------------------------
// Tx
// ----------------------------------------------------------------------------

// SetTxIndexTransient set the index of processing transaction
func (k Keeper) SetTxIndexTransient(ctx sdk.Context, index uint64) {
store := ctx.TransientStore(k.transientKey)
store.Set(types.KeyPrefixTransientTxIndex, sdk.Uint64ToBigEndian(index))
}

// GetTxIndexTransient returns EVM transaction index on the current block.
func (k Keeper) GetTxIndexTransient(ctx sdk.Context) uint64 {
store := ctx.TransientStore(k.transientKey)
bz := store.Get(types.KeyPrefixTransientTxIndex)
if len(bz) == 0 {
return 0
}

return sdk.BigEndianToUint64(bz)
}

// ----------------------------------------------------------------------------
// Log
// ----------------------------------------------------------------------------

// GetLogSizeTransient returns EVM log index on the current block.
func (k Keeper) GetLogSizeTransient(ctx sdk.Context) uint64 {
store := ctx.TransientStore(k.transientKey)
bz := store.Get(types.KeyPrefixTransientLogSize)
if len(bz) == 0 {
return 0
}

return sdk.BigEndianToUint64(bz)
}

// SetLogSizeTransient fetches the current EVM log index from the transient store, increases its
// value by one and then sets the new index back to the transient store.
func (k Keeper) SetLogSizeTransient(ctx sdk.Context, logSize uint64) {
store := ctx.TransientStore(k.transientKey)
store.Set(types.KeyPrefixTransientLogSize, sdk.Uint64ToBigEndian(logSize))
}

// ----------------------------------------------------------------------------
// Storage
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -349,16 +306,10 @@ func (k Keeper) getBaseFee(ctx sdk.Context, london bool) *big.Int {
return baseFee
}

// ResetTransientGasUsed reset gas used to prepare for execution of current cosmos tx, called in ante handler.
func (k Keeper) ResetTransientGasUsed(ctx sdk.Context) {
store := ctx.TransientStore(k.transientKey)
store.Delete(types.KeyPrefixTransientGasUsed)
}

// GetTransientGasUsed returns the gas used by current cosmos tx.
func (k Keeper) GetTransientGasUsed(ctx sdk.Context) uint64 {
store := ctx.TransientStore(k.transientKey)
bz := store.Get(types.KeyPrefixTransientGasUsed)
bz := store.Get(types.TransientGasUsedKey(ctx.TxIndex()))
if len(bz) == 0 {
return 0
}
Expand All @@ -369,7 +320,7 @@ func (k Keeper) GetTransientGasUsed(ctx sdk.Context) uint64 {
func (k Keeper) SetTransientGasUsed(ctx sdk.Context, gasUsed uint64) {
store := ctx.TransientStore(k.transientKey)
bz := sdk.Uint64ToBigEndian(gasUsed)
store.Set(types.KeyPrefixTransientGasUsed, bz)
store.Set(types.TransientGasUsedKey(ctx.TxIndex()), bz)
}

// AddTransientGasUsed accumulate gas used by each eth msgs included in current cosmos tx.
Expand Down
3 changes: 0 additions & 3 deletions x/evm/keeper/msg_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func (k *Keeper) EthereumTx(goCtx context.Context, msg *types.MsgEthereumTx) (*t
ctx := sdk.UnwrapSDKContext(goCtx)

tx := msg.AsTransaction()
txIndex := k.GetTxIndexTransient(ctx)

labels := []metrics.Label{
telemetry.NewLabel("tx_type", fmt.Sprintf("%d", tx.Type())),
Expand Down Expand Up @@ -92,8 +91,6 @@ func (k *Keeper) EthereumTx(goCtx context.Context, msg *types.MsgEthereumTx) (*t
sdk.NewAttribute(sdk.AttributeKeyAmount, tx.Value().String()),
// add event for ethereum transaction hash format
sdk.NewAttribute(types.AttributeKeyEthereumTxHash, response.Hash),
// add event for index of valid ethereum tx
sdk.NewAttribute(types.AttributeKeyTxIndex, strconv.FormatUint(txIndex, 10)),
// add event for eth tx gas used, we can't get it from cosmos tx result when it contains multiple eth tx msgs.
sdk.NewAttribute(types.AttributeKeyTxGasUsed, strconv.FormatUint(response.GasUsed, 10)),
}
Expand Down
21 changes: 2 additions & 19 deletions x/evm/keeper/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,6 @@ func (k Keeper) GetHashFn(ctx sdk.Context) vm.GetHashFunc {
//
// For relevant discussion see: https://github.com/cosmos/cosmos-sdk/discussions/9072
func (k *Keeper) ApplyTransaction(ctx sdk.Context, msgEth *types.MsgEthereumTx) (*types.MsgEthereumTxResponse, error) {
var (
bloom *big.Int
bloomReceipt ethtypes.Bloom
)

ethTx := msgEth.AsTransaction()
cfg, err := k.EVMConfig(ctx, sdk.ConsAddress(ctx.BlockHeader().ProposerAddress), k.eip155ChainID, ethTx.Hash())
if err != nil {
Expand Down Expand Up @@ -205,9 +200,7 @@ func (k *Keeper) ApplyTransaction(ctx sdk.Context, msgEth *types.MsgEthereumTx)

// Compute block bloom filter
if len(logs) > 0 {
bloom = k.GetBlockBloomTransient(ctx)
bloom.Or(bloom, big.NewInt(0).SetBytes(ethtypes.LogsBloom(logs)))
bloomReceipt = ethtypes.BytesToBloom(bloom.Bytes())
k.SetTxBloom(tmpCtx, ethtypes.LogsBloom(logs))
}

cumulativeGasUsed := res.GasUsed
Expand All @@ -228,14 +221,12 @@ func (k *Keeper) ApplyTransaction(ctx sdk.Context, msgEth *types.MsgEthereumTx)
Type: ethTx.Type(),
PostState: nil, // TODO: intermediate state root
CumulativeGasUsed: cumulativeGasUsed,
Bloom: bloomReceipt,
Logs: logs,
TxHash: cfg.TxConfig.TxHash,
ContractAddress: contractAddr,
GasUsed: res.GasUsed,
BlockHash: cfg.TxConfig.BlockHash,
BlockNumber: big.NewInt(ctx.BlockHeight()),
TransactionIndex: cfg.TxConfig.TxIndex,
}

if !res.Failed() {
Expand All @@ -258,17 +249,9 @@ func (k *Keeper) ApplyTransaction(ctx sdk.Context, msgEth *types.MsgEthereumTx)

// refund gas in order to match the Ethereum gas consumption instead of the default SDK one.
if err = k.RefundGas(ctx, msg, msg.GasLimit-res.GasUsed, cfg.Params.EvmDenom); err != nil {
return nil, errorsmod.Wrapf(err, "failed to refund gas leftover gas to sender %s", msg.From)
return nil, errorsmod.Wrapf(err, "failed to refund leftover gas to sender %s", msg.From)

Check warning on line 252 in x/evm/keeper/state_transition.go

View check run for this annotation

Codecov / codecov/patch

x/evm/keeper/state_transition.go#L252

Added line #L252 was not covered by tests
}

if len(receipt.Logs) > 0 {
// Update transient block bloom filter
k.SetBlockBloomTransient(ctx, receipt.Bloom.Big())
k.SetLogSizeTransient(ctx, uint64(cfg.TxConfig.LogIndex)+uint64(len(receipt.Logs)))
}

k.SetTxIndexTransient(ctx, uint64(cfg.TxConfig.TxIndex)+1)

totalGasUsed, err := k.AddTransientGasUsed(ctx, res.GasUsed)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to add transient gas used")
Expand Down
21 changes: 17 additions & 4 deletions x/evm/types/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package types

import (
"encoding/binary"

"github.com/ethereum/go-ethereum/common"
)

Expand Down Expand Up @@ -46,8 +48,6 @@ const (
// prefix bytes for the EVM transient store
const (
prefixTransientBloom = iota + 1
prefixTransientTxIndex
prefixTransientLogSize
prefixTransientGasUsed
)

Expand All @@ -61,8 +61,6 @@ var (
// Transient Store key prefixes
var (
KeyPrefixTransientBloom = []byte{prefixTransientBloom}
KeyPrefixTransientTxIndex = []byte{prefixTransientTxIndex}
KeyPrefixTransientLogSize = []byte{prefixTransientLogSize}
KeyPrefixTransientGasUsed = []byte{prefixTransientGasUsed}
)

Expand All @@ -75,3 +73,18 @@ func AddressStoragePrefix(address common.Address) []byte {
func StateKey(address common.Address, key []byte) []byte {
return append(AddressStoragePrefix(address), key...)
}

func TransientGasUsedKey(txIndex int) []byte {
var key [9]byte
key[0] = prefixTransientGasUsed
binary.BigEndian.PutUint64(key[1:], uint64(txIndex))
return key[:]

Check warning on line 81 in x/evm/types/key.go

View check run for this annotation

Codecov / codecov/patch

x/evm/types/key.go#L77-L81

Added lines #L77 - L81 were not covered by tests
}

func TransientBloomKey(txIndex, msgIndex int) []byte {
var key [1 + 8 + 8]byte
key[0] = prefixTransientBloom
binary.BigEndian.PutUint64(key[1:], uint64(txIndex))
binary.BigEndian.PutUint64(key[9:], uint64(msgIndex))
return key[:]

Check warning on line 89 in x/evm/types/key.go

View check run for this annotation

Codecov / codecov/patch

x/evm/types/key.go#L84-L89

Added lines #L84 - L89 were not covered by tests
}
65 changes: 65 additions & 0 deletions x/evm/types/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package types

import (
"strconv"

abci "github.com/cometbft/cometbft/abci/types"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
proto "github.com/cosmos/gogoproto/proto"
)

// PatchTxResponses fills the evm tx index and log indexes in the tx result
func PatchTxResponses(input []*abci.ExecTxResult) []*abci.ExecTxResult {
var (
txIndex uint64
logIndex uint64
)
for _, res := range input {
// assume no error result in msg handler
if res.Code != 0 {
continue

Check warning on line 21 in x/evm/types/response.go

View check run for this annotation

Codecov / codecov/patch

x/evm/types/response.go#L13-L21

Added lines #L13 - L21 were not covered by tests
}

var txMsgData sdk.TxMsgData
if err := proto.Unmarshal(res.Data, &txMsgData); err != nil {
continue

Check warning on line 26 in x/evm/types/response.go

View check run for this annotation

Codecov / codecov/patch

x/evm/types/response.go#L24-L26

Added lines #L24 - L26 were not covered by tests
}

var dirty bool
for i, rsp := range txMsgData.MsgResponses {
var response MsgEthereumTxResponse
if rsp.TypeUrl != "/"+proto.MessageName(&response) {
continue

Check warning on line 33 in x/evm/types/response.go

View check run for this annotation

Codecov / codecov/patch

x/evm/types/response.go#L29-L33

Added lines #L29 - L33 were not covered by tests
}
if err := proto.Unmarshal(rsp.Value, &response); err != nil {
continue

Check warning on line 36 in x/evm/types/response.go

View check run for this annotation

Codecov / codecov/patch

x/evm/types/response.go#L35-L36

Added lines #L35 - L36 were not covered by tests
}

res.Events = append(res.Events, abci.Event(sdk.NewEvent(
EventTypeEthereumTx,
sdk.NewAttribute(AttributeKeyTxIndex, strconv.FormatUint(txIndex, 10)),
)))
for _, log := range response.Logs {
log.TxIndex = txIndex
log.Index = logIndex
logIndex++
}
txIndex++

dirty = true
anyRsp, err := codectypes.NewAnyWithValue(&response)
if err != nil {
panic(err)

Check warning on line 53 in x/evm/types/response.go

View check run for this annotation

Codecov / codecov/patch

x/evm/types/response.go#L39-L53

Added lines #L39 - L53 were not covered by tests
}
txMsgData.MsgResponses[i] = anyRsp

Check warning on line 55 in x/evm/types/response.go

View check run for this annotation

Codecov / codecov/patch

x/evm/types/response.go#L55

Added line #L55 was not covered by tests
}

if dirty {
if data, err := proto.Marshal(&txMsgData); err != nil {
res.Data = data
}

Check warning on line 61 in x/evm/types/response.go

View check run for this annotation

Codecov / codecov/patch

x/evm/types/response.go#L58-L61

Added lines #L58 - L61 were not covered by tests
}
}
return input

Check warning on line 64 in x/evm/types/response.go

View check run for this annotation

Codecov / codecov/patch

x/evm/types/response.go#L64

Added line #L64 was not covered by tests
}
Loading

0 comments on commit ad7ff6d

Please sign in to comment.