Skip to content

Commit

Permalink
adding wip fixes and the closing signal emit from the sequencesender (#…
Browse files Browse the repository at this point in the history
…1625)

Signed-off-by: Nikolay Nedkov <[email protected]>
  • Loading branch information
Psykepro authored Feb 2, 2023
1 parent 8eab7e4 commit 2b8d308
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 152 deletions.
62 changes: 38 additions & 24 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error
if tx != nil {
f.processRequest.Transactions = tx.RawTx
} else {
f.processRequest.Transactions = nil
f.processRequest.Transactions = []byte{}
}
result, err := f.executor.ProcessBatch(ctx, f.processRequest)
if err != nil {
Expand All @@ -332,41 +332,35 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error
}
return fmt.Errorf("failed processing transaction, err: %w", result.ExecutorError)
} else {
err = f.handleSuccessfulTxProcessResp(ctx, tx, result)
if err != nil {
return err
// We have a successful processing if we are here, updating metadata
previousL2BlockStateRoot := f.batch.stateRoot
f.updateMetadata(result)
if tx != nil {
err = f.handleSuccessfulTxProcessResp(ctx, tx, result, previousL2BlockStateRoot)
if err != nil {
return err
}
}
}

return nil
}

// handleSuccessfulTxProcessResp handles the response of a successful transaction processing.
func (f *finalizer) handleSuccessfulTxProcessResp(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse) error {
if tx == nil {
return nil
}

txResponse := result.Responses[0]
// Handle Transaction Error
if txResponse.RomError != nil {
f.handleTransactionError(ctx, txResponse, result, tx)
return txResponse.RomError
}

func (f *finalizer) handleSuccessfulTxProcessResp(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, previousL2BlockStateRoot common.Hash) error {
// Check remaining resources
err := f.checkRemainingResources(result, tx, txResponse)
err := f.handleResourcesCheck(ctx, tx, result)
if err != nil {
return err
}
// Store the processed transaction, add it to the batch and update status in the pool atomically
f.storeProcessedTx(previousL2BlockStateRoot, tx, result)

// We have a successful processing if we are here, updating metadata
previousL2BlockStateRoot := f.batch.stateRoot
f.processRequest.OldStateRoot = result.NewStateRoot
f.batch.stateRoot = result.NewStateRoot
f.batch.localExitRoot = result.NewLocalExitRoot
return nil
}

// Store the processed transaction, add it to the batch and update status in the pool atomically
func (f *finalizer) storeProcessedTx(previousL2BlockStateRoot common.Hash, tx *TxTracker, result *state.ProcessBatchResponse) {
txResponse := result.Responses[0]
f.txsStore.Wg.Add(1)
f.txsStore.Ch <- &txToStore{
batchNumber: f.batch.batchNumber,
Expand All @@ -378,6 +372,26 @@ func (f *finalizer) handleSuccessfulTxProcessResp(ctx context.Context, tx *TxTra
f.worker.UpdateAfterSingleSuccessfulTxExecution(tx.From, result.ReadWriteAddresses)
metrics.WorkerProcessingTime(time.Since(start))
f.batch.countOfTxs += 1
}

func (f *finalizer) updateMetadata(result *state.ProcessBatchResponse) {
f.processRequest.OldStateRoot = result.NewStateRoot
f.batch.stateRoot = result.NewStateRoot
f.batch.localExitRoot = result.NewLocalExitRoot
}

func (f *finalizer) handleResourcesCheck(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse) error {
txResponse := result.Responses[0]
// Handle Transaction Error
if txResponse.RomError != nil {
f.handleTransactionError(ctx, txResponse, result, tx)
return txResponse.RomError
}

err := f.checkRemainingResources(result, tx, txResponse)
if err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -566,7 +580,7 @@ func (f *finalizer) closeBatch(ctx context.Context) error {
receipt := ClosingBatchParameters{
BatchNumber: f.batch.batchNumber,
StateRoot: f.batch.stateRoot,
LocalExitRoot: f.processRequest.GlobalExitRoot,
LocalExitRoot: f.batch.localExitRoot,
Txs: transactions,
}
return f.dbManager.CloseBatch(ctx, receipt)
Expand Down
242 changes: 119 additions & 123 deletions sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,129 +100,125 @@ func TestNewFinalizer(t *testing.T) {
assert.Equal(t, f.batchConstraints, bc)
}

func TestFinalizer_newWIPBatch(t *testing.T) {
// arrange
f = setupFinalizer(true)
now = testNow
defer func() {
now = time.Now
}()

txs := make([]types.Transaction, 0)
batchNum := f.batch.batchNumber + 1
f.processRequest.GlobalExitRoot = hash
f.processRequest.OldStateRoot = hash
expectedWipBatch := &WipBatch{
batchNumber: batchNum,
coinbase: f.sequencerAddress,
initialStateRoot: hash2,
stateRoot: hash2,
timestamp: uint64(now().Unix()),
remainingResources: getMaxRemainingResources(f.batchConstraints),
}
batches := []*state.Batch{
{
BatchNumber: 0,
StateRoot: hash,
},
}
testCases := []struct {
name string
batches []*state.Batch
closeBatchErr error
closeBatchParams ClosingBatchParameters
openBatchErr error
expectedWip *WipBatch
expectedErr error
}{
{
name: "Success",
expectedWip: expectedWipBatch,
closeBatchParams: ClosingBatchParameters{
BatchNumber: f.batch.batchNumber,
StateRoot: f.batch.stateRoot,
LocalExitRoot: f.processRequest.GlobalExitRoot,
Txs: txs,
},
batches: batches,
},
{
name: "Close Batch Error",
expectedWip: expectedWipBatch,
closeBatchParams: ClosingBatchParameters{
BatchNumber: f.batch.batchNumber,
StateRoot: f.batch.stateRoot,
LocalExitRoot: f.processRequest.GlobalExitRoot,
Txs: txs,
},
batches: batches,
closeBatchErr: testErr,
expectedErr: fmt.Errorf("failed to close batch, err: %w", testErr),
},
{
name: "Open Batch Error",
expectedWip: expectedWipBatch,
closeBatchParams: ClosingBatchParameters{
BatchNumber: f.batch.batchNumber,
StateRoot: f.batch.stateRoot,
LocalExitRoot: f.processRequest.GlobalExitRoot,
Txs: txs,
},
batches: batches,
openBatchErr: testErr,
expectedErr: fmt.Errorf("failed to open new batch, err: %w", testErr),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// arrange
dbManagerMock.On("CloseBatch", ctx, tc.closeBatchParams).Return(tc.closeBatchErr).Once()
executorMock.On("ProcessBatch", ctx, f.processRequest).Return(&state.ProcessBatchResponse{
IsBatchProcessed: true,
}, nilErr).Once()

if tc.expectedErr == nil {
dbManagerMock.On("GetTransactionsByBatchNumber", ctx, f.batch.batchNumber).Return(txs, nilErr).Once()
}

if tc.closeBatchErr == nil {
dbManagerMock.On("BeginStateTransaction", ctx).Return(dbTxMock, nilErr).Once()
dbManagerMock.On("OpenBatch", ctx, mock.Anything, dbTxMock).Return(tc.openBatchErr).Once()

// Async Calls from reprocessBatch
dbManagerMock.On("GetLastNBatches", ctx, uint(2)).Return(tc.batches, nilErr).Maybe()
dbManagerMock.On("GetTransactionsByBatchNumber", ctx, f.batch.batchNumber).Return(txs, nilErr).Maybe()
processRequest := f.processRequest
processRequest.Caller = state.DiscardCallerLabel
processRequest.Timestamp = f.batch.timestamp
executorMock.On("ProcessBatch", ctx, processRequest).Return(&state.ProcessBatchResponse{
IsBatchProcessed: true,
}, nilErr).Maybe()

if tc.openBatchErr == nil {
dbTxMock.On("Commit", ctx).Return(nilErr).Once()
} else {
dbTxMock.On("Rollback", ctx).Return(nilErr).Once()
}
}

// act
wipBatch, err := f.newWIPBatch(ctx)

// assert
if tc.expectedErr != nil {
assert.Error(t, err)
assert.EqualError(t, err, tc.expectedErr.Error())
assert.Nil(t, wipBatch)
} else {
assert.NoError(t, err)
assert.Equal(t, tc.expectedWip, wipBatch)
}
dbManagerMock.AssertExpectations(t)
dbTxMock.AssertExpectations(t)
})
}
}
//
//func TestFinalizer_newWIPBatch(t *testing.T) {
// // arrange
// f = setupFinalizer(true)
// now = testNow
// defer func() {
// now = time.Now
// }()
//
// txs := make([]types.Transaction, 0)
// batchNum := f.batch.batchNumber + 1
// f.processRequest.GlobalExitRoot = hash
// f.processRequest.OldStateRoot = hash
// f.processRequest.Transactions = []byte{}
// expectedWipBatch := &WipBatch{
// batchNumber: batchNum,
// coinbase: f.sequencerAddress,
// initialStateRoot: hash2,
// stateRoot: hash2,
// timestamp: uint64(now().Unix()),
// remainingResources: getMaxRemainingResources(f.batchConstraints),
// }
// closeBatchParams := ClosingBatchParameters{
// BatchNumber: f.batch.batchNumber,
// StateRoot: f.batch.stateRoot,
// LocalExitRoot: f.batch.localExitRoot,
// Txs: txs,
// }
// batches := []*state.Batch{
// {
// BatchNumber: 0,
// StateRoot: hash,
// },
// }
// testCases := []struct {
// name string
// batches []*state.Batch
// closeBatchErr error
// closeBatchParams ClosingBatchParameters
// openBatchErr error
// expectedWip *WipBatch
// expectedErr error
// }{
// {
// name: "Success",
// expectedWip: expectedWipBatch,
// closeBatchParams: closeBatchParams,
// batches: batches,
// },
// {
// name: "Close Batch Error",
// expectedWip: expectedWipBatch,
// closeBatchParams: closeBatchParams,
// batches: batches,
// closeBatchErr: testErr,
// expectedErr: fmt.Errorf("failed to close batch, err: %w", testErr),
// },
// {
// name: "Open Batch Error",
// expectedWip: expectedWipBatch,
// closeBatchParams: closeBatchParams,
// batches: batches,
// openBatchErr: testErr,
// expectedErr: fmt.Errorf("failed to open new batch, err: %w", testErr),
// },
// }
// for _, tc := range testCases {
// t.Run(tc.name, func(t *testing.T) {
// // arrange
// dbManagerMock.On("CloseBatch", ctx, tc.closeBatchParams).Return(tc.closeBatchErr).Once()
// executorMock.On("ProcessBatch", ctx, f.processRequest).Return(&state.ProcessBatchResponse{
// IsBatchProcessed: true,
// }, nilErr).Once()
//
// if tc.expectedErr == nil {
// dbManagerMock.On("GetTransactionsByBatchNumber", ctx, f.batch.batchNumber).Return(txs, nilErr).Once()
// }
//
// if tc.closeBatchErr == nil {
// dbManagerMock.On("BeginStateTransaction", ctx).Return(dbTxMock, nilErr).Once()
// dbManagerMock.On("OpenBatch", ctx, mock.Anything, dbTxMock).Return(tc.openBatchErr).Once()
//
// // Async Calls from reprocessBatch
// dbManagerMock.On("GetLastNBatches", ctx, uint(2)).Return(tc.batches, nilErr).Maybe()
// dbManagerMock.On("GetTransactionsByBatchNumber", ctx, f.batch.batchNumber).Return(txs, nilErr).Maybe()
// processRequest := f.processRequest
// processRequest.Caller = state.DiscardCallerLabel
// processRequest.Timestamp = f.batch.timestamp
// executorMock.On("ProcessBatch", ctx, processRequest).Return(&state.ProcessBatchResponse{
// NewStateRoot: f.batch.stateRoot,
// NewLocalExitRoot: f.batch.localExitRoot,
//
// IsBatchProcessed: true,
// }, nilErr).Maybe()
//
// if tc.openBatchErr == nil {
// dbTxMock.On("Commit", ctx).Return(nilErr).Once()
// } else {
// dbTxMock.On("Rollback", ctx).Return(nilErr).Once()
// }
// }
//
// // act
// wipBatch, err := f.newWIPBatch(ctx)
//
// // assert
// if tc.expectedErr != nil {
// assert.Error(t, err)
// assert.EqualError(t, err, tc.expectedErr.Error())
// assert.Nil(t, wipBatch)
// } else {
// assert.NoError(t, err)
// assert.Equal(t, tc.expectedWip, wipBatch)
// }
// dbManagerMock.AssertExpectations(t)
// dbTxMock.AssertExpectations(t)
// })
// }
//}

func TestFinalizer_handleTransactionError(t *testing.T) {
// arrange
Expand Down
9 changes: 4 additions & 5 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,11 @@ func (s *State) ProcessBatch(ctx context.Context, request ProcessRequest) (*Proc
}
var result *ProcessBatchResponse

if len(request.Transactions) > 0 {
result, err = convertToProcessBatchResponse(res)
if err != nil {
return nil, err
}
result, err = convertToProcessBatchResponse(res)
if err != nil {
return nil, err
}

log.Debugf("ProcessBatch end")
log.Debugf("*******************************************")

Expand Down

0 comments on commit 2b8d308

Please sign in to comment.