Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/tracker: validate events from peers #971

Merged
merged 2 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/tracker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ var (
Name: "failed_duties_total",
Help: "Total number of failed duties by component",
}, []string{"duty", "component"})

unexpectedEventsCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "core",
Subsystem: "tracker",
Name: "unexpected_events_total",
Help: "Total number of unexpected events by peer",
}, []string{"peer"})
)
76 changes: 65 additions & 11 deletions core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Tracker struct {
failedDutyReporter func(ctx context.Context, duty core.Duty, failed bool, component component, reason string)

// participationReporter instruments duty peer participation.
participationReporter func(ctx context.Context, duty core.Duty, participatedShares map[int]bool)
participationReporter func(ctx context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool)
}

// New returns a new Tracker.
Expand Down Expand Up @@ -113,11 +113,11 @@ func (t *Tracker) Run(ctx context.Context) error {
t.failedDutyReporter(ctx, duty, failed, failedComponent, failedMsg)

// Analyse peer participation
participatedShares, err := analyseParticipation(t.events[duty])
participatedShares, unexpectedPeers, err := analyseParticipation(ctx, duty, t.events)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unexpectedShares

if err != nil {
log.Error(ctx, "Invalid participated shares", err)
} else {
t.participationReporter(ctx, duty, participatedShares)
t.participationReporter(ctx, duty, participatedShares, unexpectedPeers)
}

delete(t.events, duty)
Expand Down Expand Up @@ -164,34 +164,88 @@ func failedDutyReporter(ctx context.Context, duty core.Duty, failed bool, compon
}

// analyseParticipation returns a set of share indexes of participated peers.
func analyseParticipation(events []event) (map[int]bool, error) {
func analyseParticipation(ctx context.Context, duty core.Duty, allEvents map[core.Duty][]event) (map[int]bool, map[int]bool, error) {
// Set of shareIdx of participated peers.
resp := make(map[int]bool)
unexpectedPeers := make(map[int]bool)

for _, e := range events {
for _, e := range allEvents[duty] {
// If we get a parSigDBInternal event, then the current node participated successfully.
// If we get a parSigEx event, then the corresponding peer with e.shareIdx participated successfully.
if e.component == parSigEx || e.component == parSigDBInternal {
if e.component == parSigEx {
if err := validateParticipation(duty, e.pubkey, allEvents); err != nil {
log.Warn(ctx, "Unexpected event found", err, z.Int("ShareIdx", e.shareIdx))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moving logging to reporter

unexpectedPeers[e.shareIdx] = true

continue
}

if e.shareIdx == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can assume shareIdx is valid since we verify partial signatures when received in parsigex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah right

return nil, nil, errors.New("shareIdx empty", z.Any("component", e.component))
}
resp[e.shareIdx] = true
} else if e.component == parSigDBInternal {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you also need to check if parSigDBInternal is expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

if e.shareIdx == 0 {
return nil, errors.New("shareIdx empty", z.Any("component", e.component))
return nil, nil, errors.New("shareIdx empty", z.Any("component", e.component))
}
resp[e.shareIdx] = true
}
}

return resp, nil
return resp, unexpectedPeers, nil
}

// validateParticipation validates events from peers for a given duty.
func validateParticipation(duty core.Duty, pubkey core.PubKey, allEvents map[core.Duty][]event) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isParSigEventExpected return true if partially signed data events is expected for the given duty and pubkey. Partially signed data events are generated by parsigex and parsigdb.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if len(allEvents[duty]) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure this optimisation is worth it, it will never be true in practice

return errors.New("no events to validate")
}

// Cannot validate validatorAPI triggered duties.
if duty.Type == core.DutyExit || duty.Type == core.DutyBuilderRegistration {
return nil
}

if duty.Type == core.DutyRandao {
// Check that if we got DutyProposer event from scheduler.
for _, e := range allEvents[core.NewProposerDuty(duty.Slot)] {
if e.component == scheduler && e.pubkey == pubkey {
return nil
}
}

// Check that if we got DutyBuilderProposer event from scheduler.
for _, e := range allEvents[core.NewBuilderProposerDuty(duty.Slot)] {
if e.component == scheduler && e.pubkey == pubkey {
return nil
}
}
}

// For all other duties check for scheduler event.
for _, e := range allEvents[duty] {
if e.component == scheduler && e.pubkey == pubkey {
return nil
}
}

return errors.New("unexpected event", z.Str("duty", duty.String()),
z.Str("pubkey", pubkey.String()))
}

// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation.
func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, map[int]bool) {
// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation
// and unexpectedPeers.
func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, map[int]bool, map[int]bool) {
// prevAbsent is the set of peers who didn't participated in the last duty.
var prevAbsent []string

return func(ctx context.Context, duty core.Duty, participatedShares map[int]bool) {
return func(ctx context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
var absentPeers []string
for _, peer := range peers {
if participatedShares[peer.ShareIdx()] {
participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(1)
} else if unexpectedPeers[peer.ShareIdx()] {
unexpectedEventsCounter.WithLabelValues(peer.Name).Inc()
} else {
absentPeers = append(absentPeers, peer.Name)
participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(0)
Expand Down
90 changes: 86 additions & 4 deletions core/tracker/tracker_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestTrackerFailedDuty(t *testing.T) {

tr := New(deadliner, []p2p.Peer{})
tr.failedDutyReporter = failedDutyReporter
tr.participationReporter = func(_ context.Context, _ core.Duty, _ map[int]bool) {}
tr.participationReporter = func(_ context.Context, _ core.Duty, _ map[int]bool, _ map[int]bool) {}

go func() {
for _, td := range testData {
Expand Down Expand Up @@ -168,9 +168,11 @@ func TestTrackerParticipation(t *testing.T) {
deadliner := testDeadliner{deadlineChan: make(chan core.Duty)}
tr := New(deadliner, peers)

var count int
var lastParticipation map[int]bool
tr.participationReporter = func(_ context.Context, actualDuty core.Duty, actualParticipation map[int]bool) {
var (
count int
lastParticipation map[int]bool
)
tr.participationReporter = func(_ context.Context, actualDuty core.Duty, actualParticipation map[int]bool, _ map[int]bool) {
require.Equal(t, testData[count].duty, actualDuty)
require.True(t, reflect.DeepEqual(actualParticipation, expectedParticipationPerDuty[testData[count].duty]))

Expand All @@ -195,6 +197,7 @@ func TestTrackerParticipation(t *testing.T) {

go func() {
for _, td := range testData {
require.NoError(t, tr.SchedulerEvent(ctx, td.duty, td.defSet))
require.NoError(t, tr.ParSigDBInternalEvent(ctx, td.duty, td.parSignedDataSet))
for _, data := range psigDataPerDutyPerPeer[td.duty] {
require.NoError(t, tr.ParSigExEvent(ctx, td.duty, data))
Expand All @@ -212,6 +215,85 @@ func TestTrackerParticipation(t *testing.T) {
require.ErrorIs(t, tr.Run(ctx), context.Canceled)
}

func TestUnexpectedParticipation(t *testing.T) {
const (
slot = 123
unexpectedPeer = 2
)

var peers []p2p.Peer
deadliner := testDeadliner{deadlineChan: make(chan core.Duty)}
data := core.NewPartialSignature(testutil.RandomCoreSignature(), unexpectedPeer)
pubkey := testutil.RandomCorePubKey(t)
participation := make(map[int]bool)

duties := []core.Duty{
core.NewRandaoDuty(slot),
core.NewProposerDuty(slot),
core.NewAttesterDuty(slot),
core.NewBuilderProposerDuty(slot),
}

for _, d := range duties {
t.Run(d.String(), func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
tr := New(deadliner, peers)

tr.participationReporter = func(_ context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
require.Equal(t, d, duty)
require.True(t, reflect.DeepEqual(unexpectedPeers, map[int]bool{unexpectedPeer: true}))
require.True(t, reflect.DeepEqual(participatedShares, participation))
cancel()
}

go func(duty core.Duty) {
require.NoError(t, tr.ParSigExEvent(ctx, duty, core.ParSignedDataSet{pubkey: data}))
deadliner.deadlineChan <- duty
}(d)

require.ErrorIs(t, tr.Run(ctx), context.Canceled)
})
}
}

func TestDutyRandaoExpected(t *testing.T) {
const (
slot = 123
validPeer = 1
)

dutyRandao := core.NewRandaoDuty(slot)
dutyProposer := core.NewProposerDuty(slot)

var peers []p2p.Peer
deadliner := testDeadliner{deadlineChan: make(chan core.Duty)}

data := core.NewPartialSignature(testutil.RandomCoreSignature(), validPeer)
pubkey := testutil.RandomCorePubKey(t)
participation := map[int]bool{validPeer: true}
unexpected := make(map[int]bool)

ctx, cancel := context.WithCancel(context.Background())
tr := New(deadliner, peers)

tr.participationReporter = func(_ context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
require.Equal(t, dutyRandao, duty)
require.True(t, reflect.DeepEqual(unexpectedPeers, unexpected))
require.True(t, reflect.DeepEqual(participatedShares, participation))

cancel()
}

go func() {
require.NoError(t, tr.SchedulerEvent(ctx, dutyProposer, core.DutyDefinitionSet{pubkey: core.NewProposerDefinition(testutil.RandomProposerDuty(t))}))
require.NoError(t, tr.ParSigExEvent(ctx, dutyRandao, core.ParSignedDataSet{pubkey: data}))

deadliner.deadlineChan <- dutyRandao
}()

require.ErrorIs(t, tr.Run(ctx), context.Canceled)
}

// testDeadliner is a mock deadliner implementation.
type testDeadliner struct {
deadlineChan chan core.Duty
Expand Down