diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index e94f14a1f6..5604661989 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -1088,10 +1088,11 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block return } for _, filter := range blockFilters { - e.sendSubscriptionResponse(filter, data) + filter.EnqueueSubscriptionDataToBeSent(data) + go filter.SendEnqueuedSubscriptionData() } } - log.Debugf("[notifyNewHeads] new l2 block event for block %v took %vms to send all the messages for block filters", event.Block.NumberU64(), time.Since(start).Milliseconds()) + log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start)) } func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) { @@ -1172,35 +1173,11 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE if err != nil { log.Errorf("failed to marshal ethLog response to subscription: %v", err) } - e.sendSubscriptionResponse(filter, data) + filter.EnqueueSubscriptionDataToBeSent(data) + go filter.SendEnqueuedSubscriptionData() } } } } - log.Debugf("[notifyNewLogs] new l2 block event for block %v took %vms to send all the messages for log filters", event.Block.NumberU64(), time.Since(start).Milliseconds()) -} - -func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data []byte) { - const errMessage = "Unable to write WS message to filter %v, %s" - - res := types.SubscriptionResponse{ - JSONRPC: "2.0", - Method: "eth_subscription", - Params: types.SubscriptionResponseParams{ - Subscription: filter.ID, - Result: data, - }, - } - message, err := json.Marshal(res) - if err != nil { - log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error())) - return - } - - err = filter.WsConn.Load().WriteMessage(websocket.TextMessage, message) - if err != nil { - log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error())) - return - } - log.Debugf("WS message sent: %v", string(message)) + log.Debugf("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start)) } diff --git a/jsonrpc/query.go b/jsonrpc/query.go index 30d1a50053..319ddb4dbe 100644 --- a/jsonrpc/query.go +++ b/jsonrpc/query.go @@ -4,11 +4,13 @@ import ( "context" "encoding/json" "fmt" + "sync" "sync/atomic" "time" "github.com/0xPolygonHermez/zkevm-node/hex" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/types" + "github.com/0xPolygonHermez/zkevm-node/log" "github.com/0xPolygonHermez/zkevm-node/state" "github.com/ethereum/go-ethereum/common" "github.com/gorilla/websocket" @@ -31,6 +33,68 @@ type Filter struct { Parameters interface{} LastPoll time.Time WsConn *atomic.Pointer[websocket.Conn] + + wsDataQueue state.Queue[[]byte] + mutex sync.Mutex + isSending bool +} + +// EnqueueSubscriptionDataToBeSent enqueues subscription data to be sent +// via web sockets connection +func (f *Filter) EnqueueSubscriptionDataToBeSent(data []byte) { + f.wsDataQueue.Push(data) +} + +// SendEnqueuedSubscriptionData consumes all the enqueued subscription data +// and sends it via web sockets connection. +func (f *Filter) SendEnqueuedSubscriptionData() { + if f.isSending { + return + } + + f.mutex.Lock() + defer f.mutex.Unlock() + f.isSending = true + for { + d, err := f.wsDataQueue.Pop() + if err == state.ErrQueueEmpty { + break + } else if err != nil { + log.Errorf("failed to pop subscription data from queue to be sent via web sockets to filter %v, %s", f.ID, err.Error()) + break + } + f.sendSubscriptionResponse(d) + } + f.isSending = false +} + +// sendSubscriptionResponse send data as subscription response via +// web sockets connection controlled by a mutex +func (f *Filter) sendSubscriptionResponse(data []byte) { + const errMessage = "Unable to write WS message to filter %v, %s" + + start := time.Now() + res := types.SubscriptionResponse{ + JSONRPC: "2.0", + Method: "eth_subscription", + Params: types.SubscriptionResponseParams{ + Subscription: f.ID, + Result: data, + }, + } + message, err := json.Marshal(res) + if err != nil { + log.Errorf(fmt.Sprintf(errMessage, f.ID, err.Error())) + return + } + + err = f.WsConn.Load().WriteMessage(websocket.TextMessage, message) + if err != nil { + log.Errorf(fmt.Sprintf(errMessage, f.ID, err.Error())) + return + } + log.Debugf("WS message sent: %v", string(message)) + log.Debugf("[SendSubscriptionResponse] took %v", time.Since(start)) } // FilterType express the type of the filter, block, logs, pending transactions @@ -92,19 +156,19 @@ func (f *LogFilter) MarshalJSON() ([]byte, error) { obj.BlockHash = f.BlockHash if f.FromBlock != nil && (*f.FromBlock == types.LatestBlockNumber) { - fromblock := "" - obj.FromBlock = &fromblock + fromBlock := "" + obj.FromBlock = &fromBlock } else if f.FromBlock != nil { - fromblock := hex.EncodeUint64(uint64(*f.FromBlock)) - obj.FromBlock = &fromblock + fromBlock := hex.EncodeUint64(uint64(*f.FromBlock)) + obj.FromBlock = &fromBlock } if f.ToBlock != nil && (*f.ToBlock == types.LatestBlockNumber) { - toblock := "" - obj.ToBlock = &toblock + toBlock := "" + obj.ToBlock = &toBlock } else if f.ToBlock != nil { - toblock := hex.EncodeUint64(uint64(*f.ToBlock)) - obj.ToBlock = &toblock + toBlock := hex.EncodeUint64(uint64(*f.ToBlock)) + obj.ToBlock = &toBlock } if f.Addresses != nil { diff --git a/state/l2block.go b/state/l2block.go index f2ed7ba3d1..0979504817 100644 --- a/state/l2block.go +++ b/state/l2block.go @@ -11,6 +11,8 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) +const newL2BlocksCheckInterval = 200 * time.Millisecond + // NewL2BlockEventHandler represent a func that will be called by the // state when a NewL2BlockEvent is triggered type NewL2BlockEventHandler func(e NewL2BlockEvent) @@ -45,36 +47,9 @@ func (s *State) RegisterNewL2BlockEventHandler(h NewL2BlockEventHandler) { s.newL2BlockEventHandlers = append(s.newL2BlockEventHandlers, h) } -func (s *State) handleEvents() { - for newL2BlockEvent := range s.newL2BlockEvents { - log.Debugf("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64()) - if len(s.newL2BlockEventHandlers) == 0 { - continue - } - - wg := sync.WaitGroup{} - for _, handler := range s.newL2BlockEventHandlers { - wg.Add(1) - go func(h NewL2BlockEventHandler, e NewL2BlockEvent) { - defer func() { - wg.Done() - if r := recover(); r != nil { - log.Errorf("failed and recovered in NewL2BlockEventHandler: %v", r) - } - }() - log.Debugf("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64()) - start := time.Now() - h(e) - log.Debugf("[handleEvents] new l2 block event handler for block %v took %vms to be executed", e.Block.NumberU64(), time.Since(start).Milliseconds()) - }(handler, newL2BlockEvent) - } - wg.Wait() - } -} - func (s *State) monitorNewL2Blocks() { waitNextCycle := func() { - time.Sleep(1 * time.Second) + time.Sleep(newL2BlocksCheckInterval) } for { @@ -116,12 +91,39 @@ func (s *State) monitorNewL2Blocks() { s.newL2BlockEvents <- NewL2BlockEvent{ Block: *block, } - log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %vms to be sent", block.NumberU64(), time.Since(start).Milliseconds()) - log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String()) s.lastL2BlockSeen.Store(block) + log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %v to be sent", block.NumberU64(), time.Since(start)) + log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String()) } // interval to check for new l2 blocks waitNextCycle() } } + +func (s *State) handleEvents() { + for newL2BlockEvent := range s.newL2BlockEvents { + log.Debugf("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64()) + if len(s.newL2BlockEventHandlers) == 0 { + continue + } + + wg := sync.WaitGroup{} + for _, handler := range s.newL2BlockEventHandlers { + wg.Add(1) + go func(h NewL2BlockEventHandler, e NewL2BlockEvent) { + defer func() { + wg.Done() + if r := recover(); r != nil { + log.Errorf("failed and recovered in NewL2BlockEventHandler: %v", r) + } + }() + log.Debugf("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64()) + start := time.Now() + h(e) + log.Debugf("[handleEvents] new l2 block event handler for block %v took %v to be executed", e.Block.NumberU64(), time.Since(start)) + }(handler, newL2BlockEvent) + } + wg.Wait() + } +} diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index feb59888f4..0f1fedb24f 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -1562,7 +1562,7 @@ func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2 } } } - log.Debugf("[AddL2Block] l2 block %v took %vms to be added", l2Block.NumberU64(), time.Since(start).Milliseconds()) + log.Debugf("[AddL2Block] l2 block %v took %v to be added", l2Block.NumberU64(), time.Since(start)) return nil } diff --git a/state/queue.go b/state/queue.go new file mode 100644 index 0000000000..b91f11d4c7 --- /dev/null +++ b/state/queue.go @@ -0,0 +1,66 @@ +package state + +import ( + "fmt" + "sync" +) + +// ErrQueueEmpty is returned when a queue operation +// depends on the queue to not be empty, but it is empty +var ErrQueueEmpty = fmt.Errorf("queue is empty") + +// Queue is a generic queue implementation that implements FIFO +type Queue[T any] struct { + items []T + mutex sync.Mutex +} + +// NewQueue creates a new instance of queue and initializes it +func NewQueue[T any]() *Queue[T] { + return &Queue[T]{ + items: make([]T, 0), + } +} + +// Push enqueue an item +func (q *Queue[T]) Push(item T) { + q.mutex.Lock() + defer q.mutex.Unlock() + q.items = append(q.items, item) +} + +// Top returns the top level item without removing it +func (q *Queue[T]) Top() (T, error) { + q.mutex.Lock() + defer q.mutex.Unlock() + var v T + if len(q.items) == 0 { + return v, ErrQueueEmpty + } + return q.items[0], nil +} + +// Pop returns the top level item and unqueues it +func (q *Queue[T]) Pop() (T, error) { + q.mutex.Lock() + defer q.mutex.Unlock() + var v T + if len(q.items) == 0 { + return v, ErrQueueEmpty + } + v = q.items[0] + q.items = q.items[1:] + return v, nil +} + +// Len returns the size of the queue +func (q *Queue[T]) Len() int { + q.mutex.Lock() + defer q.mutex.Unlock() + return len(q.items) +} + +// IsEmpty returns false if the queue has itens, otherwise true +func (q *Queue[T]) IsEmpty() bool { + return q.Len() == 0 +} diff --git a/state/queue_test.go b/state/queue_test.go new file mode 100644 index 0000000000..240c1a0fba --- /dev/null +++ b/state/queue_test.go @@ -0,0 +1,52 @@ +package state + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestQueue(t *testing.T) { + q := NewQueue[int]() + + q.Push(10) + q.Push(20) + q.Push(30) + + top, err := q.Top() + require.NoError(t, err) + assert.Equal(t, 10, top) + assert.Equal(t, 3, q.Len()) + assert.Equal(t, false, q.IsEmpty()) + + pop, err := q.Pop() + require.NoError(t, err) + assert.Equal(t, 10, pop) + assert.Equal(t, 2, q.Len()) + assert.Equal(t, false, q.IsEmpty()) + + top, err = q.Top() + require.NoError(t, err) + assert.Equal(t, 20, top) + assert.Equal(t, 2, q.Len()) + assert.Equal(t, false, q.IsEmpty()) + + pop, err = q.Pop() + require.NoError(t, err) + assert.Equal(t, 20, pop) + assert.Equal(t, 1, q.Len()) + assert.Equal(t, false, q.IsEmpty()) + + pop, err = q.Pop() + require.NoError(t, err) + assert.Equal(t, 30, pop) + assert.Equal(t, 0, q.Len()) + assert.Equal(t, true, q.IsEmpty()) + + _, err = q.Top() + require.Error(t, ErrQueueEmpty, err) + + _, err = q.Pop() + require.Error(t, ErrQueueEmpty, err) +}