-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: optimistic execution consensus v2 #22560
base: main
Are you sure you want to change the base?
Changes from 7 commits
0b80553
4807332
907bcd1
a58e20d
082ddec
266d53f
857f14c
f78dbe0
5718170
6791357
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -2,6 +2,7 @@ package cometbft | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"cosmossdk.io/server/v2/cometbft/oe" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"crypto/sha256" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"errors" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"fmt" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -12,7 +13,7 @@ import ( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
abci "github.com/cometbft/cometbft/abci/types" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
gogoproto "github.com/cosmos/gogoproto/proto" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"google.golang.org/protobuf/reflect/protoreflect" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"google.golang.org/protobuf/reflect/protoregistry" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"cosmossdk.io/collections" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -67,6 +68,11 @@ type Consensus[T transaction.Tx] struct { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
extendVote handlers.ExtendVoteHandler | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
checkTxHandler handlers.CheckTxHandler[T] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// optimisticExec contains the context required for Optimistic Execution, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// including the goroutine handling.This is experimental and must be enabled | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
julienrbrt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// by developers. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
optimisticExec *oe.OptimisticExecution | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider initializing The |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
addrPeerFilter types.PeerFilter // filter peers by address and port | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
idPeerFilter types.PeerFilter // filter peers by node ID | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -117,6 +123,10 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
c.streaming = sm | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (c *Consensus[T]) SetOptimisticExecution(oe *oe.OptimisticExecution) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
c.optimisticExec = oe | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// It allows additional snapshotter implementations to be used for creating and restoring snapshots. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -385,6 +395,14 @@ func (c *Consensus[T]) PrepareProposal( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, errors.New("no prepare proposal function was set") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Abort any running OE so it cannot overlap with `PrepareProposal`. This could happen if optimistic | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// `internalFinalizeBlock` from previous round takes a long time, but consensus has moved on to next round. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Overlap is undesirable, since `internalFinalizeBlock` and `PrepareProoposal` could share access to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// in-memory structs depending on application implementation. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// No-op if OE is not enabled. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Similar call to Abort() is done in `ProcessProposal`. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
c.optimisticExec.Abort() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+394
to
+401
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling for Abort operation The -c.optimisticExec.Abort()
+if err := c.optimisticExec.Abort(); err != nil {
+ c.logger.Error("failed to abort optimistic execution", "err", err)
+ // Continue execution as the abort error shouldn't block proposal preparation
+}
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ciCtx := contextWithCometInfo(ctx, comet.Info{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Evidence: toCoreEvidence(req.Misbehavior), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ValidatorsHash: req.NextValidatorsHash, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -421,6 +439,17 @@ func (c *Consensus[T]) ProcessProposal( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, errors.New("no process proposal function was set") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Since the application can get access to FinalizeBlock state and write to it, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// we must be sure to reset it in case ProcessProposal timeouts and is called | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// again in a subsequent round. However, we only want to do this after we've | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// processed the first block, as we want to avoid overwriting the finalizeState | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// after state changes during InitChain. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if req.Height > int64(c.initialHeight) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// abort any running OE | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
c.optimisticExec.Abort() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
//c.setState(execModeFinalize, header) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: remove comment |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+438
to
+448
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential nil pointer dereference in In Apply this diff to add a nil check before calling if req.Height > int64(c.initialHeight) {
// abort any running OE
- c.optimisticExec.Abort()
+ if c.optimisticExec != nil {
+ c.optimisticExec.Abort()
+ }
//c.setState(execModeFinalize, header)
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ciCtx := contextWithCometInfo(ctx, comet.Info{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Evidence: toCoreEvidence(req.Misbehavior), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ValidatorsHash: req.NextValidatorsHash, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -436,6 +465,17 @@ func (c *Consensus[T]) ProcessProposal( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Only execute optimistic execution if the proposal is accepted, OE is | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// enabled and the block height is greater than the initial height. During | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// the first block we'll be carrying state from InitChain, so it would be | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// impossible for us to easily revert. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// After the first block has been processed, the next blocks will get executed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// can have a response ready. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if c.optimisticExec.Enabled() && req.Height > int64(c.initialHeight) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
c.optimisticExec.Execute(req) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+464
to
+474
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential nil pointer dereference when checking In Apply this diff to add a nil check: if c.optimisticExec != nil && c.optimisticExec.Enabled() && req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return &abciproto.ProcessProposalResponse{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Status: abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}, nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -446,6 +486,29 @@ func (c *Consensus[T]) ProcessProposal( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (c *Consensus[T]) FinalizeBlock( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ctx context.Context, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
req *abciproto.FinalizeBlockRequest, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) (*abciproto.FinalizeBlockResponse, error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if c.optimisticExec.Initialized() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// check if the hash we got is the same as the one we are executing | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
aborted := c.optimisticExec.AbortIfNeeded(req.Hash) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Wait for the OE to finish, regardless of whether it was aborted or not | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
res, err := c.optimisticExec.WaitResult() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Check warning Code scanning / CodeQL Useless assignment to local variable Warning
This definition of err is never used.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// only return if we are not aborting | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if !aborted { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return res, err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if it was aborted, we need to reset the state | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: remove comment, as we are always resetting |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
c.optimisticExec.Reset() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential nil pointer dereference in In Apply this diff to add a nil check before calling if c.optimisticExec != nil && c.optimisticExec.Initialized() {
// Existing code...
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return c.internalFinalizeBlock(ctx, req) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't comment on the right line below, but |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (c *Consensus[T]) internalFinalizeBlock( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ctx context.Context, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
req *abciproto.FinalizeBlockRequest, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) (*abciproto.FinalizeBlockResponse, error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if err := c.validateFinalizeBlockHeight(req); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -2,8 +2,10 @@ package cometbft | |||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||
"cosmossdk.io/server/v2/cometbft/oe" | ||||||||||||||||||||||||||||
"crypto/sha256" | ||||||||||||||||||||||||||||
"encoding/json" | ||||||||||||||||||||||||||||
"errors" | ||||||||||||||||||||||||||||
"io" | ||||||||||||||||||||||||||||
"strings" | ||||||||||||||||||||||||||||
"testing" | ||||||||||||||||||||||||||||
|
@@ -55,10 +57,10 @@ func getQueryRouterBuilder[T any, PT interface { | |||||||||||||||||||||||||||
*T | ||||||||||||||||||||||||||||
proto.Message | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
U any, UT interface { | ||||||||||||||||||||||||||||
*U | ||||||||||||||||||||||||||||
proto.Message | ||||||||||||||||||||||||||||
}]( | ||||||||||||||||||||||||||||
U any, UT interface { | ||||||||||||||||||||||||||||
*U | ||||||||||||||||||||||||||||
proto.Message | ||||||||||||||||||||||||||||
}]( | ||||||||||||||||||||||||||||
t *testing.T, | ||||||||||||||||||||||||||||
handler func(ctx context.Context, msg PT) (UT, error), | ||||||||||||||||||||||||||||
) *stf.MsgRouterBuilder { | ||||||||||||||||||||||||||||
|
@@ -85,10 +87,10 @@ func getMsgRouterBuilder[T any, PT interface { | |||||||||||||||||||||||||||
*T | ||||||||||||||||||||||||||||
transaction.Msg | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
U any, UT interface { | ||||||||||||||||||||||||||||
*U | ||||||||||||||||||||||||||||
transaction.Msg | ||||||||||||||||||||||||||||
}]( | ||||||||||||||||||||||||||||
U any, UT interface { | ||||||||||||||||||||||||||||
*U | ||||||||||||||||||||||||||||
transaction.Msg | ||||||||||||||||||||||||||||
}]( | ||||||||||||||||||||||||||||
t *testing.T, | ||||||||||||||||||||||||||||
handler func(ctx context.Context, msg PT) (UT, error), | ||||||||||||||||||||||||||||
) *stf.MsgRouterBuilder { | ||||||||||||||||||||||||||||
|
@@ -715,3 +717,76 @@ func assertStoreLatestVersion(t *testing.T, store types.Store, target uint64) { | |||||||||||||||||||||||||||
require.NoError(t, err) | ||||||||||||||||||||||||||||
require.Equal(t, target, commitInfo.Version) | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
func TestOptimisticExecution(t *testing.T) { | ||||||||||||||||||||||||||||
c := setUpConsensus(t, 100_000, mempool.NoOpMempool[mock.Tx]{}) | ||||||||||||||||||||||||||||
// Set up handlers | ||||||||||||||||||||||||||||
c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// mock optimistic execution | ||||||||||||||||||||||||||||
calledTimes := 0 | ||||||||||||||||||||||||||||
optimisticMockFunc := func(_ context.Context, _ *abciproto.FinalizeBlockRequest) (*abciproto.FinalizeBlockResponse, error) { | ||||||||||||||||||||||||||||
calledTimes++ | ||||||||||||||||||||||||||||
return nil, errors.New("test error") | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
c.SetOptimisticExecution(oe.NewOptimisticExecution(log.NewNopLogger(), optimisticMockFunc)) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
_, err := c.InitChain(context.Background(), &abciproto.InitChainRequest{ | ||||||||||||||||||||||||||||
Time: time.Now(), | ||||||||||||||||||||||||||||
ChainId: "test", | ||||||||||||||||||||||||||||
InitialHeight: 1, | ||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||
require.NoError(t, err) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
_, err = c.FinalizeBlock(context.Background(), &abciproto.FinalizeBlockRequest{ | ||||||||||||||||||||||||||||
Time: time.Now(), | ||||||||||||||||||||||||||||
Height: 1, | ||||||||||||||||||||||||||||
Txs: [][]byte{mockTx.Bytes()}, | ||||||||||||||||||||||||||||
Hash: emptyHash[:], | ||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||
require.NoError(t, err) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
theHash := sha256.Sum256([]byte("test")) | ||||||||||||||||||||||||||||
ppReq := &abciproto.ProcessProposalRequest{ | ||||||||||||||||||||||||||||
Height: 2, | ||||||||||||||||||||||||||||
Hash: theHash[:], | ||||||||||||||||||||||||||||
Time: time.Now(), | ||||||||||||||||||||||||||||
Txs: [][]byte{mockTx.Bytes()}, | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Start optimistic execution | ||||||||||||||||||||||||||||
resp, err := c.ProcessProposal(context.Background(), ppReq) | ||||||||||||||||||||||||||||
require.NoError(t, err) | ||||||||||||||||||||||||||||
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Initialize FinalizeBlock with correct hash - should use optimistic result | ||||||||||||||||||||||||||||
theHash = sha256.Sum256([]byte("test")) | ||||||||||||||||||||||||||||
fbReq := &abciproto.FinalizeBlockRequest{ | ||||||||||||||||||||||||||||
Height: 2, | ||||||||||||||||||||||||||||
Hash: theHash[:], | ||||||||||||||||||||||||||||
Time: ppReq.Time, | ||||||||||||||||||||||||||||
Txs: ppReq.Txs, | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
fbResp, err := c.FinalizeBlock(context.Background(), fbReq) | ||||||||||||||||||||||||||||
require.Error(t, err) | ||||||||||||||||||||||||||||
require.ErrorContains(t, err, "test error") // from optimisticMockFunc | ||||||||||||||||||||||||||||
require.Equal(t, 1, calledTimes) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Comment on lines
+780
to
+784
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix ineffectual assignment The - fbResp, err := c.FinalizeBlock(context.Background(), fbReq)
+ _, err = c.FinalizeBlock(context.Background(), fbReq) 📝 Committable suggestion
Suggested change
🧰 Tools🪛 golangci-lint (1.62.2)780-780: ineffectual assignment to fbResp (ineffassign) |
||||||||||||||||||||||||||||
resp, err = c.ProcessProposal(context.Background(), ppReq) | ||||||||||||||||||||||||||||
require.NoError(t, err) | ||||||||||||||||||||||||||||
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
theWrongHash := sha256.Sum256([]byte("wrong_hash")) | ||||||||||||||||||||||||||||
fbReq.Hash = theWrongHash[:] | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Initialize FinalizeBlock with wrong hash - should abort optimistic execution | ||||||||||||||||||||||||||||
// Because is aborted, the result comes from the normal execution | ||||||||||||||||||||||||||||
fbResp, err = c.FinalizeBlock(context.Background(), fbReq) | ||||||||||||||||||||||||||||
require.NotNil(t, fbResp) | ||||||||||||||||||||||||||||
require.NoError(t, err) | ||||||||||||||||||||||||||||
require.Equal(t, 2, calledTimes) | ||||||||||||||||||||||||||||
Comment on lines
+792
to
+797
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add assertions for optimistic execution state The test should verify the optimistic execution state immediately after the wrong hash is processed, before the final assertion. // Initialize FinalizeBlock with wrong hash - should abort optimistic execution
// Because is aborted, the result comes from the normal execution
fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
require.NotNil(t, fbResp)
require.NoError(t, err)
require.Equal(t, 2, calledTimes)
+require.False(t, c.optimisticExec.Initialized(), "optimistic execution should be reset immediately after wrong hash") 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Verify optimistic execution was reset | ||||||||||||||||||||||||||||
require.False(t, c.optimisticExec.Initialized()) | ||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,160 @@ | ||||||||||||||||||||||||||||
package oe | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||
"bytes" | ||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||
"encoding/hex" | ||||||||||||||||||||||||||||
"math/rand" | ||||||||||||||||||||||||||||
"sync" | ||||||||||||||||||||||||||||
"time" | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
abci "github.com/cometbft/cometbft/api/cometbft/abci/v1" | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
"cosmossdk.io/log" | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// FinalizeBlockFunc is the function that is called by the OE to finalize the | ||||||||||||||||||||||||||||
// block. It is the same as the one in the ABCI app. | ||||||||||||||||||||||||||||
type FinalizeBlockFunc func(context.Context, *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// OptimisticExecution is a struct that contains the OE context. It is used to | ||||||||||||||||||||||||||||
// run the FinalizeBlock function in a goroutine, and to abort it if needed. | ||||||||||||||||||||||||||||
type OptimisticExecution struct { | ||||||||||||||||||||||||||||
finalizeBlockFunc FinalizeBlockFunc // ABCI FinalizeBlock function with a context | ||||||||||||||||||||||||||||
logger log.Logger | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
mtx sync.Mutex | ||||||||||||||||||||||||||||
stopCh chan struct{} | ||||||||||||||||||||||||||||
request *abci.FinalizeBlockRequest | ||||||||||||||||||||||||||||
response *abci.FinalizeBlockResponse | ||||||||||||||||||||||||||||
err error | ||||||||||||||||||||||||||||
cancelFunc func() // cancel function for the context | ||||||||||||||||||||||||||||
initialized bool // A boolean value indicating whether the struct has been initialized | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// debugging/testing options | ||||||||||||||||||||||||||||
abortRate int // number from 0 to 100 that determines the percentage of OE that should be aborted | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// NewOptimisticExecution initializes the Optimistic Execution context but does not start it. | ||||||||||||||||||||||||||||
func NewOptimisticExecution(logger log.Logger, fn FinalizeBlockFunc, opts ...func(*OptimisticExecution)) *OptimisticExecution { | ||||||||||||||||||||||||||||
logger = logger.With(log.ModuleKey, "oe") | ||||||||||||||||||||||||||||
oe := &OptimisticExecution{logger: logger, finalizeBlockFunc: fn} | ||||||||||||||||||||||||||||
for _, opt := range opts { | ||||||||||||||||||||||||||||
opt(oe) | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
return oe | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// WithAbortRate sets the abort rate for the OE. The abort rate is a number from | ||||||||||||||||||||||||||||
// 0 to 100 that determines the percentage of OE that should be aborted. | ||||||||||||||||||||||||||||
// This is for testing purposes only and must not be used in production. | ||||||||||||||||||||||||||||
func WithAbortRate(rate int) func(*OptimisticExecution) { | ||||||||||||||||||||||||||||
return func(oe *OptimisticExecution) { | ||||||||||||||||||||||||||||
oe.abortRate = rate | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Reset resets the OE context. Must be called whenever we want to invalidate | ||||||||||||||||||||||||||||
// the current OE. | ||||||||||||||||||||||||||||
func (oe *OptimisticExecution) Reset() { | ||||||||||||||||||||||||||||
oe.mtx.Lock() | ||||||||||||||||||||||||||||
defer oe.mtx.Unlock() | ||||||||||||||||||||||||||||
oe.request = nil | ||||||||||||||||||||||||||||
oe.response = nil | ||||||||||||||||||||||||||||
oe.err = nil | ||||||||||||||||||||||||||||
oe.initialized = false | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
func (oe *OptimisticExecution) Enabled() bool { | ||||||||||||||||||||||||||||
return oe != nil | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Initialized returns true if the OE was initialized, meaning that it contains | ||||||||||||||||||||||||||||
// a request and it was run or it is running. | ||||||||||||||||||||||||||||
func (oe *OptimisticExecution) Initialized() bool { | ||||||||||||||||||||||||||||
if oe == nil { | ||||||||||||||||||||||||||||
return false | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
oe.mtx.Lock() | ||||||||||||||||||||||||||||
defer oe.mtx.Unlock() | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
return oe.initialized | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Execute initializes the OE and starts it in a goroutine. | ||||||||||||||||||||||||||||
func (oe *OptimisticExecution) Execute(req *abci.ProcessProposalRequest) { | ||||||||||||||||||||||||||||
oe.mtx.Lock() | ||||||||||||||||||||||||||||
defer oe.mtx.Unlock() | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
oe.stopCh = make(chan struct{}) | ||||||||||||||||||||||||||||
oe.request = &abci.FinalizeBlockRequest{ | ||||||||||||||||||||||||||||
Txs: req.Txs, | ||||||||||||||||||||||||||||
DecidedLastCommit: req.ProposedLastCommit, | ||||||||||||||||||||||||||||
Misbehavior: req.Misbehavior, | ||||||||||||||||||||||||||||
Hash: req.Hash, | ||||||||||||||||||||||||||||
Height: req.Height, | ||||||||||||||||||||||||||||
Time: req.Time, | ||||||||||||||||||||||||||||
NextValidatorsHash: req.NextValidatorsHash, | ||||||||||||||||||||||||||||
ProposerAddress: req.ProposerAddress, | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
oe.logger.Debug("OE started", "height", req.Height, "hash", hex.EncodeToString(req.Hash), "time", req.Time.String()) | ||||||||||||||||||||||||||||
ctx, cancel := context.WithCancel(context.Background()) | ||||||||||||||||||||||||||||
oe.cancelFunc = cancel | ||||||||||||||||||||||||||||
oe.initialized = true | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
go func() { | ||||||||||||||||||||||||||||
start := time.Now() | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
resp, err := oe.finalizeBlockFunc(ctx, oe.request) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
oe.mtx.Lock() | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
executionTime := time.Since(start) | ||||||||||||||||||||||||||||
oe.logger.Debug("OE finished", "duration", executionTime.String(), "height", oe.request.Height, "hash", hex.EncodeToString(oe.request.Hash)) | ||||||||||||||||||||||||||||
oe.response, oe.err = resp, err | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
close(oe.stopCh) | ||||||||||||||||||||||||||||
oe.mtx.Unlock() | ||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// AbortIfNeeded aborts the OE if the request hash is not the same as the one in | ||||||||||||||||||||||||||||
// the running OE. Returns true if the OE was aborted. | ||||||||||||||||||||||||||||
func (oe *OptimisticExecution) AbortIfNeeded(reqHash []byte) bool { | ||||||||||||||||||||||||||||
if oe == nil { | ||||||||||||||||||||||||||||
return false | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
oe.mtx.Lock() | ||||||||||||||||||||||||||||
defer oe.mtx.Unlock() | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if !bytes.Equal(oe.request.Hash, reqHash) { | ||||||||||||||||||||||||||||
oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height) | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct the logging of In the error log within Adjust the log statement to use the correct height for oe.logger.Error("OE aborted due to hash mismatch",
"oe_hash", hex.EncodeToString(oe.request.Hash),
"req_hash", hex.EncodeToString(reqHash),
"oe_height", oe.request.Height,
- "req_height", oe.request.Height)
+ "req_height", currentRequestHeight) If
|
||||||||||||||||||||||||||||
oe.cancelFunc() | ||||||||||||||||||||||||||||
return true | ||||||||||||||||||||||||||||
Comment on lines
+145
to
+148
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prevent potential nil pointer dereference when accessing In the Consider adding a nil check for + if oe.request == nil {
+ oe.logger.Error("OE aborted due to missing request")
+ oe.cancelFunc()
+ return true
+ }
if !bytes.Equal(oe.request.Hash, reqHash) {
oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height)
oe.cancelFunc()
return true
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||
} else if oe.abortRate > 0 && rand.Intn(100) < oe.abortRate { | ||||||||||||||||||||||||||||
// this is for test purposes only, we can emulate a certain percentage of | ||||||||||||||||||||||||||||
// OE needed to be aborted. | ||||||||||||||||||||||||||||
oe.cancelFunc() | ||||||||||||||||||||||||||||
oe.logger.Error("OE aborted due to test abort rate") | ||||||||||||||||||||||||||||
return true | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
return false | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Abort aborts the OE unconditionally and waits for it to finish. | ||||||||||||||||||||||||||||
func (oe *OptimisticExecution) Abort() { | ||||||||||||||||||||||||||||
if oe == nil || oe.cancelFunc == nil { | ||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
oe.cancelFunc() | ||||||||||||||||||||||||||||
<-oe.stopCh | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// WaitResult waits for the OE to finish and returns the result. | ||||||||||||||||||||||||||||
func (oe *OptimisticExecution) WaitResult() (*abci.FinalizeBlockResponse, error) { | ||||||||||||||||||||||||||||
<-oe.stopCh | ||||||||||||||||||||||||||||
return oe.response, oe.err | ||||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will need to move lower, otherwise linting will complain