Skip to content

Commit

Permalink
Async to Sync processing ETH1 events (#540)
Browse files Browse the repository at this point in the history
* implementation
  • Loading branch information
olegshmuelov authored Feb 23, 2022
1 parent a3d272e commit 474b705
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
3 changes: 1 addition & 2 deletions eth1/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ func SyncEth1Events(logger *zap.Logger, client Client, storage SyncOffsetStorage
if syncEndedEvent, ok = event.Data.(SyncEndedEvent); ok {
return
}
logger.Debug("got new event from eth1 sync",
zap.Uint64("BlockNumber", event.Log.BlockNumber))
logger.Debug("got new event from eth1 sync", zap.Uint64("BlockNumber", event.Log.BlockNumber))
if handler != nil {
queue(*event)
}
Expand Down
61 changes: 35 additions & 26 deletions validator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,44 @@ func (c *controller) ListenToEth1Events(feed *event.Feed) {
defer sub.Unsubscribe()
for {
select {
case event := <-cn:
if err := c.ProcessEth1Event(*event); err != nil {
c.logger.Error("could not process event", zap.Error(err))
case e := <-cn:
if err := c.ProcessOngoingEth1Event(*e); err != nil {
c.logger.Error("could not process ongoing eth1 event", zap.Error(err))
}
case err := <-sub.Err():
c.logger.Error("event feed subscription error", zap.Error(err))
}
}
}

// ProcessEth1Event handles a single event, will be called in both sync and stream events from registry contract
// ProcessOngoingEth1Event handles a single event, will be called in stream events from registry contract
func (c *controller) ProcessOngoingEth1Event(e eth1.Event) error {
if validatorAddedEvent, ok := e.Data.(abiparser.ValidatorAddedEvent); ok {
pubKey := hex.EncodeToString(validatorAddedEvent.PublicKey)
if _, ok := c.validatorsMap.GetValidator(pubKey); ok {
c.logger.Debug("validator was loaded already")
return nil
}
share, err := c.handleValidatorAddedEvent(validatorAddedEvent)
if err != nil {
c.logger.Error("could not handle validatorAdded event", zap.String("pubkey", pubKey), zap.Error(err))
return err
}
v := c.validatorsMap.GetOrCreateValidator(share)
if err := c.startValidator(v); err != nil {
c.logger.Warn("could not start validator", zap.Error(err))
}
}
return nil
}

// ProcessEth1Event handles a single event, will be called in sync events from registry contract
func (c *controller) ProcessEth1Event(e eth1.Event) error {
if validatorAddedEvent, ok := e.Data.(abiparser.ValidatorAddedEvent); ok {
pubKey := hex.EncodeToString(validatorAddedEvent.PublicKey)
if err := c.handleValidatorAddedEvent(validatorAddedEvent); err != nil {
c.logger.Error("could not process validator",
zap.String("pubkey", pubKey), zap.Error(err))
_, err := c.handleValidatorAddedEvent(validatorAddedEvent)
if err != nil {
c.logger.Error("could not process validator", zap.String("pubkey", pubKey), zap.Error(err))
return err
}
}
Expand Down Expand Up @@ -285,40 +306,28 @@ func (c *controller) GetValidatorsIndices() []spec.ValidatorIndex {
}

// handleValidatorAddedEvent handles registry contract event for validator added
func (c *controller) handleValidatorAddedEvent(validatorAddedEvent abiparser.ValidatorAddedEvent) error {
pubKey := hex.EncodeToString(validatorAddedEvent.PublicKey[:])
func (c *controller) handleValidatorAddedEvent(validatorAddedEvent abiparser.ValidatorAddedEvent) (*validatorstorage.Share, error) {
pubKey := hex.EncodeToString(validatorAddedEvent.PublicKey)
logger := c.logger.With(zap.String("pubKey", pubKey))
// if exist -> do nothing
if _, ok := c.validatorsMap.GetValidator(pubKey); ok {
logger.Debug("validator was loaded already")
// TODO: handle updateValidator in the future
return nil
}
logger.Debug("new validator, starting setup")
metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusInactive))
validatorShare, found, err := c.collection.GetValidatorShare(validatorAddedEvent.PublicKey[:])
validatorShare, found, err := c.collection.GetValidatorShare(validatorAddedEvent.PublicKey)
if err != nil {
return errors.Wrap(err, "could not check if validator share exits")
return nil, errors.Wrap(err, "could not check if validator share exits")
}
if !found {
newValShare, share, err := createShareWithOperatorKey(validatorAddedEvent, c.shareEncryptionKeyProvider)
if err != nil {
return errors.Wrap(err, "failed to create share")
return nil, errors.Wrap(err, "failed to create share")
}
if err := c.onNewShare(newValShare, share); err != nil {
metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusError))
return err
return nil, err
}
validatorShare = newValShare
logger.Debug("new validator share was created and saved")
}

v := c.validatorsMap.GetOrCreateValidator(validatorShare)
if err := c.startValidator(v); err != nil {
logger.Warn("could not start validator", zap.Error(err))
}

return nil
return validatorShare, nil
}

// onMetadataUpdated is called when validator's metadata was updated
Expand Down

0 comments on commit 474b705

Please sign in to comment.