diff --git a/rpc/args/send_tx.go b/rpc/args/send_tx.go index 5d9fe58a2f..7515dbb565 100644 --- a/rpc/args/send_tx.go +++ b/rpc/args/send_tx.go @@ -5,7 +5,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" ) -// SendTxArgs represents the arguments to sumbit a new transaction into the transaction pool. +// SendTxArgs represents the arguments to submit a new transaction into the transaction pool. // Duplicate struct definition since geth struct is in internal package // Ref: https://github.com/ethereum/go-ethereum/blob/release/1.9/internal/ethapi/api.go#L1346 type SendTxArgs struct { diff --git a/rpc/filters.go b/rpc/filters.go index 936a691719..37280f3696 100644 --- a/rpc/filters.go +++ b/rpc/filters.go @@ -3,6 +3,7 @@ package rpc import ( "errors" "math/big" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -106,7 +107,7 @@ func (f *Filter) pollForBlocks() error { return errors.New("could not convert block hash to hexutil.Bytes") } - hash := common.BytesToHash([]byte(hashBytes)) + hash := common.BytesToHash(hashBytes) f.hashes = append(f.hashes, hash) prev = num @@ -115,11 +116,50 @@ func (f *Filter) pollForBlocks() error { } } +func (f *Filter) pollForTransactions() error { + for { + if f.stopped { + return nil + } + + txs, err := f.backend.PendingTransactions() + if err != nil { + return err + } + + for _, tx := range txs { + if !contains(f.hashes, tx.Hash) { + f.hashes = append(f.hashes, tx.Hash) + } + } + + <-time.After(1 * time.Second) + + } +} + +func contains(slice []common.Hash, item common.Hash) bool { + set := make(map[common.Hash]struct{}, len(slice)) + for _, s := range slice { + set[s] = struct{}{} + } + + _, ok := set[item] + return ok +} + // NewPendingTransactionFilter creates a new filter that notifies when a pending transaction arrives. func NewPendingTransactionFilter(backend Backend) *Filter { - // TODO: finish - filter := NewFilter(backend, nil) + filter := NewFilter(backend, &filters.FilterCriteria{}) filter.typ = pendingTxFilter + + go func() { + err := filter.pollForTransactions() + if err != nil { + filter.err = err + } + }() + return filter } @@ -140,7 +180,14 @@ func (f *Filter) getFilterChanges() (interface{}, error) { return blocks, nil case pendingTxFilter: - // TODO + if f.err != nil { + return nil, f.err + } + + txs := make([]common.Hash, len(f.hashes)) + copy(txs, f.hashes) + f.hashes = []common.Hash{} + return txs, nil case logFilter: return f.getFilterLogs() } diff --git a/rpc/tester/tester_test.go b/rpc/tester/tester_test.go index 1b71d5950e..ebacfdb1ff 100644 --- a/rpc/tester/tester_test.go +++ b/rpc/tester/tester_test.go @@ -1,7 +1,7 @@ // This is a test utility for Ethermint's Web3 JSON-RPC services. // // To run these tests please first ensure you have the emintd running -// and have started the RPC service with `emintcl rest-server`. +// and have started the RPC service with `emintcli rest-server`. // // You can configure the desired port (or host) below. @@ -491,3 +491,31 @@ func TestEth_GetLogs_Topics_AB(t *testing.T) { require.Equal(t, 1, len(logs)) } + +func TestEth_NewPendingTransactionFilter(t *testing.T) { + rpcRes, err := call(t, "eth_newPendingTransactionFilter", []string{}) + require.NoError(t, err) + + var code hexutil.Bytes + err = code.UnmarshalJSON(rpcRes.Result) + require.NoError(t, err) + require.NotNil(t, code) + + for i := 0; i < 5; i++ { + deployTestContractWithFunction(t) + } + + time.Sleep(10 * time.Second) + + // get filter changes + changesRes, err := call(t, "eth_getFilterChanges", []string{code.String()}) + require.NoError(t, err) + require.NotNil(t, changesRes) + + var txs []*hexutil.Bytes + err = json.Unmarshal(changesRes.Result, &txs) + require.NoError(t, err, string(changesRes.Result)) + + require.True(t, len(txs) >= 2, "could not get any txs", "changesRes.Result", string(changesRes.Result)) + +}