From d0d583300f3f156297211af9c3a747c4979bb5ed Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Wed, 21 Jun 2023 09:13:15 +0200 Subject: [PATCH 01/11] Initial changes --- consensus/polybft/contractsapi/helper.go | 8 ++++ consensus/polybft/state_retry.go | 60 ++++++++++++++++++++++++ consensus/polybft/state_retry_test.go | 49 +++++++++++++++++++ 3 files changed, 117 insertions(+) create mode 100644 consensus/polybft/state_retry.go create mode 100644 consensus/polybft/state_retry_test.go diff --git a/consensus/polybft/contractsapi/helper.go b/consensus/polybft/contractsapi/helper.go index 9eb38707aa..3a364e7bdd 100644 --- a/consensus/polybft/contractsapi/helper.go +++ b/consensus/polybft/contractsapi/helper.go @@ -2,6 +2,7 @@ package contractsapi import ( "github.com/0xPolygon/polygon-edge/types" + "github.com/umbracle/ethgo" "github.com/umbracle/ethgo/abi" ) @@ -13,6 +14,13 @@ type StateTransactionInput interface { DecodeAbi(b []byte) error } +// EventAbi is an interface representing an event generated in contractsapi +type EventAbi interface { + Sig() ethgo.Hash + Encode(inputs interface{}) ([]byte, error) + ParseLog(log *ethgo.Log) (bool, error) +} + var ( // stateSyncABIType is a specific case where we need to encode state sync event as a tuple of tuple stateSyncABIType = abi.MustNewType( diff --git a/consensus/polybft/state_retry.go b/consensus/polybft/state_retry.go new file mode 100644 index 0000000000..b6fa60206f --- /dev/null +++ b/consensus/polybft/state_retry.go @@ -0,0 +1,60 @@ +package polybft + +import ( + "github.com/0xPolygon/polygon-edge/blockchain" + "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" + "github.com/0xPolygon/polygon-edge/types" + "github.com/umbracle/ethgo" +) + +type eventDBInsertRetry[T contractsapi.EventAbi] struct { + blockchain blockchainBackend + saveEventsFn func([]T) error + parseEventFn func(*types.Header, *ethgo.Log) (T, bool, error) + isValidLogFn func(*types.Log) bool +} + +func (r *eventDBInsertRetry[T]) insertRetry(fromBlock, toBlock uint64) error { + events := make([]T, 0) + + for i := fromBlock; i <= toBlock; i++ { + blockHeader, found := r.blockchain.GetHeaderByNumber(i) + if !found { + return blockchain.ErrNoBlock + } + + receipts, err := r.blockchain.GetReceiptsByHash(blockHeader.Hash) + if err != nil { + return err + } + + for _, receipt := range receipts { + if receipt.Status == nil || *receipt.Status != types.ReceiptSuccess { + continue + } + + for _, log := range receipt.Logs { + if r.isValidLogFn != nil && !r.isValidLogFn(log) { + continue + } + + event, doesMatch, err := r.parseEventFn(blockHeader, convertLog(log)) + if err != nil { + return err + } + + if !doesMatch { + continue + } + + events = append(events, event) + } + } + } + + if len(events) > 0 { + return r.saveEventsFn(events) + } + + return nil +} diff --git a/consensus/polybft/state_retry_test.go b/consensus/polybft/state_retry_test.go new file mode 100644 index 0000000000..2dcf370760 --- /dev/null +++ b/consensus/polybft/state_retry_test.go @@ -0,0 +1,49 @@ +package polybft + +import ( + "testing" + + "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" + "github.com/0xPolygon/polygon-edge/contracts" + "github.com/0xPolygon/polygon-edge/types" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/umbracle/ethgo" +) + +func TestEventDBInsertRetry_TransferEvents(t *testing.T) { + receipt := &types.Receipt{ + Logs: []*types.Log{ + createTestLogForTransferEvent(t, contracts.ValidatorSetContract, types.ZeroAddress, types.ZeroAddress, 10), + }, + } + receipt.SetStatus(types.ReceiptSuccess) + + backend := new(blockchainMock) + backend.On("GetHeaderByNumber", mock.Anything).Return(&types.Header{ + Hash: types.BytesToHash([]byte{0, 1, 2, 3}), + }, true) + backend.On("GetReceiptsByHash", mock.Anything).Return([]*types.Receipt{receipt}, nil) + + saveEventsFn := func(events []*contractsapi.TransferEvent) error { + require.NotEmpty(t, events) + + return nil + } + + retryManager := &eventDBInsertRetry[*contractsapi.TransferEvent]{ + blockchain: backend, + saveEventsFn: saveEventsFn, + isValidLogFn: func(l *types.Log) bool { + return l.Address == contracts.ValidatorSetContract + }, + parseEventFn: func(h *types.Header, l *ethgo.Log) (*contractsapi.TransferEvent, bool, error) { + var e contractsapi.TransferEvent + doesMatch, err := e.ParseLog(l) + + return &e, doesMatch, err + }, + } + + require.NoError(t, retryManager.insertRetry(1, 1)) +} From 2b54bfbb43b8574904be2dbfe6f6fb3a34be7297 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Wed, 21 Jun 2023 15:11:04 +0200 Subject: [PATCH 02/11] Exit events update --- command/bridge/exit/exit.go | 2 +- consensus/polybft/checkpoint_manager.go | 7 +++--- consensus/polybft/consensus_runtime_test.go | 2 +- consensus/polybft/sc_integration_test.go | 22 +++++++++++-------- consensus/polybft/state.go | 18 --------------- consensus/polybft/state_store_checkpoint.go | 22 ++++++++++++------- .../polybft/state_store_checkpoint_test.go | 18 +++++++++------ 7 files changed, 44 insertions(+), 47 deletions(-) diff --git a/command/bridge/exit/exit.go b/command/bridge/exit/exit.go index dc74eef0d9..6116788b91 100644 --- a/command/bridge/exit/exit.go +++ b/command/bridge/exit/exit.go @@ -161,7 +161,7 @@ func run(cmd *cobra.Command, _ []string) { } outputter.SetCommandResult(&exitResult{ - ID: strconv.FormatUint(exitEvent.ID, 10), + ID: strconv.FormatUint(exitEvent.ID.Uint64(), 10), Sender: exitEvent.Sender.String(), Receiver: exitEvent.Receiver.String(), }) diff --git a/consensus/polybft/checkpoint_manager.go b/consensus/polybft/checkpoint_manager.go index 3544e6d4ee..4440982f43 100644 --- a/consensus/polybft/checkpoint_manager.go +++ b/consensus/polybft/checkpoint_manager.go @@ -75,6 +75,7 @@ func newCheckpointManager(key ethgo.Key, checkpointOffset uint64, checkpointManagerSC types.Address, txRelayer txrelayer.TxRelayer, blockchain blockchainBackend, backend polybftBackend, logger hclog.Logger, state *State) *checkpointManager { + return &checkpointManager{ key: key, blockchain: blockchain, @@ -399,7 +400,7 @@ func (c *checkpointManager) GenerateExitProof(exitID uint64) (types.Proof, error var exitEventAPI contractsapi.L2StateSyncedEvent - e, err := exitEventAPI.Encode(exitEvent) + e, err := exitEventAPI.Encode(exitEvent.L2StateSyncedEvent) if err != nil { return types.Proof{}, err } @@ -466,7 +467,7 @@ func getExitEventsFromReceipts(epoch, block uint64, receipts []*types.Receipt) ( // enforce sequential order sort.Slice(events, func(i, j int) bool { - return events[i].ID < events[j].ID + return events[i].ID.Cmp(events[j].ID) < 0 }) return events, nil @@ -479,7 +480,7 @@ func createExitTree(exitEvents []*ExitEvent) (*merkle.MerkleTree, error) { var exitEventAPI contractsapi.L2StateSyncedEvent for i := 0; i < numOfEvents; i++ { - b, err := exitEventAPI.Encode(exitEvents[i]) + b, err := exitEventAPI.Encode(exitEvents[i].L2StateSyncedEvent) if err != nil { return nil, err } diff --git a/consensus/polybft/consensus_runtime_test.go b/consensus/polybft/consensus_runtime_test.go index 34a7cfa3e7..8b15f7f0f1 100644 --- a/consensus/polybft/consensus_runtime_test.go +++ b/consensus/polybft/consensus_runtime_test.go @@ -1074,7 +1074,7 @@ func encodeExitEvents(t *testing.T, exitEvents []*ExitEvent) [][]byte { var exitEventAPI contractsapi.L2StateSyncedEvent for i, e := range exitEvents { - encodedEvent, err := exitEventAPI.Encode(e) + encodedEvent, err := exitEventAPI.Encode(e.L2StateSyncedEvent) require.NoError(t, err) encodedEvents[i] = encodedEvent diff --git a/consensus/polybft/sc_integration_test.go b/consensus/polybft/sc_integration_test.go index c850705553..2274aed396 100644 --- a/consensus/polybft/sc_integration_test.go +++ b/consensus/polybft/sc_integration_test.go @@ -161,16 +161,20 @@ func TestIntegratoin_PerformExit(t *testing.T) { exits := []*ExitEvent{ { - ID: 1, - Sender: ethgo.Address(contracts.ChildERC20PredicateContract), - Receiver: ethgo.Address(rootERC20PredicateAddr), - Data: exitData1, + L2StateSyncedEvent: &contractsapi.L2StateSyncedEvent{ + ID: big.NewInt(1), + Sender: types.Address(contracts.ChildERC20PredicateContract), + Receiver: types.Address(rootERC20PredicateAddr), + Data: exitData1, + }, }, { - ID: 2, - Sender: ethgo.Address(contracts.ChildERC20PredicateContract), - Receiver: ethgo.Address(rootERC20PredicateAddr), - Data: exitData2, + L2StateSyncedEvent: &contractsapi.L2StateSyncedEvent{ + ID: big.NewInt(2), + Sender: types.Address(contracts.ChildERC20PredicateContract), + Receiver: types.Address(rootERC20PredicateAddr), + Data: exitData2, + }, }, } exitTree, err := createExitTree(exits) @@ -226,7 +230,7 @@ func TestIntegratoin_PerformExit(t *testing.T) { require.Equal(t, 0, int(res[31])) var exitEventAPI contractsapi.L2StateSyncedEvent - proofExitEvent, err := exitEventAPI.Encode(exits[0]) + proofExitEvent, err := exitEventAPI.Encode(exits[0].L2StateSyncedEvent) require.NoError(t, err) proof, err := exitTree.GenerateProof(proofExitEvent) diff --git a/consensus/polybft/state.go b/consensus/polybft/state.go index 3e36e1ee01..2627b91ba7 100644 --- a/consensus/polybft/state.go +++ b/consensus/polybft/state.go @@ -5,26 +5,8 @@ import ( "github.com/hashicorp/go-hclog" bolt "go.etcd.io/bbolt" - - "github.com/umbracle/ethgo" ) -// ExitEvent is an event emitted by Exit contract -type ExitEvent struct { - // ID is the decoded 'index' field from the event - ID uint64 `abi:"id"` - // Sender is the decoded 'sender' field from the event - Sender ethgo.Address `abi:"sender"` - // Receiver is the decoded 'receiver' field from the event - Receiver ethgo.Address `abi:"receiver"` - // Data is the decoded 'data' field from the event - Data []byte `abi:"data"` - // EpochNumber is the epoch number in which exit event was added - EpochNumber uint64 `abi:"-"` - // BlockNumber is the block in which exit event was added - BlockNumber uint64 `abi:"-"` -} - // MessageSignature encapsulates sender identifier and its signature type MessageSignature struct { // Signer of the vote diff --git a/consensus/polybft/state_store_checkpoint.go b/consensus/polybft/state_store_checkpoint.go index 4f56a36af8..d7a5507f21 100644 --- a/consensus/polybft/state_store_checkpoint.go +++ b/consensus/polybft/state_store_checkpoint.go @@ -28,6 +28,15 @@ func (e *exitEventNotFoundError) Error() string { return fmt.Sprintf("could not find any exit event that has an id: %v and epoch: %v", e.exitID, e.epoch) } +// ExitEvent is an event emitted by Exit contract +type ExitEvent struct { + *contractsapi.L2StateSyncedEvent + // EpochNumber is the epoch number in which exit event was added + EpochNumber uint64 `abi:"-"` + // BlockNumber is the block in which exit event was added + BlockNumber uint64 `abi:"-"` +} + /* Bolt DB schema: @@ -80,7 +89,7 @@ func insertExitEventToBucket(exitEventBucket, lookupBucket *bolt.Bucket, exitEve } epochBytes := common.EncodeUint64ToBytes(exitEvent.EpochNumber) - exitIDBytes := common.EncodeUint64ToBytes(exitEvent.ID) + exitIDBytes := common.EncodeUint64ToBytes(exitEvent.ID.Uint64()) err = exitEventBucket.Put(bytes.Join([][]byte{epochBytes, exitIDBytes, common.EncodeUint64ToBytes(exitEvent.BlockNumber)}, nil), raw) @@ -162,7 +171,7 @@ func (s *CheckpointStore) getExitEvents(epoch uint64, filter func(exitEvent *Exi // enforce sequential order sort.Slice(events, func(i, j int) bool { - return events[i].ID < events[j].ID + return events[i].ID.Cmp(events[j].ID) < 0 }) return events, err @@ -182,12 +191,9 @@ func decodeExitEvent(log *ethgo.Log, epoch, block uint64) (*ExitEvent, error) { } return &ExitEvent{ - ID: l2StateSyncedEvent.ID.Uint64(), - Sender: ethgo.Address(l2StateSyncedEvent.Sender), - Receiver: ethgo.Address(l2StateSyncedEvent.Receiver), - Data: l2StateSyncedEvent.Data, - EpochNumber: epoch, - BlockNumber: block, + L2StateSyncedEvent: &l2StateSyncedEvent, + EpochNumber: epoch, + BlockNumber: block, }, nil } diff --git a/consensus/polybft/state_store_checkpoint_test.go b/consensus/polybft/state_store_checkpoint_test.go index 7ee66d88e0..6123986a6a 100644 --- a/consensus/polybft/state_store_checkpoint_test.go +++ b/consensus/polybft/state_store_checkpoint_test.go @@ -1,10 +1,12 @@ package polybft import ( + "math/big" "testing" "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" "github.com/0xPolygon/polygon-edge/helper/common" + "github.com/0xPolygon/polygon-edge/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/umbracle/ethgo" @@ -96,13 +98,13 @@ func TestState_NoEpochForExitEventInLookup(t *testing.T) { exitEventFromDB, err := state.CheckpointStore.getExitEvent(exitToTest) require.NoError(t, err) - require.Equal(t, exitToTest, exitEventFromDB.ID) + require.Equal(t, exitToTest, exitEventFromDB.ID.Uint64()) require.Equal(t, epochToMatch, exitEventFromDB.EpochNumber) require.Equal(t, blockNumberToMatch, exitEventFromDB.BlockNumber) // simulate invalid case (for some reason lookup table doesn't have epoch for given exit) err = state.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(exitEventToEpochLookupBucket).Delete(common.EncodeUint64ToBytes(exitEventFromDB.ID)) + return tx.Bucket(exitEventToEpochLookupBucket).Delete(common.EncodeUint64ToBytes(exitEventFromDB.ID.Uint64())) }) require.NoError(t, err) @@ -141,7 +143,7 @@ func TestState_decodeExitEvent(t *testing.T) { event, err := decodeExitEvent(log, epoch, blockNumber) require.NoError(t, err) - require.Equal(t, uint64(exitID), event.ID) + require.Equal(t, uint64(exitID), event.ID.Uint64()) require.Equal(t, uint64(epoch), event.EpochNumber) require.Equal(t, uint64(blockNumber), event.BlockNumber) @@ -181,10 +183,12 @@ func insertTestExitEvents(t *testing.T, state *State, for k := 1; k <= numOfEventsPerBlock; k++ { exitEvents[index] = &ExitEvent{ - ID: index, - Sender: ethgo.ZeroAddress, - Receiver: ethgo.ZeroAddress, - Data: generateRandomBytes(t), + L2StateSyncedEvent: &contractsapi.L2StateSyncedEvent{ + ID: new(big.Int).SetUint64(index), + Sender: types.ZeroAddress, + Receiver: types.ZeroAddress, + Data: generateRandomBytes(t), + }, EpochNumber: i, BlockNumber: block, } From e340952a0b4dcc98d7a352b6cc7d711b573b7ced Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 22 Jun 2023 10:49:10 +0200 Subject: [PATCH 03/11] Checkpoint manager change --- command/bridge/exit/exit.go | 2 +- consensus/polybft/checkpoint_manager.go | 113 ++++++++++-------- consensus/polybft/checkpoint_manager_test.go | 95 +++++++++++++-- consensus/polybft/extra.go | 4 - consensus/polybft/fsm.go | 13 +- consensus/polybft/fsm_test.go | 6 +- consensus/polybft/runtime_helpers.go | 5 + consensus/polybft/sc_integration_test.go | 8 +- consensus/polybft/stake_manager.go | 104 +++++++--------- consensus/polybft/stake_manager_test.go | 12 +- consensus/polybft/state_event_getter.go | 91 ++++++++++++++ ...try_test.go => state_event_getter_test.go} | 6 +- consensus/polybft/state_retry.go | 60 ---------- consensus/polybft/state_store_checkpoint.go | 37 +++++- 14 files changed, 342 insertions(+), 214 deletions(-) create mode 100644 consensus/polybft/state_event_getter.go rename consensus/polybft/{state_retry_test.go => state_event_getter_test.go} (87%) delete mode 100644 consensus/polybft/state_retry.go diff --git a/command/bridge/exit/exit.go b/command/bridge/exit/exit.go index 6116788b91..047d415a40 100644 --- a/command/bridge/exit/exit.go +++ b/command/bridge/exit/exit.go @@ -186,7 +186,7 @@ func createExitTxn(sender ethgo.Address, proof types.Proof) (*ethgo.Transaction, var exitEventAPI contractsapi.L2StateSyncedEvent - exitEventEncoded, err := exitEventAPI.Encode(exitEvent) + exitEventEncoded, err := exitEventAPI.Encode(exitEvent.L2StateSyncedEvent) if err != nil { return nil, nil, fmt.Errorf("failed to encode exit event: %w", err) } diff --git a/consensus/polybft/checkpoint_manager.go b/consensus/polybft/checkpoint_manager.go index 4440982f43..05984f6031 100644 --- a/consensus/polybft/checkpoint_manager.go +++ b/consensus/polybft/checkpoint_manager.go @@ -68,6 +68,8 @@ type checkpointManager struct { logger hclog.Logger // state boltDb instance state *State + // eventGetter gets exit events (missed or current) from blocks + eventGetter *eventsGetter[*ExitEvent] } // newCheckpointManager creates a new instance of checkpointManager @@ -75,6 +77,43 @@ func newCheckpointManager(key ethgo.Key, checkpointOffset uint64, checkpointManagerSC types.Address, txRelayer txrelayer.TxRelayer, blockchain blockchainBackend, backend polybftBackend, logger hclog.Logger, state *State) *checkpointManager { + retry := &eventsGetter[*ExitEvent]{ + blockchain: blockchain, + isValidLogFn: func(l *types.Log) bool { + return l.Address == contracts.L2StateSenderContract + }, + saveEventsFn: func(events []*ExitEvent) error { + // enforce sequential order + sort.Slice(events, func(i, j int) bool { + return events[i].ID.Cmp(events[j].ID) < 0 + }) + + return state.CheckpointStore.insertExitEvents(events) + }, + parseEventFn: func(h *types.Header, l *ethgo.Log) (*ExitEvent, bool, error) { + extra, err := GetIbftExtra(h.ExtraData) + if err != nil { + return nil, false, + fmt.Errorf("could not get header extra on exit event parsing. Error: %w", err) + } + + epoch := extra.Checkpoint.EpochNumber + block := h.Number + if extra.Validators != nil { + // exit events that happened in epoch ending blocks, + // should be added to the tree of the next epoch + epoch++ + block++ + } + + event, err := decodeExitEvent(l, epoch, block) + if err != nil { + return nil, false, err + } + + return event, true, nil + }, + } return &checkpointManager{ key: key, @@ -85,6 +124,7 @@ func newCheckpointManager(key ethgo.Key, checkpointOffset uint64, checkpointManagerAddr: checkpointManagerSC, logger: logger, state: state, + eventGetter: retry, } } @@ -274,30 +314,33 @@ func (c *checkpointManager) isCheckpointBlock(blockNumber uint64, isEpochEndingB // PostBlock is called on every insert of finalized block (either from consensus or syncer) // It will read any exit event that happened in block and insert it to state boltDb func (c *checkpointManager) PostBlock(req *PostBlockRequest) error { - var ( - epoch = req.Epoch - block = req.FullBlock.Block.Number() - ) + block := req.FullBlock.Block.Number() - if req.IsEpochEndingBlock { - // exit events that happened in epoch ending blocks, - // should be added to the tree of the next epoch - epoch++ - block++ + lastBlock, err := c.state.CheckpointStore.getLastSaved() + if err != nil { + return fmt.Errorf("could not get last processed block for exit events. Error: %w", err) } - // commit exit events only when we finalize a block - events, err := getExitEventsFromReceipts(epoch, block, req.FullBlock.Receipts) - if err != nil { - return err + // get any missed events + if lastBlock+1 < block { + // since we save exit events from epoch ending blocks, + // to next epoch and next block, check that block as well + // to not miss any events that might happen in it + if err := c.eventGetter.getFromBlocks(lastBlock+1, block-1); err != nil { + return fmt.Errorf("could not get exit events from missed blocks. Error: %w", err) + } + + if err := c.state.CheckpointStore.updateLastSaved(block - 1); err != nil { + return err + } } - if len(events) > 0 { - c.logger.Debug("Gotten exit events from logs on block", - "eventsNum", len(events), "block", req.FullBlock.Block.Number()) + // get exit events from current block + if err := c.eventGetter.saveBlockEvents(req.FullBlock.Block.Header, req.FullBlock.Receipts); err != nil { + return err } - if err := c.state.CheckpointStore.insertExitEvents(events); err != nil { + if err := c.state.CheckpointStore.updateLastSaved(block); err != nil { return err } @@ -437,42 +480,6 @@ func (c *checkpointManager) GenerateExitProof(exitID uint64) (types.Proof, error }, nil } -// getExitEventsFromReceipts parses logs from receipts to find exit events -func getExitEventsFromReceipts(epoch, block uint64, receipts []*types.Receipt) ([]*ExitEvent, error) { - events := make([]*ExitEvent, 0) - - for i := 0; i < len(receipts); i++ { - if receipts[i].Status == nil || *receipts[i].Status != types.ReceiptSuccess { - continue - } - - for _, log := range receipts[i].Logs { - if log.Address != contracts.L2StateSenderContract { - continue - } - - event, err := decodeExitEvent(convertLog(log), epoch, block) - if err != nil { - return nil, err - } - - if event == nil { - // valid case, not an exit event - continue - } - - events = append(events, event) - } - } - - // enforce sequential order - sort.Slice(events, func(i, j int) bool { - return events[i].ID.Cmp(events[j].ID) < 0 - }) - - return events, nil -} - // createExitTree creates an exit event merkle tree from provided exit events func createExitTree(exitEvents []*ExitEvent) (*merkle.MerkleTree, error) { numOfEvents := len(exitEvents) diff --git a/consensus/polybft/checkpoint_manager_test.go b/consensus/polybft/checkpoint_manager_test.go index 7648eba1b0..7b323b7211 100644 --- a/consensus/polybft/checkpoint_manager_test.go +++ b/consensus/polybft/checkpoint_manager_test.go @@ -296,22 +296,37 @@ func TestCheckpointManager_PostBlock(t *testing.T) { state := newTestState(t) - receipts := make([]*types.Receipt, numOfReceipts) - for i := 0; i < numOfReceipts; i++ { - receipts[i] = &types.Receipt{Logs: []*types.Log{ - createTestLogForExitEvent(t, uint64(i)), - }} - receipts[i].SetStatus(types.ReceiptSuccess) + createReceipts := func(startID, endID uint64) []*types.Receipt { + receipts := make([]*types.Receipt, endID-startID) + for i := startID; i < endID; i++ { + receipts[i-startID] = &types.Receipt{Logs: []*types.Log{ + createTestLogForExitEvent(t, i), + }} + receipts[i-startID].SetStatus(types.ReceiptSuccess) + } + + return receipts } - req := &PostBlockRequest{FullBlock: &types.FullBlock{Block: &types.Block{Header: &types.Header{Number: block}}, Receipts: receipts}, + extra := &Extra{ + Checkpoint: &CheckpointData{ + EpochNumber: epoch, + }, + } + + req := &PostBlockRequest{FullBlock: &types.FullBlock{Block: &types.Block{Header: &types.Header{Number: block}}}, Epoch: epoch} + req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil) + + blockchain := new(blockchainMock) checkpointManager := newCheckpointManager(wallet.NewEcdsaSigner(createTestKey(t)), 5, types.ZeroAddress, - nil, nil, nil, hclog.NewNullLogger(), state) + nil, blockchain, nil, hclog.NewNullLogger(), state) t.Run("PostBlock - not epoch ending block", func(t *testing.T) { + require.NoError(t, state.CheckpointStore.updateLastSaved(block-1)) // we got everything till the current block req.IsEpochEndingBlock = false + req.FullBlock.Receipts = createReceipts(0, 5) require.NoError(t, checkpointManager.PostBlock(req)) exitEvents, err := state.CheckpointStore.getExitEvents(epoch, func(exitEvent *ExitEvent) bool { @@ -319,23 +334,79 @@ func TestCheckpointManager_PostBlock(t *testing.T) { }) require.NoError(t, err) - require.Len(t, exitEvents, numOfReceipts) + require.Len(t, exitEvents, 5) require.Equal(t, uint64(epoch), exitEvents[0].EpochNumber) }) t.Run("PostBlock - epoch ending block (exit events are saved to the next epoch)", func(t *testing.T) { + require.NoError(t, state.CheckpointStore.updateLastSaved(block)) // we got everything till the current block req.IsEpochEndingBlock = true + req.FullBlock.Receipts = createReceipts(5, 10) + extra.Validators = &validator.ValidatorSetDelta{} + req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil) + req.FullBlock.Block.Header.Number = block + 1 + require.NoError(t, checkpointManager.PostBlock(req)) exitEvents, err := state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool { - return exitEvent.BlockNumber == block+1 + return exitEvent.BlockNumber == block+2 // they should be saved in the next epoch and its first block }) require.NoError(t, err) - require.Len(t, exitEvents, numOfReceipts) - require.Equal(t, uint64(block+1), exitEvents[0].BlockNumber) + require.Len(t, exitEvents, 5) + require.Equal(t, uint64(block+2), exitEvents[0].BlockNumber) require.Equal(t, uint64(epoch+1), exitEvents[0].EpochNumber) }) + + t.Run("PostBlock - there are missing events", func(t *testing.T) { + require.NoError(t, state.CheckpointStore.updateLastSaved(block)) // we are missing one block + + missedReceipts := createReceipts(10, 13) + newReceipts := createReceipts(13, 15) + + extra := &Extra{ + Checkpoint: &CheckpointData{ + EpochNumber: epoch + 1, + }, + } + + blockchain.On("GetHeaderByNumber", uint64(block+1)).Return(&types.Header{ + Number: block + 1, + ExtraData: extra.MarshalRLPTo(nil), + Hash: types.BytesToHash([]byte{0, 1, 2, 3}), + }, true) + blockchain.On("GetReceiptsByHash", types.BytesToHash([]byte{0, 1, 2, 3})).Return([]*types.Receipt{}, nil) + blockchain.On("GetHeaderByNumber", uint64(block+2)).Return(&types.Header{ + Number: block + 2, + ExtraData: extra.MarshalRLPTo(nil), + Hash: types.BytesToHash([]byte{4, 5, 6, 7}), + }, true) + blockchain.On("GetReceiptsByHash", types.BytesToHash([]byte{4, 5, 6, 7})).Return(missedReceipts, nil) + + req.IsEpochEndingBlock = false + req.FullBlock.Block.Header.Number = block + 3 // new block + req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil) // same epoch + req.FullBlock.Receipts = newReceipts + require.NoError(t, checkpointManager.PostBlock(req)) + + exitEvents, err := state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool { + return exitEvent.BlockNumber == block+2 + }) + + require.NoError(t, err) + // receipts from missed block + events from previous test case that were saved in the next epoch + // since they were in epoch ending block + require.Len(t, exitEvents, len(missedReceipts)+5) + require.Equal(t, extra.Checkpoint.EpochNumber, exitEvents[0].EpochNumber) + + exitEvents, err = state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool { + return exitEvent.BlockNumber == block+3 + }) + + require.NoError(t, err) + require.Len(t, exitEvents, len(newReceipts)) + require.Equal(t, extra.Checkpoint.EpochNumber, exitEvents[0].EpochNumber) + }) } func TestCheckpointManager_BuildEventRoot(t *testing.T) { diff --git a/consensus/polybft/extra.go b/consensus/polybft/extra.go index 42ba463738..3db1917712 100644 --- a/consensus/polybft/extra.go +++ b/consensus/polybft/extra.go @@ -499,9 +499,5 @@ func GetIbftExtra(extraRaw []byte) (*Extra, error) { return nil, err } - if extra.Validators == nil { - extra.Validators = &validator.ValidatorSetDelta{} - } - return extra, nil } diff --git a/consensus/polybft/fsm.go b/consensus/polybft/fsm.go index 90fe2eb5b8..8e858bd306 100644 --- a/consensus/polybft/fsm.go +++ b/consensus/polybft/fsm.go @@ -43,8 +43,9 @@ var ( "allowed in an epoch ending block") errProposalDontMatch = errors.New("failed to insert proposal, because the validated proposal " + "is either nil or it does not match the received one") - errValidatorSetDeltaMismatch = errors.New("validator set delta mismatch") - errValidatorsUpdateInNonEpochEnding = errors.New("trying to update validator set in a non epoch ending block") + errValidatorSetDeltaMismatch = errors.New("validator set delta mismatch") + errValidatorsUpdateInNonEpochEnding = errors.New("trying to update validator set in a non epoch ending block") + errValidatorDeltaNilInEpochEndingBlock = errors.New("validator set delta is nil in epoch ending block") ) type fsm struct { @@ -337,11 +338,15 @@ func (f *fsm) Validate(proposal []byte) error { // validate validators delta if f.isEndOfEpoch { + if extra.Validators == nil { + return errValidatorDeltaNilInEpochEndingBlock + } + if !extra.Validators.Equals(f.newValidatorsDelta) { return errValidatorSetDeltaMismatch } - } else if !extra.Validators.IsEmpty() { - // delta should be empty in non epoch ending blocks + } else if extra.Validators != nil { + // delta should be nil in non epoch ending blocks return errValidatorsUpdateInNonEpochEnding } diff --git a/consensus/polybft/fsm_test.go b/consensus/polybft/fsm_test.go index 0d71d15413..f439c841e3 100644 --- a/consensus/polybft/fsm_test.go +++ b/consensus/polybft/fsm_test.go @@ -365,7 +365,7 @@ func TestFSM_BuildProposal_EpochEndingBlock_ValidatorsDeltaExists(t *testing.T) blockChainMock.AssertExpectations(t) } -func TestFSM_BuildProposal_NonEpochEndingBlock_ValidatorsDeltaEmpty(t *testing.T) { +func TestFSM_BuildProposal_NonEpochEndingBlock_ValidatorsDeltaNil(t *testing.T) { t.Parallel() const ( @@ -396,7 +396,7 @@ func TestFSM_BuildProposal_NonEpochEndingBlock_ValidatorsDeltaEmpty(t *testing.T blockExtra, err := GetIbftExtra(stateBlock.Block.Header.ExtraData) assert.NoError(t, err) - assert.True(t, blockExtra.Validators.IsEmpty()) + assert.Nil(t, blockExtra.Validators) blockBuilderMock.AssertExpectations(t) } @@ -799,7 +799,7 @@ func TestFSM_Validate_EpochEndingBlock_MismatchInDeltas(t *testing.T) { parentCheckpointHash, err := extra.Checkpoint.Hash(0, parentBlockNumber, parent.Hash) require.NoError(t, err) - extra.Validators = nil // this will cause test to fail + extra.Validators = &validator.ValidatorSetDelta{} // this will cause test to fail extra.Parent = createSignature(t, validators.GetPrivateIdentities(), parentCheckpointHash, bls.DomainCheckpointManager) stateBlock := createDummyStateBlock(parent.Number+1, types.Hash{100, 15}, extra.MarshalRLPTo(nil)) diff --git a/consensus/polybft/runtime_helpers.go b/consensus/polybft/runtime_helpers.go index f2ffa2a49a..1fc6671f01 100644 --- a/consensus/polybft/runtime_helpers.go +++ b/consensus/polybft/runtime_helpers.go @@ -28,6 +28,11 @@ func getBlockData(blockNumber uint64, blockchainBackend blockchainBackend) (*typ // isEpochEndingBlock checks if given block is an epoch ending block func isEpochEndingBlock(blockNumber uint64, extra *Extra, blockchain blockchainBackend) (bool, error) { + if extra.Validators == nil { + // non epoch ending blocks have validator set delta as nil + return false, nil + } + if !extra.Validators.IsEmpty() { // if validator set delta is not empty, the validator set was changed in this block // meaning the epoch changed as well diff --git a/consensus/polybft/sc_integration_test.go b/consensus/polybft/sc_integration_test.go index 2274aed396..6762ed8095 100644 --- a/consensus/polybft/sc_integration_test.go +++ b/consensus/polybft/sc_integration_test.go @@ -163,16 +163,16 @@ func TestIntegratoin_PerformExit(t *testing.T) { { L2StateSyncedEvent: &contractsapi.L2StateSyncedEvent{ ID: big.NewInt(1), - Sender: types.Address(contracts.ChildERC20PredicateContract), - Receiver: types.Address(rootERC20PredicateAddr), + Sender: contracts.ChildERC20PredicateContract, + Receiver: rootERC20PredicateAddr, Data: exitData1, }, }, { L2StateSyncedEvent: &contractsapi.L2StateSyncedEvent{ ID: big.NewInt(2), - Sender: types.Address(contracts.ChildERC20PredicateContract), - Receiver: types.Address(rootERC20PredicateAddr), + Sender: contracts.ChildERC20PredicateContract, + Receiver: rootERC20PredicateAddr, Data: exitData2, }, }, diff --git a/consensus/polybft/stake_manager.go b/consensus/polybft/stake_manager.go index e140ca6eee..a6764622aa 100644 --- a/consensus/polybft/stake_manager.go +++ b/consensus/polybft/stake_manager.go @@ -8,7 +8,6 @@ import ( "sort" "strings" - "github.com/0xPolygon/polygon-edge/blockchain" "github.com/0xPolygon/polygon-edge/consensus/polybft/bitmap" "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer" @@ -108,25 +107,7 @@ func (s *stakeManager) PostBlock(req *PostBlockRequest) error { s.logger.Debug("Stake manager on post block", "block", req.FullBlock.Block.Number(), "last saved", fullValidatorSet.BlockNumber, "last updated", fullValidatorSet.UpdatedAtBlockNumber) - // update with missing blocks - for i := fullValidatorSet.BlockNumber + 1; i < req.FullBlock.Block.Number(); i++ { - blockHeader, found := s.blockchain.GetHeaderByNumber(i) - if !found { - return blockchain.ErrNoBlock - } - - receipts, err := s.blockchain.GetReceiptsByHash(blockHeader.Hash) - if err != nil { - return err - } - - if err := s.updateWithReceipts(&fullValidatorSet, receipts, i); err != nil { - return err - } - } - - // finally update with received block - err = s.updateWithReceipts(&fullValidatorSet, req.FullBlock.Receipts, req.FullBlock.Block.Number()) + err = s.updateWithReceipts(&fullValidatorSet, req.FullBlock) if err != nil { return err } @@ -138,20 +119,47 @@ func (s *stakeManager) PostBlock(req *PostBlockRequest) error { } func (s *stakeManager) updateWithReceipts( - fullValidatorSet *validatorSetState, receipts []*types.Receipt, block uint64) error { - events, err := s.getTransferEventsFromReceipts(receipts) - if err != nil { - return err + fullValidatorSet *validatorSetState, fullBlock *types.FullBlock) error { + var transferEvents []*contractsapi.TransferEvent + + eventsGetter := &eventsGetter[*contractsapi.TransferEvent]{ + blockchain: s.blockchain, + saveEventsFn: func(te []*contractsapi.TransferEvent) error { + transferEvents = append(transferEvents, te...) + + return nil + }, + isValidLogFn: func(l *types.Log) bool { + return l.Address == s.validatorSetContract + }, + parseEventFn: func(h *types.Header, l *ethgo.Log) (*contractsapi.TransferEvent, bool, error) { + var transferEvent contractsapi.TransferEvent + doesMatch, err := transferEvent.ParseLog(l) + + return &transferEvent, doesMatch, err + }, + } + + if fullValidatorSet.BlockNumber+1 < fullBlock.Block.Number() { + // get transfer events we missed until current block + if err := eventsGetter.getFromBlocks(fullValidatorSet.BlockNumber+1, + fullBlock.Block.Number()-1); err != nil { + return fmt.Errorf("could not get transfer events from missed blocks. Error: %w", err) + } } - s.logger.Debug("Full validator set before", - "block", block-1, "evnts", len(events), "data", fullValidatorSet.Validators) + // get transfer currentBlockEvents from current block + currentBlockEvents, err := eventsGetter.getEventsFromReceipts(fullBlock.Block.Header, fullBlock.Receipts) + if err != nil { + return fmt.Errorf("could not get transfer events from current block. Error: %w", err) + } - if len(events) == 0 { + transferEvents = append(transferEvents, currentBlockEvents...) + if len(transferEvents) == 0 { return nil } - for _, event := range events { + for _, event := range transferEvents { if event.IsStake() { s.logger.Debug("Stake transfer event", "to", event.To, "value", event.Value) @@ -172,16 +180,18 @@ func (s *stakeManager) updateWithReceipts( if data.BlsKey == nil { data.BlsKey, err = s.getBlsKey(data.Address) if err != nil { - s.logger.Warn("Could not get info for new validator", "block", block, "address", addr) + s.logger.Warn("Could not get info for new validator", + "block", fullBlock.Block.Number(), "address", addr) } } data.IsActive = data.VotingPower.Cmp(bigZero) > 0 } - fullValidatorSet.UpdatedAtBlockNumber = block // mark on which block validator set has been updated + // mark on which block validator set has been updated + fullValidatorSet.UpdatedAtBlockNumber = fullValidatorSet.BlockNumber - s.logger.Debug("Full validator set after", "block", block, "data", fullValidatorSet.Validators) + s.logger.Debug("Full validator set after", "block", fullBlock.Block, "data", fullValidatorSet.Validators) return nil } @@ -261,38 +271,6 @@ func (s *stakeManager) UpdateValidatorSet( return delta, nil } -// getTransferEventsFromReceipts parses logs from receipts to find transfer events -func (s *stakeManager) getTransferEventsFromReceipts(receipts []*types.Receipt) ([]*contractsapi.TransferEvent, error) { - events := make([]*contractsapi.TransferEvent, 0) - - for i := 0; i < len(receipts); i++ { - if receipts[i].Status == nil || *receipts[i].Status != types.ReceiptSuccess { - continue - } - - for _, log := range receipts[i].Logs { - if log.Address != s.validatorSetContract { - continue - } - - var transferEvent contractsapi.TransferEvent - - doesMatch, err := transferEvent.ParseLog(convertLog(log)) - if err != nil { - return nil, err - } - - if !doesMatch { - continue - } - - events = append(events, &transferEvent) - } - } - - return events, nil -} - // getBlsKey returns bls key for validator from the supernet contract func (s *stakeManager) getBlsKey(address types.Address) (*bls.PublicKey, error) { getValidatorFn := &contractsapi.GetValidatorCustomSupernetManagerFn{ diff --git a/consensus/polybft/stake_manager_test.go b/consensus/polybft/stake_manager_test.go index 5546216450..25125c5b59 100644 --- a/consensus/polybft/stake_manager_test.go +++ b/consensus/polybft/stake_manager_test.go @@ -291,16 +291,16 @@ func TestStakeManager_PostBlock(t *testing.T) { fullValidatorSet, err := state.StakeStore.getFullValidatorSet() require.NoError(t, err) - var firstValidaotor *validator.ValidatorMetadata - firstValidaotor = nil + var updatedValidator *validator.ValidatorMetadata + updatedValidator = nil for _, validator := range fullValidatorSet.Validators { if validator.Address.String() == validators.GetValidator(initialSetAliases[secondValidator]).Address().String() { - firstValidaotor = validator + updatedValidator = validator } } - require.NotNil(t, firstValidaotor) - require.Equal(t, big.NewInt(501), firstValidaotor.VotingPower) // 250 + 250 + initial 1 - require.True(t, firstValidaotor.IsActive) + require.NotNil(t, updatedValidator) + require.Equal(t, big.NewInt(501), updatedValidator.VotingPower) // 250 + 250 + initial 1 + require.True(t, updatedValidator.IsActive) bcMock.AssertExpectations(t) }) diff --git a/consensus/polybft/state_event_getter.go b/consensus/polybft/state_event_getter.go new file mode 100644 index 0000000000..7151be3564 --- /dev/null +++ b/consensus/polybft/state_event_getter.go @@ -0,0 +1,91 @@ +package polybft + +import ( + "github.com/0xPolygon/polygon-edge/blockchain" + "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" + "github.com/0xPolygon/polygon-edge/types" + "github.com/umbracle/ethgo" +) + +// eventsGetter is a struct for getting missed and current events +// of specified type from specified blocks +type eventsGetter[T contractsapi.EventAbi] struct { + blockchain blockchainBackend + saveEventsFn func([]T) error + parseEventFn func(*types.Header, *ethgo.Log) (T, bool, error) + isValidLogFn func(*types.Log) bool +} + +// getFromBlocks gets events of specified type from specified blocks +// and saves them using the provided saveEventsFn +func (m *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error { + var missedEvents []T + + for i := fromBlock; i <= toBlock; i++ { + blockHeader, found := m.blockchain.GetHeaderByNumber(i) + if !found { + return blockchain.ErrNoBlock + } + + receipts, err := m.blockchain.GetReceiptsByHash(blockHeader.Hash) + if err != nil { + return err + } + + eventsFromBlock, err := m.getEventsFromReceipts(blockHeader, receipts) + if err != nil { + return err + } + + missedEvents = append(missedEvents, eventsFromBlock...) + } + + if len(missedEvents) > 0 { + return m.saveEventsFn(missedEvents) + } + + return nil +} + +// getEventsFromReceipts returns events of specified type from block transaction receipts +func (m *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header, + receipts []*types.Receipt) ([]T, error) { + var events []T + + for _, receipt := range receipts { + if receipt.Status == nil || *receipt.Status != types.ReceiptSuccess { + continue + } + + for _, log := range receipt.Logs { + if m.isValidLogFn != nil && !m.isValidLogFn(log) { + continue + } + + event, doesMatch, err := m.parseEventFn(blockHeader, convertLog(log)) + if err != nil { + return nil, err + } + + if !doesMatch { + continue + } + + events = append(events, event) + } + } + + return events, nil +} + +// saveBlockEvents gets events of specified block from block receipts +// and saves them using the provided saveEventsFn +func (m *eventsGetter[T]) saveBlockEvents(blockHeader *types.Header, + receipts []*types.Receipt) error { + events, err := m.getEventsFromReceipts(blockHeader, receipts) + if err != nil { + return err + } + + return m.saveEventsFn(events) +} diff --git a/consensus/polybft/state_retry_test.go b/consensus/polybft/state_event_getter_test.go similarity index 87% rename from consensus/polybft/state_retry_test.go rename to consensus/polybft/state_event_getter_test.go index 2dcf370760..3a2cc06508 100644 --- a/consensus/polybft/state_retry_test.go +++ b/consensus/polybft/state_event_getter_test.go @@ -11,7 +11,7 @@ import ( "github.com/umbracle/ethgo" ) -func TestEventDBInsertRetry_TransferEvents(t *testing.T) { +func TestEventDBInsertRetry_GetEvents(t *testing.T) { receipt := &types.Receipt{ Logs: []*types.Log{ createTestLogForTransferEvent(t, contracts.ValidatorSetContract, types.ZeroAddress, types.ZeroAddress, 10), @@ -31,7 +31,7 @@ func TestEventDBInsertRetry_TransferEvents(t *testing.T) { return nil } - retryManager := &eventDBInsertRetry[*contractsapi.TransferEvent]{ + retryManager := &eventsGetter[*contractsapi.TransferEvent]{ blockchain: backend, saveEventsFn: saveEventsFn, isValidLogFn: func(l *types.Log) bool { @@ -45,5 +45,5 @@ func TestEventDBInsertRetry_TransferEvents(t *testing.T) { }, } - require.NoError(t, retryManager.insertRetry(1, 1)) + require.NoError(t, retryManager.getFromBlocks(1, 1)) } diff --git a/consensus/polybft/state_retry.go b/consensus/polybft/state_retry.go deleted file mode 100644 index b6fa60206f..0000000000 --- a/consensus/polybft/state_retry.go +++ /dev/null @@ -1,60 +0,0 @@ -package polybft - -import ( - "github.com/0xPolygon/polygon-edge/blockchain" - "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" - "github.com/0xPolygon/polygon-edge/types" - "github.com/umbracle/ethgo" -) - -type eventDBInsertRetry[T contractsapi.EventAbi] struct { - blockchain blockchainBackend - saveEventsFn func([]T) error - parseEventFn func(*types.Header, *ethgo.Log) (T, bool, error) - isValidLogFn func(*types.Log) bool -} - -func (r *eventDBInsertRetry[T]) insertRetry(fromBlock, toBlock uint64) error { - events := make([]T, 0) - - for i := fromBlock; i <= toBlock; i++ { - blockHeader, found := r.blockchain.GetHeaderByNumber(i) - if !found { - return blockchain.ErrNoBlock - } - - receipts, err := r.blockchain.GetReceiptsByHash(blockHeader.Hash) - if err != nil { - return err - } - - for _, receipt := range receipts { - if receipt.Status == nil || *receipt.Status != types.ReceiptSuccess { - continue - } - - for _, log := range receipt.Logs { - if r.isValidLogFn != nil && !r.isValidLogFn(log) { - continue - } - - event, doesMatch, err := r.parseEventFn(blockHeader, convertLog(log)) - if err != nil { - return err - } - - if !doesMatch { - continue - } - - events = append(events, event) - } - } - } - - if len(events) > 0 { - return r.saveEventsFn(events) - } - - return nil -} diff --git a/consensus/polybft/state_store_checkpoint.go b/consensus/polybft/state_store_checkpoint.go index d7a5507f21..04d74175b8 100644 --- a/consensus/polybft/state_store_checkpoint.go +++ b/consensus/polybft/state_store_checkpoint.go @@ -3,6 +3,7 @@ package polybft import ( "bytes" "encoding/json" + "errors" "fmt" "sort" @@ -17,6 +18,10 @@ var ( // bucket to store exit contract events exitEventsBucket = []byte("exitEvent") exitEventToEpochLookupBucket = []byte("exitIdToEpochLookup") + exitEventLastSavedBucket = []byte("lastSavedExitEvent") + + lastSavedEventKey = []byte("lastSaved") + errNoLastSavedEntry = errors.New("there is no last saved block in last saved bucket") ) type exitEventNotFoundError struct { @@ -58,7 +63,11 @@ func (s *CheckpointStore) initialize(tx *bolt.Tx) error { return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventToEpochLookupBucket), err) } - return nil + if _, err := tx.CreateBucketIfNotExists(exitEventLastSavedBucket); err != nil { + return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventLastSavedBucket), err) + } + + return tx.Bucket(exitEventLastSavedBucket).Put(lastSavedEventKey, common.EncodeUint64ToBytes(0)) } // insertExitEvents inserts a slice of exit events to exit event bucket in bolt db @@ -177,6 +186,32 @@ func (s *CheckpointStore) getExitEvents(epoch uint64, filter func(exitEvent *Exi return events, err } +// updateLastSaved saves the last block processed for exit events +func (s *CheckpointStore) updateLastSaved(blockNumber uint64) error { + return s.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(exitEventLastSavedBucket).Put(lastSavedEventKey, + common.EncodeUint64ToBytes(blockNumber)) + }) +} + +// updateLastSaved saves the last block processed for exit events +func (s *CheckpointStore) getLastSaved() (uint64, error) { + var lastSavedBlock uint64 + + err := s.db.View(func(tx *bolt.Tx) error { + v := tx.Bucket(exitEventLastSavedBucket).Get(lastSavedEventKey) + if v == nil { + return errNoLastSavedEntry + } + + lastSavedBlock = common.EncodeBytesToUint64(v) + + return nil + }) + + return lastSavedBlock, err +} + // decodeExitEvent tries to decode exit event from the provided log func decodeExitEvent(log *ethgo.Log, epoch, block uint64) (*ExitEvent, error) { var l2StateSyncedEvent contractsapi.L2StateSyncedEvent From 80cc365424449ffc036f1cdc157f2365ea13c8ab Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Fri, 23 Jun 2023 14:47:24 +0200 Subject: [PATCH 04/11] Event tracker error --- consensus/polybft/state_event_getter.go | 31 ++++++++++++------- consensus/polybft/state_store_checkpoint.go | 21 +++++++------ consensus/polybft/state_sync_manager.go | 12 ++++--- .../statesyncrelayer/state_sync_relayer.go | 10 ++++-- tracker/event_tracker.go | 2 +- tracker/event_tracker_store.go | 16 ++++++---- tracker/event_tracker_test.go | 4 ++- 7 files changed, 59 insertions(+), 37 deletions(-) diff --git a/consensus/polybft/state_event_getter.go b/consensus/polybft/state_event_getter.go index 7151be3564..1100a9a925 100644 --- a/consensus/polybft/state_event_getter.go +++ b/consensus/polybft/state_event_getter.go @@ -10,29 +10,36 @@ import ( // eventsGetter is a struct for getting missed and current events // of specified type from specified blocks type eventsGetter[T contractsapi.EventAbi] struct { - blockchain blockchainBackend + // blockchain is an abstraction of blockchain that provides necessary functions + // for querying blockchain data (blocks, receipts, etc.) + blockchain blockchainBackend + // saveEventsFn is a plugin function used to return gotten events + // and/or to save them to some db saveEventsFn func([]T) error + // parseEventFn is a plugin function used to parse the event from transaction log parseEventFn func(*types.Header, *ethgo.Log) (T, bool, error) + // isValidLogFn is a plugin function that validates the log + // for example: if it was sent from the desired address isValidLogFn func(*types.Log) bool } // getFromBlocks gets events of specified type from specified blocks // and saves them using the provided saveEventsFn -func (m *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error { +func (e *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error { var missedEvents []T for i := fromBlock; i <= toBlock; i++ { - blockHeader, found := m.blockchain.GetHeaderByNumber(i) + blockHeader, found := e.blockchain.GetHeaderByNumber(i) if !found { return blockchain.ErrNoBlock } - receipts, err := m.blockchain.GetReceiptsByHash(blockHeader.Hash) + receipts, err := e.blockchain.GetReceiptsByHash(blockHeader.Hash) if err != nil { return err } - eventsFromBlock, err := m.getEventsFromReceipts(blockHeader, receipts) + eventsFromBlock, err := e.getEventsFromReceipts(blockHeader, receipts) if err != nil { return err } @@ -41,14 +48,14 @@ func (m *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error { } if len(missedEvents) > 0 { - return m.saveEventsFn(missedEvents) + return e.saveEventsFn(missedEvents) } return nil } // getEventsFromReceipts returns events of specified type from block transaction receipts -func (m *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header, +func (e *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header, receipts []*types.Receipt) ([]T, error) { var events []T @@ -58,11 +65,11 @@ func (m *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header, } for _, log := range receipt.Logs { - if m.isValidLogFn != nil && !m.isValidLogFn(log) { + if e.isValidLogFn != nil && !e.isValidLogFn(log) { continue } - event, doesMatch, err := m.parseEventFn(blockHeader, convertLog(log)) + event, doesMatch, err := e.parseEventFn(blockHeader, convertLog(log)) if err != nil { return nil, err } @@ -80,12 +87,12 @@ func (m *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header, // saveBlockEvents gets events of specified block from block receipts // and saves them using the provided saveEventsFn -func (m *eventsGetter[T]) saveBlockEvents(blockHeader *types.Header, +func (e *eventsGetter[T]) saveBlockEvents(blockHeader *types.Header, receipts []*types.Receipt) error { - events, err := m.getEventsFromReceipts(blockHeader, receipts) + events, err := e.getEventsFromReceipts(blockHeader, receipts) if err != nil { return err } - return m.saveEventsFn(events) + return e.saveEventsFn(events) } diff --git a/consensus/polybft/state_store_checkpoint.go b/consensus/polybft/state_store_checkpoint.go index 04d74175b8..c5f0df8986 100644 --- a/consensus/polybft/state_store_checkpoint.go +++ b/consensus/polybft/state_store_checkpoint.go @@ -16,12 +16,12 @@ import ( var ( // bucket to store exit contract events - exitEventsBucket = []byte("exitEvent") - exitEventToEpochLookupBucket = []byte("exitIdToEpochLookup") - exitEventLastSavedBucket = []byte("lastSavedExitEvent") + exitEventsBucket = []byte("exitEvent") + exitEventToEpochLookupBucket = []byte("exitIdToEpochLookup") + exitEventLastProcessedBlockBucket = []byte("lastProcessedBlock") - lastSavedEventKey = []byte("lastSaved") - errNoLastSavedEntry = errors.New("there is no last saved block in last saved bucket") + lastProcessedBlockKey = []byte("lastProcessedBlock") + errNoLastSavedEntry = errors.New("there is no last saved block in last saved bucket") ) type exitEventNotFoundError struct { @@ -48,6 +48,7 @@ Bolt DB schema: exit events/ |--> (id+epoch+blockNumber) -> *ExitEvent (json marshalled) |--> (exitEventID) -> epochNumber +|--> (lastProcessedBlockKey) -> block number */ type CheckpointStore struct { db *bolt.DB @@ -63,11 +64,11 @@ func (s *CheckpointStore) initialize(tx *bolt.Tx) error { return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventToEpochLookupBucket), err) } - if _, err := tx.CreateBucketIfNotExists(exitEventLastSavedBucket); err != nil { - return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventLastSavedBucket), err) + if _, err := tx.CreateBucketIfNotExists(exitEventLastProcessedBlockBucket); err != nil { + return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventLastProcessedBlockBucket), err) } - return tx.Bucket(exitEventLastSavedBucket).Put(lastSavedEventKey, common.EncodeUint64ToBytes(0)) + return tx.Bucket(exitEventLastProcessedBlockBucket).Put(lastProcessedBlockKey, common.EncodeUint64ToBytes(0)) } // insertExitEvents inserts a slice of exit events to exit event bucket in bolt db @@ -189,7 +190,7 @@ func (s *CheckpointStore) getExitEvents(epoch uint64, filter func(exitEvent *Exi // updateLastSaved saves the last block processed for exit events func (s *CheckpointStore) updateLastSaved(blockNumber uint64) error { return s.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(exitEventLastSavedBucket).Put(lastSavedEventKey, + return tx.Bucket(exitEventLastProcessedBlockBucket).Put(lastProcessedBlockKey, common.EncodeUint64ToBytes(blockNumber)) }) } @@ -199,7 +200,7 @@ func (s *CheckpointStore) getLastSaved() (uint64, error) { var lastSavedBlock uint64 err := s.db.View(func(tx *bolt.Tx) error { - v := tx.Bucket(exitEventLastSavedBucket).Get(lastSavedEventKey) + v := tx.Bucket(exitEventLastProcessedBlockBucket).Get(lastProcessedBlockKey) if v == nil { return errNoLastSavedEntry } diff --git a/consensus/polybft/state_sync_manager.go b/consensus/polybft/state_sync_manager.go index 75a036ec44..ad6cbc1cd4 100644 --- a/consensus/polybft/state_sync_manager.go +++ b/consensus/polybft/state_sync_manager.go @@ -231,12 +231,12 @@ func (s *stateSyncManager) verifyVoteSignature(valSet validator.ValidatorSet, si } // AddLog saves the received log from event tracker if it matches a state sync event ABI -func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) { +func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) error { event := &contractsapi.StateSyncedEvent{} doesMatch, err := event.ParseLog(eventLog) if !doesMatch { - return + return nil } s.logger.Info( @@ -249,18 +249,22 @@ func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) { if err != nil { s.logger.Error("could not decode state sync event", "err", err) - return + return err } if err := s.state.StateSyncStore.insertStateSyncEvent(event); err != nil { s.logger.Error("could not save state sync event to boltDb", "err", err) - return + return err } if err := s.buildCommitment(); err != nil { + // we don't return an error here. If state sync event is inserted in db, + // we will just try to build a commitment on next block or next event arrival s.logger.Error("could not build a commitment on arrival of new state sync", "err", err, "stateSyncID", event.ID) } + + return nil } // Commitment returns a commitment to be submitted if there is a pending commitment with quorum diff --git a/consensus/polybft/statesyncrelayer/state_sync_relayer.go b/consensus/polybft/statesyncrelayer/state_sync_relayer.go index 17a108581a..33232387ea 100644 --- a/consensus/polybft/statesyncrelayer/state_sync_relayer.go +++ b/consensus/polybft/statesyncrelayer/state_sync_relayer.go @@ -107,20 +107,20 @@ func (r *StateSyncRelayer) Stop() { close(r.closeCh) } -func (r *StateSyncRelayer) AddLog(log *ethgo.Log) { +func (r *StateSyncRelayer) AddLog(log *ethgo.Log) error { r.logger.Debug("Received a log", "log", log) var commitEvent contractsapi.NewCommitmentEvent doesMatch, err := commitEvent.ParseLog(log) if !doesMatch { - return + return nil } if err != nil { r.logger.Error("Failed to parse log", "err", err) - return + return err } startID := commitEvent.StartID.Uint64() @@ -128,6 +128,8 @@ func (r *StateSyncRelayer) AddLog(log *ethgo.Log) { r.logger.Info("Execute commitment", "Block", log.BlockNumber, "StartID", startID, "EndID", endID) + // we don't return errors if some client logic fails, + // only if event is not parsed for i := startID; i <= endID; i++ { // query the state sync proof stateSyncProof, err := r.queryStateSyncProof(fmt.Sprintf("0x%x", i)) @@ -145,6 +147,8 @@ func (r *StateSyncRelayer) AddLog(log *ethgo.Log) { r.logger.Info("State sync executed", "ID", i) } + + return nil } // queryStateSyncProof queries the state sync proof diff --git a/tracker/event_tracker.go b/tracker/event_tracker.go index 6ecca5c926..753c42aaae 100644 --- a/tracker/event_tracker.go +++ b/tracker/event_tracker.go @@ -15,7 +15,7 @@ import ( const minBlockMaxBacklog = 96 type eventSubscription interface { - AddLog(log *ethgo.Log) + AddLog(log *ethgo.Log) error } type EventTracker struct { diff --git a/tracker/event_tracker_store.go b/tracker/event_tracker_store.go index 151e59bc8c..12e4a3cfb0 100644 --- a/tracker/event_tracker_store.go +++ b/tracker/event_tracker_store.go @@ -124,6 +124,8 @@ func (b *EventTrackerStore) Set(k, v string) error { if strings.HasPrefix(k, dbLastBlockPrefix) { if err := b.onNewBlock(k[len(dbLastBlockPrefix):], v); err != nil { b.logger.Warn("new block error", "err", err) + + return err } } @@ -160,17 +162,19 @@ func (b *EventTrackerStore) onNewBlock(filterHash, blockData string) error { return nil // nothing to process } - nextToProcessIdx := common.EncodeBytesToUint64(lastProcessedKey) + 1 + // notify subscriber with logs + for _, log := range logs { + if err := b.subscriber.AddLog(log); err != nil { + return err + } + } + // save next to process only if every AddLog finished successfully + nextToProcessIdx := common.EncodeBytesToUint64(lastProcessedKey) + 1 if err := entry.saveNextToProcessIndx(nextToProcessIdx); err != nil { return err } - // notify subscriber with logs - for _, log := range logs { - b.subscriber.AddLog(log) - } - b.logger.Debug("Event logs have been notified to a subscriber", "len", len(logs), "next", nextToProcessIdx) return nil diff --git a/tracker/event_tracker_test.go b/tracker/event_tracker_test.go index 669569e5a1..bfc1c00e0a 100644 --- a/tracker/event_tracker_test.go +++ b/tracker/event_tracker_test.go @@ -20,7 +20,7 @@ type mockEventSubscriber struct { logs []*ethgo.Log } -func (m *mockEventSubscriber) AddLog(log *ethgo.Log) { +func (m *mockEventSubscriber) AddLog(log *ethgo.Log) error { m.lock.Lock() defer m.lock.Unlock() @@ -29,6 +29,8 @@ func (m *mockEventSubscriber) AddLog(log *ethgo.Log) { } m.logs = append(m.logs, log) + + return nil } func (m *mockEventSubscriber) len() int { From bb8ee43f78e7734d6f7041420ba2756d640712ad Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Wed, 5 Jul 2023 11:40:26 +0200 Subject: [PATCH 05/11] Lint fix --- consensus/polybft/state_sync_manager_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/consensus/polybft/state_sync_manager_test.go b/consensus/polybft/state_sync_manager_test.go index c9defef33e..b04588bd76 100644 --- a/consensus/polybft/state_sync_manager_test.go +++ b/consensus/polybft/state_sync_manager_test.go @@ -335,7 +335,7 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { s := newTestStateSyncManager(t, vals.GetValidator("0"), &mockRuntime{isActiveValidator: true}) // empty log which is not an state sync - s.AddLog(ðgo.Log{}) + require.NoError(t, s.AddLog(ðgo.Log{})) stateSyncs, err := s.state.StateSyncStore.list() require.NoError(t, err) @@ -346,7 +346,7 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { stateSyncEventID := stateSyncedEvent.Sig() // log with the state sync topic but incorrect content - s.AddLog(ðgo.Log{Topics: []ethgo.Hash{stateSyncEventID}}) + require.Error(t, s.AddLog(ðgo.Log{Topics: []ethgo.Hash{stateSyncEventID}})) stateSyncs, err = s.state.StateSyncStore.list() require.NoError(t, err) @@ -366,7 +366,7 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { Data: data, } - s.AddLog(goodLog) + require.NoError(t, s.AddLog(goodLog)) stateSyncs, err = s.state.StateSyncStore.getStateSyncEventsForCommitment(0, 0) require.NoError(t, err) @@ -378,7 +378,7 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { // add one more log to have a minimum commitment goodLog2 := goodLog.Copy() goodLog2.Topics[1] = ethgo.BytesToHash([]byte{0x1}) // state sync index 1 - s.AddLog(goodLog2) + require.NoError(t, s.AddLog(goodLog2)) require.Len(t, s.pendingCommitments, 2) require.Equal(t, uint64(0), s.pendingCommitments[1].StartID.Uint64()) @@ -387,11 +387,11 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { // add two more logs to have larger commitments goodLog3 := goodLog.Copy() goodLog3.Topics[1] = ethgo.BytesToHash([]byte{0x2}) // state sync index 2 - s.AddLog(goodLog3) + require.NoError(t, s.AddLog(goodLog3)) goodLog4 := goodLog.Copy() goodLog4.Topics[1] = ethgo.BytesToHash([]byte{0x3}) // state sync index 3 - s.AddLog(goodLog4) + require.NoError(t, s.AddLog(goodLog4)) require.Len(t, s.pendingCommitments, 4) require.Equal(t, uint64(0), s.pendingCommitments[3].StartID.Uint64()) @@ -419,7 +419,7 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { Data: data, } - s.AddLog(goodLog) + require.NoError(t, s.AddLog(goodLog)) // node should have inserted given state sync event, but it shouldn't build any commitment stateSyncs, err := s.state.StateSyncStore.getStateSyncEventsForCommitment(0, 0) From 438276dc1f3d1f6bff0dc8a14313b410a5f9862c Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Wed, 5 Jul 2023 11:40:38 +0200 Subject: [PATCH 06/11] Comments fix --- consensus/polybft/contractsapi/helper.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/consensus/polybft/contractsapi/helper.go b/consensus/polybft/contractsapi/helper.go index 3a364e7bdd..6332104a6f 100644 --- a/consensus/polybft/contractsapi/helper.go +++ b/consensus/polybft/contractsapi/helper.go @@ -16,8 +16,11 @@ type StateTransactionInput interface { // EventAbi is an interface representing an event generated in contractsapi type EventAbi interface { + // Sig returns the event ABI signature or ID (which is unique for all event types) Sig() ethgo.Hash + // Encode does abi encoding of given event Encode(inputs interface{}) ([]byte, error) + // ParseLog parses the provided receipt log to given event type ParseLog(log *ethgo.Log) (bool, error) } From e2abe2656c00ffd57be8f4a353e2d251676ba9a7 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Wed, 5 Jul 2023 12:33:57 +0200 Subject: [PATCH 07/11] Comments fix --- consensus/polybft/checkpoint_manager.go | 49 +++++++++++++------------ 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/consensus/polybft/checkpoint_manager.go b/consensus/polybft/checkpoint_manager.go index 05984f6031..dc3e240ee4 100644 --- a/consensus/polybft/checkpoint_manager.go +++ b/consensus/polybft/checkpoint_manager.go @@ -90,29 +90,7 @@ func newCheckpointManager(key ethgo.Key, checkpointOffset uint64, return state.CheckpointStore.insertExitEvents(events) }, - parseEventFn: func(h *types.Header, l *ethgo.Log) (*ExitEvent, bool, error) { - extra, err := GetIbftExtra(h.ExtraData) - if err != nil { - return nil, false, - fmt.Errorf("could not get header extra on exit event parsing. Error: %w", err) - } - - epoch := extra.Checkpoint.EpochNumber - block := h.Number - if extra.Validators != nil { - // exit events that happened in epoch ending blocks, - // should be added to the tree of the next epoch - epoch++ - block++ - } - - event, err := decodeExitEvent(l, epoch, block) - if err != nil { - return nil, false, err - } - - return event, true, nil - }, + parseEventFn: parseExitEvent, } return &checkpointManager{ @@ -497,3 +475,28 @@ func createExitTree(exitEvents []*ExitEvent) (*merkle.MerkleTree, error) { return merkle.NewMerkleTree(data) } + +// parseExitEvent parses exit event from provided log +func parseExitEvent(h *types.Header, l *ethgo.Log) (*ExitEvent, bool, error) { + extra, err := GetIbftExtra(h.ExtraData) + if err != nil { + return nil, false, + fmt.Errorf("could not get header extra on exit event parsing. Error: %w", err) + } + + epoch := extra.Checkpoint.EpochNumber + block := h.Number + if extra.Validators != nil { + // exit events that happened in epoch ending blocks, + // should be added to the tree of the next epoch + epoch++ + block++ + } + + event, err := decodeExitEvent(l, epoch, block) + if err != nil { + return nil, false, err + } + + return event, true, nil +} From 2d01560373f7426410c9a06ed66e2e1005949874 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Wed, 5 Jul 2023 12:35:54 +0200 Subject: [PATCH 08/11] Lint fix --- consensus/polybft/checkpoint_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/consensus/polybft/checkpoint_manager.go b/consensus/polybft/checkpoint_manager.go index dc3e240ee4..40733a626d 100644 --- a/consensus/polybft/checkpoint_manager.go +++ b/consensus/polybft/checkpoint_manager.go @@ -486,6 +486,7 @@ func parseExitEvent(h *types.Header, l *ethgo.Log) (*ExitEvent, bool, error) { epoch := extra.Checkpoint.EpochNumber block := h.Number + if extra.Validators != nil { // exit events that happened in epoch ending blocks, // should be added to the tree of the next epoch From a81c825b664158501a50db948e0e37258ceec3b8 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 6 Jul 2023 15:23:27 +0200 Subject: [PATCH 09/11] Comments fix --- consensus/polybft/checkpoint_manager.go | 16 +--------- consensus/polybft/stake_manager.go | 16 +++------- consensus/polybft/state_event_getter.go | 31 ++++++++------------ consensus/polybft/state_event_getter_test.go | 5 +++- 4 files changed, 22 insertions(+), 46 deletions(-) diff --git a/consensus/polybft/checkpoint_manager.go b/consensus/polybft/checkpoint_manager.go index 40733a626d..6705b7dc6a 100644 --- a/consensus/polybft/checkpoint_manager.go +++ b/consensus/polybft/checkpoint_manager.go @@ -299,22 +299,8 @@ func (c *checkpointManager) PostBlock(req *PostBlockRequest) error { return fmt.Errorf("could not get last processed block for exit events. Error: %w", err) } - // get any missed events - if lastBlock+1 < block { - // since we save exit events from epoch ending blocks, - // to next epoch and next block, check that block as well - // to not miss any events that might happen in it - if err := c.eventGetter.getFromBlocks(lastBlock+1, block-1); err != nil { - return fmt.Errorf("could not get exit events from missed blocks. Error: %w", err) - } - - if err := c.state.CheckpointStore.updateLastSaved(block - 1); err != nil { - return err - } - } - // get exit events from current block - if err := c.eventGetter.saveBlockEvents(req.FullBlock.Block.Header, req.FullBlock.Receipts); err != nil { + if err := c.eventGetter.getFromBlocks(lastBlock, req.FullBlock); err != nil { return err } diff --git a/consensus/polybft/stake_manager.go b/consensus/polybft/stake_manager.go index a6764622aa..e4a5030f1e 100644 --- a/consensus/polybft/stake_manager.go +++ b/consensus/polybft/stake_manager.go @@ -140,21 +140,11 @@ func (s *stakeManager) updateWithReceipts( }, } - if fullValidatorSet.BlockNumber+1 < fullBlock.Block.Number() { - // get transfer events we missed until current block - if err := eventsGetter.getFromBlocks(fullValidatorSet.BlockNumber+1, - fullBlock.Block.Number()-1); err != nil { - return fmt.Errorf("could not get transfer events from missed blocks. Error: %w", err) - } - } - // get transfer currentBlockEvents from current block - currentBlockEvents, err := eventsGetter.getEventsFromReceipts(fullBlock.Block.Header, fullBlock.Receipts) - if err != nil { + if err := eventsGetter.getFromBlocks(fullValidatorSet.BlockNumber, fullBlock); err != nil { return fmt.Errorf("could not get transfer events from current block. Error: %w", err) } - transferEvents = append(transferEvents, currentBlockEvents...) if len(transferEvents) == 0 { return nil } @@ -178,11 +168,13 @@ func (s *stakeManager) updateWithReceipts( for addr, data := range fullValidatorSet.Validators { if data.BlsKey == nil { - data.BlsKey, err = s.getBlsKey(data.Address) + blsKey, err := s.getBlsKey(data.Address) if err != nil { s.logger.Warn("Could not get info for new validator", "block", fullBlock.Block.Number(), "address", addr) } + + data.BlsKey = blsKey } data.IsActive = data.VotingPower.Cmp(bigZero) > 0 diff --git a/consensus/polybft/state_event_getter.go b/consensus/polybft/state_event_getter.go index 1100a9a925..69fe081f24 100644 --- a/consensus/polybft/state_event_getter.go +++ b/consensus/polybft/state_event_getter.go @@ -25,10 +25,10 @@ type eventsGetter[T contractsapi.EventAbi] struct { // getFromBlocks gets events of specified type from specified blocks // and saves them using the provided saveEventsFn -func (e *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error { - var missedEvents []T +func (e *eventsGetter[T]) getFromBlocks(lastProcessedBlock uint64, currentBlock *types.FullBlock) error { + var allEvents []T - for i := fromBlock; i <= toBlock; i++ { + for i := lastProcessedBlock + 1; i < currentBlock.Block.Number(); i++ { blockHeader, found := e.blockchain.GetHeaderByNumber(i) if !found { return blockchain.ErrNoBlock @@ -44,11 +44,18 @@ func (e *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error { return err } - missedEvents = append(missedEvents, eventsFromBlock...) + allEvents = append(allEvents, eventsFromBlock...) } - if len(missedEvents) > 0 { - return e.saveEventsFn(missedEvents) + currentEvents, err := e.getEventsFromReceipts(currentBlock.Block.Header, currentBlock.Receipts) + if err != nil { + return err + } + + allEvents = append(allEvents, currentEvents...) + + if len(allEvents) > 0 { + return e.saveEventsFn(allEvents) } return nil @@ -84,15 +91,3 @@ func (e *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header, return events, nil } - -// saveBlockEvents gets events of specified block from block receipts -// and saves them using the provided saveEventsFn -func (e *eventsGetter[T]) saveBlockEvents(blockHeader *types.Header, - receipts []*types.Receipt) error { - events, err := e.getEventsFromReceipts(blockHeader, receipts) - if err != nil { - return err - } - - return e.saveEventsFn(events) -} diff --git a/consensus/polybft/state_event_getter_test.go b/consensus/polybft/state_event_getter_test.go index 3a2cc06508..9b739671eb 100644 --- a/consensus/polybft/state_event_getter_test.go +++ b/consensus/polybft/state_event_getter_test.go @@ -45,5 +45,8 @@ func TestEventDBInsertRetry_GetEvents(t *testing.T) { }, } - require.NoError(t, retryManager.getFromBlocks(1, 1)) + require.NoError(t, retryManager.getFromBlocks(0, &types.FullBlock{ + Block: &types.Block{Header: &types.Header{Number: 2}}, + Receipts: []*types.Receipt{}, + })) } From fbe95a3aaa03547258e9b8825842079045f49b43 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 6 Jul 2023 18:56:15 +0200 Subject: [PATCH 10/11] Comments fix --- consensus/polybft/checkpoint_manager.go | 21 ++++++------ consensus/polybft/stake_manager.go | 36 +++++++++----------- consensus/polybft/state_event_getter.go | 20 ++++------- consensus/polybft/state_event_getter_test.go | 16 ++++----- 4 files changed, 41 insertions(+), 52 deletions(-) diff --git a/consensus/polybft/checkpoint_manager.go b/consensus/polybft/checkpoint_manager.go index 6705b7dc6a..bf69f93d05 100644 --- a/consensus/polybft/checkpoint_manager.go +++ b/consensus/polybft/checkpoint_manager.go @@ -82,14 +82,6 @@ func newCheckpointManager(key ethgo.Key, checkpointOffset uint64, isValidLogFn: func(l *types.Log) bool { return l.Address == contracts.L2StateSenderContract }, - saveEventsFn: func(events []*ExitEvent) error { - // enforce sequential order - sort.Slice(events, func(i, j int) bool { - return events[i].ID.Cmp(events[j].ID) < 0 - }) - - return state.CheckpointStore.insertExitEvents(events) - }, parseEventFn: parseExitEvent, } @@ -299,8 +291,17 @@ func (c *checkpointManager) PostBlock(req *PostBlockRequest) error { return fmt.Errorf("could not get last processed block for exit events. Error: %w", err) } - // get exit events from current block - if err := c.eventGetter.getFromBlocks(lastBlock, req.FullBlock); err != nil { + exitEvents, err := c.eventGetter.getFromBlocks(lastBlock, req.FullBlock) + if err != nil { + return err + } + + sort.Slice(exitEvents, func(i, j int) bool { + // keep events in sequential order + return exitEvents[i].ID.Cmp(exitEvents[j].ID) < 0 + }) + + if err := c.state.CheckpointStore.insertExitEvents(exitEvents); err != nil { return err } diff --git a/consensus/polybft/stake_manager.go b/consensus/polybft/stake_manager.go index e4a5030f1e..042bbbe662 100644 --- a/consensus/polybft/stake_manager.go +++ b/consensus/polybft/stake_manager.go @@ -57,6 +57,7 @@ type stakeManager struct { supernetManagerContract types.Address maxValidatorSetSize int blockchain blockchainBackend + eventsGetter *eventsGetter[*contractsapi.TransferEvent] } // newStakeManager returns a new instance of stake manager @@ -69,6 +70,19 @@ func newStakeManager( blockchain blockchainBackend, maxValidatorSetSize int, ) *stakeManager { + eventsGetter := &eventsGetter[*contractsapi.TransferEvent]{ + blockchain: blockchain, + isValidLogFn: func(l *types.Log) bool { + return l.Address == validatorSetAddr + }, + parseEventFn: func(h *types.Header, l *ethgo.Log) (*contractsapi.TransferEvent, bool, error) { + var transferEvent contractsapi.TransferEvent + doesMatch, err := transferEvent.ParseLog(l) + + return &transferEvent, doesMatch, err + }, + } + return &stakeManager{ logger: logger, state: state, @@ -78,6 +92,7 @@ func newStakeManager( supernetManagerContract: supernetManagerAddr, maxValidatorSetSize: maxValidatorSetSize, blockchain: blockchain, + eventsGetter: eventsGetter, } } @@ -122,26 +137,9 @@ func (s *stakeManager) updateWithReceipts( fullValidatorSet *validatorSetState, fullBlock *types.FullBlock) error { var transferEvents []*contractsapi.TransferEvent - eventsGetter := &eventsGetter[*contractsapi.TransferEvent]{ - blockchain: s.blockchain, - saveEventsFn: func(te []*contractsapi.TransferEvent) error { - transferEvents = append(transferEvents, te...) - - return nil - }, - isValidLogFn: func(l *types.Log) bool { - return l.Address == s.validatorSetContract - }, - parseEventFn: func(h *types.Header, l *ethgo.Log) (*contractsapi.TransferEvent, bool, error) { - var transferEvent contractsapi.TransferEvent - doesMatch, err := transferEvent.ParseLog(l) - - return &transferEvent, doesMatch, err - }, - } - // get transfer currentBlockEvents from current block - if err := eventsGetter.getFromBlocks(fullValidatorSet.BlockNumber, fullBlock); err != nil { + transferEvents, err := s.eventsGetter.getFromBlocks(fullValidatorSet.BlockNumber, fullBlock) + if err != nil { return fmt.Errorf("could not get transfer events from current block. Error: %w", err) } diff --git a/consensus/polybft/state_event_getter.go b/consensus/polybft/state_event_getter.go index 69fe081f24..46912d3d28 100644 --- a/consensus/polybft/state_event_getter.go +++ b/consensus/polybft/state_event_getter.go @@ -13,9 +13,6 @@ type eventsGetter[T contractsapi.EventAbi] struct { // blockchain is an abstraction of blockchain that provides necessary functions // for querying blockchain data (blocks, receipts, etc.) blockchain blockchainBackend - // saveEventsFn is a plugin function used to return gotten events - // and/or to save them to some db - saveEventsFn func([]T) error // parseEventFn is a plugin function used to parse the event from transaction log parseEventFn func(*types.Header, *ethgo.Log) (T, bool, error) // isValidLogFn is a plugin function that validates the log @@ -25,23 +22,24 @@ type eventsGetter[T contractsapi.EventAbi] struct { // getFromBlocks gets events of specified type from specified blocks // and saves them using the provided saveEventsFn -func (e *eventsGetter[T]) getFromBlocks(lastProcessedBlock uint64, currentBlock *types.FullBlock) error { +func (e *eventsGetter[T]) getFromBlocks(lastProcessedBlock uint64, + currentBlock *types.FullBlock) ([]T, error) { var allEvents []T for i := lastProcessedBlock + 1; i < currentBlock.Block.Number(); i++ { blockHeader, found := e.blockchain.GetHeaderByNumber(i) if !found { - return blockchain.ErrNoBlock + return nil, blockchain.ErrNoBlock } receipts, err := e.blockchain.GetReceiptsByHash(blockHeader.Hash) if err != nil { - return err + return nil, err } eventsFromBlock, err := e.getEventsFromReceipts(blockHeader, receipts) if err != nil { - return err + return nil, err } allEvents = append(allEvents, eventsFromBlock...) @@ -49,16 +47,12 @@ func (e *eventsGetter[T]) getFromBlocks(lastProcessedBlock uint64, currentBlock currentEvents, err := e.getEventsFromReceipts(currentBlock.Block.Header, currentBlock.Receipts) if err != nil { - return err + return nil, err } allEvents = append(allEvents, currentEvents...) - if len(allEvents) > 0 { - return e.saveEventsFn(allEvents) - } - - return nil + return allEvents, nil } // getEventsFromReceipts returns events of specified type from block transaction receipts diff --git a/consensus/polybft/state_event_getter_test.go b/consensus/polybft/state_event_getter_test.go index 9b739671eb..92eaf77b25 100644 --- a/consensus/polybft/state_event_getter_test.go +++ b/consensus/polybft/state_event_getter_test.go @@ -25,15 +25,8 @@ func TestEventDBInsertRetry_GetEvents(t *testing.T) { }, true) backend.On("GetReceiptsByHash", mock.Anything).Return([]*types.Receipt{receipt}, nil) - saveEventsFn := func(events []*contractsapi.TransferEvent) error { - require.NotEmpty(t, events) - - return nil - } - retryManager := &eventsGetter[*contractsapi.TransferEvent]{ - blockchain: backend, - saveEventsFn: saveEventsFn, + blockchain: backend, isValidLogFn: func(l *types.Log) bool { return l.Address == contracts.ValidatorSetContract }, @@ -45,8 +38,11 @@ func TestEventDBInsertRetry_GetEvents(t *testing.T) { }, } - require.NoError(t, retryManager.getFromBlocks(0, &types.FullBlock{ + events, err := retryManager.getFromBlocks(0, &types.FullBlock{ Block: &types.Block{Header: &types.Header{Number: 2}}, Receipts: []*types.Receipt{}, - })) + }) + + require.NoError(t, err) + require.Len(t, events, 1) } From c6a737a26f8dbc5bd6b7041194bc435ecdfba0ff Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Thu, 6 Jul 2023 19:04:09 +0200 Subject: [PATCH 11/11] Comments fix --- consensus/polybft/stake_manager.go | 4 ---- consensus/polybft/stake_manager_fuzz_test.go | 6 ++++-- consensus/polybft/stake_manager_test.go | 11 ++++++----- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/consensus/polybft/stake_manager.go b/consensus/polybft/stake_manager.go index 042bbbe662..0f332cf7f2 100644 --- a/consensus/polybft/stake_manager.go +++ b/consensus/polybft/stake_manager.go @@ -53,10 +53,8 @@ type stakeManager struct { state *State rootChainRelayer txrelayer.TxRelayer key ethgo.Key - validatorSetContract types.Address supernetManagerContract types.Address maxValidatorSetSize int - blockchain blockchainBackend eventsGetter *eventsGetter[*contractsapi.TransferEvent] } @@ -88,10 +86,8 @@ func newStakeManager( state: state, rootChainRelayer: rootchainRelayer, key: key, - validatorSetContract: validatorSetAddr, supernetManagerContract: supernetManagerAddr, maxValidatorSetSize: maxValidatorSetSize, - blockchain: blockchain, eventsGetter: eventsGetter, } } diff --git a/consensus/polybft/stake_manager_fuzz_test.go b/consensus/polybft/stake_manager_fuzz_test.go index 9f49f0f2e3..532c2f6bab 100644 --- a/consensus/polybft/stake_manager_fuzz_test.go +++ b/consensus/polybft/stake_manager_fuzz_test.go @@ -147,12 +147,14 @@ func FuzzTestStakeManagerPostBlock(f *testing.F) { t.Skip() } + validatorSetAddr := types.StringToAddress("0x0001") + stakeManager := newStakeManager( hclog.NewNullLogger(), state, nil, wallet.NewEcdsaSigner(validators.GetValidator("A").Key()), - types.StringToAddress("0x0001"), + validatorSetAddr, types.StringToAddress("0x0002"), nil, 5, @@ -167,7 +169,7 @@ func FuzzTestStakeManagerPostBlock(f *testing.F) { Logs: []*types.Log{ createTestLogForTransferEvent( t, - stakeManager.validatorSetContract, + validatorSetAddr, validators.GetValidator(initialSetAliases[data.ValidatorID]).Address(), types.ZeroAddress, data.StakeValue, diff --git a/consensus/polybft/stake_manager_test.go b/consensus/polybft/stake_manager_test.go index 25125c5b59..30940bd67e 100644 --- a/consensus/polybft/stake_manager_test.go +++ b/consensus/polybft/stake_manager_test.go @@ -63,6 +63,7 @@ func TestStakeManager_PostBlock(t *testing.T) { newStake = uint64(100) firstValidator = uint64(0) secondValidator = uint64(1) + validatorSetAddr = types.StringToAddress("0x0001") ) state := newTestState(t) @@ -75,7 +76,7 @@ func TestStakeManager_PostBlock(t *testing.T) { state, nil, wallet.NewEcdsaSigner(validators.GetValidator("A").Key()), - types.StringToAddress("0x0001"), types.StringToAddress("0x0002"), + validatorSetAddr, types.StringToAddress("0x0002"), nil, 5, ) @@ -90,7 +91,7 @@ func TestStakeManager_PostBlock(t *testing.T) { Logs: []*types.Log{ createTestLogForTransferEvent( t, - stakeManager.validatorSetContract, + validatorSetAddr, validators.GetValidator(initialSetAliases[firstValidator]).Address(), types.ZeroAddress, 1, // initial validator stake was 1 @@ -146,7 +147,7 @@ func TestStakeManager_PostBlock(t *testing.T) { Logs: []*types.Log{ createTestLogForTransferEvent( t, - stakeManager.validatorSetContract, + validatorSetAddr, types.ZeroAddress, validators.GetValidator(initialSetAliases[secondValidator]).Address(), 250, @@ -213,7 +214,7 @@ func TestStakeManager_PostBlock(t *testing.T) { receipts[i] = &types.Receipt{Logs: []*types.Log{ createTestLogForTransferEvent( t, - stakeManager.validatorSetContract, + validatorSetAddr, types.ZeroAddress, validators.GetValidator(allAliases[i]).Address(), newStake, @@ -272,7 +273,7 @@ func TestStakeManager_PostBlock(t *testing.T) { receipt.Logs = []*types.Log{ createTestLogForTransferEvent( t, - stakeManager.validatorSetContract, + validatorSetAddr, types.ZeroAddress, validators.GetValidator(initialSetAliases[secondValidator]).Address(), 250,