Skip to content

Commit

Permalink
eth/filters,ethclient,node: install newSideHeads subscription (ethere…
Browse files Browse the repository at this point in the history
…um#293)

etclabscore/core-geth#293

commit 85e4d0b644e4ec692d199899dd6e4e34c41ff36c
Author: meows <[email protected]>
Date:   Tue Jan 19 07:07:02 2021 -0600

    ethclient: install missing eth_newSideBlockFilter method for rpc.discover test

    Date: 2021-01-19 07:07:02-06:00
    Signed-off-by: meows <[email protected]>

commit c8cf61df8b9014620508d71f745b316569fec5e1
Author: meows <[email protected]>
Date:   Tue Jan 12 09:54:37 2021 -0600

    filters: (lint) fix comment

    Resolves https://github.com/etclabscore/core-geth/pull/293/files/bf5bd1290fdb0ad59e8d4255100934d053be6e5d#r555805816

    Date: 2021-01-12 09:54:37-06:00
    Signed-off-by: meows <[email protected]>

commit bf5bd1290fdb0ad59e8d4255100934d053be6e5d
Author: meows <[email protected]>
Date:   Tue Jan 12 08:05:16 2021 -0600

    filters: (lint) goimports

    Date: 2021-01-12 08:05:16-06:00
    Signed-off-by: meows <[email protected]>

commit 6ae8387e1a45a407fdb12a2c30dc368fa001b20e
Author: meows <[email protected]>
Date:   Mon Jan 11 18:13:48 2021 -0600

    eth/filters,ethclient,node: install newSideHeads subscription

    The newSideHeads subscription work very similarly to
    the newHeads subscription; instead, non-canonical blocks are channeled.
  • Loading branch information
meowsbits committed Jan 19, 2021
1 parent c354dd4 commit 9422e73
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 4 deletions.
31 changes: 31 additions & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,33 @@ func (b *SimulatedBackend) SubscribeNewHead(ctx context.Context, ch chan<- *type
}), nil
}

// SubscribeNewHead returns an event subscription for a new header imported as non-canonical (side status).
func (b *SimulatedBackend) SubscribeNewSideHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
// subscribe to a new head
sink := make(chan *types.Header)
sub := b.events.SubscribeNewSideHeads(sink)

return event.NewSubscription(func(quit <-chan struct{}) error {
defer sub.Unsubscribe()
for {
select {
case head := <-sink:
select {
case ch <- head:
case err := <-sub.Err():
return err
case <-quit:
return nil
}
case err := <-sub.Err():
return err
case <-quit:
return nil
}
}
}), nil
}

// AdjustTime adds a time shift to the simulated clock.
// It can only be called on empty blocks.
func (b *SimulatedBackend) AdjustTime(adjustment time.Duration) error {
Expand Down Expand Up @@ -770,6 +797,10 @@ func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Su
return fb.bc.SubscribeChainEvent(ch)
}

func (fb *filterBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
return fb.bc.SubscribeChainSideEvent(ch)
}

func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return fb.bc.SubscribeRemovedLogsEvent(ch)
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// In theory we should fire a ChainHeadEvent when we inject
// a canonical block, but sometimes we can insert a batch of
// canonicial blocks. Avoid firing too much ChainHeadEvents,
// canonical blocks. Avoid firing too much ChainHeadEvents,
// we will fire an accumulated ChainHeadEvent and disable fire
// event here.
if emitHeadEvent {
Expand Down
67 changes: 65 additions & 2 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,39 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
return headerSub.ID
}

// NewSideBlockFilter creates a filter that fetches blocks that are imported into the chain with a non-canonical status.
// It is part of the filter package since polling goes with eth_getFilterChanges.
func (api *PublicFilterAPI) NewSideBlockFilter() rpc.ID {
var (
headers = make(chan *types.Header)
headerSub = api.events.SubscribeNewSideHeads(headers)
)

api.filtersMu.Lock()
api.filters[headerSub.ID] = &filter{typ: SideBlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
api.filtersMu.Unlock()

go func() {
for {
select {
case h := <-headers:
api.filtersMu.Lock()
if f, found := api.filters[headerSub.ID]; found {
f.hashes = append(f.hashes, h.Hash())
}
api.filtersMu.Unlock()
case <-headerSub.Err():
api.filtersMu.Lock()
delete(api.filters, headerSub.ID)
api.filtersMu.Unlock()
return
}
}
}()

return headerSub.ID
}

// NewHeads send a notification each time a new (header) block is appended to the chain.
func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
Expand Down Expand Up @@ -233,7 +266,37 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
return rpcSub, nil
}

// Logs creates a subscription that fires for all new log that match the given filter criteria.
// NewSideHeads send a notification each time a new non-canonical (header) block is written to the database.
func (api *PublicFilterAPI) NewSideHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
headers := make(chan *types.Header)
headersSub := api.events.SubscribeNewSideHeads(headers)

for {
select {
case h := <-headers:
notifier.Notify(rpcSub.ID, h)
case <-rpcSub.Err():
headersSub.Unsubscribe()
return
case <-notifier.Closed():
headersSub.Unsubscribe()
return
}
}
}()

return rpcSub, nil
}

// Logs creates a subscription that fires for all new logs that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
Expand Down Expand Up @@ -424,7 +487,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
f.deadline.Reset(deadline)

switch f.typ {
case PendingTransactionsSubscription, BlocksSubscription:
case PendingTransactionsSubscription, BlocksSubscription, SideBlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
Expand Down
1 change: 1 addition & 0 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Backend interface {

SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
Expand Down
52 changes: 51 additions & 1 deletion eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// SideBlocksSubscription queries blocks that are imported non-canonically
SideBlocksSubscription
// LastSubscription keeps track of the last index
LastIndexSubscription
)
Expand Down Expand Up @@ -93,6 +95,7 @@ type EventSystem struct {
rmLogsSub event.Subscription // Subscription for removed log event
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event
chainSideSub event.Subscription // Subscription for new side chain event

// Channels
install chan *subscription // install filter for event notification
Expand All @@ -102,6 +105,7 @@ type EventSystem struct {
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
chainSideCh chan core.ChainSideEvent // Channel to receive new side chain event
}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -121,17 +125,19 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainEvChanSize),
}

// Subscribe events
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.chainSideSub = m.backend.SubscribeChainSideEvent(m.chainSideCh)
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)

// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.chainSideSub == nil || m.pendingLogsSub == nil {
log.Crit("Subscribe for event system failed")
}

Expand Down Expand Up @@ -290,6 +296,22 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
return es.subscribe(sub)
}

// SubscribeNewSideHeads creates a subscription that writes the header of a block that is
// imported as a side chain.
func (es *EventSystem) SubscribeNewSideHeads(headers chan *types.Header) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: SideBlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribePendingTxs creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
Expand Down Expand Up @@ -366,6 +388,25 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
}
}

func (es *EventSystem) handleChainSideEvent(filters filterIndex, ev core.ChainSideEvent) {
for _, f := range filters[SideBlocksSubscription] {
f.headers <- ev.Block.Header()
}
// Handle filtered log eventing similarly to the newHead event, except that 'remove' will always be set to true
// (indicating the logs come from a non-canonical block).
// When newHeads and newSideHeads are subscribed to at the same time, this can result in certain logs being broadcast
// repetitiously.
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewSideHead(ev.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
})
}
}

func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
oldh := es.lastHead
es.lastHead = newHeader
Expand Down Expand Up @@ -399,6 +440,10 @@ func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func
}
}

func (es *EventSystem) lightFilterNewSideHead(header *types.Header, callBack func(*types.Header, bool)) {
callBack(header, true)
}

// filter logs of a single header in light client mode
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
if bloomFilter(header.Bloom, addresses, topics) {
Expand Down Expand Up @@ -448,6 +493,7 @@ func (es *EventSystem) eventLoop() {
es.rmLogsSub.Unsubscribe()
es.pendingLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
es.chainSideSub.Unsubscribe()
}()

index := make(filterIndex)
Expand All @@ -467,6 +513,8 @@ func (es *EventSystem) eventLoop() {
es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
es.handleChainEvent(index, ev)
case ev := <-es.chainSideCh:
es.handleChainSideEvent(index, ev)

case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
Expand Down Expand Up @@ -497,6 +545,8 @@ func (es *EventSystem) eventLoop() {
return
case <-es.chainSub.Err():
return
case <-es.chainSideSub.Err():
return
}
}
}
61 changes: 61 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type testBackend struct {
rmLogsFeed event.Feed
pendingLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
}

func (b *testBackend) ChainDb() ethdb.Database {
Expand Down Expand Up @@ -123,6 +124,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}

func (b *testBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
return b.chainSideFeed.Subscribe(ch)
}

func (b *testBackend) BloomStatus() (uint64, uint64) {
return vars.BloomBitsBlocks, b.sections
}
Expand Down Expand Up @@ -210,6 +215,62 @@ func TestBlockSubscription(t *testing.T) {
<-sub1.Err()
}

// TestSideBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
// - one that is created after a cutoff moment and uninstalled after a second cutoff moment (blockHashes[cutoff1:cutoff2])
// - one that is created after the second cutoff moment (blockHashes[cutoff2:])
func TestSideBlockSubscription(t *testing.T) {
t.Parallel()

var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
genesis = core.MustCommitGenesis(db, new(genesisT.Genesis))
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
chainSideEvents = []core.ChainSideEvent{}
)

for _, blk := range chain {
chainSideEvents = append(chainSideEvents, core.ChainSideEvent{Block: blk})
}

chan0 := make(chan *types.Header)
sub0 := api.events.SubscribeNewSideHeads(chan0)
chan1 := make(chan *types.Header)
sub1 := api.events.SubscribeNewSideHeads(chan1)

go func() { // simulate client
i1, i2 := 0, 0
for i1 != len(chainSideEvents) || i2 != len(chainSideEvents) {
select {
case header := <-chan0:
if chainSideEvents[i1].Block.Hash() != header.Hash() {
t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainSideEvents[i1].Block.Hash(), header.Hash())
}
i1++
case header := <-chan1:
if chainSideEvents[i2].Block.Hash() != header.Hash() {
t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainSideEvents[i2].Block.Hash(), header.Hash())
}
i2++
}
}

sub0.Unsubscribe()
sub1.Unsubscribe()
}()

time.Sleep(1 * time.Second)
for _, e := range chainSideEvents {
backend.chainSideFeed.Send(e)
}

<-sub0.Err()
<-sub1.Err()
}

// TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
func TestPendingTxFilter(t *testing.T) {
t.Parallel()
Expand Down
6 changes: 6 additions & 0 deletions ethclient/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ func (ec *Client) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header)
return ec.c.EthSubscribe(ctx, ch, "newHeads")
}

// SubscribeNewSideHead subscribes to notifications about the current blockchain head
// on the given channel.
func (ec *Client) SubscribeNewSideHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newSideHeads")
}

// State Access

// NetworkID returns the network ID (also known as the chain ID) for this chain.
Expand Down
Loading

0 comments on commit 9422e73

Please sign in to comment.