Skip to content

Commit

Permalink
refactor(octane/evmengine): simplify event processing
Browse files Browse the repository at this point in the history
  • Loading branch information
corverroos committed Jan 9, 2025
1 parent e568a2d commit 211c878
Show file tree
Hide file tree
Showing 15 changed files with 299 additions and 101 deletions.
2 changes: 1 addition & 1 deletion e2e/manifests/devnet2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ multi_omni_evms = true
prometheus = true
deploy_solve = true

feature_flags = ["evm-staking-module"]
feature_flags = ["evm-staking-module","simple-evm-events"]

[node.validator01]
[node.validator02]
Expand Down
2 changes: 1 addition & 1 deletion e2e/manifests/staging.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ multi_omni_evms = true
prometheus = true
deploy_solve = true

feature_flags = ["evm-staking-module"]
feature_flags = ["evm-staking-module","simple-evm-events"]

[node.validator01]
[node.validator02]
Expand Down
9 changes: 6 additions & 3 deletions halo/app/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,12 @@ func testCProvider(t *testing.T, ctx context.Context, cprov cprovider.Provider)
xblock, ok, err := cprov.XBlock(ctx, 0, true)
tutil.RequireNoError(t, err)
require.True(t, ok)
require.Len(t, xblock.Msgs, 1)
require.Equal(t, xchain.ShardBroadcast0, xblock.Msgs[0].ShardID)
require.Equal(t, xchain.BroadcastChainID, xblock.Msgs[0].DestChainID)
require.Len(t, xblock.Msgs, 2)
for i, msg := range xblock.Msgs {
require.Equal(t, xchain.ShardBroadcast0, msg.ShardID)
require.Equal(t, xchain.BroadcastChainID, msg.DestChainID)
require.EqualValues(t, i, msg.LogIndex)
}

// Ensure getting latest xblock.
xblock2, ok, err := cprov.XBlock(ctx, xblock.BlockHeight, false)
Expand Down
19 changes: 12 additions & 7 deletions halo/evmstaking2/keeper/keeper_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ func TestDeliveryWithBrokenServer(t *testing.T) {

keeper, ctx := setupKeeper(t, deliverInterval, sServerMock)

var hash common.Hash
events, err := evmengkeeper.FetchProcEvents(ctx, ethClientMock, keeper, hash)
events, err := getStakingEvents(ctx, ethClientMock, keeper)
require.NoError(t, err)

expectDelegates := 1
Expand Down Expand Up @@ -151,8 +150,7 @@ func TestDeliveryOfInvalidEvents(t *testing.T) {

keeper, ctx := setupKeeper(t, deliverInterval, sServerMock)

var hash common.Hash
events, err := evmengkeeper.FetchProcEvents(ctx, ethClientMock, keeper, hash)
events, err := getStakingEvents(ctx, ethClientMock, keeper)
require.NoError(t, err)

expectDelegates := 1
Expand Down Expand Up @@ -231,8 +229,7 @@ func TestHappyPathDelivery(t *testing.T) {

keeper, ctx := setupKeeper(t, deliverInterval, sServerMock)

var hash common.Hash
events, err := evmengkeeper.FetchProcEvents(ctx, ethClientMock, keeper, hash)
events, err := getStakingEvents(ctx, ethClientMock, keeper)
require.NoError(t, err)

expectDelegates := 1
Expand All @@ -242,7 +239,7 @@ func TestHappyPathDelivery(t *testing.T) {
require.Len(t, events, expectTotalEvents)

for _, event := range events {
err := keeper.Deliver(ctx, hash, event)
err := keeper.Deliver(ctx, common.Hash{}, event)
require.NoError(t, err)
}

Expand Down Expand Up @@ -348,3 +345,11 @@ func setupKeeper(

return k, ctx
}

// getStakingEvents returns the staking events from the mock engine client.
func getStakingEvents(ctx context.Context, cl ethclient.EngineClient, keeper *Keeper) ([]etypes.EVMEvent, error) {
// Enable simple staking to ensure events are in correct order, since FetchProcEvents sorts either by index or address>topic>data.
ctx = feature.WithFlag(ctx, feature.FlagSimpleEVMEvents)

return evmengkeeper.FetchProcEvents(ctx, cl, common.Hash{}, keeper)
}
40 changes: 22 additions & 18 deletions lib/ethclient/enginemock.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ var (

var _ EngineClient = (*engineMock)(nil)

type filterQueryKey struct {
BlockHash common.Hash
Address common.Address
}

// engineMock mocks the Engine API for testing purposes.
type engineMock struct {
Client
Expand All @@ -57,7 +62,7 @@ type engineMock struct {
mu sync.Mutex
head *types.Block
pendingLogs map[common.Address][]types.Log
logs map[common.Hash][]types.Log
logs map[filterQueryKey][]types.Log
payloads map[engine.PayloadID]payloadArgs
}

Expand Down Expand Up @@ -85,7 +90,8 @@ func WithMockValidatorCreation(pubkey crypto.PubKey) func(*engineMock) {
createValidatorEvent.ID,
common.HexToHash(valAddr.Hex()), // validator
},
Data: data,
Data: data,
Index: 100,
}

mock.pendingLogs[contractAddr] = append(mock.pendingLogs[contractAddr], eventLog)
Expand Down Expand Up @@ -118,7 +124,8 @@ func WithMockSelfDelegation(pubkey crypto.PubKey, ether int64) func(*engineMock)
common.HexToHash(valAddr.Hex()), // delegator
common.HexToHash(valAddr.Hex()), // validator
},
Data: data,
Data: data,
Index: 300,
}

mock.pendingLogs[contractAddr] = append(mock.pendingLogs[contractAddr], eventLog)
Expand All @@ -133,7 +140,7 @@ func WithPortalRegister(network netconf.Network) func(*engineMock) {
contractAddr := common.HexToAddress(predeploys.PortalRegistry)

var eventLogs []types.Log
for _, chain := range network.EVMChains() {
for i, chain := range network.EVMChains() {
data, err := portalRegEvent.Inputs.NonIndexed().Pack(
chain.DeployHeight,
chain.AttestInterval,
Expand All @@ -157,7 +164,8 @@ func WithPortalRegister(network netconf.Network) func(*engineMock) {
topicChainID,
common.BytesToHash(chain.PortalAddress.Bytes()), //nolint:forbidigo // Explicit left padded
},
Data: data,
Data: data,
Index: 200 + uint(i),
}

eventLogs = append(eventLogs, eventLog)
Expand Down Expand Up @@ -246,7 +254,7 @@ func NewEngineMock(opts ...func(mock *engineMock)) (EngineClient, error) {
head: genesisBlock,
pendingLogs: make(map[common.Address][]types.Log),
payloads: make(map[engine.PayloadID]payloadArgs),
logs: make(map[common.Hash][]types.Log),
logs: make(map[filterQueryKey][]types.Log),
}

for _, opt := range opts {
Expand Down Expand Up @@ -285,25 +293,21 @@ func (m *engineMock) FilterLogs(_ context.Context, q ethereum.FilterQuery) ([]ty
}

addr := q.Addresses[0]

// Ensure we returns the same logs for the same query.
if eventLogs, ok := m.logs[*q.BlockHash]; ok {
var resp []types.Log
for _, eventLog := range eventLogs {
if eventLog.Address == addr {
resp = append(resp, eventLog)
}
}

return resp, nil
key := filterQueryKey{
BlockHash: *q.BlockHash,
Address: addr,
}
// Ensure we return the same logs for the same query.
if eventLogs, ok := m.logs[key]; ok {
return eventLogs, nil
}

eventLogs, ok := m.pendingLogs[addr]
if !ok {
return nil, nil
}

m.logs[*q.BlockHash] = eventLogs
m.logs[key] = eventLogs
delete(m.pendingLogs, addr)

return eventLogs, nil
Expand Down
3 changes: 3 additions & 0 deletions lib/feature/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
const (
// FlagEVMStakingModule enables the wip EVM Staking Module feature.
FlagEVMStakingModule Flag = "evm-staking-module"
// FlagSimpleEVMEvents enables the simplified EVM events refactor.
FlagSimpleEVMEvents Flag = "simple-evm-events"
)

// enabledFlags holds all globally enabled feature flags. The reason for having it is that
Expand All @@ -20,6 +22,7 @@ var enabledFlags sync.Map

var allFlags = map[Flag]bool{
FlagEVMStakingModule: true,
FlagSimpleEVMEvents: true,
}

// Flag is a feature flag.
Expand Down
12 changes: 8 additions & 4 deletions octane/evmengine/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/omni-network/omni/lib/cast"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/feature"
"github.com/omni-network/omni/lib/log"
"github.com/omni-network/omni/octane/evmengine/types"

Expand Down Expand Up @@ -133,10 +134,13 @@ func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPreparePropos
return nil, errors.Wrap(err, "prepare votes")
}

// Next, collect all prev payload evm event logs.
evmEvents, err := k.evmEvents(ctx, payloadResp.ExecutionPayload.ParentHash)
if err != nil {
return nil, errors.Wrap(err, "prepare evm event logs")
// Next, collect all prev payload evm event logs (only if simple-evm-events feature not enabled).
var evmEvents []types.EVMEvent
if !feature.FlagSimpleEVMEvents.Enabled(ctx) {
evmEvents, err = k.evmEvents(ctx, payloadResp.ExecutionPayload.ParentHash)
if err != nil {
return nil, errors.Wrap(err, "prepare evm event logs")
}
}

// Then construct the execution payload message.
Expand Down
65 changes: 59 additions & 6 deletions octane/evmengine/keeper/abci_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
attesttypes "github.com/omni-network/omni/halo/attest/types"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/ethclient"
"github.com/omni-network/omni/lib/feature"
"github.com/omni-network/omni/lib/k1util"
"github.com/omni-network/omni/lib/tutil"
etypes "github.com/omni-network/omni/octane/evmengine/types"
Expand Down Expand Up @@ -202,7 +203,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {

for _, msg := range tx.GetMsgs() {
if _, ok := msg.(*etypes.MsgExecutionPayload); ok {
assertExecutablePayload(t, msg, ts.Unix(), nextBlock.Hash(), frp, uint64(req.Height), 0)
assertExecutablePayload(t, msg, ts.Unix(), nextBlock.Hash(), frp, uint64(req.Height), 0, 1)
}
}
})
Expand Down Expand Up @@ -251,7 +252,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {
// assert that the message is an executable payload
for _, msg := range tx.GetMsgs() {
if _, ok := msg.(*etypes.MsgExecutionPayload); ok {
assertExecutablePayload(t, msg, req.Time.Unix(), headHash, frp, head.GetBlockHeight()+1, 0)
assertExecutablePayload(t, msg, req.Time.Unix(), headHash, frp, head.GetBlockHeight()+1, 0, 1)
}
if msgDelegate, ok := msg.(*stypes.MsgDelegate); ok {
require.Equal(t, msgDelegate.Amount, sdk.NewInt64Coin("stake", 100))
Expand Down Expand Up @@ -329,10 +330,59 @@ func TestKeeper_PrepareProposal(t *testing.T) {
found = true
expected := [][]byte{commitment1, commitment2}
require.EqualValues(t, expected, payload.BlobCommitments)
assertExecutablePayload(t, msg, req.Time.Unix(), headHash, frp, head.GetBlockHeight()+1, len(expected))
assertExecutablePayload(t, msg, req.Time.Unix(), headHash, frp, head.GetBlockHeight()+1, len(expected), 1)
}
require.True(t, found)
})

t.Run("TestSimpleEvmEvents", func(t *testing.T) {
t.Parallel()
// setup dependencies
ctx, storeService := setupCtxStore(t, nil)
ctx = ctx.WithContext(feature.WithFlag(ctx, feature.FlagSimpleEVMEvents))

cdc := getCodec(t)
txConfig := authtx.NewTxConfig(cdc, nil)

mockEngine, err := newMockEngineAPI(0)
require.NoError(t, err)

ap := mockAddressProvider{
address: common.BytesToAddress([]byte("test")),
}
frp := newRandomFeeRecipientProvider()
keeper, err := NewKeeper(cdc, storeService, &mockEngine, txConfig, ap, frp, mockEventProc{})
require.NoError(t, err)
keeper.SetVoteProvider(mockVEProvider{})
populateGenesisHead(ctx, t, keeper)

// Get the parent block we will build on top of
head, err := keeper.getExecutionHead(ctx)
require.NoError(t, err)
headHash, err := head.Hash()
require.NoError(t, err)

req := &abci.RequestPrepareProposal{
Txs: nil,
Height: int64(2),
Time: time.Now(),
MaxTxBytes: cmttypes.MaxBlockSizeBytes,
}

resp, err := keeper.PrepareProposal(withRandomErrs(t, ctx), req)
tutil.RequireNoError(t, err)
require.NotNil(t, resp)

// decode the txn and get the messages
tx, err := txConfig.TxDecoder()(resp.Txs[0])
require.NoError(t, err)

for _, msg := range tx.GetMsgs() {
if _, ok := msg.(*etypes.MsgExecutionPayload); ok {
assertExecutablePayload(t, msg, req.Time.Unix(), headHash, frp, head.GetBlockHeight()+1, 0, 0)
}
}
})
}

func TestOptimistic(t *testing.T) {
Expand Down Expand Up @@ -402,6 +452,7 @@ func assertExecutablePayload(
frp etypes.FeeRecipientProvider,
height uint64,
blobs int,
events int,
) {
t.Helper()
executionPayload, ok := msg.(*etypes.MsgExecutionPayload)
Expand All @@ -417,9 +468,11 @@ func assertExecutablePayload(
require.Empty(t, payload.Withdrawals)
require.Equal(t, payload.Number, height)

require.Len(t, executionPayload.PrevPayloadEvents, 1)
evmLog := executionPayload.PrevPayloadEvents[0]
require.Equal(t, evmLog.Address, zeroAddr.Bytes())
require.Len(t, executionPayload.PrevPayloadEvents, events)
if events > 0 {
evmLog := executionPayload.PrevPayloadEvents[0]
require.Equal(t, evmLog.Address, zeroAddr.Bytes())
}

require.Len(t, executionPayload.BlobCommitments, blobs)
}
Expand Down
Loading

0 comments on commit 211c878

Please sign in to comment.