Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize WS newhead #1615

Merged
merged 2 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions evmrpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@
func (a *FilterAPI) NewFilter(
_ context.Context,
crit filters.FilterCriteria,
) (ethrpc.ID, error) {
) (id ethrpc.ID, err error) {
defer recordMetrics("eth_newFilter", time.Now(), err == nil)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
a.filtersMu.Lock()
defer a.filtersMu.Unlock()
curFilterID := ethrpc.NewID()
Expand All @@ -113,7 +114,8 @@

func (a *FilterAPI) NewBlockFilter(
_ context.Context,
) (ethrpc.ID, error) {
) (id ethrpc.ID, err error) {
defer recordMetrics("eth_newBlockFilter", time.Now(), err == nil)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
a.filtersMu.Lock()
defer a.filtersMu.Unlock()
curFilterID := ethrpc.NewID()
Expand All @@ -128,7 +130,8 @@
func (a *FilterAPI) GetFilterChanges(
ctx context.Context,
filterID ethrpc.ID,
) (interface{}, error) {
) (res interface{}, err error) {
defer recordMetrics("eth_getFilterChanges", time.Now(), err == nil)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
a.filtersMu.Lock()
defer a.filtersMu.Unlock()
filter, ok := a.filters[filterID]
Expand Down Expand Up @@ -178,7 +181,8 @@
func (a *FilterAPI) GetFilterLogs(
ctx context.Context,
filterID ethrpc.ID,
) ([]*ethtypes.Log, error) {
) (res []*ethtypes.Log, err error) {
defer recordMetrics("eth_getFilterLogs", time.Now(), err == nil)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
a.filtersMu.Lock()
defer a.filtersMu.Unlock()
filter, ok := a.filters[filterID]
Expand Down Expand Up @@ -206,7 +210,8 @@
func (a *FilterAPI) GetLogs(
ctx context.Context,
crit filters.FilterCriteria,
) ([]*ethtypes.Log, error) {
) (res []*ethtypes.Log, err error) {
defer recordMetrics("eth_getLogs", time.Now(), err == nil)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
logs, _, err := a.logFetcher.GetLogsByFilters(ctx, crit, 0)
return logs, err
}
Expand Down Expand Up @@ -252,7 +257,8 @@
func (a *FilterAPI) UninstallFilter(
_ context.Context,
filterID ethrpc.ID,
) bool {
) (res bool) {
defer recordMetrics("eth_uninstallFilter", time.Now(), res)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
a.filtersMu.Lock()
defer a.filtersMu.Unlock()
_, found := a.filters[filterID]
Expand Down
2 changes: 1 addition & 1 deletion evmrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func NewEVMWebSocketServer(
},
{
Namespace: "eth",
Service: NewSubscriptionAPI(tmClient, &LogFetcher{tmClient: tmClient, k: k, ctxProvider: ctxProvider}, &SubscriptionConfig{subscriptionCapacity: 100}),
Service: NewSubscriptionAPI(tmClient, &LogFetcher{tmClient: tmClient, k: k, ctxProvider: ctxProvider}, &SubscriptionConfig{subscriptionCapacity: 100}, &FilterConfig{timeout: config.FilterTimeout, maxLog: config.MaxLogNoBlock, maxBlock: config.MaxBlocksForLog}),
},
{
Namespace: "web3",
Expand Down
3 changes: 3 additions & 0 deletions evmrpc/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ var MockBlockID = tmtypes.BlockID{
Hash: bytes.HexBytes(mustHexToBytes("0000000000000000000000000000000000000000000000000000000000000001")),
}

var NewHeadsCalled = make(chan struct{})

type MockClient struct {
mock.Client
}
Expand Down Expand Up @@ -271,6 +273,7 @@ func (c *MockClient) Subscribe(ctx context.Context, subscriber string, query str
if query == "tm.event = 'NewBlockHeader'" {
resCh := make(chan coretypes.ResultEvent, 5)
go func() {
<-NewHeadsCalled
for i := uint64(0); i < 5; i++ {
resCh <- coretypes.ResultEvent{
SubscriptionID: subscriber,
Expand Down
81 changes: 59 additions & 22 deletions evmrpc/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,61 +25,98 @@
subscriptionManager *SubscriptionManager
subscriptonConfig *SubscriptionConfig

logFetcher *LogFetcher
logFetcher *LogFetcher
newHeadListenersMtx *sync.Mutex
newHeadListeners map[rpc.ID]chan map[string]interface{}
}

type SubscriptionConfig struct {
subscriptionCapacity int
}

func NewSubscriptionAPI(tmClient rpcclient.Client, logFetcher *LogFetcher, subscriptionConfig *SubscriptionConfig) *SubscriptionAPI {
logFetcher.filterConfig = &FilterConfig{}
return &SubscriptionAPI{
func NewSubscriptionAPI(tmClient rpcclient.Client, logFetcher *LogFetcher, subscriptionConfig *SubscriptionConfig, filterConfig *FilterConfig) *SubscriptionAPI {
logFetcher.filterConfig = filterConfig
api := &SubscriptionAPI{
tmClient: tmClient,
subscriptionManager: NewSubscriptionManager(tmClient),
subscriptonConfig: subscriptionConfig,
logFetcher: logFetcher,
newHeadListenersMtx: &sync.Mutex{},
newHeadListeners: make(map[rpc.ID]chan map[string]interface{}),
}
id, subCh, err := api.subscriptionManager.Subscribe(context.Background(), NewHeadQueryBuilder(), api.subscriptonConfig.subscriptionCapacity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need supervision or some mechanism to retry/reconnect (can the subscription die?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the subscription manager has a retry mechanism (that's what the subscription was canceled, resubscribing log we saw)

if err != nil {
panic(err)
}
go func() {
defer func() {
_ = api.subscriptionManager.Unsubscribe(context.Background(), id)
}()
for {
res := <-subCh
ethHeader, err := encodeTmHeader(res.Data.(tmtypes.EventDataNewBlockHeader))
if err != nil {
fmt.Printf("error encoding new head event %#v due to %s\n", res.Data, err)
continue
}
api.newHeadListenersMtx.Lock()
for _, c := range api.newHeadListeners {
c := c
go func() {
defer func() {
// if the channel is already closed, sending to it will panic
if err := recover(); err != nil {
return
}
}()
c <- ethHeader
}()
Dismissed Show dismissed Hide dismissed
}
Dismissed Show dismissed Hide dismissed
api.newHeadListenersMtx.Unlock()
}
}()
Dismissed Show dismissed Hide dismissed
return api
}

func (a *SubscriptionAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
func (a *SubscriptionAPI) NewHeads(ctx context.Context) (s *rpc.Subscription, err error) {
defer recordMetrics("eth_newHeads", time.Now(), err == nil)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()
subscriberID, subCh, err := a.subscriptionManager.Subscribe(context.Background(), NewHeadQueryBuilder(), a.subscriptonConfig.subscriptionCapacity)
if err != nil {
return nil, err
}
listener := make(chan map[string]interface{})

go func() {
defer func() {
_ = a.subscriptionManager.Unsubscribe(context.Background(), subscriberID)
}()
OUTER:
for {
select {
case res := <-subCh:
ethHeader, err := encodeTmHeader(res.Data.(tmtypes.EventDataNewBlockHeader))
case res := <-listener:
err = notifier.Notify(rpcSub.ID, res)
if err != nil {
return
}
err = notifier.Notify(rpcSub.ID, ethHeader)
if err != nil {
return
break OUTER
}
case <-rpcSub.Err():
return
break OUTER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the same as continue here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we want to break out of the outer loop (instead of the select clause)

case <-notifier.Closed():
return
break OUTER
}
}
a.newHeadListenersMtx.Lock()
defer a.newHeadListenersMtx.Unlock()
delete(a.newHeadListeners, rpcSub.ID)
close(listener)
}()
a.newHeadListenersMtx.Lock()
defer a.newHeadListenersMtx.Unlock()
a.newHeadListeners[rpcSub.ID] = listener

return rpcSub, nil
}

func (a *SubscriptionAPI) Logs(ctx context.Context, filter *filters.FilterCriteria) (*rpc.Subscription, error) {
func (a *SubscriptionAPI) Logs(ctx context.Context, filter *filters.FilterCriteria) (s *rpc.Subscription, err error) {
defer recordMetrics("eth_logs", time.Now(), err == nil)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand Down
1 change: 1 addition & 0 deletions evmrpc/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
func TestSubscribeNewHeads(t *testing.T) {
t.Parallel()
recvCh, done := sendWSRequestGood(t, "subscribe", "newHeads")
NewHeadsCalled <- struct{}{}
defer func() { done <- struct{}{} }()

receivedSubMsg := false
Expand Down
Loading