Skip to content

Commit

Permalink
add queue to ws subscription filters (#2769)
Browse files Browse the repository at this point in the history
  • Loading branch information
tclemos authored Nov 10, 2023
1 parent 29ceae4 commit 987a1b7
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 68 deletions.
35 changes: 6 additions & 29 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,10 +1088,11 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block
return
}
for _, filter := range blockFilters {
e.sendSubscriptionResponse(filter, data)
filter.EnqueueSubscriptionDataToBeSent(data)
go filter.SendEnqueuedSubscriptionData()
}
}
log.Debugf("[notifyNewHeads] new l2 block event for block %v took %vms to send all the messages for block filters", event.Block.NumberU64(), time.Since(start).Milliseconds())
log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start))
}

func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
Expand Down Expand Up @@ -1172,35 +1173,11 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE
if err != nil {
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
e.sendSubscriptionResponse(filter, data)
filter.EnqueueSubscriptionDataToBeSent(data)
go filter.SendEnqueuedSubscriptionData()
}
}
}
}
log.Debugf("[notifyNewLogs] new l2 block event for block %v took %vms to send all the messages for log filters", event.Block.NumberU64(), time.Since(start).Milliseconds())
}

func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data []byte) {
const errMessage = "Unable to write WS message to filter %v, %s"

res := types.SubscriptionResponse{
JSONRPC: "2.0",
Method: "eth_subscription",
Params: types.SubscriptionResponseParams{
Subscription: filter.ID,
Result: data,
},
}
message, err := json.Marshal(res)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
return
}

err = filter.WsConn.Load().WriteMessage(websocket.TextMessage, message)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
return
}
log.Debugf("WS message sent: %v", string(message))
log.Debugf("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start))
}
80 changes: 72 additions & 8 deletions jsonrpc/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/types"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/ethereum/go-ethereum/common"
"github.com/gorilla/websocket"
Expand All @@ -31,6 +33,68 @@ type Filter struct {
Parameters interface{}
LastPoll time.Time
WsConn *atomic.Pointer[websocket.Conn]

wsDataQueue state.Queue[[]byte]
mutex sync.Mutex
isSending bool
}

// EnqueueSubscriptionDataToBeSent enqueues subscription data to be sent
// via web sockets connection
func (f *Filter) EnqueueSubscriptionDataToBeSent(data []byte) {
f.wsDataQueue.Push(data)
}

// SendEnqueuedSubscriptionData consumes all the enqueued subscription data
// and sends it via web sockets connection.
func (f *Filter) SendEnqueuedSubscriptionData() {
if f.isSending {
return
}

f.mutex.Lock()
defer f.mutex.Unlock()
f.isSending = true
for {
d, err := f.wsDataQueue.Pop()
if err == state.ErrQueueEmpty {
break
} else if err != nil {
log.Errorf("failed to pop subscription data from queue to be sent via web sockets to filter %v, %s", f.ID, err.Error())
break
}
f.sendSubscriptionResponse(d)
}
f.isSending = false
}

// sendSubscriptionResponse send data as subscription response via
// web sockets connection controlled by a mutex
func (f *Filter) sendSubscriptionResponse(data []byte) {
const errMessage = "Unable to write WS message to filter %v, %s"

start := time.Now()
res := types.SubscriptionResponse{
JSONRPC: "2.0",
Method: "eth_subscription",
Params: types.SubscriptionResponseParams{
Subscription: f.ID,
Result: data,
},
}
message, err := json.Marshal(res)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, f.ID, err.Error()))
return
}

err = f.WsConn.Load().WriteMessage(websocket.TextMessage, message)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, f.ID, err.Error()))
return
}
log.Debugf("WS message sent: %v", string(message))
log.Debugf("[SendSubscriptionResponse] took %v", time.Since(start))
}

// FilterType express the type of the filter, block, logs, pending transactions
Expand Down Expand Up @@ -92,19 +156,19 @@ func (f *LogFilter) MarshalJSON() ([]byte, error) {
obj.BlockHash = f.BlockHash

if f.FromBlock != nil && (*f.FromBlock == types.LatestBlockNumber) {
fromblock := ""
obj.FromBlock = &fromblock
fromBlock := ""
obj.FromBlock = &fromBlock
} else if f.FromBlock != nil {
fromblock := hex.EncodeUint64(uint64(*f.FromBlock))
obj.FromBlock = &fromblock
fromBlock := hex.EncodeUint64(uint64(*f.FromBlock))
obj.FromBlock = &fromBlock
}

if f.ToBlock != nil && (*f.ToBlock == types.LatestBlockNumber) {
toblock := ""
obj.ToBlock = &toblock
toBlock := ""
obj.ToBlock = &toBlock
} else if f.ToBlock != nil {
toblock := hex.EncodeUint64(uint64(*f.ToBlock))
obj.ToBlock = &toblock
toBlock := hex.EncodeUint64(uint64(*f.ToBlock))
obj.ToBlock = &toBlock
}

if f.Addresses != nil {
Expand Down
62 changes: 32 additions & 30 deletions state/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

const newL2BlocksCheckInterval = 200 * time.Millisecond

// NewL2BlockEventHandler represent a func that will be called by the
// state when a NewL2BlockEvent is triggered
type NewL2BlockEventHandler func(e NewL2BlockEvent)
Expand Down Expand Up @@ -45,36 +47,9 @@ func (s *State) RegisterNewL2BlockEventHandler(h NewL2BlockEventHandler) {
s.newL2BlockEventHandlers = append(s.newL2BlockEventHandlers, h)
}

func (s *State) handleEvents() {
for newL2BlockEvent := range s.newL2BlockEvents {
log.Debugf("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64())
if len(s.newL2BlockEventHandlers) == 0 {
continue
}

wg := sync.WaitGroup{}
for _, handler := range s.newL2BlockEventHandlers {
wg.Add(1)
go func(h NewL2BlockEventHandler, e NewL2BlockEvent) {
defer func() {
wg.Done()
if r := recover(); r != nil {
log.Errorf("failed and recovered in NewL2BlockEventHandler: %v", r)
}
}()
log.Debugf("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64())
start := time.Now()
h(e)
log.Debugf("[handleEvents] new l2 block event handler for block %v took %vms to be executed", e.Block.NumberU64(), time.Since(start).Milliseconds())
}(handler, newL2BlockEvent)
}
wg.Wait()
}
}

func (s *State) monitorNewL2Blocks() {
waitNextCycle := func() {
time.Sleep(1 * time.Second)
time.Sleep(newL2BlocksCheckInterval)
}

for {
Expand Down Expand Up @@ -116,12 +91,39 @@ func (s *State) monitorNewL2Blocks() {
s.newL2BlockEvents <- NewL2BlockEvent{
Block: *block,
}
log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %vms to be sent", block.NumberU64(), time.Since(start).Milliseconds())
log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String())
s.lastL2BlockSeen.Store(block)
log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %v to be sent", block.NumberU64(), time.Since(start))
log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String())
}

// interval to check for new l2 blocks
waitNextCycle()
}
}

func (s *State) handleEvents() {
for newL2BlockEvent := range s.newL2BlockEvents {
log.Debugf("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64())
if len(s.newL2BlockEventHandlers) == 0 {
continue
}

wg := sync.WaitGroup{}
for _, handler := range s.newL2BlockEventHandlers {
wg.Add(1)
go func(h NewL2BlockEventHandler, e NewL2BlockEvent) {
defer func() {
wg.Done()
if r := recover(); r != nil {
log.Errorf("failed and recovered in NewL2BlockEventHandler: %v", r)
}
}()
log.Debugf("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64())
start := time.Now()
h(e)
log.Debugf("[handleEvents] new l2 block event handler for block %v took %v to be executed", e.Block.NumberU64(), time.Since(start))
}(handler, newL2BlockEvent)
}
wg.Wait()
}
}
2 changes: 1 addition & 1 deletion state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,7 +1562,7 @@ func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2
}
}
}
log.Debugf("[AddL2Block] l2 block %v took %vms to be added", l2Block.NumberU64(), time.Since(start).Milliseconds())
log.Debugf("[AddL2Block] l2 block %v took %v to be added", l2Block.NumberU64(), time.Since(start))
return nil
}

Expand Down
66 changes: 66 additions & 0 deletions state/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package state

import (
"fmt"
"sync"
)

// ErrQueueEmpty is returned when a queue operation
// depends on the queue to not be empty, but it is empty
var ErrQueueEmpty = fmt.Errorf("queue is empty")

// Queue is a generic queue implementation that implements FIFO
type Queue[T any] struct {
items []T
mutex sync.Mutex
}

// NewQueue creates a new instance of queue and initializes it
func NewQueue[T any]() *Queue[T] {
return &Queue[T]{
items: make([]T, 0),
}
}

// Push enqueue an item
func (q *Queue[T]) Push(item T) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.items = append(q.items, item)
}

// Top returns the top level item without removing it
func (q *Queue[T]) Top() (T, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
var v T
if len(q.items) == 0 {
return v, ErrQueueEmpty
}
return q.items[0], nil
}

// Pop returns the top level item and unqueues it
func (q *Queue[T]) Pop() (T, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
var v T
if len(q.items) == 0 {
return v, ErrQueueEmpty
}
v = q.items[0]
q.items = q.items[1:]
return v, nil
}

// Len returns the size of the queue
func (q *Queue[T]) Len() int {
q.mutex.Lock()
defer q.mutex.Unlock()
return len(q.items)
}

// IsEmpty returns false if the queue has itens, otherwise true
func (q *Queue[T]) IsEmpty() bool {
return q.Len() == 0
}
52 changes: 52 additions & 0 deletions state/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package state

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestQueue(t *testing.T) {
q := NewQueue[int]()

q.Push(10)
q.Push(20)
q.Push(30)

top, err := q.Top()
require.NoError(t, err)
assert.Equal(t, 10, top)
assert.Equal(t, 3, q.Len())
assert.Equal(t, false, q.IsEmpty())

pop, err := q.Pop()
require.NoError(t, err)
assert.Equal(t, 10, pop)
assert.Equal(t, 2, q.Len())
assert.Equal(t, false, q.IsEmpty())

top, err = q.Top()
require.NoError(t, err)
assert.Equal(t, 20, top)
assert.Equal(t, 2, q.Len())
assert.Equal(t, false, q.IsEmpty())

pop, err = q.Pop()
require.NoError(t, err)
assert.Equal(t, 20, pop)
assert.Equal(t, 1, q.Len())
assert.Equal(t, false, q.IsEmpty())

pop, err = q.Pop()
require.NoError(t, err)
assert.Equal(t, 30, pop)
assert.Equal(t, 0, q.Len())
assert.Equal(t, true, q.IsEmpty())

_, err = q.Top()
require.Error(t, ErrQueueEmpty, err)

_, err = q.Pop()
require.Error(t, ErrQueueEmpty, err)
}

0 comments on commit 987a1b7

Please sign in to comment.