diff --git a/core/tracker/metrics.go b/core/tracker/metrics.go index bf8a2c8f7..2356b7c0a 100644 --- a/core/tracker/metrics.go +++ b/core/tracker/metrics.go @@ -34,4 +34,11 @@ var ( Name: "failed_duties_total", Help: "Total number of failed duties by component", }, []string{"duty", "component"}) + + unexpectedEventsCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "core", + Subsystem: "tracker", + Name: "unexpected_events_total", + Help: "Total number of unexpected events by peer", + }, []string{"peer"}) ) diff --git a/core/tracker/tracker.go b/core/tracker/tracker.go index c5703cab0..b65ff31e1 100644 --- a/core/tracker/tracker.go +++ b/core/tracker/tracker.go @@ -20,7 +20,6 @@ import ( "fmt" "sort" - "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" @@ -71,7 +70,7 @@ type Tracker struct { failedDutyReporter func(ctx context.Context, duty core.Duty, failed bool, component component, reason string) // participationReporter instruments duty peer participation. - participationReporter func(ctx context.Context, duty core.Duty, participatedShares map[int]bool) + participationReporter func(ctx context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) } // New returns a new Tracker. @@ -113,11 +112,11 @@ func (t *Tracker) Run(ctx context.Context) error { t.failedDutyReporter(ctx, duty, failed, failedComponent, failedMsg) // Analyse peer participation - participatedShares, err := analyseParticipation(t.events[duty]) + participatedShares, unexpectedShares, err := analyseParticipation(duty, t.events) if err != nil { log.Error(ctx, "Invalid participated shares", err) } else { - t.participationReporter(ctx, duty, participatedShares) + t.participationReporter(ctx, duty, participatedShares, unexpectedShares) } delete(t.events, duty) @@ -164,34 +163,82 @@ func failedDutyReporter(ctx context.Context, duty core.Duty, failed bool, compon } // analyseParticipation returns a set of share indexes of participated peers. -func analyseParticipation(events []event) (map[int]bool, error) { +func analyseParticipation(duty core.Duty, allEvents map[core.Duty][]event) (map[int]bool, map[int]bool, error) { // Set of shareIdx of participated peers. resp := make(map[int]bool) + unexpectedShares := make(map[int]bool) - for _, e := range events { + for _, e := range allEvents[duty] { // If we get a parSigDBInternal event, then the current node participated successfully. // If we get a parSigEx event, then the corresponding peer with e.shareIdx participated successfully. - if e.component == parSigEx || e.component == parSigDBInternal { - if e.shareIdx == 0 { - return nil, errors.New("shareIdx empty", z.Any("component", e.component)) + if e.component == parSigEx { + if !isParSigEventExpected(duty, e.pubkey, allEvents) { + unexpectedShares[e.shareIdx] = true + continue + } + + resp[e.shareIdx] = true + } else if e.component == parSigDBInternal { + if !isParSigEventExpected(duty, e.pubkey, allEvents) { + unexpectedShares[e.shareIdx] = true + continue } + resp[e.shareIdx] = true } } - return resp, nil + return resp, unexpectedShares, nil +} + +// isParSigEventExpected return true if partially signed data events is expected for the given duty and pubkey. +// Partially signed data events are generated by parsigex and parsigdb. +func isParSigEventExpected(duty core.Duty, pubkey core.PubKey, allEvents map[core.Duty][]event) bool { + // Cannot validate validatorAPI triggered duties. + if duty.Type == core.DutyExit || duty.Type == core.DutyBuilderRegistration { + return true + } + + if duty.Type == core.DutyRandao { + // Check that if we got DutyProposer event from scheduler. + for _, e := range allEvents[core.NewProposerDuty(duty.Slot)] { + if e.component == scheduler && e.pubkey == pubkey { + return true + } + } + + // Check that if we got DutyBuilderProposer event from scheduler. + for _, e := range allEvents[core.NewBuilderProposerDuty(duty.Slot)] { + if e.component == scheduler && e.pubkey == pubkey { + return true + } + } + } + + // For all other duties check for scheduler event. + for _, e := range allEvents[duty] { + if e.component == scheduler && e.pubkey == pubkey { + return true + } + } + + return false } -// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation. -func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, map[int]bool) { +// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation +// and unexpectedPeers. +func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, map[int]bool, map[int]bool) { // prevAbsent is the set of peers who didn't participated in the last duty. var prevAbsent []string - return func(ctx context.Context, duty core.Duty, participatedShares map[int]bool) { + return func(ctx context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedShares map[int]bool) { var absentPeers []string for _, peer := range peers { if participatedShares[peer.ShareIdx()] { participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(1) + } else if unexpectedShares[peer.ShareIdx()] { + log.Warn(ctx, "Unexpected event found", nil, z.Str("peer", peer.Name), z.Str("duty", duty.String())) + unexpectedEventsCounter.WithLabelValues(peer.Name).Inc() } else { absentPeers = append(absentPeers, peer.Name) participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(0) diff --git a/core/tracker/tracker_internal_test.go b/core/tracker/tracker_internal_test.go index fed67be65..1c8a105e5 100644 --- a/core/tracker/tracker_internal_test.go +++ b/core/tracker/tracker_internal_test.go @@ -53,7 +53,7 @@ func TestTrackerFailedDuty(t *testing.T) { tr := New(deadliner, []p2p.Peer{}) tr.failedDutyReporter = failedDutyReporter - tr.participationReporter = func(_ context.Context, _ core.Duty, _ map[int]bool) {} + tr.participationReporter = func(_ context.Context, _ core.Duty, _ map[int]bool, _ map[int]bool) {} go func() { for _, td := range testData { @@ -168,9 +168,11 @@ func TestTrackerParticipation(t *testing.T) { deadliner := testDeadliner{deadlineChan: make(chan core.Duty)} tr := New(deadliner, peers) - var count int - var lastParticipation map[int]bool - tr.participationReporter = func(_ context.Context, actualDuty core.Duty, actualParticipation map[int]bool) { + var ( + count int + lastParticipation map[int]bool + ) + tr.participationReporter = func(_ context.Context, actualDuty core.Duty, actualParticipation map[int]bool, _ map[int]bool) { require.Equal(t, testData[count].duty, actualDuty) require.True(t, reflect.DeepEqual(actualParticipation, expectedParticipationPerDuty[testData[count].duty])) @@ -195,6 +197,7 @@ func TestTrackerParticipation(t *testing.T) { go func() { for _, td := range testData { + require.NoError(t, tr.SchedulerEvent(ctx, td.duty, td.defSet)) require.NoError(t, tr.ParSigDBInternalEvent(ctx, td.duty, td.parSignedDataSet)) for _, data := range psigDataPerDutyPerPeer[td.duty] { require.NoError(t, tr.ParSigExEvent(ctx, td.duty, data)) @@ -212,6 +215,85 @@ func TestTrackerParticipation(t *testing.T) { require.ErrorIs(t, tr.Run(ctx), context.Canceled) } +func TestUnexpectedParticipation(t *testing.T) { + const ( + slot = 123 + unexpectedPeer = 2 + ) + + var peers []p2p.Peer + deadliner := testDeadliner{deadlineChan: make(chan core.Duty)} + data := core.NewPartialSignature(testutil.RandomCoreSignature(), unexpectedPeer) + pubkey := testutil.RandomCorePubKey(t) + participation := make(map[int]bool) + + duties := []core.Duty{ + core.NewRandaoDuty(slot), + core.NewProposerDuty(slot), + core.NewAttesterDuty(slot), + core.NewBuilderProposerDuty(slot), + } + + for _, d := range duties { + t.Run(d.String(), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + tr := New(deadliner, peers) + + tr.participationReporter = func(_ context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) { + require.Equal(t, d, duty) + require.True(t, reflect.DeepEqual(unexpectedPeers, map[int]bool{unexpectedPeer: true})) + require.True(t, reflect.DeepEqual(participatedShares, participation)) + cancel() + } + + go func(duty core.Duty) { + require.NoError(t, tr.ParSigExEvent(ctx, duty, core.ParSignedDataSet{pubkey: data})) + deadliner.deadlineChan <- duty + }(d) + + require.ErrorIs(t, tr.Run(ctx), context.Canceled) + }) + } +} + +func TestDutyRandaoExpected(t *testing.T) { + const ( + slot = 123 + validPeer = 1 + ) + + dutyRandao := core.NewRandaoDuty(slot) + dutyProposer := core.NewProposerDuty(slot) + + var peers []p2p.Peer + deadliner := testDeadliner{deadlineChan: make(chan core.Duty)} + + data := core.NewPartialSignature(testutil.RandomCoreSignature(), validPeer) + pubkey := testutil.RandomCorePubKey(t) + participation := map[int]bool{validPeer: true} + unexpected := make(map[int]bool) + + ctx, cancel := context.WithCancel(context.Background()) + tr := New(deadliner, peers) + + tr.participationReporter = func(_ context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) { + require.Equal(t, dutyRandao, duty) + require.True(t, reflect.DeepEqual(unexpectedPeers, unexpected)) + require.True(t, reflect.DeepEqual(participatedShares, participation)) + + cancel() + } + + go func() { + require.NoError(t, tr.SchedulerEvent(ctx, dutyProposer, core.DutyDefinitionSet{pubkey: core.NewProposerDefinition(testutil.RandomProposerDuty(t))})) + require.NoError(t, tr.ParSigExEvent(ctx, dutyRandao, core.ParSignedDataSet{pubkey: data})) + + deadliner.deadlineChan <- dutyRandao + }() + + require.ErrorIs(t, tr.Run(ctx), context.Canceled) +} + // testDeadliner is a mock deadliner implementation. type testDeadliner struct { deadlineChan chan core.Duty