Skip to content

Commit

Permalink
fix(evmengine): get next proposer via comet algo (#409)
Browse files Browse the repository at this point in the history
use comet's exposed validators functions to check if the local node is
the next (predicted) proposer

issue: closes #388
  • Loading branch information
jdubpark authored Dec 10, 2024
1 parent 3bb4ad6 commit d7f9160
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 206 deletions.
8 changes: 8 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ linters-settings:
nolintlint:
require-explanation: true
require-specific: true
paralleltest:
# Ignore missing calls to `t.Parallel()` and only report incorrect uses of it.
# Default: false
ignore-missing: true
# Ignore missing calls to `t.Parallel()` in subtests. Top-level tests are
# still required to have `t.Parallel`, but subtests are allowed to skip it.
# Default: false
ignore-missing-subtests: true
revive:
enable-all-rules: true
severity: warning
Expand Down
2 changes: 1 addition & 1 deletion client/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func Start(ctx context.Context, cfg Config) (func(context.Context) error, error)
}

rpcClient := rpclocal.New(cmtNode)
cmtAPI := comet.NewAPI(rpcClient)
cmtAPI := comet.NewAPI(rpcClient, app.ChainID())
app.SetCometAPI(cmtAPI)

log.Info(ctx, "Starting CometBFT", "listeners", cmtNode.Listeners())
Expand Down
109 changes: 22 additions & 87 deletions client/comet/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,118 +3,53 @@ package comet
import (
"context"

lightprovider "github.com/cometbft/cometbft/light/provider"
lighthttp "github.com/cometbft/cometbft/light/provider/http"
rpcclient "github.com/cometbft/cometbft/rpc/client"
cmttypes "github.com/cometbft/cometbft/types"
"github.com/ethereum/go-ethereum/common"

"github.com/piplabs/story/lib/errors"
"github.com/piplabs/story/lib/k1util"
"github.com/piplabs/story/lib/tracer"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const perPageConst = 100

var _ API = adapter{}

type API interface {
// Validators returns the cometBFT validators at the given height or false if not
// available (probably due to snapshot sync after height).
Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, bool, error)

// IsValidator returns true if the given address is a validator at the latest height.
// It is best-effort, so returns false on any error.
IsValidator(ctx context.Context, valAddress common.Address) bool
// Validators returns the cometBFT validators at the given height.
Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, error)
}

func NewAPI(cl rpcclient.Client) API {
return adapter{cl: cl}
func NewAPI(cl rpcclient.Client, chainID string) API {
return adapter{
cl: lighthttp.NewWithClient(chainID, remoteCl{cl}),
}
}

type adapter struct {
cl rpcclient.Client
}

// IsValidator returns true if the given address is a validator at the latest height.
// It is best-effort, so returns false on any error.
func (a adapter) IsValidator(ctx context.Context, valAddress common.Address) bool {
ctx, span := tracer.Start(ctx, "comet/is_validator")
defer span.End()

status, err := a.cl.Status(ctx)
if err != nil || status.SyncInfo.CatchingUp {
return false // Best effort
}

valset, ok, err := a.Validators(ctx, status.SyncInfo.LatestBlockHeight)
if !ok || err != nil {
return false // Best effort
}

for _, val := range valset.Validators {
addr, err := k1util.PubKeyToAddress(val.PubKey)
if err != nil {
continue // Best effort
}

if addr == valAddress {
return true
}
}

return false
cl lightprovider.Provider
}

// Validators returns the cometBFT validators at the given height or false if not
// available (probably due to snapshot sync after height).
func (a adapter) Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, bool, error) {
// Validators returns the cometBFT validators at the given height.
func (a adapter) Validators(ctx context.Context, height int64) (*cmttypes.ValidatorSet, error) {
ctx, span := tracer.Start(ctx, "comet/validators", trace.WithAttributes(attribute.Int64("height", height)))
defer span.End()

perPage := perPageConst // Can't take a pointer to a const directly.

var vals []*cmttypes.Validator
for page := 1; ; page++ { // Pages are 1-indexed.
if page > 10 { // Sanity check.
return nil, false, errors.New("too many validators [BUG]")
}

status, err := a.cl.Status(ctx)
if err != nil {
return nil, false, errors.Wrap(err, "fetch status")
} else if height < status.SyncInfo.EarliestBlockHeight {
// This can happen if height is before snapshot restore.
return nil, false, nil
}

valResp, err := a.cl.Validators(ctx, &height, &page, &perPage)
if err != nil {
return nil, false, errors.Wrap(err, "fetch validators")
}

for _, v := range valResp.Validators {
vals = append(vals, cmttypes.NewValidator(v.PubKey, v.VotingPower))
}

if len(vals) == valResp.Total {
break
}
block, err := a.cl.LightBlock(ctx, height) // LightBlock does all the heavy lifting to query the validator set.
if err != nil {
return nil, errors.Wrap(err, "fetch light block")
}

// cmttypes.NewValidatorSet() panics on error, so manually construct it for proper error handling.
valset := new(cmttypes.ValidatorSet)
if err := valset.UpdateWithChangeSet(vals); err != nil {
return nil, false, errors.Wrap(err, "update with change set")
}
if len(vals) > 0 {
valset.IncrementProposerPriority(1) // See cmttypes.NewValidatorSet
}
return block.ValidatorSet, nil
}

if err := valset.ValidateBasic(); err != nil {
return nil, false, errors.Wrap(err, "validate basic")
}
// remoteCl is a wrapper around rpcclient.Client to implement rpcclient.RemoteClient.
type remoteCl struct {
rpcclient.Client
}

return valset, true, nil
func (remoteCl) Remote() string {
return ""
}
4 changes: 2 additions & 2 deletions client/x/evmengine/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPreparePropos
return nil, err
}
triggeredAt = time.Now()
log.Debug(ctx, "Started non-optimistic payload", "height", req.Height, "payload", payloadID.String())
} else {
log.Info(ctx, "Using optimistic payload", "height", height, "payload", payloadID.String())
}
Expand Down Expand Up @@ -183,12 +184,11 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error {

// Extract context values
height := ctx.BlockHeight()
proposer := ctx.BlockHeader().ProposerAddress
timestamp := ctx.BlockTime()
appHash := common.BytesToHash(ctx.BlockHeader().AppHash) // This is the app hash after the block is finalized.

// Maybe start building the next block if we are the next proposer.
isNext, err := k.isNextProposer(ctx, proposer, height)
isNext, err := k.isNextProposer(ctx, height)
if err != nil {
// IsNextProposer does non-deterministic cometBFT queries, don't stall node due to errors.
log.Warn(ctx, "Next proposer failed, skipping optimistic EVM payload build", err)
Expand Down
50 changes: 36 additions & 14 deletions client/x/evmengine/keeper/abci_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {
// such as when no transactions are provided, when errors occur while fetching block information,
// or when errors occur during fork choice update.
t.Run("TestRunErrScenarios", func(t *testing.T) {
t.Parallel()
// t.Parallel() // disable parallel testing for now
tests := []struct {
name string
mockEngine mockEngineAPI
Expand All @@ -70,7 +70,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {
setupMocks func(esk *moduletestutil.MockEvmStakingKeeper)
}{
{
name: "no transactions",
name: "pass: no transactions",
mockEngine: mockEngineAPI{},
mockClient: mock.MockClient{},
req: &abci.RequestPrepareProposal{
Expand All @@ -82,14 +82,14 @@ func TestKeeper_PrepareProposal(t *testing.T) {
wantErr: false,
},
{
name: "max bytes is less than 9/10 of max block size",
name: "fail: max bytes is less than 9/10 of max block size",
mockEngine: mockEngineAPI{},
mockClient: mock.MockClient{},
req: &abci.RequestPrepareProposal{MaxTxBytes: cmttypes.MaxBlockSizeBytes * 1 / 10},
wantErr: true,
},
{
name: "with transactions",
name: "fail: with transactions",
mockEngine: mockEngineAPI{},
mockClient: mock.MockClient{},
req: &abci.RequestPrepareProposal{
Expand All @@ -101,7 +101,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {
wantErr: true,
},
{
name: "failed to peek eligible withdrawals",
name: "fail: peek eligible withdrawals",
mockEngine: mockEngineAPI{},
mockClient: mock.MockClient{},
setupMocks: func(esk *moduletestutil.MockEvmStakingKeeper) {
Expand All @@ -117,7 +117,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {
wantErr: true,
},
{
name: "forkchoiceUpdateV3 not valid",
name: "fail: forkchoiceUpdateV3 not valid",
mockEngine: mockEngineAPI{
headerByTypeFunc: func(context.Context, ethclient.HeadType) (*types.Header, error) {
fuzzer := ethclient.NewFuzzer(0)
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {
},
},
{
name: "unknown payload",
name: "fail: unknown payload",
mockEngine: mockEngineAPI{
forkchoiceUpdatedV3Func: func(ctx context.Context, update eengine.ForkchoiceStateV1,
payloadAttributes *eengine.PayloadAttributes) (eengine.ForkChoiceResponse, error) {
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {
},
},
{
name: "optimistic payload exists but unknown payload is returned by EL",
name: "fail: optimistic payload exists but unknown payload is returned by EL",
mockEngine: mockEngineAPI{
forkchoiceUpdatedV3Func: func(ctx context.Context, update eengine.ForkchoiceStateV1,
payloadAttributes *eengine.PayloadAttributes) (eengine.ForkChoiceResponse, error) {
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
// t.Parallel()
ctx, storeKey, storeService := setupCtxStore(t, nil)
cdc := getCodec(t)
txConfig := authtx.NewTxConfig(cdc, nil)
Expand Down Expand Up @@ -316,7 +316,6 @@ func TestKeeper_PrepareProposal(t *testing.T) {
})
}

//nolint:paralleltest // no parallel test for now
func TestKeeper_PostFinalize(t *testing.T) {
payloadID := eengine.PayloadID{0x1}
payloadFailedToSet := func(k *Keeper) {
Expand All @@ -334,15 +333,26 @@ func TestKeeper_PostFinalize(t *testing.T) {
mockClient mock.MockClient
wantErr bool
enableOptimistic bool
isNextProposer bool
setupMocks func(esk *moduletestutil.MockEvmStakingKeeper)
postStateCheck func(k *Keeper)
}{
{
name: "nothing happens when enableOptimistic is false",
name: "pass: nothing happens when enableOptimistic is false",
mockEngine: mockEngineAPI{},
mockClient: mock.MockClient{},
wantErr: false,
enableOptimistic: false,
isNextProposer: false,
postStateCheck: payloadFailedToSet,
},
{
name: "pass: node is not next proposer",
mockEngine: mockEngineAPI{},
mockClient: mock.MockClient{},
wantErr: false,
enableOptimistic: true,
isNextProposer: false,
postStateCheck: payloadFailedToSet,
},
{
Expand All @@ -351,6 +361,7 @@ func TestKeeper_PostFinalize(t *testing.T) {
mockClient: mock.MockClient{},
wantErr: false,
enableOptimistic: true,
isNextProposer: true,
setupMocks: func(esk *moduletestutil.MockEvmStakingKeeper) {
esk.EXPECT().MaxWithdrawalPerBlock(gomock.Any()).Return(uint32(0), nil)
esk.EXPECT().PeekEligibleWithdrawals(gomock.Any(), gomock.Any()).Return(nil, errors.New("failed to peek eligible withdrawals"))
Expand All @@ -375,6 +386,7 @@ func TestKeeper_PostFinalize(t *testing.T) {
mockClient: mock.MockClient{},
wantErr: false,
enableOptimistic: true,
isNextProposer: true,
setupMocks: func(esk *moduletestutil.MockEvmStakingKeeper) {
esk.EXPECT().MaxWithdrawalPerBlock(gomock.Any()).Return(uint32(0), nil)
esk.EXPECT().PeekEligibleWithdrawals(gomock.Any(), gomock.Any()).Return(nil, nil)
Expand All @@ -400,6 +412,7 @@ func TestKeeper_PostFinalize(t *testing.T) {
mockClient: mock.MockClient{},
wantErr: false,
enableOptimistic: true,
isNextProposer: true,
setupMocks: func(esk *moduletestutil.MockEvmStakingKeeper) {
esk.EXPECT().MaxWithdrawalPerBlock(gomock.Any()).Return(uint32(0), nil)
esk.EXPECT().PeekEligibleWithdrawals(gomock.Any(), gomock.Any()).Return(nil, nil)
Expand All @@ -408,7 +421,7 @@ func TestKeeper_PostFinalize(t *testing.T) {
postStateCheck: payloadFailedToSet,
},
{
name: "pass",
name: "pass: optimistic build payload",
mockEngine: mockEngineAPI{
forkchoiceUpdatedV3Func: func(ctx context.Context, update eengine.ForkchoiceStateV1,
payloadAttributes *eengine.PayloadAttributes) (eengine.ForkChoiceResponse, error) {
Expand All @@ -425,6 +438,7 @@ func TestKeeper_PostFinalize(t *testing.T) {
mockClient: mock.MockClient{},
wantErr: false,
enableOptimistic: true,
isNextProposer: true,
setupMocks: func(esk *moduletestutil.MockEvmStakingKeeper) {
esk.EXPECT().MaxWithdrawalPerBlock(gomock.Any()).Return(uint32(0), nil)
esk.EXPECT().PeekEligibleWithdrawals(gomock.Any(), gomock.Any()).Return(nil, nil)
Expand Down Expand Up @@ -453,11 +467,19 @@ func TestKeeper_PostFinalize(t *testing.T) {
var err error

cmtAPI := newMockCometAPI(t, nil)

// set the header and proposer so we have the correct next proposer
header := cmtproto.Header{Height: 1, AppHash: tutil.RandomHash().Bytes()}
header.ProposerAddress = cmtAPI.validatorSet.Validators[0].Address
nxtAddr, err := k1util.PubKeyToAddress(cmtAPI.validatorSet.Validators[1].PubKey)
header.ProposerAddress = cmtAPI.validatorSet.CopyIncrementProposerPriority(1).Proposer.Address

var nxtAddr common.Address
if tt.isNextProposer {
nxtAddr, err = k1util.PubKeyToAddress(cmtAPI.validatorSet.CopyIncrementProposerPriority(1).Proposer.PubKey)
} else {
nxtAddr = common.HexToAddress("0x0000000000000000000000000000000000000000")
}
require.NoError(t, err)

ctx, storeKey, storeService := setupCtxStore(t, &header)
ctx = ctx.WithExecMode(sdk.ExecModeFinalize)
tt.mockEngine.EngineClient, err = ethclient.NewEngineMock(storeKey)
Expand Down
Loading

0 comments on commit d7f9160

Please sign in to comment.