Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/tracker: skip participation reporting #1236

Merged
merged 2 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type Tracker struct {
failedDutyReporter func(ctx context.Context, duty core.Duty, failed bool, component component, reason string)

// participationReporter instruments duty peer participation.
participationReporter func(ctx context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool)
participationReporter func(ctx context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool)
}

// New returns a new Tracker. The deleter deadliner must return well after analyser deadliner since duties of the same slot are often analysed together.
Expand Down Expand Up @@ -195,7 +195,7 @@ func (t *Tracker) Run(ctx context.Context) error {

// Analyse peer participation
participatedShares, unexpectedShares := analyseParticipation(duty, t.events)
t.participationReporter(ctx, duty, participatedShares, unexpectedShares)
t.participationReporter(ctx, duty, failed, participatedShares, unexpectedShares)
case duty := <-t.deleter.C():
delete(t.events, duty)
}
Expand Down Expand Up @@ -406,11 +406,16 @@ func isParSigEventExpected(duty core.Duty, pubkey core.PubKey, allEvents map[cor

// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation
// and unexpectedPeers.
func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, map[int]bool, map[int]bool) {
func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, bool, map[int]bool, map[int]bool) {
// prevAbsent is the set of peers who didn't participate in the last duty per type.
prevAbsent := make(map[core.DutyType][]string)

return func(ctx context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedShares map[int]bool) {
return func(ctx context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedShares map[int]bool) {
if len(participatedShares) == 0 && !failed {
// Ignore participation metrics and log for noop duties (like DutyAggregator)
return
}

var absentPeers []string
for _, peer := range peers {
if participatedShares[peer.ShareIdx()] {
Expand Down
40 changes: 35 additions & 5 deletions core/tracker/tracker_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func TestTrackerFailedDuty(t *testing.T) {

tr := New(analyser, deleter, []p2p.Peer{}, 0)
tr.failedDutyReporter = failedDutyReporter
tr.participationReporter = func(_ context.Context, _ core.Duty, _ map[int]bool, _ map[int]bool) {}
tr.participationReporter = func(_ context.Context, _ core.Duty, failed bool, _ map[int]bool, _ map[int]bool) {
require.True(t, failed)
}

go func() {
for _, td := range testData {
Expand Down Expand Up @@ -91,6 +93,9 @@ func TestTrackerFailedDuty(t *testing.T) {

tr := New(analyser, deleter, []p2p.Peer{}, 0)
tr.failedDutyReporter = failedDutyReporter
tr.participationReporter = func(_ context.Context, _ core.Duty, failed bool, _ map[int]bool, _ map[int]bool) {
require.False(t, failed)
}

go func() {
for _, td := range testData {
Expand Down Expand Up @@ -353,9 +358,10 @@ func TestTrackerParticipation(t *testing.T) {
count int
lastParticipation map[int]bool
)
tr.participationReporter = func(_ context.Context, actualDuty core.Duty, actualParticipation map[int]bool, _ map[int]bool) {
tr.participationReporter = func(_ context.Context, actualDuty core.Duty, failed bool, actualParticipation map[int]bool, _ map[int]bool) {
require.Equal(t, testData[count].duty, actualDuty)
require.True(t, reflect.DeepEqual(actualParticipation, expectedParticipationPerDuty[testData[count].duty]))
require.False(t, failed)

if count == 2 {
// For third duty, last Participation should be equal to that of second duty.
Expand Down Expand Up @@ -424,10 +430,11 @@ func TestUnexpectedParticipation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
tr := New(analyser, deleter, peers, 0)

tr.participationReporter = func(_ context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
require.Equal(t, d, duty)
require.True(t, reflect.DeepEqual(unexpectedPeers, map[int]bool{unexpectedPeer: true}))
require.True(t, reflect.DeepEqual(participatedShares, participation))
require.True(t, failed)
cancel()
}

Expand Down Expand Up @@ -463,14 +470,15 @@ func TestDutyRandaoUnexpected(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
tr := New(analyser, deleter, peers, 0)

tr.participationReporter = func(_ context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
if duty.Type == core.DutyProposer {
return
}

require.Equal(t, dutyRandao, duty)
require.True(t, reflect.DeepEqual(unexpectedPeers, unexpected))
require.True(t, reflect.DeepEqual(participatedShares, participation))
require.True(t, failed)

cancel()
}
Expand Down Expand Up @@ -509,12 +517,13 @@ func TestDutyRandaoExpected(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
tr := New(analyser, deleter, peers, 0)

tr.participationReporter = func(_ context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
tr.participationReporter = func(_ context.Context, duty core.Duty, failed bool, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
if duty.Type == core.DutyProposer {
return
}

require.Equal(t, dutyRandao, duty)
require.True(t, failed)
require.True(t, reflect.DeepEqual(unexpectedPeers, unexpected))
require.True(t, reflect.DeepEqual(participatedShares, participation))

Expand Down Expand Up @@ -739,4 +748,25 @@ func TestAnalyseDutyFailedAgg(t *testing.T) {
require.Equal(t, sigAgg, comp)
require.Equal(t, msgSigAgg, msg)
})

t.Run("no aggregator found", func(t *testing.T) {
allEvents := make(map[core.Duty][]event)
allEvents[dutyAgg] = append(allEvents[dutyAgg], event{
duty: dutyAgg,
component: scheduler,
})
allEvents[dutyPrepAgg] = append(allEvents[dutyPrepAgg], event{
duty: dutyPrepAgg,
component: sigAgg,
})
allEvents[dutyAtt] = append(allEvents[dutyAtt], event{
duty: dutyAtt,
component: sigAgg,
})

failed, comp, msg := analyseDutyFailed(dutyAgg, allEvents)
require.False(t, failed)
require.Equal(t, fetcher, comp)
require.Equal(t, "", msg)
})
}