Skip to content

Commit

Permalink
feat: add SubscribeFullPendingTransactions
Browse files Browse the repository at this point in the history
This enables the full transaction body subscription, which was already
implemented in go-ethererum with this
PR(ethereum/go-ethereum#25186)
  • Loading branch information
yuichiroaoki committed Jun 1, 2023
1 parent c404b3f commit a6ba555
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 45 deletions.
38 changes: 24 additions & 14 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type filter struct {
typ Type
deadline *time.Timer // filter is inactiv when deadline triggers
hashes []common.Hash
txs []*types.Transaction
crit FilterCriteria
logs []*types.Log
s *Subscription // associated subscription in event system
Expand Down Expand Up @@ -104,7 +105,7 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
}
}

// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
// NewPendingTransactionFilter creates a filter that fetches pending transactions
// as transactions enter the pending state.
//
// It is part of the filter package because this filter can be used through the
Expand All @@ -113,21 +114,21 @@ func (api *PublicFilterAPI) timeoutLoop(timeout time.Duration) {
// https://eth.wiki/json-rpc/API#eth_newpendingtransactionfilter
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
var (
pendingTxs = make(chan []common.Hash)
pendingTxs = make(chan []*types.Transaction)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
)

api.filtersMu.Lock()
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
api.filtersMu.Unlock()

go func() {
for {
select {
case ph := <-pendingTxs:
case pTx := <-pendingTxs:
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found {
f.hashes = append(f.hashes, ph...)
f.txs = append(f.txs, pTx...)
}
api.filtersMu.Unlock()
case <-pendingTxSub.Err():
Expand All @@ -142,9 +143,10 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
return pendingTxSub.ID
}

// NewPendingTransactions creates a subscription that is triggered each time a transaction
// enters the transaction pool and was signed from one of the transactions this nodes manages.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
// NewPendingTransactions creates a subscription that is triggered each time a
// transaction enters the transaction pool. If fullTx is true the full tx is
// sent to the client, otherwise the hash is sent.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -153,16 +155,20 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su
rpcSub := notifier.CreateSubscription()

go func() {
txHashes := make(chan []common.Hash, 128)
pendingTxSub := api.events.SubscribePendingTxs(txHashes)
txs := make(chan []*types.Transaction, 128)
pendingTxSub := api.events.SubscribePendingTxs(txs)

for {
select {
case hashes := <-txHashes:
case txs := <-txs:
// To keep the original behaviour, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
for _, h := range hashes {
notifier.Notify(rpcSub.ID, h)
for _, tx := range txs {
if fullTx != nil && *fullTx {
notifier.Notify(rpcSub.ID, tx)
} else {
notifier.Notify(rpcSub.ID, tx.Hash())
}
}
case <-rpcSub.Err():
pendingTxSub.Unsubscribe()
Expand Down Expand Up @@ -486,10 +492,14 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
f.deadline.Reset(api.timeout)

switch f.typ {
case PendingTransactionsSubscription, BlocksSubscription:
case BlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
case PendingTransactionsSubscription:
txs := f.txs
f.txs = nil
return txs, nil
case LogsSubscription, MinedAndPendingLogsSubscription:
logs := f.logs
f.logs = nil
Expand Down
3 changes: 1 addition & 2 deletions eth/filters/bor_filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package filters
import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
Expand All @@ -22,7 +21,7 @@ func (es *EventSystem) SubscribeNewDeposits(data chan *types.StateSyncData) *Sub
typ: StateSyncSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
stateSyncData: data,
installed: make(chan struct{}),
Expand Down
28 changes: 12 additions & 16 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ const (
PendingLogsSubscription
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
MinedAndPendingLogsSubscription
// PendingTransactionsSubscription queries tx hashes for pending
// transactions entering the pending state
// PendingTransactionsSubscription queries for pending transactions entering
// the pending state
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
Expand Down Expand Up @@ -78,7 +78,7 @@ type subscription struct {
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
txs chan []*types.Transaction
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
Expand Down Expand Up @@ -177,7 +177,7 @@ func (sub *Subscription) Unsubscribe() {
case sub.es.uninstall <- sub.f:
break uninstallLoop
case <-sub.f.logs:
case <-sub.f.hashes:
case <-sub.f.txs:
case <-sub.f.headers:
}
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
Expand All @@ -261,7 +261,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
Expand All @@ -278,7 +278,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
txs: make(chan []*types.Transaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
Expand All @@ -294,23 +294,23 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
txs: make(chan []*types.Transaction),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribePendingTxs creates a subscription that writes transaction hashes for
// SubscribePendingTxs creates a subscription that writes transactions for
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
txs: txs,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
Expand Down Expand Up @@ -354,12 +354,8 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog
}

func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
hashes := make([]common.Hash, 0, len(ev.Txs))
for _, tx := range ev.Txs {
hashes = append(hashes, tx.Hash())
}
for _, f := range filters[PendingTransactionsSubscription] {
f.hashes <- hashes
f.txs <- ev.Txs
}
}

Expand Down
22 changes: 11 additions & 11 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestPendingTxFilter(t *testing.T) {
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
}

hashes []common.Hash
txs []*types.Transaction
)

fid0 := api.NewPendingTransactionFilter()
Expand All @@ -128,9 +128,9 @@ func TestPendingTxFilter(t *testing.T) {
t.Fatalf("Unable to retrieve logs: %v", err)
}

h := results.([]common.Hash)
hashes = append(hashes, h...)
if len(hashes) >= len(transactions) {
tx := results.([]*types.Transaction)
txs = append(txs, tx...)
if len(txs) >= len(transactions) {
break
}
// check timeout
Expand All @@ -141,13 +141,13 @@ func TestPendingTxFilter(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

if len(hashes) != len(transactions) {
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
if len(txs) != len(transactions) {
t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(txs))
return
}
for i := range hashes {
if hashes[i] != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
for i := range txs {
if txs[i].Hash() != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash())
}
}
}
Expand Down Expand Up @@ -575,11 +575,11 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
fids[i] = fid
// Wait for at least one tx to arrive in filter
for {
hashes, err := api.GetFilterChanges(fid)
txs, err := api.GetFilterChanges(fid)
if err != nil {
t.Fatalf("Filter should exist: %v\n", err)
}
if len(hashes.([]common.Hash)) > 0 {
if len(txs.([]*types.Transaction)) > 0 {
break
}
runtime.Gosched()
Expand Down
7 changes: 6 additions & 1 deletion ethclient/gethclient/gethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ func (ec *Client) GetNodeInfo(ctx context.Context) (*p2p.NodeInfo, error) {
return &result, err
}

// SubscribePendingTransactions subscribes to new pending transactions.
// SubscribeFullPendingTransactions subscribes to new pending transactions.
func (ec *Client) SubscribeFullPendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (*rpc.ClientSubscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true)
}

// SubscribePendingTransactions subscribes to new pending transaction hashes.
func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (*rpc.ClientSubscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions")
}
Expand Down
39 changes: 38 additions & 1 deletion ethclient/gethclient/gethclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ func TestGethClient(t *testing.T) {
"TestSetHead",
func(t *testing.T) { testSetHead(t, client) },
}, {
"TestSubscribePendingTxs",
"TestSubscribePendingTxHashes",
func(t *testing.T) { testSubscribePendingTransactions(t, client) },
}, {
"TestSubscribePendingTxs",
func(t *testing.T) { testSubscribeFullPendingTransactions(t, client) },
}, {
"TestCallContract",
func(t *testing.T) { testCallContract(t, client) },
Expand Down Expand Up @@ -285,6 +288,40 @@ func testSubscribePendingTransactions(t *testing.T, client *rpc.Client) {
}
}

func testSubscribeFullPendingTransactions(t *testing.T, client *rpc.Client) {
ec := New(client)
ethcl := ethclient.NewClient(client)
// Subscribe to Transactions
ch := make(chan *types.Transaction)
ec.SubscribeFullPendingTransactions(context.Background(), ch)
// Send a transaction
chainID, err := ethcl.ChainID(context.Background())
if err != nil {
t.Fatal(err)
}
// Create transaction
tx := types.NewTransaction(1, common.Address{1}, big.NewInt(1), 22000, big.NewInt(1), nil)
signer := types.LatestSignerForChainID(chainID)
signature, err := crypto.Sign(signer.Hash(tx).Bytes(), testKey)
if err != nil {
t.Fatal(err)
}
signedTx, err := tx.WithSignature(signer, signature)
if err != nil {
t.Fatal(err)
}
// Send transaction
err = ethcl.SendTransaction(context.Background(), signedTx)
if err != nil {
t.Fatal(err)
}
// Check that the transaction was send over the channel
tx = <-ch
if tx.Hash() != signedTx.Hash() {
t.Fatalf("Invalid tx hash received, got %v, want %v", tx.Hash(), signedTx.Hash())
}
}

func testCallContract(t *testing.T, client *rpc.Client) {
ec := New(client)
msg := ethereum.CallMsg{
Expand Down

0 comments on commit a6ba555

Please sign in to comment.