From 2237124ada48a4f61e2ee99ddd42268e574d6c5e Mon Sep 17 00:00:00 2001 From: corverroos Date: Wed, 27 Jul 2022 14:44:35 +0200 Subject: [PATCH] app: wire tracker (#857) Wire tracker in core workflow. Implement failed duty reporter. Also fix concurrency deadlock between Tracker and Deadliner. category: feature ticket: #768 --- app/app.go | 28 ++++++ app/lifecycle/order.go | 3 +- app/lifecycle/orderstart_string.go | 21 ++--- app/simnet_test.go | 2 +- core/deadline.go | 33 ++++--- core/deadline_test.go | 2 +- core/tracker/metrics.go | 21 +++-- core/tracker/tracker.go | 125 +++++++++++++------------- core/tracker/tracker_internal_test.go | 10 +-- 9 files changed, 148 insertions(+), 97 deletions(-) diff --git a/app/app.go b/app/app.go index 9dc79fc78..6fc132633 100644 --- a/app/app.go +++ b/app/app.go @@ -57,6 +57,7 @@ import ( "github.com/obolnetwork/charon/core/parsigex" "github.com/obolnetwork/charon/core/scheduler" "github.com/obolnetwork/charon/core/sigagg" + "github.com/obolnetwork/charon/core/tracker" "github.com/obolnetwork/charon/core/validatorapi" "github.com/obolnetwork/charon/eth2util/keystore" "github.com/obolnetwork/charon/p2p" @@ -320,6 +321,11 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, pubshares = append(pubshares, eth2Share) } + peers, err := lock.Peers() + if err != nil { + return err + } + sender := new(p2p.Sender) sched, err := scheduler.New(corePubkeys, eth2Cl, conf.BuilderAPI) @@ -365,6 +371,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, if err != nil { return err } + deadliner := core.NewDeadliner(ctx, deadlineFunc) retryer, err := retry.New[core.Duty](deadlineFunc) if err != nil { @@ -376,6 +383,8 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, return err } + wireTracker(life, deadliner, peers, sched, fetch, cons, vapi, parSigDB, parSigEx, sigAgg) + core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, core.WithTracing(), @@ -403,6 +412,25 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, return nil } +// wireTracker creates a new tracker instance and wires it to the components with "output events". +func wireTracker(life *lifecycle.Manager, deadliner core.Deadliner, peers []p2p.Peer, + sched core.Scheduler, fetcher core.Fetcher, cons core.Consensus, vapi core.ValidatorAPI, + parSigDB core.ParSigDB, parSigEx core.ParSigEx, sigAgg core.SigAgg, +) { + trackr := tracker.New(deadliner, peers) + + sched.Subscribe(trackr.SchedulerEvent) + fetcher.Subscribe(trackr.FetcherEvent) + cons.Subscribe(trackr.ConsensusEvent) + vapi.Subscribe(trackr.ValidatorAPIEvent) + parSigDB.SubscribeInternal(trackr.ParSigDBInternalEvent) + parSigDB.SubscribeThreshold(trackr.ParSigDBThresholdEvent) + parSigEx.Subscribe(trackr.ParSigExEvent) + sigAgg.Subscribe(trackr.SigAggEvent) + + life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartTracker, lifecycle.HookFunc(trackr.Run)) +} + // eth2PubKeys returns a list of BLS pubkeys of validators in the cluster lock. func eth2PubKeys(validators []cluster.DistValidator) ([]eth2p0.BLSPubKey, error) { var pubkeys []eth2p0.BLSPubKey diff --git a/app/lifecycle/order.go b/app/lifecycle/order.go index 54b674aa2..c91627a4d 100644 --- a/app/lifecycle/order.go +++ b/app/lifecycle/order.go @@ -26,7 +26,8 @@ type OrderStop int // Global ordering of start hooks. const ( - StartAggSigDB OrderStart = iota + StartTracker OrderStart = iota + StartAggSigDB StartRelay StartMonitoringAPI StartValidatorAPI diff --git a/app/lifecycle/orderstart_string.go b/app/lifecycle/orderstart_string.go index 940d0078a..6c8d86cd0 100644 --- a/app/lifecycle/orderstart_string.go +++ b/app/lifecycle/orderstart_string.go @@ -23,19 +23,20 @@ func _() { // An "invalid array index" compiler error signifies that the constant values have changed. // Re-run the stringer command to generate them again. var x [1]struct{} - _ = x[StartAggSigDB-0] - _ = x[StartRelay-1] - _ = x[StartMonitoringAPI-2] - _ = x[StartValidatorAPI-3] - _ = x[StartP2PPing-4] - _ = x[StartP2PConsensus-5] - _ = x[StartSimulator-6] - _ = x[StartScheduler-7] + _ = x[StartTracker-0] + _ = x[StartAggSigDB-1] + _ = x[StartRelay-2] + _ = x[StartMonitoringAPI-3] + _ = x[StartValidatorAPI-4] + _ = x[StartP2PPing-5] + _ = x[StartP2PConsensus-6] + _ = x[StartSimulator-7] + _ = x[StartScheduler-8] } -const _OrderStart_name = "AggSigDBRelayMonitoringAPIValidatorAPIP2PPingP2PConsensusSimulatorScheduler" +const _OrderStart_name = "TrackerAggSigDBRelayMonitoringAPIValidatorAPIP2PPingP2PConsensusSimulatorScheduler" -var _OrderStart_index = [...]uint8{0, 8, 13, 26, 38, 45, 57, 66, 75} +var _OrderStart_index = [...]uint8{0, 7, 15, 20, 33, 45, 52, 64, 73, 82} func (i OrderStart) String() string { if i < 0 || i >= OrderStart(len(_OrderStart_index)-1) { diff --git a/app/simnet_test.go b/app/simnet_test.go index 1788c30ac..6777904d9 100644 --- a/app/simnet_test.go +++ b/app/simnet_test.go @@ -47,7 +47,7 @@ import ( "github.com/obolnetwork/charon/testutil/beaconmock" ) -//go:generate go test . -run=TestSimnetNoNetwork_TekuVC -integration -v +//go:generate go test . -integration -v var integration = flag.Bool("integration", false, "Enable docker based integration test") func TestSimnetNoNetwork_WithAttesterTekuVC(t *testing.T) { diff --git a/core/deadline.go b/core/deadline.go index 8e316b24b..db81a89f2 100644 --- a/core/deadline.go +++ b/core/deadline.go @@ -24,6 +24,7 @@ import ( "github.com/jonboulle/clockwork" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/log" ) // lateFactor defines the number of slots duties may be late. @@ -56,8 +57,8 @@ type deadlineInput struct { success chan<- bool } -// Deadline implements the Deadliner interface. -type Deadline struct { +// deadliner implements the Deadliner interface. +type deadliner struct { inputChan chan deadlineInput deadlineChan chan Duty clock clockwork.Clock @@ -65,12 +66,16 @@ type Deadline struct { } // NewForT returns a Deadline for use in tests. -func NewForT(ctx context.Context, t *testing.T, deadlineFunc func(Duty) time.Time, clock clockwork.Clock) *Deadline { +func NewForT(ctx context.Context, t *testing.T, deadlineFunc func(Duty) time.Time, clock clockwork.Clock) Deadliner { t.Helper() - d := &Deadline{ + // outputBuffer big enough to support all duty types, which can expire at the same time + // while external consumer is synchronously adding duties (so not reading output). + const outputBuffer = 10 + + d := &deadliner{ inputChan: make(chan deadlineInput), - deadlineChan: make(chan Duty), + deadlineChan: make(chan Duty, outputBuffer), clock: clock, quit: make(chan struct{}), } @@ -83,8 +88,8 @@ func NewForT(ctx context.Context, t *testing.T, deadlineFunc func(Duty) time.Tim // NewDeadliner returns a new instance of Deadline. // It runs a goroutine which is responsible for reading and storing duties, // and sending the deadlined duty to receiver's deadlineChan. -func NewDeadliner(ctx context.Context, deadlineFunc func(Duty) time.Time) *Deadline { - d := &Deadline{ +func NewDeadliner(ctx context.Context, deadlineFunc func(Duty) time.Time) Deadliner { + d := &deadliner{ inputChan: make(chan deadlineInput), deadlineChan: make(chan Duty), clock: clockwork.NewRealClock(), @@ -96,7 +101,7 @@ func NewDeadliner(ctx context.Context, deadlineFunc func(Duty) time.Time) *Deadl return d } -func (d *Deadline) run(ctx context.Context, deadlineFunc func(Duty) time.Time) { +func (d *deadliner) run(ctx context.Context, deadlineFunc func(Duty) time.Time) { duties := make(map[Duty]bool) currDuty, currDeadline := getCurrDuty(duties, deadlineFunc) currTimer := d.clock.NewTimer(currDeadline.Sub(d.clock.Now())) @@ -140,6 +145,8 @@ func (d *Deadline) run(ctx context.Context, deadlineFunc func(Duty) time.Time) { case <-ctx.Done(): return case d.deadlineChan <- currDuty: + default: + log.Warn(ctx, "Deadliner output channel full", nil) } delete(duties, currDuty) @@ -149,25 +156,25 @@ func (d *Deadline) run(ctx context.Context, deadlineFunc func(Duty) time.Time) { } // Add adds a duty to be notified of the deadline. It returns true if the duty was added successfully. -func (d *Deadline) Add(duty Duty) bool { - res := make(chan bool) +func (d *deadliner) Add(duty Duty) bool { + success := make(chan bool) select { case <-d.quit: return false - case d.inputChan <- deadlineInput{duty: duty, success: res}: + case d.inputChan <- deadlineInput{duty: duty, success: success}: } select { case <-d.quit: return false - case ok := <-res: + case ok := <-success: return ok } } // C returns the deadline channel. -func (d *Deadline) C() <-chan Duty { +func (d *deadliner) C() <-chan Duty { return d.deadlineChan } diff --git a/core/deadline_test.go b/core/deadline_test.go index 472d211ca..364abda0a 100644 --- a/core/deadline_test.go +++ b/core/deadline_test.go @@ -77,7 +77,7 @@ func TestDeadliner(t *testing.T) { } // sendDuties runs a goroutine which adds the duties to the deadliner channel. -func addDuties(t *testing.T, wg *sync.WaitGroup, duties []core.Duty, expected bool, deadliner *core.Deadline) { +func addDuties(t *testing.T, wg *sync.WaitGroup, duties []core.Duty, expected bool, deadliner core.Deadliner) { t.Helper() wg.Add(1) diff --git a/core/tracker/metrics.go b/core/tracker/metrics.go index 39d5f4a98..bf8a2c8f7 100644 --- a/core/tracker/metrics.go +++ b/core/tracker/metrics.go @@ -20,9 +20,18 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -var participationGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "core", - Subsystem: "tracker", - Name: "participation", - Help: "Set to 1 if peer participated successfully for the given duty or else 0", -}, []string{"duty", "peer"}) +var ( + participationGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "core", + Subsystem: "tracker", + Name: "participation", + Help: "Set to 1 if peer participated successfully for the given duty or else 0", + }, []string{"duty", "peer"}) + + failedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "core", + Subsystem: "tracker", + Name: "failed_duties_total", + Help: "Total number of failed duties by component", + }, []string{"duty", "component"}) +) diff --git a/core/tracker/tracker.go b/core/tracker/tracker.go index 5efbb7daf..a635ec6fe 100644 --- a/core/tracker/tracker.go +++ b/core/tracker/tracker.go @@ -67,11 +67,11 @@ type Tracker struct { deadliner core.Deadliner quit chan struct{} - // failedDutyReporter instruments the duty. It ignores non-failed duties. - failedDutyReporter func(core.Duty, bool, string, string) + // failedDutyReporter instruments duty failures. + failedDutyReporter func(ctx context.Context, duty core.Duty, failed bool, component component, reason string) - // participationReporter logs and instruments the participation. - participationReporter func(context.Context, core.Duty, map[int]bool) + // participationReporter instruments duty peer participation. + participationReporter func(ctx context.Context, duty core.Duty, participatedShares map[int]bool) } // New returns a new Tracker. @@ -106,9 +106,13 @@ func (t *Tracker) Run(ctx context.Context) error { t.events[e.duty] = append(t.events[e.duty], e) case duty := <-t.deadliner.C(): + ctx := log.WithCtx(ctx, z.Any("duty", duty)) + + // Analyse failed duties failed, failedComponent, failedMsg := analyseDutyFailed(duty, t.events[duty]) - t.failedDutyReporter(duty, failed, failedComponent.String(), failedMsg) + t.failedDutyReporter(ctx, duty, failed, failedComponent, failedMsg) + // Analyse peer participation participatedShares, err := analyseParticipation(t.events[duty]) if err != nil { log.Error(ctx, "Invalid participated shares", err) @@ -146,9 +150,18 @@ func analyseDutyFailed(duty core.Duty, es []event) (bool, component, string) { return true, events[0].component + 1, fmt.Sprintf("%s failed in %s component", duty.String(), (events[0].component + 1).String()) } -// failedDutyReporter instruments the duty. It ignores non-failed duties. -// TODO(xenowits): Implement logic for reporting duties. -func failedDutyReporter(core.Duty, bool, string, string) {} +// failedDutyReporter instruments failed duties. +func failedDutyReporter(ctx context.Context, duty core.Duty, failed bool, component component, reason string) { + if !failed { + return + } + + log.Warn(ctx, "Duty failed", nil, + z.Any("component", component), + z.Str("reason", reason)) + + failedCounter.WithLabelValues(duty.String(), component.String()).Inc() +} // analyseParticipation returns a set of share indexes of participated peers. func analyseParticipation(events []event) (map[int]bool, error) { @@ -187,9 +200,9 @@ func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, if fmt.Sprint(prevAbsent) != fmt.Sprint(absentPeers) { if len(absentPeers) == 0 { - log.Info(ctx, "All peers participated in duty", z.Str("duty", duty.String())) + log.Info(ctx, "All peers participated in duty") } else { - log.Info(ctx, "Not all peers participated in duty", z.Str("duty", duty.String()), z.Any("absent", absentPeers)) + log.Info(ctx, "Not all peers participated in duty", z.Any("absent", absentPeers)) } } @@ -205,12 +218,11 @@ func (t *Tracker) SchedulerEvent(ctx context.Context, duty core.Duty, defSet cor return ctx.Err() case <-t.quit: return nil - default: - t.input <- event{ - duty: duty, - component: scheduler, - pubkey: pubkey, - } + case t.input <- event{ + duty: duty, + component: scheduler, + pubkey: pubkey, + }: } } @@ -225,12 +237,11 @@ func (t *Tracker) FetcherEvent(ctx context.Context, duty core.Duty, data core.Un return ctx.Err() case <-t.quit: return nil - default: - t.input <- event{ - duty: duty, - component: fetcher, - pubkey: pubkey, - } + case t.input <- event{ + duty: duty, + component: fetcher, + pubkey: pubkey, + }: } } @@ -245,12 +256,11 @@ func (t *Tracker) ConsensusEvent(ctx context.Context, duty core.Duty, data core. return ctx.Err() case <-t.quit: return nil - default: - t.input <- event{ - duty: duty, - component: consensus, - pubkey: pubkey, - } + case t.input <- event{ + duty: duty, + component: consensus, + pubkey: pubkey, + }: } } @@ -265,12 +275,11 @@ func (t *Tracker) ValidatorAPIEvent(ctx context.Context, duty core.Duty, data co return ctx.Err() case <-t.quit: return nil - default: - t.input <- event{ - duty: duty, - component: validatorAPI, - pubkey: pubkey, - } + case t.input <- event{ + duty: duty, + component: validatorAPI, + pubkey: pubkey, + }: } } @@ -285,13 +294,12 @@ func (t *Tracker) ParSigExEvent(ctx context.Context, duty core.Duty, data core.P return ctx.Err() case <-t.quit: return nil - default: - t.input <- event{ - duty: duty, - component: parSigEx, - pubkey: pubkey, - shareIdx: pSig.ShareIdx, - } + case t.input <- event{ + duty: duty, + component: parSigEx, + pubkey: pubkey, + shareIdx: pSig.ShareIdx, + }: } } @@ -306,13 +314,12 @@ func (t *Tracker) ParSigDBInternalEvent(ctx context.Context, duty core.Duty, dat return ctx.Err() case <-t.quit: return nil - default: - t.input <- event{ - duty: duty, - component: parSigDBInternal, - pubkey: pubkey, - shareIdx: pSig.ShareIdx, - } + case t.input <- event{ + duty: duty, + component: parSigDBInternal, + pubkey: pubkey, + shareIdx: pSig.ShareIdx, + }: } } @@ -326,12 +333,11 @@ func (t *Tracker) ParSigDBThresholdEvent(ctx context.Context, duty core.Duty, pu return ctx.Err() case <-t.quit: return nil - default: - t.input <- event{ - duty: duty, - component: parSigDBThreshold, - pubkey: pubkey, - } + case t.input <- event{ + duty: duty, + component: parSigDBThreshold, + pubkey: pubkey, + }: } return nil @@ -344,12 +350,11 @@ func (t *Tracker) SigAggEvent(ctx context.Context, duty core.Duty, pubkey core.P return ctx.Err() case <-t.quit: return nil - default: - t.input <- event{ - duty: duty, - component: sigAgg, - pubkey: pubkey, - } + case t.input <- event{ + duty: duty, + component: sigAgg, + pubkey: pubkey, + }: } return nil diff --git a/core/tracker/tracker_internal_test.go b/core/tracker/tracker_internal_test.go index 3c926978a..fed67be65 100644 --- a/core/tracker/tracker_internal_test.go +++ b/core/tracker/tracker_internal_test.go @@ -40,10 +40,10 @@ func TestTrackerFailedDuty(t *testing.T) { } count := 0 - failedDutyReporter := func(failedDuty core.Duty, isFailed bool, component string, msg string) { + failedDutyReporter := func(_ context.Context, failedDuty core.Duty, isFailed bool, component component, msg string) { require.Equal(t, testData[0].duty, failedDuty) require.True(t, isFailed) - require.Equal(t, component, "consensus") + require.Equal(t, consensus, component) count++ if count == len(testData) { @@ -75,10 +75,10 @@ func TestTrackerFailedDuty(t *testing.T) { } count := 0 - failedDutyReporter := func(failedDuty core.Duty, isFailed bool, component string, msg string) { + failedDutyReporter := func(_ context.Context, failedDuty core.Duty, isFailed bool, component component, msg string) { require.Equal(t, testData[0].duty, failedDuty) require.False(t, isFailed) - require.Equal(t, "sigAgg", component) + require.Equal(t, sigAgg, component) count++ if count == len(testData) { @@ -191,7 +191,7 @@ func TestTrackerParticipation(t *testing.T) { } // Ignore failedDutyReporter part to isolate participation only. - tr.failedDutyReporter = func(core.Duty, bool, string, string) {} + tr.failedDutyReporter = func(context.Context, core.Duty, bool, component, string) {} go func() { for _, td := range testData {