Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor events subsystem #7000

Merged
merged 9 commits into from
Aug 31, 2021
1 change: 1 addition & 0 deletions api/api_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Gateway interface {
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error)
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*HeadChange, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
Expand Down
13 changes: 13 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
1 change: 1 addition & 0 deletions build/params_2k.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build debug || 2k
// +build debug 2k

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_butterfly.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build butterflynet
// +build butterflynet

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_calibnet.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build calibnet
// +build calibnet

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_debug.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build debug
// +build debug

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_interop.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build interopnet
// +build interopnet

package build
Expand Down
9 changes: 2 additions & 7 deletions build/params_mainnet.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
// +build !debug
// +build !2k
// +build !testground
// +build !calibnet
// +build !nerpanet
// +build !butterflynet
// +build !interopnet
//go:build !debug && !2k && !testground && !calibnet && !nerpanet && !butterflynet && !interopnet
// +build !debug,!2k,!testground,!calibnet,!nerpanet,!butterflynet,!interopnet

package build

Expand Down
1 change: 1 addition & 0 deletions build/params_nerpanet.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build nerpanet
// +build nerpanet

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_shared_vals.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !testground
// +build !testground

package build
Expand Down
1 change: 1 addition & 0 deletions build/params_testground.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build testground
// +build testground

// This file makes hardcoded parameters (const) configurable as vars.
Expand Down
3 changes: 2 additions & 1 deletion build/tools.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//+build tools
//go:build tools
// +build tools

package build

Expand Down
33 changes: 33 additions & 0 deletions chain/events/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package events

import (
"context"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)

type uncachedAPI interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)

StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
}

type cache struct {
*tipSetCache
*messageCache
uncachedAPI
}

func newCache(api EventAPI, gcConfidence abi.ChainEpoch) *cache {
return &cache{
newTSCache(api, gcConfidence),
newMessageCache(api),
api,
}
}
201 changes: 17 additions & 184 deletions chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@ package events

import (
"context"
"sync"
"time"

"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)

Expand All @@ -25,209 +21,46 @@ type (
RevertHandler func(ctx context.Context, ts *types.TipSet) error
)

type heightHandler struct {
confidence int
called bool

handle HeightHandler
revert RevertHandler
// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, from, to *types.TipSet) error
Revert(ctx context.Context, from, to *types.TipSet) error
}

type EventAPI interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainHead(context.Context) (*types.TipSet, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)

StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
}

type Events struct {
api EventAPI

tsc *tipSetCache
lk sync.Mutex

ready chan struct{}
readyOnce sync.Once

heightEvents
*observer
*heightEvents
*hcEvents

observers []TipSetObserver
}

func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi.ChainEpoch) *Events {
tsc := newTSCache(gcConfidence, api)
func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi.ChainEpoch) (*Events, error) {
cache := newCache(api, gcConfidence)

e := &Events{
api: api,

tsc: tsc,

heightEvents: heightEvents{
tsc: tsc,
ctx: ctx,
gcConfidence: gcConfidence,

heightTriggers: map[uint64]*heightHandler{},
htTriggerHeights: map[abi.ChainEpoch][]uint64{},
htHeights: map[abi.ChainEpoch][]uint64{},
},

hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
observers: []TipSetObserver{},
ob := newObserver(cache, gcConfidence)
if err := ob.start(ctx); err != nil {
return nil, err
}

go e.listenHeadChanges(ctx)
he := newHeightEvents(cache, ob, gcConfidence)
headChange := newHCEvents(cache, ob)

// Wait for the first tipset to be seen or bail if shutting down
select {
case <-e.ready:
case <-ctx.Done():
}

return e
return &Events{ob, he, headChange}, nil
}

func NewEvents(ctx context.Context, api EventAPI) *Events {
func NewEvents(ctx context.Context, api EventAPI) (*Events, error) {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, gcConfidence)
}

func (e *Events) listenHeadChanges(ctx context.Context) {
for {
if err := e.listenHeadChangesOnce(ctx); err != nil {
log.Errorf("listen head changes errored: %s", err)
} else {
log.Warn("listenHeadChanges quit")
}
select {
case <-build.Clock.After(time.Second):
case <-ctx.Done():
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}

log.Info("restarting listenHeadChanges")
}
}

func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// Retry is handled by caller
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}

var cur []*api.HeadChange
var ok bool

// Wait for first tipset or bail
select {
case cur, ok = <-notifs:
if !ok {
return xerrors.Errorf("notification channel closed")
}
case <-ctx.Done():
return ctx.Err()
}

if len(cur) != 1 {
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
}

if cur[0].Type != store.HCCurrent {
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
}

if err := e.tsc.add(cur[0].Val); err != nil {
log.Warnf("tsc.add: adding current tipset failed: %v", err)
}

e.readyOnce.Do(func() {
e.lastTs = cur[0].Val
// Signal that we have seen first tipset
close(e.ready)
})

for notif := range notifs {
var rev, app []*types.TipSet
for _, notif := range notif {
switch notif.Type {
case store.HCRevert:
rev = append(rev, notif.Val)
case store.HCApply:
app = append(app, notif.Val)
default:
log.Warnf("unexpected head change notification type: '%s'", notif.Type)
}
}

if err := e.headChange(ctx, rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}

// sync with fake chainstore (for tests)
if fcs, ok := e.api.(interface{ notifDone() }); ok {
fcs.notifDone()
}
}

return nil
}

func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")
}

e.lk.Lock()
defer e.lk.Unlock()

if err := e.headChangeAt(rev, app); err != nil {
return err
}

if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}
return e.processHeadChangeEvent(rev, app)
}

// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}

// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}

// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}

for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}

return nil
}
Loading