Skip to content

Commit

Permalink
chore(events): improve perf for parallel event filter matching
Browse files Browse the repository at this point in the history
1. Cache address look-ups for the given tipset across filters
2. Run the filters in an errgroup
  • Loading branch information
rvagg committed Nov 4, 2024
1 parent 773efae commit e01d209
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Implement new `lotus f3` CLI commands to list F3 participants, dump manifest, get/list finality certificates and check the F3 status. ([filecoin-project/lotus#12617](https://github.com/filecoin-project/lotus/pull/12617), [filecoin-project/lotus#12627](https://github.com/filecoin-project/lotus/pull/12627))
- Return a `"data"` field on the `"error"` returned from RPC when `eth_call` and `eth_estimateGas` APIs encounter `execution reverted` errors. ([filecoin-project/lotus#12553](https://github.com/filecoin-project/lotus/pull/12553))
- Implement `EthGetTransactionByBlockNumberAndIndex` (`eth_getTransactionByBlockNumberAndIndex`) and `EthGetTransactionByBlockHashAndIndex` (`eth_getTransactionByBlockHashAndIndex`) methods. ([filecoin-project/lotus#12618](https://github.com/filecoin-project/lotus/pull/12618))
- Improve eth filter performance for nodes serving many clients. ([filecoin-project/lotus#12603](https://github.com/filecoin-project/lotus/pull/12603))

## Bug Fixes
- Fix a bug in the `lotus-shed indexes backfill-events` command that may result in either duplicate events being backfilled where there are existing events (such an operation *should* be idempotent) or events erroneously having duplicate `logIndex` values when queried via ETH APIs. ([filecoin-project/lotus#12567](https://github.com/filecoin-project/lotus/pull/12567))
Expand Down
73 changes: 50 additions & 23 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand All @@ -28,7 +29,10 @@ func isIndexedValue(b uint8) bool {
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}

type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool)
// AddressResolver is a function that resolves an actor ID to an address. If the
// actor ID cannot be resolved to an address, the function should return
// address.Undef.
type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) address.Address

type EventFilter interface {
Filter
Expand Down Expand Up @@ -77,9 +81,6 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil
}

// cache of lookups between actor id and f4 address
addressLookups := make(map[abi.ActorID]address.Address)

ems, err := te.messages(ctx)
if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
Expand All @@ -89,16 +90,10 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever

for msgIdx, em := range ems {
for _, ev := range em.Events() {
// lookup address corresponding to the actor id
addr, found := addressLookups[ev.Emitter]
if !found {
var ok bool
addr, ok = resolver(ctx, ev.Emitter, te.rctTs)
if !ok {
// not an address we will be able to match against
continue
}
addressLookups[ev.Emitter] = addr
addr := resolver(ctx, ev.Emitter, te.rctTs)
if addr == address.Undef {
// not an address we will be able to match against
continue
}

if !f.matchAddress(addr) {
Expand Down Expand Up @@ -295,7 +290,7 @@ func (e *executedMessage) Events() []*types.Event {

type EventFilterManager struct {
ChainStore *cstore.ChainStore
AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)
AddressResolver AddressResolver
MaxFilterResults int
ChainIndexer index.Indexer

Expand All @@ -319,11 +314,18 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
load: m.loadExecutedMessages,
}

// TODO: could run this loop in parallel with errgroup if there are many filters
tsAddressResolver := m.createTipSetCachedAddressResolver()

g, ctx := errgroup.WithContext(ctx)
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
return err
}
f := f
g.Go(func() error {
return f.CollectEvents(ctx, tse, false, tsAddressResolver)
})
}

if err := g.Wait(); err != nil {
return err
}

return nil
Expand All @@ -344,16 +346,41 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
load: m.loadExecutedMessages,
}

// TODO: could run this loop in parallel with errgroup if there are many filters
tsAddressResolver := m.createTipSetCachedAddressResolver()

g, ctx := errgroup.WithContext(ctx)
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
return err
}
f := f
g.Go(func() error {
return f.CollectEvents(ctx, tse, true, tsAddressResolver)
})
}

if err := g.Wait(); err != nil {
return err
}

return nil
}

func (m *EventFilterManager) createTipSetCachedAddressResolver() func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
addressLookups := make(map[abi.ActorID]address.Address)
var addressLookupsLk sync.Mutex

return func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
addressLookupsLk.Lock()
defer addressLookupsLk.Unlock()

addr, ok := addressLookups[emitter]
if !ok {
addr = m.AddressResolver(ctx, emitter, ts)
addressLookups[emitter] = addr
}

return addr
}
}

func (m *EventFilterManager) Fill(
ctx context.Context,
minHeight,
Expand Down
7 changes: 5 additions & 2 deletions chain/events/filter/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,10 @@ func (a addressMap) add(actorID abi.ActorID, addr address.Address) {
a[actorID] = addr
}

func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
ra, ok := a[emitter]
return ra, ok
if ok {
return ra
}
return address.Undef
}
10 changes: 4 additions & 6 deletions node/modules/actorevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,18 @@ func EventFilterManager(cfg config.EventsConfig) func(helpers.MetricsCtx, repo.L
fm := &filter.EventFilterManager{
ChainStore: cs,
ChainIndexer: ci,
// TODO:
// We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
return address.Undef
}

actor, err := sm.LoadActor(ctx, idAddr, ts)
if err != nil || actor.DelegatedAddress == nil {
return idAddr, true
return idAddr
}

return *actor.DelegatedAddress, true
return *actor.DelegatedAddress
},

MaxFilterResults: cfg.MaxFilterResults,
Expand Down

0 comments on commit e01d209

Please sign in to comment.