Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/tracker: validate events from peers #971

Merged
merged 2 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/tracker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ var (
Name: "failed_duties_total",
Help: "Total number of failed duties by component",
}, []string{"duty", "component"})

unexpectedEventsCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "core",
Subsystem: "tracker",
Name: "unexpected_events_total",
Help: "Total number of unexpected events by peer",
}, []string{"peer"})
)
73 changes: 60 additions & 13 deletions core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"sort"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
Expand Down Expand Up @@ -71,7 +70,7 @@ type Tracker struct {
failedDutyReporter func(ctx context.Context, duty core.Duty, failed bool, component component, reason string)

// participationReporter instruments duty peer participation.
participationReporter func(ctx context.Context, duty core.Duty, participatedShares map[int]bool)
participationReporter func(ctx context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool)
}

// New returns a new Tracker.
Expand Down Expand Up @@ -113,11 +112,11 @@ func (t *Tracker) Run(ctx context.Context) error {
t.failedDutyReporter(ctx, duty, failed, failedComponent, failedMsg)

// Analyse peer participation
participatedShares, err := analyseParticipation(t.events[duty])
participatedShares, unexpectedShares, err := analyseParticipation(duty, t.events)
if err != nil {
log.Error(ctx, "Invalid participated shares", err)
} else {
t.participationReporter(ctx, duty, participatedShares)
t.participationReporter(ctx, duty, participatedShares, unexpectedShares)
}

delete(t.events, duty)
Expand Down Expand Up @@ -164,34 +163,82 @@ func failedDutyReporter(ctx context.Context, duty core.Duty, failed bool, compon
}

// analyseParticipation returns a set of share indexes of participated peers.
func analyseParticipation(events []event) (map[int]bool, error) {
func analyseParticipation(duty core.Duty, allEvents map[core.Duty][]event) (map[int]bool, map[int]bool, error) {
// Set of shareIdx of participated peers.
resp := make(map[int]bool)
unexpectedShares := make(map[int]bool)

for _, e := range events {
for _, e := range allEvents[duty] {
// If we get a parSigDBInternal event, then the current node participated successfully.
// If we get a parSigEx event, then the corresponding peer with e.shareIdx participated successfully.
if e.component == parSigEx || e.component == parSigDBInternal {
if e.shareIdx == 0 {
return nil, errors.New("shareIdx empty", z.Any("component", e.component))
if e.component == parSigEx {
if !isParSigEventExpected(duty, e.pubkey, allEvents) {
unexpectedShares[e.shareIdx] = true
continue
}

resp[e.shareIdx] = true
} else if e.component == parSigDBInternal {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you also need to check if parSigDBInternal is expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

if !isParSigEventExpected(duty, e.pubkey, allEvents) {
unexpectedShares[e.shareIdx] = true
continue
}

resp[e.shareIdx] = true
}
}

return resp, nil
return resp, unexpectedShares, nil
}

// isParSigEventExpected return true if partially signed data events is expected for the given duty and pubkey.
// Partially signed data events are generated by parsigex and parsigdb.
func isParSigEventExpected(duty core.Duty, pubkey core.PubKey, allEvents map[core.Duty][]event) bool {
// Cannot validate validatorAPI triggered duties.
if duty.Type == core.DutyExit || duty.Type == core.DutyBuilderRegistration {
return true
}

if duty.Type == core.DutyRandao {
// Check that if we got DutyProposer event from scheduler.
for _, e := range allEvents[core.NewProposerDuty(duty.Slot)] {
if e.component == scheduler && e.pubkey == pubkey {
return true
}
}

// Check that if we got DutyBuilderProposer event from scheduler.
for _, e := range allEvents[core.NewBuilderProposerDuty(duty.Slot)] {
if e.component == scheduler && e.pubkey == pubkey {
return true
}
}
}

// For all other duties check for scheduler event.
for _, e := range allEvents[duty] {
if e.component == scheduler && e.pubkey == pubkey {
return true
}
}

return false
}

// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation.
func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, map[int]bool) {
// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation
// and unexpectedPeers.
func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, map[int]bool, map[int]bool) {
// prevAbsent is the set of peers who didn't participated in the last duty.
var prevAbsent []string

return func(ctx context.Context, duty core.Duty, participatedShares map[int]bool) {
return func(ctx context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedShares map[int]bool) {
var absentPeers []string
for _, peer := range peers {
if participatedShares[peer.ShareIdx()] {
participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(1)
} else if unexpectedShares[peer.ShareIdx()] {
log.Warn(ctx, "Unexpected event found", nil, z.Str("peer", peer.Name), z.Str("duty", duty.String()))
unexpectedEventsCounter.WithLabelValues(peer.Name).Inc()
} else {
absentPeers = append(absentPeers, peer.Name)
participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(0)
Expand Down
90 changes: 86 additions & 4 deletions core/tracker/tracker_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestTrackerFailedDuty(t *testing.T) {

tr := New(deadliner, []p2p.Peer{})
tr.failedDutyReporter = failedDutyReporter
tr.participationReporter = func(_ context.Context, _ core.Duty, _ map[int]bool) {}
tr.participationReporter = func(_ context.Context, _ core.Duty, _ map[int]bool, _ map[int]bool) {}

go func() {
for _, td := range testData {
Expand Down Expand Up @@ -168,9 +168,11 @@ func TestTrackerParticipation(t *testing.T) {
deadliner := testDeadliner{deadlineChan: make(chan core.Duty)}
tr := New(deadliner, peers)

var count int
var lastParticipation map[int]bool
tr.participationReporter = func(_ context.Context, actualDuty core.Duty, actualParticipation map[int]bool) {
var (
count int
lastParticipation map[int]bool
)
tr.participationReporter = func(_ context.Context, actualDuty core.Duty, actualParticipation map[int]bool, _ map[int]bool) {
require.Equal(t, testData[count].duty, actualDuty)
require.True(t, reflect.DeepEqual(actualParticipation, expectedParticipationPerDuty[testData[count].duty]))

Expand All @@ -195,6 +197,7 @@ func TestTrackerParticipation(t *testing.T) {

go func() {
for _, td := range testData {
require.NoError(t, tr.SchedulerEvent(ctx, td.duty, td.defSet))
require.NoError(t, tr.ParSigDBInternalEvent(ctx, td.duty, td.parSignedDataSet))
for _, data := range psigDataPerDutyPerPeer[td.duty] {
require.NoError(t, tr.ParSigExEvent(ctx, td.duty, data))
Expand All @@ -212,6 +215,85 @@ func TestTrackerParticipation(t *testing.T) {
require.ErrorIs(t, tr.Run(ctx), context.Canceled)
}

func TestUnexpectedParticipation(t *testing.T) {
const (
slot = 123
unexpectedPeer = 2
)

var peers []p2p.Peer
deadliner := testDeadliner{deadlineChan: make(chan core.Duty)}
data := core.NewPartialSignature(testutil.RandomCoreSignature(), unexpectedPeer)
pubkey := testutil.RandomCorePubKey(t)
participation := make(map[int]bool)

duties := []core.Duty{
core.NewRandaoDuty(slot),
core.NewProposerDuty(slot),
core.NewAttesterDuty(slot),
core.NewBuilderProposerDuty(slot),
}

for _, d := range duties {
t.Run(d.String(), func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
tr := New(deadliner, peers)

tr.participationReporter = func(_ context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
require.Equal(t, d, duty)
require.True(t, reflect.DeepEqual(unexpectedPeers, map[int]bool{unexpectedPeer: true}))
require.True(t, reflect.DeepEqual(participatedShares, participation))
cancel()
}

go func(duty core.Duty) {
require.NoError(t, tr.ParSigExEvent(ctx, duty, core.ParSignedDataSet{pubkey: data}))
deadliner.deadlineChan <- duty
}(d)

require.ErrorIs(t, tr.Run(ctx), context.Canceled)
})
}
}

func TestDutyRandaoExpected(t *testing.T) {
const (
slot = 123
validPeer = 1
)

dutyRandao := core.NewRandaoDuty(slot)
dutyProposer := core.NewProposerDuty(slot)

var peers []p2p.Peer
deadliner := testDeadliner{deadlineChan: make(chan core.Duty)}

data := core.NewPartialSignature(testutil.RandomCoreSignature(), validPeer)
pubkey := testutil.RandomCorePubKey(t)
participation := map[int]bool{validPeer: true}
unexpected := make(map[int]bool)

ctx, cancel := context.WithCancel(context.Background())
tr := New(deadliner, peers)

tr.participationReporter = func(_ context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
require.Equal(t, dutyRandao, duty)
require.True(t, reflect.DeepEqual(unexpectedPeers, unexpected))
require.True(t, reflect.DeepEqual(participatedShares, participation))

cancel()
}

go func() {
require.NoError(t, tr.SchedulerEvent(ctx, dutyProposer, core.DutyDefinitionSet{pubkey: core.NewProposerDefinition(testutil.RandomProposerDuty(t))}))
require.NoError(t, tr.ParSigExEvent(ctx, dutyRandao, core.ParSignedDataSet{pubkey: data}))

deadliner.deadlineChan <- dutyRandao
}()

require.ErrorIs(t, tr.Run(ctx), context.Canceled)
}

// testDeadliner is a mock deadliner implementation.
type testDeadliner struct {
deadlineChan chan core.Duty
Expand Down