Skip to content

Commit

Permalink
Cherry-pick #3650: Do fatal when datastream channel is full (workarou…
Browse files Browse the repository at this point in the history
…nd to fix datastream blocking issue) (#3654)

* Do fatal when datastream channel is full (workaround to fix datastream blocking issue) (#3650)

* Do fatal when datastream channel is full (this will restart sequencer automatically)

* update datastream library (more ds-debug logs)

* fix decrease DataToStreamChannelCount
  • Loading branch information
agnusmor authored May 27, 2024
1 parent 6023dca commit 0233b40
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 45 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node
go 1.21

require (
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b
github.com/didip/tollbooth/v6 v6.1.2
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127
github.com/ethereum/go-ethereum v1.13.11
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1 h1:4wbCJOGcZ8BTuOfNFrcZ1cAVfTWaX1W9EYHaDx3imLc=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b h1:BzQRXbSnW7BsFvJrnZbCgnxD5+nCGyrYUgqH+3vsnrM=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
Expand Down
6 changes: 3 additions & 3 deletions sequencer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@ func (f *finalizer) insertSIPBatch(ctx context.Context, batchNumber uint64, stat
}

// Send batch bookmark to the datastream
f.DSSendBatchBookmark(batchNumber)
f.DSSendBatchBookmark(ctx, batchNumber)
// Send batch start to the datastream
f.DSSendBatchStart(batchNumber, false)
f.DSSendBatchStart(ctx, batchNumber, false)

// Check if synchronizer is up-to-date
//TODO: review if this is needed
Expand Down Expand Up @@ -406,7 +406,7 @@ func (f *finalizer) closeSIPBatch(ctx context.Context, dbTx pgx.Tx) error {
}

// Sent batch to DS
f.DSSendBatchEnd(f.sipBatch.batchNumber, f.sipBatch.finalStateRoot, f.sipBatch.finalLocalExitRoot)
f.DSSendBatchEnd(ctx, f.sipBatch.batchNumber, f.sipBatch.finalStateRoot, f.sipBatch.finalLocalExitRoot)

log.Infof("sip batch %d closed in statedb, closing reason: %s", f.sipBatch.batchNumber, f.sipBatch.closingReason)

Expand Down
35 changes: 29 additions & 6 deletions sequencer/datastreamer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package sequencer

import (
"github.com/0xPolygonHermez/zkevm-node/log"
"context"
"fmt"

"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/datastream"
"github.com/ethereum/go-ethereum/common"
)

func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32, minTimestamp uint64) error {
func (f *finalizer) DSSendL2Block(ctx context.Context, batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32, minTimestamp uint64) error {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)

// Send data to streamer
Expand Down Expand Up @@ -52,28 +54,35 @@ func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.Proce
l2Transactions = append(l2Transactions, l2Transaction)
}

log.Infof("[ds-debug] sending l2block %d to datastream channel", blockResponse.BlockNumber)
f.checkDSBufferIsFull(ctx)

f.dataToStream <- state.DSL2FullBlock{
DSL2Block: l2Block,
Txs: l2Transactions,
}

f.dataToStreamCount.Add(1)
}

return nil
}

func (f *finalizer) DSSendBatchBookmark(batchNumber uint64) {
func (f *finalizer) DSSendBatchBookmark(ctx context.Context, batchNumber uint64) {
// Check if stream server enabled
if f.streamServer != nil {
f.checkDSBufferIsFull(ctx)

// Send batch bookmark to the streamer
f.dataToStream <- datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH,
Value: batchNumber,
}

f.dataToStreamCount.Add(1)
}
}

func (f *finalizer) DSSendBatchStart(batchNumber uint64, isForced bool) {
func (f *finalizer) DSSendBatchStart(ctx context.Context, batchNumber uint64, isForced bool) {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)

batchStart := datastream.BatchStart{
Expand All @@ -88,18 +97,32 @@ func (f *finalizer) DSSendBatchStart(batchNumber uint64, isForced bool) {
}

if f.streamServer != nil {
f.checkDSBufferIsFull(ctx)

// Send batch start to the streamer
f.dataToStream <- batchStart

f.dataToStreamCount.Add(1)
}
}

func (f *finalizer) DSSendBatchEnd(batchNumber uint64, stateRoot common.Hash, localExitRoot common.Hash) {
func (f *finalizer) DSSendBatchEnd(ctx context.Context, batchNumber uint64, stateRoot common.Hash, localExitRoot common.Hash) {
if f.streamServer != nil {
f.checkDSBufferIsFull(ctx)

// Send batch end to the streamer
f.dataToStream <- datastream.BatchEnd{
Number: batchNumber,
StateRoot: stateRoot.Bytes(),
LocalExitRoot: localExitRoot.Bytes(),
}

f.dataToStreamCount.Add(1)
}
}

func (f *finalizer) checkDSBufferIsFull(ctx context.Context) {
if f.dataToStreamCount.Load() == datastreamChannelBufferSize {
f.Halt(ctx, fmt.Errorf("datastream channel buffer full"), true)
}
}
10 changes: 8 additions & 2 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ type finalizer struct {
// interval metrics
metrics *intervalMetrics
// stream server
streamServer *datastreamer.StreamServer
dataToStream chan interface{}
streamServer *datastreamer.StreamServer
dataToStream chan interface{}
dataToStreamCount atomic.Int32
}

// newFinalizer returns a new instance of Finalizer.
Expand Down Expand Up @@ -885,6 +886,11 @@ func (f *finalizer) logZKCounters(counters state.ZKCounters) string {
counters.Binaries, counters.Sha256Hashes_V2, counters.Steps)
}

// Decrease datastreamChannelCount variable
func (f *finalizer) DataToStreamChannelCountAdd(ct int32) {
f.dataToStreamCount.Add(ct)
}

// Halt halts the finalizer
func (f *finalizer) Halt(ctx context.Context, err error, isFatal bool) {
f.haltFinalizer.Store(true)
Expand Down
2 changes: 1 addition & 1 deletion sequencer/forcedbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, newBat
}

// Send L2 block to data streamer
err = f.DSSendL2Block(newBatchNumber, forcedL2BlockResponse, 0, forcedL2BlockResponse.Timestamp)
err = f.DSSendL2Block(ctx, newBatchNumber, forcedL2BlockResponse, 0, forcedL2BlockResponse.Timestamp)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d to data streamer, error: %v", forcedL2BlockResponse.BlockNumber, err)
Expand Down
11 changes: 1 addition & 10 deletions sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,6 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
return err
}

//TODO: remove this Log
log.Infof("[ds-debug] l2 block %d [%d] stored in statedb", blockResponse.BlockNumber, l2Block.trackingNum)

// Update txs status in the pool
for _, txResponse := range blockResponse.TransactionResponses {
// Change Tx status to selected
Expand All @@ -492,19 +489,13 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
}
}

//TODO: remove this log
log.Infof("[ds-debug] l2 block %d [%d] transactions updated as selected in the pooldb", blockResponse.BlockNumber, l2Block.trackingNum)

// Send L2 block to data streamer
err = f.DSSendL2Block(l2Block.batch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex(), l2Block.timestamp)
err = f.DSSendL2Block(ctx, l2Block.batch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex(), l2Block.timestamp)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d [%d] to data streamer, error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err)
}

//TODO: remove this log
log.Infof("[ds-debug] l2 block %d [%d] sent to datastream", blockResponse.BlockNumber, l2Block.trackingNum)

for _, tx := range l2Block.transactions {
// Delete the tx from the pending list in the worker (addrQueue)
f.workerIntf.DeleteTxPendingToStore(tx.Hash, tx.From)
Expand Down
24 changes: 4 additions & 20 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

const (
datastreamChannelMultiplier = 2
datastreamChannelBufferSize = 20
)

// Sequencer represents a sequencer
Expand Down Expand Up @@ -52,9 +52,7 @@ func New(cfg Config, batchCfg state.BatchConfig, poolCfg pool.Config, txPool txP
eventLog: eventLog,
}

// TODO: Make configurable
channelBufferSize := 200 * datastreamChannelMultiplier // nolint:gomnd
sequencer.dataToStream = make(chan interface{}, channelBufferSize)
sequencer.dataToStream = make(chan interface{}, datastreamChannelBufferSize)

return sequencer, nil
}
Expand Down Expand Up @@ -257,14 +255,14 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
// Read data from channel
dataStream := <-s.dataToStream

s.finalizer.DataToStreamChannelCountAdd(-1)

if s.streamServer != nil {
switch data := dataStream.(type) {
// Stream a complete L2 block with its transactions
case state.DSL2FullBlock:
l2Block := data

//TODO: remove this log
log.Infof("[ds-debug] start atomic op for l2block %d", l2Block.L2BlockNumber)
err = s.streamServer.StartAtomicOp()
if err != nil {
log.Errorf("failed to start atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err)
Expand All @@ -276,8 +274,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
Value: l2Block.L2BlockNumber,
}

//TODO: remove this log
log.Infof("[ds-debug] add stream bookmark for l2block %d", l2Block.L2BlockNumber)
marshalledBookMark, err := proto.Marshal(bookMark)
if err != nil {
log.Errorf("failed to marshal bookmark for l2block %d, error: %v", l2Block.L2BlockNumber, err)
Expand All @@ -298,8 +294,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
Value: l2Block.L2BlockNumber - 1,
}

//TODO: remove this log
log.Infof("[ds-debug] get previous l2block %d", l2Block.L2BlockNumber-1)
marshalledBookMark, err := proto.Marshal(bookMark)
if err != nil {
log.Errorf("failed to marshal bookmark for l2block %d, error: %v", l2Block.L2BlockNumber, err)
Expand Down Expand Up @@ -339,16 +333,12 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
continue
}

//TODO: remove this log
log.Infof("[ds-debug] add l2blockStart stream entry for l2block %d", l2Block.L2BlockNumber)
_, err = s.streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK), marshalledL2Block)
if err != nil {
log.Errorf("failed to add stream entry for l2block %d, error: %v", l2Block.L2BlockNumber, err)
continue
}

//TODO: remove this log
log.Infof("[ds-debug] adding l2tx stream entries for l2block %d", l2Block.L2BlockNumber)
for _, l2Transaction := range l2Block.Txs {
streamL2Transaction := &datastream.Transaction{
L2BlockNumber: l2Transaction.L2BlockNumber,
Expand All @@ -371,17 +361,11 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
}
}

//TODO: remove this log
log.Infof("[ds-debug] commit atomic op for l2block %d", l2Block.L2BlockNumber)
err = s.streamServer.CommitAtomicOp()
if err != nil {
log.Errorf("failed to commit atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err)
continue
}

//TODO: remove this log
log.Infof("[ds-debug] l2block %d sent to datastream", l2Block.L2BlockNumber)

// Stream a bookmark
case datastream.BookMark:
err = s.streamServer.StartAtomicOp()
Expand Down

0 comments on commit 0233b40

Please sign in to comment.