Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters,ethclient,node: install newSideHeads subscription #293

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,33 @@ func (b *SimulatedBackend) SubscribeNewHead(ctx context.Context, ch chan<- *type
}), nil
}

// SubscribeNewHead returns an event subscription for a new header imported as non-canonical (side status).
func (b *SimulatedBackend) SubscribeNewSideHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
// subscribe to a new head
sink := make(chan *types.Header)
sub := b.events.SubscribeNewSideHeads(sink)

return event.NewSubscription(func(quit <-chan struct{}) error {
defer sub.Unsubscribe()
for {
select {
case head := <-sink:
select {
case ch <- head:
case err := <-sub.Err():
return err
case <-quit:
return nil
}
case err := <-sub.Err():
return err
case <-quit:
return nil
}
}
}), nil
}

// AdjustTime adds a time shift to the simulated clock.
// It can only be called on empty blocks.
func (b *SimulatedBackend) AdjustTime(adjustment time.Duration) error {
Expand Down Expand Up @@ -770,6 +797,10 @@ func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Su
return fb.bc.SubscribeChainEvent(ch)
}

func (fb *filterBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
return fb.bc.SubscribeChainSideEvent(ch)
}

func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return fb.bc.SubscribeRemovedLogsEvent(ch)
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// In theory we should fire a ChainHeadEvent when we inject
// a canonical block, but sometimes we can insert a batch of
// canonicial blocks. Avoid firing too much ChainHeadEvents,
// canonical blocks. Avoid firing too much ChainHeadEvents,
// we will fire an accumulated ChainHeadEvent and disable fire
// event here.
if emitHeadEvent {
Expand Down
67 changes: 65 additions & 2 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,39 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
return headerSub.ID
}

// NewSideBlockFilter creates a filter that fetches blocks that are imported into the chain with a non-canonical status.
// It is part of the filter package since polling goes with eth_getFilterChanges.
func (api *PublicFilterAPI) NewSideBlockFilter() rpc.ID {
var (
headers = make(chan *types.Header)
headerSub = api.events.SubscribeNewSideHeads(headers)
)

api.filtersMu.Lock()
api.filters[headerSub.ID] = &filter{typ: SideBlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
api.filtersMu.Unlock()

go func() {
for {
select {
case h := <-headers:
api.filtersMu.Lock()
if f, found := api.filters[headerSub.ID]; found {
f.hashes = append(f.hashes, h.Hash())
}
api.filtersMu.Unlock()
case <-headerSub.Err():
api.filtersMu.Lock()
delete(api.filters, headerSub.ID)
api.filtersMu.Unlock()
return
}
}
}()

return headerSub.ID
}

// NewHeads send a notification each time a new (header) block is appended to the chain.
func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
Expand Down Expand Up @@ -233,7 +266,37 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
return rpcSub, nil
}

// Logs creates a subscription that fires for all new log that match the given filter criteria.
// NewHeads send a notification each time a new (header) block is appended to the chain.
ziogaschr marked this conversation as resolved.
Show resolved Hide resolved
func (api *PublicFilterAPI) NewSideHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
headers := make(chan *types.Header)
headersSub := api.events.SubscribeNewSideHeads(headers)

for {
select {
case h := <-headers:
notifier.Notify(rpcSub.ID, h)
case <-rpcSub.Err():
headersSub.Unsubscribe()
return
case <-notifier.Closed():
headersSub.Unsubscribe()
return
}
}
}()

return rpcSub, nil
}

// Logs creates a subscription that fires for all new logs that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
Expand Down Expand Up @@ -424,7 +487,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
f.deadline.Reset(deadline)

switch f.typ {
case PendingTransactionsSubscription, BlocksSubscription:
case PendingTransactionsSubscription, BlocksSubscription, SideBlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
Expand Down
1 change: 1 addition & 0 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Backend interface {

SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
Expand Down
52 changes: 51 additions & 1 deletion eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// SideBlocksSubscription queries blocks that are imported non-canonically
SideBlocksSubscription
// LastSubscription keeps track of the last index
LastIndexSubscription
)
Expand Down Expand Up @@ -93,6 +95,7 @@ type EventSystem struct {
rmLogsSub event.Subscription // Subscription for removed log event
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event
chainSideSub event.Subscription // Subscription for new side chain event

// Channels
install chan *subscription // install filter for event notification
Expand All @@ -102,6 +105,7 @@ type EventSystem struct {
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
chainSideCh chan core.ChainSideEvent // Channel to receive new side chain event
}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -121,17 +125,19 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainEvChanSize),
}

// Subscribe events
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.chainSideSub = m.backend.SubscribeChainSideEvent(m.chainSideCh)
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)

// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.chainSideSub == nil || m.pendingLogsSub == nil {
log.Crit("Subscribe for event system failed")
}

Expand Down Expand Up @@ -290,6 +296,22 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
return es.subscribe(sub)
}

// SubscribeNewSideHeads creates a subscription that writes the header of a block that is
// imported as a side chain.
func (es *EventSystem) SubscribeNewSideHeads(headers chan *types.Header) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: SideBlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribePendingTxs creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
Expand Down Expand Up @@ -366,6 +388,25 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
}
}

func (es *EventSystem) handleChainSideEvent(filters filterIndex, ev core.ChainSideEvent) {
for _, f := range filters[SideBlocksSubscription] {
f.headers <- ev.Block.Header()
}
// Handle filtered log eventing similarly to the newHead event, except that 'remove' will always be set to true
// (indicating the logs come from a non-canonical block).
// When newHeads and newSideHeads are subscribed to at the same time, this can result in certain logs being broadcast
// repetitiously.
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewSideHead(ev.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
})
}
}

func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
oldh := es.lastHead
es.lastHead = newHeader
Expand Down Expand Up @@ -399,6 +440,10 @@ func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func
}
}

func (es *EventSystem) lightFilterNewSideHead(header *types.Header, callBack func(*types.Header, bool)) {
callBack(header, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is noteworthy; logs from side chain blocks will always be marked Removed: true. newHeads handling of this uses Removed to indicate the same characteristic, but in that context is expected to have been canonical, and is now removed to the side. Although the reuse of this field (and vocabulary) is a little awkward, I think it's the best we can do without modifying the existing API data type.

}

// filter logs of a single header in light client mode
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
if bloomFilter(header.Bloom, addresses, topics) {
Expand Down Expand Up @@ -448,6 +493,7 @@ func (es *EventSystem) eventLoop() {
es.rmLogsSub.Unsubscribe()
es.pendingLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
es.chainSideSub.Unsubscribe()
}()

index := make(filterIndex)
Expand All @@ -467,6 +513,8 @@ func (es *EventSystem) eventLoop() {
es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
es.handleChainEvent(index, ev)
case ev := <-es.chainSideCh:
es.handleChainSideEvent(index, ev)

case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
Expand Down Expand Up @@ -497,6 +545,8 @@ func (es *EventSystem) eventLoop() {
return
case <-es.chainSub.Err():
return
case <-es.chainSideSub.Err():
return
}
}
}
61 changes: 61 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type testBackend struct {
rmLogsFeed event.Feed
pendingLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
}

func (b *testBackend) ChainDb() ethdb.Database {
Expand Down Expand Up @@ -123,6 +124,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}

func (b *testBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
return b.chainSideFeed.Subscribe(ch)
}

func (b *testBackend) BloomStatus() (uint64, uint64) {
return vars.BloomBitsBlocks, b.sections
}
Expand Down Expand Up @@ -210,6 +215,62 @@ func TestBlockSubscription(t *testing.T) {
<-sub1.Err()
}

// TestSideBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
// - one that is created after a cutoff moment and uninstalled after a second cutoff moment (blockHashes[cutoff1:cutoff2])
// - one that is created after the second cutoff moment (blockHashes[cutoff2:])
func TestSideBlockSubscription(t *testing.T) {
t.Parallel()

var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
genesis = core.MustCommitGenesis(db, new(genesisT.Genesis))
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
chainSideEvents = []core.ChainSideEvent{}
)

for _, blk := range chain {
chainSideEvents = append(chainSideEvents, core.ChainSideEvent{Block: blk})
}

chan0 := make(chan *types.Header)
sub0 := api.events.SubscribeNewSideHeads(chan0)
chan1 := make(chan *types.Header)
sub1 := api.events.SubscribeNewSideHeads(chan1)

go func() { // simulate client
i1, i2 := 0, 0
for i1 != len(chainSideEvents) || i2 != len(chainSideEvents) {
select {
case header := <-chan0:
if chainSideEvents[i1].Block.Hash() != header.Hash() {
t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainSideEvents[i1].Block.Hash(), header.Hash())
}
i1++
case header := <-chan1:
if chainSideEvents[i2].Block.Hash() != header.Hash() {
t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainSideEvents[i2].Block.Hash(), header.Hash())
}
i2++
}
}

sub0.Unsubscribe()
sub1.Unsubscribe()
}()

time.Sleep(1 * time.Second)
for _, e := range chainSideEvents {
backend.chainSideFeed.Send(e)
}

<-sub0.Err()
<-sub1.Err()
}

// TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
func TestPendingTxFilter(t *testing.T) {
t.Parallel()
Expand Down
6 changes: 6 additions & 0 deletions ethclient/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ func (ec *Client) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header)
return ec.c.EthSubscribe(ctx, ch, "newHeads")
}

// SubscribeNewSideHead subscribes to notifications about the current blockchain head
// on the given channel.
func (ec *Client) SubscribeNewSideHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newSideHeads")
}

// State Access

// NetworkID returns the network ID (also known as the chain ID) for this chain.
Expand Down
Loading