Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation for newPendingTransactions (eth_subscribe) #1787

Merged
merged 3 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions jsonrpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func (d *Dispatcher) handleSubscribe(req Request, conn wsConn) (string, Error) {
return "", NewInternalError(err.Error())
}
filterID = d.filterManager.NewLogFilter(logQuery, conn)
} else if subscribeMethod == "newPendingTransactions" {
filterID = d.filterManager.NewPendingTxFilter(conn)
} else {
return "", NewSubscriptionNotFoundError(subscribeMethod)
}
Expand Down
54 changes: 38 additions & 16 deletions jsonrpc/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/0xPolygon/polygon-edge/txpool/proto"
"github.com/0xPolygon/polygon-edge/types"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -60,29 +61,29 @@ func expectBatchJSONResult(data []byte, v interface{}) error {
func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) {
t.Parallel()

t.Run("clients should be able to receive \"newHeads\" event thru eth_subscribe", func(t *testing.T) {
store := newMockStore()
dispatcher := newTestDispatcher(t,
hclog.NewNullLogger(),
store,
&dispatcherParams{
chainID: 0,
priceLimit: 0,
jsonRPCBatchLengthLimit: 20,
blockRangeLimit: 1000,
},
)

t.Run("clients should be able to receive \"newHeads\" event through eth_subscribe", func(t *testing.T) {
t.Parallel()

store := newMockStore()
dispatcher := newTestDispatcher(t,
hclog.NewNullLogger(),
store,
&dispatcherParams{
chainID: 0,
priceLimit: 0,
jsonRPCBatchLengthLimit: 20,
blockRangeLimit: 1000,
},
)
mockConnection, msgCh := newMockWsConnWithMsgCh()

req := []byte(`{
"method": "eth_subscribe",
"params": ["newHeads"]
}`)
if _, err := dispatcher.HandleWs(req, mockConnection); err != nil {
t.Fatal(err)
}
_, err := dispatcher.HandleWs(req, mockConnection)
require.NoError(t, err)

store.emitEvent(&mockEvent{
NewChain: []*mockHeader{
Expand All @@ -100,6 +101,27 @@ func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) {
t.Fatal("\"newHeads\" event not received in 2 seconds")
}
})

t.Run("clients should be able to receive \"newPendingTransactions\" event through eth_subscribe", func(t *testing.T) {
t.Parallel()

mockConnection, msgCh := newMockWsConnWithMsgCh()

req := []byte(`{
"method": "eth_subscribe",
"params": ["newPendingTransactions"]
}`)
_, err := dispatcher.HandleWs(req, mockConnection)
require.NoError(t, err)

store.emitTxPoolEvent(proto.EventType_ADDED, "evt1")

select {
case <-msgCh:
case <-time.After(2 * time.Second):
t.Fatal("\"newPendingTransactions\" event not received in 2 seconds")
}
})
}

func TestDispatcher_WebsocketConnection_RequestFormats(t *testing.T) {
Expand Down Expand Up @@ -289,7 +311,7 @@ func TestDispatcherFuncDecode(t *testing.T) {
for _, c := range cases {
res := handleReq(c.typ, c.msg)
if !reflect.DeepEqual(res, c.res) {
t.Fatal("bad")
t.Fatal("no tx pool events received in the predefined time slot")
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions jsonrpc/eth_blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0xPolygon/polygon-edge/helper/hex"
"github.com/0xPolygon/polygon-edge/helper/progress"
"github.com/0xPolygon/polygon-edge/state/runtime"
"github.com/0xPolygon/polygon-edge/txpool/proto"
"github.com/0xPolygon/polygon-edge/types"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -589,6 +590,10 @@ func (m *mockBlockStore) FilterExtra(extra []byte) ([]byte, error) {
return extra, nil
}

func (m *mockBlockStore) TxPoolSubscribe(request *proto.SubscribeRequest) (<-chan *proto.TxPoolEvent, func(), error) {
return nil, nil, nil
}

func newTestBlock(number uint64, hash types.Hash) *types.Block {
return &types.Block{
Header: &types.Header{
Expand Down
165 changes: 155 additions & 10 deletions jsonrpc/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/0xPolygon/polygon-edge/blockchain"
"github.com/0xPolygon/polygon-edge/txpool/proto"
"github.com/0xPolygon/polygon-edge/types"
"github.com/google/uuid"
"github.com/gorilla/websocket"
Expand All @@ -25,6 +26,7 @@ var (
ErrIncorrectBlockRange = errors.New("incorrect range")
ErrBlockRangeTooHigh = errors.New("block range too high")
ErrNoWSConnection = errors.New("no websocket connection")
ErrUnknownSubscriptionType = errors.New("unknown subscription type")
)

// defaultTimeout is the timeout to remove the filters that don't have a web socket stream
Expand All @@ -35,6 +37,16 @@ const (
NoIndexInHeap = -1
)

// subscriptionType determines which event type the filter is subscribed to
type subscriptionType byte

const (
// Blocks represents subscription type for blockchain events
Blocks subscriptionType = iota
// PendingTransactions represents subscription type for tx pool events
PendingTransactions
)

// filter is an interface that BlockFilter and LogFilter implement
type filter interface {
// hasWSConn returns the flag indicating the filter has web socket stream
Expand All @@ -43,6 +55,9 @@ type filter interface {
// getFilterBase returns filterBase that has common fields
getFilterBase() *filterBase

// getSubscriptionType returns the type of the event the filter is subscribed to
getSubscriptionType() subscriptionType

// getUpdates returns stored data in a JSON serializable form
getUpdates() (interface{}, error)

Expand Down Expand Up @@ -159,6 +174,11 @@ func (f *blockFilter) sendUpdates() error {
return nil
}

// getSubscriptionType returns the type of the event the filter is subscribed to
func (f *blockFilter) getSubscriptionType() subscriptionType {
return Blocks
}

// logFilter is a filter to store logs that meet the conditions in query
type logFilter struct {
filterBase
Expand Down Expand Up @@ -212,6 +232,68 @@ func (f *logFilter) sendUpdates() error {
return nil
}

// getSubscriptionType returns the type of the event the filter is subscribed to
func (f *logFilter) getSubscriptionType() subscriptionType {
return Blocks
}

// pendingTxFilter is a filter to store pending tx
type pendingTxFilter struct {
filterBase
sync.Mutex

txHashes []string
}

// appendPendingTxHashes appends new pending tx hash to tx hashes
func (f *pendingTxFilter) appendPendingTxHashes(txHash string) {
f.Lock()
defer f.Unlock()

f.txHashes = append(f.txHashes, txHash)
}

// takePendingTxsUpdates returns all saved pending tx hashes in filter and sets a new slice
func (f *pendingTxFilter) takePendingTxsUpdates() []string {
f.Lock()
defer f.Unlock()

txHashes := f.txHashes
f.txHashes = []string{}

return txHashes
}

// getSubscriptionType returns the type of the event the filter is subscribed to
func (f *pendingTxFilter) getSubscriptionType() subscriptionType {
return PendingTransactions
}

// getUpdates returns stored pending tx hashes
func (f *pendingTxFilter) getUpdates() (interface{}, error) {
pendingTxHashes := f.takePendingTxsUpdates()

return pendingTxHashes, nil
}

// sendUpdates write the hashes for all pending transactions to web socket stream
func (f *pendingTxFilter) sendUpdates() error {
pendingTxHashes := f.takePendingTxsUpdates()

for _, txHash := range pendingTxHashes {
res, err := json.Marshal(txHash)
stana-miric marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

if err := f.writeMessageToWs(string(res)); err != nil {
return err
}
}

return nil
}

// filterManagerStore provides methods required by FilterManager
type filterManagerStore interface {
// Header returns the current header of the chain (genesis if empty)
Expand All @@ -228,6 +310,9 @@ type filterManagerStore interface {

// GetBlockByNumber returns a block using the provided number
GetBlockByNumber(num uint64, full bool) (*types.Block, bool)

// TxPoolSubscribe subscribes for tx pool events
TxPoolSubscribe(request *proto.SubscribeRequest) (<-chan *proto.TxPoolEvent, func(), error)
}

// FilterManager manages all running filters
Expand Down Expand Up @@ -279,18 +364,32 @@ func NewFilterManager(logger hclog.Logger, store filterManagerStore, blockRangeL
// Run starts worker process to handle events
func (f *FilterManager) Run() {
// watch for new events in the blockchain
watchCh := make(chan *blockchain.Event)
blockWatchCh := make(chan *blockchain.Event)

go func() {
for {
evnt := f.subscription.GetEvent()
if evnt == nil {
return
}
watchCh <- evnt
blockWatchCh <- evnt
}
}()

// watch for new events in the tx pool
txRequest := &proto.SubscribeRequest{
Types: []proto.EventType{proto.EventType_ADDED},
}

txWatchCh, txPoolUnsubscribe, err := f.store.TxPoolSubscribe(txRequest)
if err != nil {
f.logger.Error("Unable to subscribe to tx pool")

return
}

defer txPoolUnsubscribe()

var timeoutCh <-chan time.Time

for {
Expand All @@ -303,10 +402,16 @@ func (f *FilterManager) Run() {
}

select {
case evnt := <-watchCh:
case evnt := <-blockWatchCh:
// new blockchain event
if err := f.dispatchEvent(evnt); err != nil {
f.logger.Error("failed to dispatch event", "err", err)
f.logger.Error("failed to dispatch block event", "err", err)
}

case evnt := <-txWatchCh:
// new tx pool event
if err := f.dispatchEvent(evnt); err != nil {
f.logger.Error("failed to dispatch tx pool event", "err", err)
}

case <-timeoutCh:
Expand Down Expand Up @@ -359,6 +464,20 @@ func (f *FilterManager) NewLogFilter(logQuery *LogQuery, ws wsConn) string {
return f.addFilter(filter)
}

// NewPendingTxFilter adds new PendingTxFilter
func (f *FilterManager) NewPendingTxFilter(ws wsConn) string {
filter := &pendingTxFilter{
filterBase: newFilterBase(ws),
txHashes: []string{},
}

if filter.hasWSConn() {
ws.SetFilterID(filter.id)
}

return f.addFilter(filter)
}

// Exists checks the filter with given ID exists
func (f *FilterManager) Exists(id string) bool {
f.RLock()
Expand Down Expand Up @@ -621,20 +740,34 @@ func (f *FilterManager) nextTimeoutFilter() (string, time.Time) {
}

// dispatchEvent is an event handler for new block event
func (f *FilterManager) dispatchEvent(evnt *blockchain.Event) error {
func (f *FilterManager) dispatchEvent(evnt interface{}) error {
var subType subscriptionType

// store new event in each filters
f.processEvent(evnt)
switch evt := evnt.(type) {
case *blockchain.Event:
f.processBlockEvent(evt)

subType = Blocks
case *proto.TxPoolEvent:
f.processTxEvent(evt)

subType = PendingTransactions

default:
return ErrUnknownSubscriptionType
}

// send data to web socket stream
if err := f.flushWsFilters(); err != nil {
if err := f.flushWsFilters(subType); err != nil {
return err
}

return nil
}

// processEvent makes each filter append the new data that interests them
func (f *FilterManager) processEvent(evnt *blockchain.Event) {
func (f *FilterManager) processBlockEvent(evnt *blockchain.Event) {
f.RLock()
defer f.RUnlock()

Expand Down Expand Up @@ -710,15 +843,27 @@ func (f *FilterManager) appendLogsToFilters(header *block) error {
return nil
}

// processTxEvent makes each filter refresh the pending tx hashes
func (f *FilterManager) processTxEvent(evnt *proto.TxPoolEvent) {
f.RLock()
defer f.RUnlock()

for _, f := range f.filters {
if txFilter, ok := f.(*pendingTxFilter); ok {
txFilter.appendPendingTxHashes(evnt.TxHash)
}
}
}

// flushWsFilters make each filters with web socket connection write the updates to web socket stream
// flushWsFilters also removes the filters if flushWsFilters notices the connection is closed
func (f *FilterManager) flushWsFilters() error {
func (f *FilterManager) flushWsFilters(subType subscriptionType) error {
closedFilterIDs := make([]string, 0)

f.RLock()

for id, filter := range f.filters {
if !filter.hasWSConn() {
if !filter.hasWSConn() || filter.getSubscriptionType() != subType {
continue
}

Expand Down
Loading