Skip to content

Commit

Permalink
chore: fix ethereum rpc PUblicFilterAPI.NewHeads
Browse files Browse the repository at this point in the history
  • Loading branch information
dudong2 committed Feb 4, 2024
1 parent 11d7b33 commit f996886
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
28 changes: 11 additions & 17 deletions rpc/namespaces/ethereum/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,48 +334,42 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
api.events.WithContext(ctx)
rpcSub := notifier.CreateSubscription()

headersSub, cancelSubs, err := api.events.SubscribeNewHeads()
blocksSub, cancelSubs, err := api.events.SubscribeNewBlocks()
if err != nil {
return &rpc.Subscription{}, err
}

go func(headersCh <-chan coretypes.ResultEvent) {
go func(blocksCh <-chan coretypes.ResultEvent) {
defer cancelSubs()

for {
select {
case ev, ok := <-headersCh:
case ev, ok := <-blocksCh:
if !ok {
headersSub.Unsubscribe(api.events)
blocksSub.Unsubscribe(api.events)
return
}

data, ok := ev.Data.(tmtypes.EventDataNewBlockEvents)
data, ok := ev.Data.(tmtypes.EventDataNewBlock)
if !ok {
api.logger.Debug("EventDataNewBlockEvents type mismatch", "type", fmt.Sprintf("%T", ev.Data))
api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data))
continue
}

baseFee := types.BaseFeeFromEvents(data.Events)

data1, ok := ev.Data.(tmtypes.EventDataNewBlockHeader)
if !ok {
api.logger.Debug("EventDataNewBlockHeader type mismatch", "type", fmt.Sprintf("%T", ev.Data))
continue
}
baseFee := types.BaseFeeFromEvents(data.ResultFinalizeBlock.Events)

// TODO: fetch bloom from events
header := types.EthHeaderFromTendermint(data1.Header, ethtypes.Bloom{}, baseFee)
header := types.EthHeaderFromTendermint(data.Block.Header, ethtypes.Bloom{}, baseFee)
_ = notifier.Notify(rpcSub.ID, header)
case <-rpcSub.Err():
headersSub.Unsubscribe(api.events)
blocksSub.Unsubscribe(api.events)
return
case <-notifier.Closed():
headersSub.Unsubscribe(api.events)
blocksSub.Unsubscribe(api.events)
return
}
}
}(headersSub.eventCh)
}(blocksSub.eventCh)

return rpcSub, err
}
Expand Down
15 changes: 15 additions & 0 deletions rpc/namespaces/ethereum/eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
var (
txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String()
headerEvents = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader).String()
blockEvents = tmtypes.QueryForEvent(tmtypes.EventNewBlock).String()
evmEvents = tmquery.MustCompile(
fmt.Sprintf("%s='%s' AND %s.%s='%s'",
tmtypes.EventTypeKey,
Expand Down Expand Up @@ -222,6 +223,20 @@ func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc
return es.subscribe(sub)
}

// SubscribeNewHeads subscribes to new block headers events.
func (es EventSystem) SubscribeNewBlocks() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.BlocksSubscription,
event: blockEvents,
created: time.Now().UTC(),
headers: make(chan *ethtypes.Header),
installed: make(chan struct{}, 1),
err: make(chan error, 1),
}
return es.subscribe(sub)
}

// SubscribePendingTxs subscribes to new pending transactions events from the mempool.
func (es EventSystem) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
Expand Down

0 comments on commit f996886

Please sign in to comment.