diff --git a/rpc/namespaces/ethereum/eth/filters/api.go b/rpc/namespaces/ethereum/eth/filters/api.go index 17483ea81b..11c8302b71 100644 --- a/rpc/namespaces/ethereum/eth/filters/api.go +++ b/rpc/namespaces/ethereum/eth/filters/api.go @@ -334,48 +334,42 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er api.events.WithContext(ctx) rpcSub := notifier.CreateSubscription() - headersSub, cancelSubs, err := api.events.SubscribeNewHeads() + blocksSub, cancelSubs, err := api.events.SubscribeNewBlocks() if err != nil { return &rpc.Subscription{}, err } - go func(headersCh <-chan coretypes.ResultEvent) { + go func(blocksCh <-chan coretypes.ResultEvent) { defer cancelSubs() for { select { - case ev, ok := <-headersCh: + case ev, ok := <-blocksCh: if !ok { - headersSub.Unsubscribe(api.events) + blocksSub.Unsubscribe(api.events) return } - data, ok := ev.Data.(tmtypes.EventDataNewBlockEvents) + data, ok := ev.Data.(tmtypes.EventDataNewBlock) if !ok { - api.logger.Debug("EventDataNewBlockEvents type mismatch", "type", fmt.Sprintf("%T", ev.Data)) + api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) continue } - baseFee := types.BaseFeeFromEvents(data.Events) - - data1, ok := ev.Data.(tmtypes.EventDataNewBlockHeader) - if !ok { - api.logger.Debug("EventDataNewBlockHeader type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } + baseFee := types.BaseFeeFromEvents(data.ResultFinalizeBlock.Events) // TODO: fetch bloom from events - header := types.EthHeaderFromTendermint(data1.Header, ethtypes.Bloom{}, baseFee) + header := types.EthHeaderFromTendermint(data.Block.Header, ethtypes.Bloom{}, baseFee) _ = notifier.Notify(rpcSub.ID, header) case <-rpcSub.Err(): - headersSub.Unsubscribe(api.events) + blocksSub.Unsubscribe(api.events) return case <-notifier.Closed(): - headersSub.Unsubscribe(api.events) + blocksSub.Unsubscribe(api.events) return } } - }(headersSub.eventCh) + }(blocksSub.eventCh) return rpcSub, err } diff --git a/rpc/namespaces/ethereum/eth/filters/filter_system.go b/rpc/namespaces/ethereum/eth/filters/filter_system.go index 9f3f4a9cfd..e3fd9002ac 100644 --- a/rpc/namespaces/ethereum/eth/filters/filter_system.go +++ b/rpc/namespaces/ethereum/eth/filters/filter_system.go @@ -44,6 +44,7 @@ import ( var ( txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String() headerEvents = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader).String() + blockEvents = tmtypes.QueryForEvent(tmtypes.EventNewBlock).String() evmEvents = tmquery.MustCompile( fmt.Sprintf("%s='%s' AND %s.%s='%s'", tmtypes.EventTypeKey, @@ -222,6 +223,20 @@ func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc return es.subscribe(sub) } +// SubscribeNewHeads subscribes to new block headers events. +func (es EventSystem) SubscribeNewBlocks() (*Subscription, pubsub.UnsubscribeFunc, error) { + sub := &Subscription{ + id: rpc.NewID(), + typ: filters.BlocksSubscription, + event: blockEvents, + created: time.Now().UTC(), + headers: make(chan *ethtypes.Header), + installed: make(chan struct{}, 1), + err: make(chan error, 1), + } + return es.subscribe(sub) +} + // SubscribePendingTxs subscribes to new pending transactions events from the mempool. func (es EventSystem) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) { sub := &Subscription{