From 99dce469ea1b55d380fde45700000e2d94b3ae78 Mon Sep 17 00:00:00 2001 From: Goran Rojovic Date: Fri, 23 Jun 2023 14:47:24 +0200 Subject: [PATCH] Event tracker error --- consensus/polybft/state_event_getter.go | 31 ++++++++++++------- consensus/polybft/state_store_checkpoint.go | 21 +++++++------ consensus/polybft/state_sync_manager.go | 12 ++++--- .../statesyncrelayer/state_sync_relayer.go | 10 ++++-- tracker/event_tracker.go | 2 +- tracker/event_tracker_store.go | 16 ++++++---- tracker/event_tracker_test.go | 4 ++- 7 files changed, 59 insertions(+), 37 deletions(-) diff --git a/consensus/polybft/state_event_getter.go b/consensus/polybft/state_event_getter.go index 7151be3564..1100a9a925 100644 --- a/consensus/polybft/state_event_getter.go +++ b/consensus/polybft/state_event_getter.go @@ -10,29 +10,36 @@ import ( // eventsGetter is a struct for getting missed and current events // of specified type from specified blocks type eventsGetter[T contractsapi.EventAbi] struct { - blockchain blockchainBackend + // blockchain is an abstraction of blockchain that provides necessary functions + // for querying blockchain data (blocks, receipts, etc.) + blockchain blockchainBackend + // saveEventsFn is a plugin function used to return gotten events + // and/or to save them to some db saveEventsFn func([]T) error + // parseEventFn is a plugin function used to parse the event from transaction log parseEventFn func(*types.Header, *ethgo.Log) (T, bool, error) + // isValidLogFn is a plugin function that validates the log + // for example: if it was sent from the desired address isValidLogFn func(*types.Log) bool } // getFromBlocks gets events of specified type from specified blocks // and saves them using the provided saveEventsFn -func (m *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error { +func (e *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error { var missedEvents []T for i := fromBlock; i <= toBlock; i++ { - blockHeader, found := m.blockchain.GetHeaderByNumber(i) + blockHeader, found := e.blockchain.GetHeaderByNumber(i) if !found { return blockchain.ErrNoBlock } - receipts, err := m.blockchain.GetReceiptsByHash(blockHeader.Hash) + receipts, err := e.blockchain.GetReceiptsByHash(blockHeader.Hash) if err != nil { return err } - eventsFromBlock, err := m.getEventsFromReceipts(blockHeader, receipts) + eventsFromBlock, err := e.getEventsFromReceipts(blockHeader, receipts) if err != nil { return err } @@ -41,14 +48,14 @@ func (m *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error { } if len(missedEvents) > 0 { - return m.saveEventsFn(missedEvents) + return e.saveEventsFn(missedEvents) } return nil } // getEventsFromReceipts returns events of specified type from block transaction receipts -func (m *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header, +func (e *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header, receipts []*types.Receipt) ([]T, error) { var events []T @@ -58,11 +65,11 @@ func (m *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header, } for _, log := range receipt.Logs { - if m.isValidLogFn != nil && !m.isValidLogFn(log) { + if e.isValidLogFn != nil && !e.isValidLogFn(log) { continue } - event, doesMatch, err := m.parseEventFn(blockHeader, convertLog(log)) + event, doesMatch, err := e.parseEventFn(blockHeader, convertLog(log)) if err != nil { return nil, err } @@ -80,12 +87,12 @@ func (m *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header, // saveBlockEvents gets events of specified block from block receipts // and saves them using the provided saveEventsFn -func (m *eventsGetter[T]) saveBlockEvents(blockHeader *types.Header, +func (e *eventsGetter[T]) saveBlockEvents(blockHeader *types.Header, receipts []*types.Receipt) error { - events, err := m.getEventsFromReceipts(blockHeader, receipts) + events, err := e.getEventsFromReceipts(blockHeader, receipts) if err != nil { return err } - return m.saveEventsFn(events) + return e.saveEventsFn(events) } diff --git a/consensus/polybft/state_store_checkpoint.go b/consensus/polybft/state_store_checkpoint.go index 04d74175b8..c5f0df8986 100644 --- a/consensus/polybft/state_store_checkpoint.go +++ b/consensus/polybft/state_store_checkpoint.go @@ -16,12 +16,12 @@ import ( var ( // bucket to store exit contract events - exitEventsBucket = []byte("exitEvent") - exitEventToEpochLookupBucket = []byte("exitIdToEpochLookup") - exitEventLastSavedBucket = []byte("lastSavedExitEvent") + exitEventsBucket = []byte("exitEvent") + exitEventToEpochLookupBucket = []byte("exitIdToEpochLookup") + exitEventLastProcessedBlockBucket = []byte("lastProcessedBlock") - lastSavedEventKey = []byte("lastSaved") - errNoLastSavedEntry = errors.New("there is no last saved block in last saved bucket") + lastProcessedBlockKey = []byte("lastProcessedBlock") + errNoLastSavedEntry = errors.New("there is no last saved block in last saved bucket") ) type exitEventNotFoundError struct { @@ -48,6 +48,7 @@ Bolt DB schema: exit events/ |--> (id+epoch+blockNumber) -> *ExitEvent (json marshalled) |--> (exitEventID) -> epochNumber +|--> (lastProcessedBlockKey) -> block number */ type CheckpointStore struct { db *bolt.DB @@ -63,11 +64,11 @@ func (s *CheckpointStore) initialize(tx *bolt.Tx) error { return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventToEpochLookupBucket), err) } - if _, err := tx.CreateBucketIfNotExists(exitEventLastSavedBucket); err != nil { - return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventLastSavedBucket), err) + if _, err := tx.CreateBucketIfNotExists(exitEventLastProcessedBlockBucket); err != nil { + return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventLastProcessedBlockBucket), err) } - return tx.Bucket(exitEventLastSavedBucket).Put(lastSavedEventKey, common.EncodeUint64ToBytes(0)) + return tx.Bucket(exitEventLastProcessedBlockBucket).Put(lastProcessedBlockKey, common.EncodeUint64ToBytes(0)) } // insertExitEvents inserts a slice of exit events to exit event bucket in bolt db @@ -189,7 +190,7 @@ func (s *CheckpointStore) getExitEvents(epoch uint64, filter func(exitEvent *Exi // updateLastSaved saves the last block processed for exit events func (s *CheckpointStore) updateLastSaved(blockNumber uint64) error { return s.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(exitEventLastSavedBucket).Put(lastSavedEventKey, + return tx.Bucket(exitEventLastProcessedBlockBucket).Put(lastProcessedBlockKey, common.EncodeUint64ToBytes(blockNumber)) }) } @@ -199,7 +200,7 @@ func (s *CheckpointStore) getLastSaved() (uint64, error) { var lastSavedBlock uint64 err := s.db.View(func(tx *bolt.Tx) error { - v := tx.Bucket(exitEventLastSavedBucket).Get(lastSavedEventKey) + v := tx.Bucket(exitEventLastProcessedBlockBucket).Get(lastProcessedBlockKey) if v == nil { return errNoLastSavedEntry } diff --git a/consensus/polybft/state_sync_manager.go b/consensus/polybft/state_sync_manager.go index 75a036ec44..ad6cbc1cd4 100644 --- a/consensus/polybft/state_sync_manager.go +++ b/consensus/polybft/state_sync_manager.go @@ -231,12 +231,12 @@ func (s *stateSyncManager) verifyVoteSignature(valSet validator.ValidatorSet, si } // AddLog saves the received log from event tracker if it matches a state sync event ABI -func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) { +func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) error { event := &contractsapi.StateSyncedEvent{} doesMatch, err := event.ParseLog(eventLog) if !doesMatch { - return + return nil } s.logger.Info( @@ -249,18 +249,22 @@ func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) { if err != nil { s.logger.Error("could not decode state sync event", "err", err) - return + return err } if err := s.state.StateSyncStore.insertStateSyncEvent(event); err != nil { s.logger.Error("could not save state sync event to boltDb", "err", err) - return + return err } if err := s.buildCommitment(); err != nil { + // we don't return an error here. If state sync event is inserted in db, + // we will just try to build a commitment on next block or next event arrival s.logger.Error("could not build a commitment on arrival of new state sync", "err", err, "stateSyncID", event.ID) } + + return nil } // Commitment returns a commitment to be submitted if there is a pending commitment with quorum diff --git a/consensus/polybft/statesyncrelayer/state_sync_relayer.go b/consensus/polybft/statesyncrelayer/state_sync_relayer.go index 17a108581a..33232387ea 100644 --- a/consensus/polybft/statesyncrelayer/state_sync_relayer.go +++ b/consensus/polybft/statesyncrelayer/state_sync_relayer.go @@ -107,20 +107,20 @@ func (r *StateSyncRelayer) Stop() { close(r.closeCh) } -func (r *StateSyncRelayer) AddLog(log *ethgo.Log) { +func (r *StateSyncRelayer) AddLog(log *ethgo.Log) error { r.logger.Debug("Received a log", "log", log) var commitEvent contractsapi.NewCommitmentEvent doesMatch, err := commitEvent.ParseLog(log) if !doesMatch { - return + return nil } if err != nil { r.logger.Error("Failed to parse log", "err", err) - return + return err } startID := commitEvent.StartID.Uint64() @@ -128,6 +128,8 @@ func (r *StateSyncRelayer) AddLog(log *ethgo.Log) { r.logger.Info("Execute commitment", "Block", log.BlockNumber, "StartID", startID, "EndID", endID) + // we don't return errors if some client logic fails, + // only if event is not parsed for i := startID; i <= endID; i++ { // query the state sync proof stateSyncProof, err := r.queryStateSyncProof(fmt.Sprintf("0x%x", i)) @@ -145,6 +147,8 @@ func (r *StateSyncRelayer) AddLog(log *ethgo.Log) { r.logger.Info("State sync executed", "ID", i) } + + return nil } // queryStateSyncProof queries the state sync proof diff --git a/tracker/event_tracker.go b/tracker/event_tracker.go index 6ecca5c926..753c42aaae 100644 --- a/tracker/event_tracker.go +++ b/tracker/event_tracker.go @@ -15,7 +15,7 @@ import ( const minBlockMaxBacklog = 96 type eventSubscription interface { - AddLog(log *ethgo.Log) + AddLog(log *ethgo.Log) error } type EventTracker struct { diff --git a/tracker/event_tracker_store.go b/tracker/event_tracker_store.go index 151e59bc8c..12e4a3cfb0 100644 --- a/tracker/event_tracker_store.go +++ b/tracker/event_tracker_store.go @@ -124,6 +124,8 @@ func (b *EventTrackerStore) Set(k, v string) error { if strings.HasPrefix(k, dbLastBlockPrefix) { if err := b.onNewBlock(k[len(dbLastBlockPrefix):], v); err != nil { b.logger.Warn("new block error", "err", err) + + return err } } @@ -160,17 +162,19 @@ func (b *EventTrackerStore) onNewBlock(filterHash, blockData string) error { return nil // nothing to process } - nextToProcessIdx := common.EncodeBytesToUint64(lastProcessedKey) + 1 + // notify subscriber with logs + for _, log := range logs { + if err := b.subscriber.AddLog(log); err != nil { + return err + } + } + // save next to process only if every AddLog finished successfully + nextToProcessIdx := common.EncodeBytesToUint64(lastProcessedKey) + 1 if err := entry.saveNextToProcessIndx(nextToProcessIdx); err != nil { return err } - // notify subscriber with logs - for _, log := range logs { - b.subscriber.AddLog(log) - } - b.logger.Debug("Event logs have been notified to a subscriber", "len", len(logs), "next", nextToProcessIdx) return nil diff --git a/tracker/event_tracker_test.go b/tracker/event_tracker_test.go index 669569e5a1..bfc1c00e0a 100644 --- a/tracker/event_tracker_test.go +++ b/tracker/event_tracker_test.go @@ -20,7 +20,7 @@ type mockEventSubscriber struct { logs []*ethgo.Log } -func (m *mockEventSubscriber) AddLog(log *ethgo.Log) { +func (m *mockEventSubscriber) AddLog(log *ethgo.Log) error { m.lock.Lock() defer m.lock.Unlock() @@ -29,6 +29,8 @@ func (m *mockEventSubscriber) AddLog(log *ethgo.Log) { } m.logs = append(m.logs, log) + + return nil } func (m *mockEventSubscriber) len() int {