Skip to content

Commit

Permalink
Event tracker error
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Jul 5, 2023
1 parent c73e22b commit 99dce46
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 37 deletions.
31 changes: 19 additions & 12 deletions consensus/polybft/state_event_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,36 @@ import (
// eventsGetter is a struct for getting missed and current events
// of specified type from specified blocks
type eventsGetter[T contractsapi.EventAbi] struct {
blockchain blockchainBackend
// blockchain is an abstraction of blockchain that provides necessary functions
// for querying blockchain data (blocks, receipts, etc.)
blockchain blockchainBackend
// saveEventsFn is a plugin function used to return gotten events
// and/or to save them to some db
saveEventsFn func([]T) error
// parseEventFn is a plugin function used to parse the event from transaction log
parseEventFn func(*types.Header, *ethgo.Log) (T, bool, error)
// isValidLogFn is a plugin function that validates the log
// for example: if it was sent from the desired address
isValidLogFn func(*types.Log) bool
}

// getFromBlocks gets events of specified type from specified blocks
// and saves them using the provided saveEventsFn
func (m *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error {
func (e *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error {
var missedEvents []T

for i := fromBlock; i <= toBlock; i++ {
blockHeader, found := m.blockchain.GetHeaderByNumber(i)
blockHeader, found := e.blockchain.GetHeaderByNumber(i)
if !found {
return blockchain.ErrNoBlock
}

receipts, err := m.blockchain.GetReceiptsByHash(blockHeader.Hash)
receipts, err := e.blockchain.GetReceiptsByHash(blockHeader.Hash)
if err != nil {
return err
}

eventsFromBlock, err := m.getEventsFromReceipts(blockHeader, receipts)
eventsFromBlock, err := e.getEventsFromReceipts(blockHeader, receipts)
if err != nil {
return err
}
Expand All @@ -41,14 +48,14 @@ func (m *eventsGetter[T]) getFromBlocks(fromBlock, toBlock uint64) error {
}

if len(missedEvents) > 0 {
return m.saveEventsFn(missedEvents)
return e.saveEventsFn(missedEvents)
}

return nil
}

// getEventsFromReceipts returns events of specified type from block transaction receipts
func (m *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header,
func (e *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header,
receipts []*types.Receipt) ([]T, error) {
var events []T

Expand All @@ -58,11 +65,11 @@ func (m *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header,
}

for _, log := range receipt.Logs {
if m.isValidLogFn != nil && !m.isValidLogFn(log) {
if e.isValidLogFn != nil && !e.isValidLogFn(log) {
continue
}

event, doesMatch, err := m.parseEventFn(blockHeader, convertLog(log))
event, doesMatch, err := e.parseEventFn(blockHeader, convertLog(log))
if err != nil {
return nil, err
}
Expand All @@ -80,12 +87,12 @@ func (m *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header,

// saveBlockEvents gets events of specified block from block receipts
// and saves them using the provided saveEventsFn
func (m *eventsGetter[T]) saveBlockEvents(blockHeader *types.Header,
func (e *eventsGetter[T]) saveBlockEvents(blockHeader *types.Header,
receipts []*types.Receipt) error {
events, err := m.getEventsFromReceipts(blockHeader, receipts)
events, err := e.getEventsFromReceipts(blockHeader, receipts)
if err != nil {
return err
}

return m.saveEventsFn(events)
return e.saveEventsFn(events)
}
21 changes: 11 additions & 10 deletions consensus/polybft/state_store_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (

var (
// bucket to store exit contract events
exitEventsBucket = []byte("exitEvent")
exitEventToEpochLookupBucket = []byte("exitIdToEpochLookup")
exitEventLastSavedBucket = []byte("lastSavedExitEvent")
exitEventsBucket = []byte("exitEvent")
exitEventToEpochLookupBucket = []byte("exitIdToEpochLookup")
exitEventLastProcessedBlockBucket = []byte("lastProcessedBlock")

lastSavedEventKey = []byte("lastSaved")
errNoLastSavedEntry = errors.New("there is no last saved block in last saved bucket")
lastProcessedBlockKey = []byte("lastProcessedBlock")
errNoLastSavedEntry = errors.New("there is no last saved block in last saved bucket")
)

type exitEventNotFoundError struct {
Expand All @@ -48,6 +48,7 @@ Bolt DB schema:
exit events/
|--> (id+epoch+blockNumber) -> *ExitEvent (json marshalled)
|--> (exitEventID) -> epochNumber
|--> (lastProcessedBlockKey) -> block number
*/
type CheckpointStore struct {
db *bolt.DB
Expand All @@ -63,11 +64,11 @@ func (s *CheckpointStore) initialize(tx *bolt.Tx) error {
return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventToEpochLookupBucket), err)
}

if _, err := tx.CreateBucketIfNotExists(exitEventLastSavedBucket); err != nil {
return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventLastSavedBucket), err)
if _, err := tx.CreateBucketIfNotExists(exitEventLastProcessedBlockBucket); err != nil {
return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventLastProcessedBlockBucket), err)
}

return tx.Bucket(exitEventLastSavedBucket).Put(lastSavedEventKey, common.EncodeUint64ToBytes(0))
return tx.Bucket(exitEventLastProcessedBlockBucket).Put(lastProcessedBlockKey, common.EncodeUint64ToBytes(0))
}

// insertExitEvents inserts a slice of exit events to exit event bucket in bolt db
Expand Down Expand Up @@ -189,7 +190,7 @@ func (s *CheckpointStore) getExitEvents(epoch uint64, filter func(exitEvent *Exi
// updateLastSaved saves the last block processed for exit events
func (s *CheckpointStore) updateLastSaved(blockNumber uint64) error {
return s.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(exitEventLastSavedBucket).Put(lastSavedEventKey,
return tx.Bucket(exitEventLastProcessedBlockBucket).Put(lastProcessedBlockKey,
common.EncodeUint64ToBytes(blockNumber))
})
}
Expand All @@ -199,7 +200,7 @@ func (s *CheckpointStore) getLastSaved() (uint64, error) {
var lastSavedBlock uint64

err := s.db.View(func(tx *bolt.Tx) error {
v := tx.Bucket(exitEventLastSavedBucket).Get(lastSavedEventKey)
v := tx.Bucket(exitEventLastProcessedBlockBucket).Get(lastProcessedBlockKey)
if v == nil {
return errNoLastSavedEntry
}
Expand Down
12 changes: 8 additions & 4 deletions consensus/polybft/state_sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,12 @@ func (s *stateSyncManager) verifyVoteSignature(valSet validator.ValidatorSet, si
}

// AddLog saves the received log from event tracker if it matches a state sync event ABI
func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) {
func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) error {
event := &contractsapi.StateSyncedEvent{}

doesMatch, err := event.ParseLog(eventLog)
if !doesMatch {
return
return nil
}

s.logger.Info(
Expand All @@ -249,18 +249,22 @@ func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) {
if err != nil {
s.logger.Error("could not decode state sync event", "err", err)

return
return err
}

if err := s.state.StateSyncStore.insertStateSyncEvent(event); err != nil {
s.logger.Error("could not save state sync event to boltDb", "err", err)

return
return err
}

if err := s.buildCommitment(); err != nil {
// we don't return an error here. If state sync event is inserted in db,
// we will just try to build a commitment on next block or next event arrival
s.logger.Error("could not build a commitment on arrival of new state sync", "err", err, "stateSyncID", event.ID)
}

return nil
}

// Commitment returns a commitment to be submitted if there is a pending commitment with quorum
Expand Down
10 changes: 7 additions & 3 deletions consensus/polybft/statesyncrelayer/state_sync_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,29 @@ func (r *StateSyncRelayer) Stop() {
close(r.closeCh)
}

func (r *StateSyncRelayer) AddLog(log *ethgo.Log) {
func (r *StateSyncRelayer) AddLog(log *ethgo.Log) error {
r.logger.Debug("Received a log", "log", log)

var commitEvent contractsapi.NewCommitmentEvent

doesMatch, err := commitEvent.ParseLog(log)
if !doesMatch {
return
return nil
}

if err != nil {
r.logger.Error("Failed to parse log", "err", err)

return
return err
}

startID := commitEvent.StartID.Uint64()
endID := commitEvent.EndID.Uint64()

r.logger.Info("Execute commitment", "Block", log.BlockNumber, "StartID", startID, "EndID", endID)

// we don't return errors if some client logic fails,
// only if event is not parsed
for i := startID; i <= endID; i++ {
// query the state sync proof
stateSyncProof, err := r.queryStateSyncProof(fmt.Sprintf("0x%x", i))
Expand All @@ -145,6 +147,8 @@ func (r *StateSyncRelayer) AddLog(log *ethgo.Log) {

r.logger.Info("State sync executed", "ID", i)
}

return nil
}

// queryStateSyncProof queries the state sync proof
Expand Down
2 changes: 1 addition & 1 deletion tracker/event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
const minBlockMaxBacklog = 96

type eventSubscription interface {
AddLog(log *ethgo.Log)
AddLog(log *ethgo.Log) error
}

type EventTracker struct {
Expand Down
16 changes: 10 additions & 6 deletions tracker/event_tracker_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func (b *EventTrackerStore) Set(k, v string) error {
if strings.HasPrefix(k, dbLastBlockPrefix) {
if err := b.onNewBlock(k[len(dbLastBlockPrefix):], v); err != nil {
b.logger.Warn("new block error", "err", err)

return err
}
}

Expand Down Expand Up @@ -160,17 +162,19 @@ func (b *EventTrackerStore) onNewBlock(filterHash, blockData string) error {
return nil // nothing to process
}

nextToProcessIdx := common.EncodeBytesToUint64(lastProcessedKey) + 1
// notify subscriber with logs
for _, log := range logs {
if err := b.subscriber.AddLog(log); err != nil {
return err
}
}

// save next to process only if every AddLog finished successfully
nextToProcessIdx := common.EncodeBytesToUint64(lastProcessedKey) + 1
if err := entry.saveNextToProcessIndx(nextToProcessIdx); err != nil {
return err
}

// notify subscriber with logs
for _, log := range logs {
b.subscriber.AddLog(log)
}

b.logger.Debug("Event logs have been notified to a subscriber", "len", len(logs), "next", nextToProcessIdx)

return nil
Expand Down
4 changes: 3 additions & 1 deletion tracker/event_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type mockEventSubscriber struct {
logs []*ethgo.Log
}

func (m *mockEventSubscriber) AddLog(log *ethgo.Log) {
func (m *mockEventSubscriber) AddLog(log *ethgo.Log) error {
m.lock.Lock()
defer m.lock.Unlock()

Expand All @@ -29,6 +29,8 @@ func (m *mockEventSubscriber) AddLog(log *ethgo.Log) {
}

m.logs = append(m.logs, log)

return nil
}

func (m *mockEventSubscriber) len() int {
Expand Down

0 comments on commit 99dce46

Please sign in to comment.