Skip to content

Commit

Permalink
eth/filters, ethclient/gethclient: add fullTx option to pending tx fi…
Browse files Browse the repository at this point in the history
…lter (ethereum#25186)

This PR adds a way to subscribe to the _full_ pending transactions, as opposed to just being notified about hashes. 

In use cases where client subscribes to newPendingTransactions and gets txhashes only to then request the actual transaction, the caller can now shortcut that flow and obtain the transactions directly. 


Co-authored-by: Felix Lange <[email protected]>
  • Loading branch information
lmittmann and fjl authored Oct 12, 2022
1 parent 010f47f commit 5b1a04b
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 43 deletions.
38 changes: 24 additions & 14 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type filter struct {
typ Type
deadline *time.Timer // filter is inactive when deadline triggers
hashes []common.Hash
txs []*types.Transaction
crit FilterCriteria
logs []*types.Log
s *Subscription // associated subscription in event system
Expand Down Expand Up @@ -96,28 +97,28 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
}
}

// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
// NewPendingTransactionFilter creates a filter that fetches pending transactions
// as transactions enter the pending state.
//
// It is part of the filter package because this filter can be used through the
// `eth_getFilterChanges` polling method that is also used for log filters.
func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID {
var (
pendingTxs = make(chan []common.Hash)
pendingTxs = make(chan []*types.Transaction)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
)

api.filtersMu.Lock()
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
api.filtersMu.Unlock()

go func() {
for {
select {
case ph := <-pendingTxs:
case pTx := <-pendingTxs:
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found {
f.hashes = append(f.hashes, ph...)
f.txs = append(f.txs, pTx...)
}
api.filtersMu.Unlock()
case <-pendingTxSub.Err():
Expand All @@ -132,9 +133,10 @@ func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID {
return pendingTxSub.ID
}

// NewPendingTransactions creates a subscription that is triggered each time a transaction
// enters the transaction pool and was signed from one of the transactions this nodes manages.
func (api *FilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
// NewPendingTransactions creates a subscription that is triggered each time a
// transaction enters the transaction pool. If fullTx is true the full tx is
// sent to the client, otherwise the hash is sent.
func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -143,16 +145,20 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscrip
rpcSub := notifier.CreateSubscription()

go func() {
txHashes := make(chan []common.Hash, 128)
pendingTxSub := api.events.SubscribePendingTxs(txHashes)
txs := make(chan []*types.Transaction, 128)
pendingTxSub := api.events.SubscribePendingTxs(txs)

for {
select {
case hashes := <-txHashes:
case txs := <-txs:
// To keep the original behaviour, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
for _, h := range hashes {
notifier.Notify(rpcSub.ID, h)
for _, tx := range txs {
if fullTx != nil && *fullTx {
notifier.Notify(rpcSub.ID, tx)
} else {
notifier.Notify(rpcSub.ID, tx.Hash())
}
}
case <-rpcSub.Err():
pendingTxSub.Unsubscribe()
Expand Down Expand Up @@ -411,10 +417,14 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
f.deadline.Reset(api.timeout)

switch f.typ {
case PendingTransactionsSubscription, BlocksSubscription:
case BlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
case PendingTransactionsSubscription:
txs := f.txs
f.txs = nil
return txs, nil
case LogsSubscription, MinedAndPendingLogsSubscription:
logs := f.logs
f.logs = nil
Expand Down
28 changes: 12 additions & 16 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ const (
PendingLogsSubscription
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
MinedAndPendingLogsSubscription
// PendingTransactionsSubscription queries tx hashes for pending
// transactions entering the pending state
// PendingTransactionsSubscription queries for pending transactions entering
// the pending state
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
Expand All @@ -151,7 +151,7 @@ type subscription struct {
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
txs chan []*types.Transaction
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
Expand Down Expand Up @@ -244,7 +244,7 @@ func (sub *Subscription) Unsubscribe() {
case sub.es.uninstall <- sub.f:
break uninstallLoop
case <-sub.f.logs:
case <-sub.f.hashes:
case <-sub.f.txs:
case <-sub.f.headers:
}
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
Expand All @@ -328,7 +328,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
Expand All @@ -345,7 +345,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
Expand All @@ -361,23 +361,23 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
txs: make(chan []*types.Transaction),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribePendingTxs creates a subscription that writes transaction hashes for
// SubscribePendingTxs creates a subscription that writes transactions for
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
txs: txs,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
Expand Down Expand Up @@ -421,12 +421,8 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog
}

func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
hashes := make([]common.Hash, 0, len(ev.Txs))
for _, tx := range ev.Txs {
hashes = append(hashes, tx.Hash())
}
for _, f := range filters[PendingTransactionsSubscription] {
f.hashes <- hashes
f.txs <- ev.Txs
}
}

Expand Down
22 changes: 11 additions & 11 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func TestPendingTxFilter(t *testing.T) {
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
}

hashes []common.Hash
txs []*types.Transaction
)

fid0 := api.NewPendingTransactionFilter()
Expand All @@ -255,9 +255,9 @@ func TestPendingTxFilter(t *testing.T) {
t.Fatalf("Unable to retrieve logs: %v", err)
}

h := results.([]common.Hash)
hashes = append(hashes, h...)
if len(hashes) >= len(transactions) {
tx := results.([]*types.Transaction)
txs = append(txs, tx...)
if len(txs) >= len(transactions) {
break
}
// check timeout
Expand All @@ -268,13 +268,13 @@ func TestPendingTxFilter(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

if len(hashes) != len(transactions) {
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
if len(txs) != len(transactions) {
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(txs))
return
}
for i := range hashes {
if hashes[i] != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
for i := range txs {
if txs[i].Hash() != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash())
}
}
}
Expand Down Expand Up @@ -705,11 +705,11 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
fids[i] = fid
// Wait for at least one tx to arrive in filter
for {
hashes, err := api.GetFilterChanges(fid)
txs, err := api.GetFilterChanges(fid)
if err != nil {
t.Fatalf("Filter should exist: %v\n", err)
}
if len(hashes.([]common.Hash)) > 0 {
if len(txs.([]*types.Transaction)) > 0 {
break
}
runtime.Gosched()
Expand Down
7 changes: 6 additions & 1 deletion ethclient/gethclient/gethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) {
return &result, err
}

// SubscribePendingTransactions subscribes to new pending transactions.
// SubscribeFullPendingTransactions subscribes to new pending transactions.
func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true)
}

// SubscribePendingTransactions subscribes to new pending transaction hashes.
func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions")
}
Expand Down
39 changes: 38 additions & 1 deletion ethclient/gethclient/gethclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,11 @@ func TestGethClient(t *testing.T) {
"TestSetHead",
func(t *testing.T) { testSetHead(t, client) },
}, {
"TestSubscribePendingTxs",
"TestSubscribePendingTxHashes",
func(t *testing.T) { testSubscribePendingTransactions(t, client) },
}, {
"TestSubscribePendingTxs",
func(t *testing.T) { testSubscribeFullPendingTransactions(t, client) },
}, {
"TestCallContract",
func(t *testing.T) { testCallContract(t, client) },
Expand Down Expand Up @@ -303,6 +306,40 @@ func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) {
}
}

func testSubscribeFullPendingTransactions(t *testing.T, client *rpc.Client) {
ec := New(client)
ethcl := ethclient.NewClient(client)
// Subscribe to Transactions
ch := make(chan *types.Transaction)
ec.SubscribeFullPendingTransactions(context.Background(), ch)
// Send a transaction
chainID, err := ethcl.ChainID(context.Background())
if err != nil {
t.Fatal(err)
}
// Create transaction
tx := types.NewTransaction(1, common.Address{1}, big.NewInt(1), 22000, big.NewInt(1), nil)
signer := types.LatestSignerForChainID(chainID)
signature, err := crypto.Sign(signer.Hash(tx).Bytes(), testKey)
if err != nil {
t.Fatal(err)
}
signedTx, err := tx.WithSignature(signer, signature)
if err != nil {
t.Fatal(err)
}
// Send transaction
err = ethcl.SendTransaction(context.Background(), signedTx)
if err != nil {
t.Fatal(err)
}
// Check that the transaction was send over the channel
tx = <-ch
if tx.Hash() != signedTx.Hash() {
t.Fatalf("Invalid tx hash received, got %v, want %v", tx.Hash(), signedTx.Hash())
}
}

func testCallContract(t *testing.T, client *rpc.Client) {
ec := New(client)
msg := ethereum.CallMsg{
Expand Down

0 comments on commit 5b1a04b

Please sign in to comment.