diff --git a/packages/eventindexer/indexer/subscribe.go b/packages/eventindexer/indexer/subscribe.go index 6662233da43..0ba8af09cc9 100644 --- a/packages/eventindexer/indexer/subscribe.go +++ b/packages/eventindexer/indexer/subscribe.go @@ -19,6 +19,7 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { errChan := make(chan error) go svc.subscribeBlockProven(ctx, chainID, errChan) + go svc.subscribeBlockProposed(ctx, chainID, errChan) // nolint: gosimple for { @@ -61,6 +62,74 @@ func (svc *Service) subscribeBlockProven(ctx context.Context, chainID *big.Int, log.Infof("blockProvenEvent from subscription for prover %v", event.Prover.Hex()) if err := svc.saveBlockProvenEvent(ctx, chainID, event); err != nil { + eventindexer.BlockProvenEventsProcessedError.Inc() + + log.Errorf("svc.subscribe, svc.saveBlockProvenEvent: %v", err) + } + + block, err := svc.blockRepo.GetLatestBlockProcessed(chainID) + if err != nil { + log.Errorf("svc.subscribe, blockRepo.GetLatestBlockProcessed: %v", err) + continue + } + + if block.Height < event.Raw.BlockNumber { + err = svc.blockRepo.Save(eventindexer.SaveBlockOpts{ + Height: event.Raw.BlockNumber, + Hash: event.Raw.BlockHash, + ChainID: chainID, + }) + if err != nil { + log.Errorf("svc.subscribe, svc.blockRepo.Save: %v", err) + } + + eventindexer.BlocksProcessed.Inc() + } + } + } +} + +func (svc *Service) subscribeBlockProposed(ctx context.Context, chainID *big.Int, errChan chan error) { + sink := make(chan *taikol1.TaikoL1BlockProposed) + + sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + log.Errorf("svc.taikoL1.WatchBlockProposed: %v", err) + } + log.Info("resubscribing to BlockProposed events") + + return svc.taikol1.WatchBlockProposed(&bind.WatchOpts{ + Context: ctx, + }, sink, nil) + }) + + defer sub.Unsubscribe() + + for { + select { + case <-ctx.Done(): + log.Info("context finished") + return + case err := <-sub.Err(): + errChan <- errors.Wrap(err, "sub.Err()") + case event := <-sink: + log.Infof("blockProposedEvent from subscription") + + tx, _, err := svc.ethClient.TransactionByHash(ctx, event.Raw.TxHash) + if err != nil { + log.Errorf("svc.ethClient.TransactionByHash: %v", err) + } + + sender, err := svc.ethClient.TransactionSender(ctx, tx, event.Raw.BlockHash, event.Raw.TxIndex) + if err != nil { + log.Errorf("svc.ethClient.TransactionSender: %v", err) + } + + log.Infof("blockProposed by: %v", sender.Hex()) + + if err := svc.saveBlockProposedEvent(ctx, chainID, event, sender); err != nil { + eventindexer.BlockProposedEventsProcessedError.Inc() + log.Errorf("svc.subscribe, svc.saveBlockProvenEvent: %v", err) }