Skip to content

Commit

Permalink
Merge seiv2 into main (#232)
Browse files Browse the repository at this point in the history
* Make ReadMaxTxs atomic (#166)

* Support pending transaction in mempool (#169)

* fix unconfirmed tx to consider pending txs (#172)

* fix pending pop (#173)

* add TTL for pending txs (#174)

* [EVM] Fix evm pending nonce (#179)

* Perf: Increase buffer size for pubsub server to boost performance (#167)

* Increase buffer size for pubsub server

* Add more timeout for test failure

* Add more timeout

* Fix test split scripts

* Fix test split

* Fix unit test

* Unit test

* Unit test

* [P2P] Optimize block pool requester retry and peer pick up logic (#170)

* P2P Improvements: Fix block sync reactor and block pool retry logic

* Revert "Add event data to result event (#165)" (#176)

This reverts commit 72bb29c.

* Fix block sync auto restart not working as expected (#175)

* Fix edge case for blocksync (#178)

* fix evm pending nonce

* fix test

* deflake a test

* de-flake test

* Revert "merge main"

This reverts commit 58b9424, reversing
changes made to 02d1478.

* consider keep-in-cache logic when removing from cache

* undo test tweaks

---------

Co-authored-by: Yiming Zang <[email protected]>
Co-authored-by: Jeremy Wei <[email protected]>

* Fix bug when popping pending TXs (#188)

* Add mempool metrics for number of pending tx and expired txs (#189)

* Add metrics for mempool pending transaction size

* Add expired tx count metrics

* [EVM] Allow multiple txs from same account in a block (#190)

* add mempool prioritization with evm nonce

* fix priority stability

* index fixes

* replace with binary search insert

* impl binary search

* fix removeTx to push next queued evm tx (#191)

* fix expire metric (#193)

* [EVM] Fix duplicate evm txs from priority queue (#195)

* debug duplicate evm tx

* add more logs

* add some \ns

* more logs

* fix swap check

* add-lockable-reap-by-gas

* add invariant checks

* fix invariant parenthesis

* fix log

* remove invalid invariant

* fix nonce ordering pain

* handle ordering of insert

* fix remove

* cleanup

* fix imports

* cleanup

* avoid getTransactionByHash(hash) panic due to index

* use Key() to compare instead of pointer

* [EVM] prevent duplicate txs from getting inserted (#196)

* prevent duplicates in mempool

* use timestamp in priority queue

* [EVM] Add logging for expiration (#198)

* add logging for expired txs

* cleanup

* [EVM] Avoid returning nil transactions on ForEach (#197)

* remove heapIndex to avoid nil scenario

* avoid returning nil in loop (mimic Peek)

* call callback from mempool (#200)

* separate limit for pending tx (#202)

* Add EVM txs eviction logic (#204)

* Fix debug log (#205)

* EVM transaction replacement (#206) (#208)

* Add heapIndex with safety check (#213)

* add heapIndex with safety check

* cleanup

* comment out for perf test

* add back perf improvement

* fix nil test

* Use write-lock in (*TxPriorityQueue).ReapMax funcs (#209)

ReapMaxBytesMaxGas and ReapMaxTxs funcs in TxPriorityQueue claim
> Transactions returned are not removed from the mempool transaction
> store or indexes.

However, they use a priority queue to accomplish the claim
> Transaction are retrieved in priority order.

This is accomplished by popping all items out of the whole heap, and
then pushing then back in sequentially. A copy of the heap cannot be
obtained otherwise. Both of the mentioned functions use a read-lock
(RLock) when doing this. This results in a potential scenario where
multiple executions of the ReapMax can be started in parallel, and
both would be popping items out of the priority queue.

In practice, this can be abused by executing the `unconfirmed_txs` RPC
call repeatedly. Based on our observations, running it multiple times
per millisecond results in multiple threads picking it up at the same
time. Such a scenario can be obtained via the WebSocket interface, and
spamming `unconfirmed_txs` calls there. The behavior that happens is a
`Panic in WSJSONRPC handler` when a queue item unexpectedly disappears
for `mempool.(*TxPriorityQueue).Swap`.
(`runtime error: index out of range [0] with length 0`)

This can additionally lead to a `CONSENSUS FAILURE!!!` if the race
condition occurs for `internal/consensus.(*State).finalizeCommit`
when it tries to do `mempool.(*TxPriorityQueue).RemoveTx`, but
the ReapMax has already removed all elements from the underlying
heap. (`runtime error: index out of range [-1]`)

This commit switches the lock type to a write-lock (Lock) to ensure
no parallel modifications take place. This commit additionally updates
the tests to allow parallel execution of the func calls in testing,
as to prevent regressions (in case someone wants to downgrade the locks
without considering the implications from the underlying heap usage).

---------

Co-authored-by: Valters Jansons <[email protected]>

* Pending Txs Update Condition (#214)

* Add metrics for mempool size changes (#220)

* [EVM] Adjust locking for replacement (#224)

* Remove tx from cache when canAddPendingTx fails (#230)

* add tx hash to evm info proto (#231)

---------

Co-authored-by: codchen <[email protected]>
Co-authored-by: Steven Landers <[email protected]>
Co-authored-by: Yiming Zang <[email protected]>
Co-authored-by: Jeremy Wei <[email protected]>
Co-authored-by: Valters Jansons <[email protected]>
Co-authored-by: Kartik Bhat <[email protected]>
  • Loading branch information
7 people authored Apr 19, 2024
1 parent f49d0f6 commit 754c03a
Show file tree
Hide file tree
Showing 36 changed files with 2,263 additions and 440 deletions.
5 changes: 3 additions & 2 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ func (cli *grpcClient) Info(ctx context.Context, params *types.RequestInfo) (*ty
return cli.client.Info(ctx, types.ToRequestInfo(params).GetInfo(), grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true))
func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
resCheckTx, err := cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true))
return &types.ResponseCheckTxV2{ResponseCheckTx: resCheckTx}, err
}

func (cli *grpcClient) Query(ctx context.Context, params *types.RequestQuery) (*types.ResponseQuery, error) {
Expand Down
8 changes: 4 additions & 4 deletions abci/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,12 @@ func (cli *socketClient) Info(ctx context.Context, req *types.RequestInfo) (*typ
return res.GetInfo(), nil
}

func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
res, err := cli.doRequest(ctx, types.ToRequestCheckTx(req))
if err != nil {
return nil, err
}
return res.GetCheckTx(), nil
return &types.ResponseCheckTxV2{ResponseCheckTx: res.GetCheckTx()}, nil
}

func (cli *socketClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
Expand Down
4 changes: 2 additions & 2 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ func (app *Application) FinalizeBlock(_ context.Context, req *types.RequestFinal
return &types.ResponseFinalizeBlock{TxResults: respTxs, ValidatorUpdates: app.ValUpdates, AppHash: appHash}, nil
}

func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}, nil
func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
return &types.ResponseCheckTxV2{ResponseCheckTx: &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}}, nil
}

func (app *Application) Commit(_ context.Context) (*types.ResponseCommit, error) {
Expand Down
8 changes: 8 additions & 0 deletions abci/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,11 @@ func (app *gRPCApplication) Flush(_ context.Context, req *types.RequestFlush) (*
func (app *gRPCApplication) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) {
return app.Application.Commit(ctx)
}

func (app *gRPCApplication) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
resV2, err := app.Application.CheckTx(ctx, req)
if err != nil {
return &types.ResponseCheckTx{}, err
}
return resV2.ResponseCheckTx, nil
}
6 changes: 3 additions & 3 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Application interface {
Query(context.Context, *RequestQuery) (*ResponseQuery, error) // Query for state

// Mempool Connection
CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTx, error) // Validate a tx for the mempool
CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTxV2, error) // Validate a tx for the mempool

// Consensus Connection
InitChain(context.Context, *RequestInitChain) (*ResponseInitChain, error) // Initialize blockchain w validators/other info from TendermintCore
Expand Down Expand Up @@ -51,8 +51,8 @@ func (BaseApplication) Info(_ context.Context, req *RequestInfo) (*ResponseInfo,
return &ResponseInfo{}, nil
}

func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) {
return &ResponseCheckTx{Code: CodeTypeOK}, nil
func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTxV2, error) {
return &ResponseCheckTxV2{ResponseCheckTx: &ResponseCheckTx{Code: CodeTypeOK}}, nil
}

func (BaseApplication) Commit(_ context.Context) (*ResponseCommit, error) {
Expand Down
4 changes: 2 additions & 2 deletions abci/types/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func ToResponseInfo(res *ResponseInfo) *Response {
}
}

func ToResponseCheckTx(res *ResponseCheckTx) *Response {
func ToResponseCheckTx(res *ResponseCheckTxV2) *Response {
return &Response{
Value: &Response_CheckTx{res},
Value: &Response_CheckTx{res.ResponseCheckTx},
}
}

Expand Down
8 changes: 4 additions & 4 deletions abci/types/mocks/application.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,28 @@ func MarshalTxResults(r []*ExecTxResult) ([][]byte, error) {
}
return s, nil
}

type PendingTxCheckerResponse int

const (
Accepted PendingTxCheckerResponse = iota
Rejected
Pending
)

type PendingTxChecker func() PendingTxCheckerResponse
type ExpireTxHandler func()

// ResponseCheckTxV2 response type contains non-protobuf fields, so non-local ABCI clients will not be able
// to utilize the new fields in V2 type (but still be backwards-compatible)
type ResponseCheckTxV2 struct {
*ResponseCheckTx
IsPendingTransaction bool
Checker PendingTxChecker // must not be nil if IsPendingTransaction is true
ExpireTxHandler ExpireTxHandler

// helper properties for prioritization in mempool
EVMNonce uint64
EVMSenderAddress string
IsEVM bool
}
Loading

0 comments on commit 754c03a

Please sign in to comment.