Skip to content

Commit

Permalink
app: wire tracker (#857)
Browse files Browse the repository at this point in the history
Wire tracker in core workflow. Implement failed duty reporter.

Also fix concurrency deadlock between Tracker and Deadliner.

category: feature
ticket: #768
  • Loading branch information
corverroos authored Jul 27, 2022
1 parent 1adae1b commit 2237124
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 97 deletions.
28 changes: 28 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/obolnetwork/charon/core/parsigex"
"github.com/obolnetwork/charon/core/scheduler"
"github.com/obolnetwork/charon/core/sigagg"
"github.com/obolnetwork/charon/core/tracker"
"github.com/obolnetwork/charon/core/validatorapi"
"github.com/obolnetwork/charon/eth2util/keystore"
"github.com/obolnetwork/charon/p2p"
Expand Down Expand Up @@ -320,6 +321,11 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
pubshares = append(pubshares, eth2Share)
}

peers, err := lock.Peers()
if err != nil {
return err
}

sender := new(p2p.Sender)

sched, err := scheduler.New(corePubkeys, eth2Cl, conf.BuilderAPI)
Expand Down Expand Up @@ -365,6 +371,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
if err != nil {
return err
}
deadliner := core.NewDeadliner(ctx, deadlineFunc)

retryer, err := retry.New[core.Duty](deadlineFunc)
if err != nil {
Expand All @@ -376,6 +383,8 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

wireTracker(life, deadliner, peers, sched, fetch, cons, vapi, parSigDB, parSigEx, sigAgg)

core.Wire(sched, fetch, cons, dutyDB, vapi,
parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster,
core.WithTracing(),
Expand Down Expand Up @@ -403,6 +412,25 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return nil
}

// wireTracker creates a new tracker instance and wires it to the components with "output events".
func wireTracker(life *lifecycle.Manager, deadliner core.Deadliner, peers []p2p.Peer,
sched core.Scheduler, fetcher core.Fetcher, cons core.Consensus, vapi core.ValidatorAPI,
parSigDB core.ParSigDB, parSigEx core.ParSigEx, sigAgg core.SigAgg,
) {
trackr := tracker.New(deadliner, peers)

sched.Subscribe(trackr.SchedulerEvent)
fetcher.Subscribe(trackr.FetcherEvent)
cons.Subscribe(trackr.ConsensusEvent)
vapi.Subscribe(trackr.ValidatorAPIEvent)
parSigDB.SubscribeInternal(trackr.ParSigDBInternalEvent)
parSigDB.SubscribeThreshold(trackr.ParSigDBThresholdEvent)
parSigEx.Subscribe(trackr.ParSigExEvent)
sigAgg.Subscribe(trackr.SigAggEvent)

life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartTracker, lifecycle.HookFunc(trackr.Run))
}

// eth2PubKeys returns a list of BLS pubkeys of validators in the cluster lock.
func eth2PubKeys(validators []cluster.DistValidator) ([]eth2p0.BLSPubKey, error) {
var pubkeys []eth2p0.BLSPubKey
Expand Down
3 changes: 2 additions & 1 deletion app/lifecycle/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type OrderStop int

// Global ordering of start hooks.
const (
StartAggSigDB OrderStart = iota
StartTracker OrderStart = iota
StartAggSigDB
StartRelay
StartMonitoringAPI
StartValidatorAPI
Expand Down
21 changes: 11 additions & 10 deletions app/lifecycle/orderstart_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion app/simnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (
"github.com/obolnetwork/charon/testutil/beaconmock"
)

//go:generate go test . -run=TestSimnetNoNetwork_TekuVC -integration -v
//go:generate go test . -integration -v
var integration = flag.Bool("integration", false, "Enable docker based integration test")

func TestSimnetNoNetwork_WithAttesterTekuVC(t *testing.T) {
Expand Down
33 changes: 20 additions & 13 deletions core/deadline.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jonboulle/clockwork"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
)

// lateFactor defines the number of slots duties may be late.
Expand Down Expand Up @@ -56,21 +57,25 @@ type deadlineInput struct {
success chan<- bool
}

// Deadline implements the Deadliner interface.
type Deadline struct {
// deadliner implements the Deadliner interface.
type deadliner struct {
inputChan chan deadlineInput
deadlineChan chan Duty
clock clockwork.Clock
quit chan struct{}
}

// NewForT returns a Deadline for use in tests.
func NewForT(ctx context.Context, t *testing.T, deadlineFunc func(Duty) time.Time, clock clockwork.Clock) *Deadline {
func NewForT(ctx context.Context, t *testing.T, deadlineFunc func(Duty) time.Time, clock clockwork.Clock) Deadliner {
t.Helper()

d := &Deadline{
// outputBuffer big enough to support all duty types, which can expire at the same time
// while external consumer is synchronously adding duties (so not reading output).
const outputBuffer = 10

d := &deadliner{
inputChan: make(chan deadlineInput),
deadlineChan: make(chan Duty),
deadlineChan: make(chan Duty, outputBuffer),
clock: clock,
quit: make(chan struct{}),
}
Expand All @@ -83,8 +88,8 @@ func NewForT(ctx context.Context, t *testing.T, deadlineFunc func(Duty) time.Tim
// NewDeadliner returns a new instance of Deadline.
// It runs a goroutine which is responsible for reading and storing duties,
// and sending the deadlined duty to receiver's deadlineChan.
func NewDeadliner(ctx context.Context, deadlineFunc func(Duty) time.Time) *Deadline {
d := &Deadline{
func NewDeadliner(ctx context.Context, deadlineFunc func(Duty) time.Time) Deadliner {
d := &deadliner{
inputChan: make(chan deadlineInput),
deadlineChan: make(chan Duty),
clock: clockwork.NewRealClock(),
Expand All @@ -96,7 +101,7 @@ func NewDeadliner(ctx context.Context, deadlineFunc func(Duty) time.Time) *Deadl
return d
}

func (d *Deadline) run(ctx context.Context, deadlineFunc func(Duty) time.Time) {
func (d *deadliner) run(ctx context.Context, deadlineFunc func(Duty) time.Time) {
duties := make(map[Duty]bool)
currDuty, currDeadline := getCurrDuty(duties, deadlineFunc)
currTimer := d.clock.NewTimer(currDeadline.Sub(d.clock.Now()))
Expand Down Expand Up @@ -140,6 +145,8 @@ func (d *Deadline) run(ctx context.Context, deadlineFunc func(Duty) time.Time) {
case <-ctx.Done():
return
case d.deadlineChan <- currDuty:
default:
log.Warn(ctx, "Deadliner output channel full", nil)
}

delete(duties, currDuty)
Expand All @@ -149,25 +156,25 @@ func (d *Deadline) run(ctx context.Context, deadlineFunc func(Duty) time.Time) {
}

// Add adds a duty to be notified of the deadline. It returns true if the duty was added successfully.
func (d *Deadline) Add(duty Duty) bool {
res := make(chan bool)
func (d *deadliner) Add(duty Duty) bool {
success := make(chan bool)

select {
case <-d.quit:
return false
case d.inputChan <- deadlineInput{duty: duty, success: res}:
case d.inputChan <- deadlineInput{duty: duty, success: success}:
}

select {
case <-d.quit:
return false
case ok := <-res:
case ok := <-success:
return ok
}
}

// C returns the deadline channel.
func (d *Deadline) C() <-chan Duty {
func (d *deadliner) C() <-chan Duty {
return d.deadlineChan
}

Expand Down
2 changes: 1 addition & 1 deletion core/deadline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestDeadliner(t *testing.T) {
}

// sendDuties runs a goroutine which adds the duties to the deadliner channel.
func addDuties(t *testing.T, wg *sync.WaitGroup, duties []core.Duty, expected bool, deadliner *core.Deadline) {
func addDuties(t *testing.T, wg *sync.WaitGroup, duties []core.Duty, expected bool, deadliner core.Deadliner) {
t.Helper()

wg.Add(1)
Expand Down
21 changes: 15 additions & 6 deletions core/tracker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,18 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

var participationGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "core",
Subsystem: "tracker",
Name: "participation",
Help: "Set to 1 if peer participated successfully for the given duty or else 0",
}, []string{"duty", "peer"})
var (
participationGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "core",
Subsystem: "tracker",
Name: "participation",
Help: "Set to 1 if peer participated successfully for the given duty or else 0",
}, []string{"duty", "peer"})

failedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "core",
Subsystem: "tracker",
Name: "failed_duties_total",
Help: "Total number of failed duties by component",
}, []string{"duty", "component"})
)
Loading

0 comments on commit 2237124

Please sign in to comment.