Skip to content

Commit

Permalink
core/tracker: validate events from peers (#971)
Browse files Browse the repository at this point in the history
Validates parSigEx events in analyseParticipation.

category: feature
ticket: #855
  • Loading branch information
dB2510 authored Aug 16, 2022
1 parent b412b01 commit 263ae95
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 17 deletions.
7 changes: 7 additions & 0 deletions core/tracker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ var (
Name: "failed_duties_total",
Help: "Total number of failed duties by component",
}, []string{"duty", "component"})

unexpectedEventsCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "core",
Subsystem: "tracker",
Name: "unexpected_events_total",
Help: "Total number of unexpected events by peer",
}, []string{"peer"})
)
73 changes: 60 additions & 13 deletions core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"sort"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
Expand Down Expand Up @@ -71,7 +70,7 @@ type Tracker struct {
failedDutyReporter func(ctx context.Context, duty core.Duty, failed bool, component component, reason string)

// participationReporter instruments duty peer participation.
participationReporter func(ctx context.Context, duty core.Duty, participatedShares map[int]bool)
participationReporter func(ctx context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool)
}

// New returns a new Tracker.
Expand Down Expand Up @@ -113,11 +112,11 @@ func (t *Tracker) Run(ctx context.Context) error {
t.failedDutyReporter(ctx, duty, failed, failedComponent, failedMsg)

// Analyse peer participation
participatedShares, err := analyseParticipation(t.events[duty])
participatedShares, unexpectedShares, err := analyseParticipation(duty, t.events)
if err != nil {
log.Error(ctx, "Invalid participated shares", err)
} else {
t.participationReporter(ctx, duty, participatedShares)
t.participationReporter(ctx, duty, participatedShares, unexpectedShares)
}

delete(t.events, duty)
Expand Down Expand Up @@ -164,34 +163,82 @@ func failedDutyReporter(ctx context.Context, duty core.Duty, failed bool, compon
}

// analyseParticipation returns a set of share indexes of participated peers.
func analyseParticipation(events []event) (map[int]bool, error) {
func analyseParticipation(duty core.Duty, allEvents map[core.Duty][]event) (map[int]bool, map[int]bool, error) {
// Set of shareIdx of participated peers.
resp := make(map[int]bool)
unexpectedShares := make(map[int]bool)

for _, e := range events {
for _, e := range allEvents[duty] {
// If we get a parSigDBInternal event, then the current node participated successfully.
// If we get a parSigEx event, then the corresponding peer with e.shareIdx participated successfully.
if e.component == parSigEx || e.component == parSigDBInternal {
if e.shareIdx == 0 {
return nil, errors.New("shareIdx empty", z.Any("component", e.component))
if e.component == parSigEx {
if !isParSigEventExpected(duty, e.pubkey, allEvents) {
unexpectedShares[e.shareIdx] = true
continue
}

resp[e.shareIdx] = true
} else if e.component == parSigDBInternal {
if !isParSigEventExpected(duty, e.pubkey, allEvents) {
unexpectedShares[e.shareIdx] = true
continue
}

resp[e.shareIdx] = true
}
}

return resp, nil
return resp, unexpectedShares, nil
}

// isParSigEventExpected return true if partially signed data events is expected for the given duty and pubkey.
// Partially signed data events are generated by parsigex and parsigdb.
func isParSigEventExpected(duty core.Duty, pubkey core.PubKey, allEvents map[core.Duty][]event) bool {
// Cannot validate validatorAPI triggered duties.
if duty.Type == core.DutyExit || duty.Type == core.DutyBuilderRegistration {
return true
}

if duty.Type == core.DutyRandao {
// Check that if we got DutyProposer event from scheduler.
for _, e := range allEvents[core.NewProposerDuty(duty.Slot)] {
if e.component == scheduler && e.pubkey == pubkey {
return true
}
}

// Check that if we got DutyBuilderProposer event from scheduler.
for _, e := range allEvents[core.NewBuilderProposerDuty(duty.Slot)] {
if e.component == scheduler && e.pubkey == pubkey {
return true
}
}
}

// For all other duties check for scheduler event.
for _, e := range allEvents[duty] {
if e.component == scheduler && e.pubkey == pubkey {
return true
}
}

return false
}

// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation.
func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, map[int]bool) {
// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation
// and unexpectedPeers.
func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, map[int]bool, map[int]bool) {
// prevAbsent is the set of peers who didn't participated in the last duty.
var prevAbsent []string

return func(ctx context.Context, duty core.Duty, participatedShares map[int]bool) {
return func(ctx context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedShares map[int]bool) {
var absentPeers []string
for _, peer := range peers {
if participatedShares[peer.ShareIdx()] {
participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(1)
} else if unexpectedShares[peer.ShareIdx()] {
log.Warn(ctx, "Unexpected event found", nil, z.Str("peer", peer.Name), z.Str("duty", duty.String()))
unexpectedEventsCounter.WithLabelValues(peer.Name).Inc()
} else {
absentPeers = append(absentPeers, peer.Name)
participationGauge.WithLabelValues(duty.Type.String(), peer.Name).Set(0)
Expand Down
90 changes: 86 additions & 4 deletions core/tracker/tracker_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestTrackerFailedDuty(t *testing.T) {

tr := New(deadliner, []p2p.Peer{})
tr.failedDutyReporter = failedDutyReporter
tr.participationReporter = func(_ context.Context, _ core.Duty, _ map[int]bool) {}
tr.participationReporter = func(_ context.Context, _ core.Duty, _ map[int]bool, _ map[int]bool) {}

go func() {
for _, td := range testData {
Expand Down Expand Up @@ -168,9 +168,11 @@ func TestTrackerParticipation(t *testing.T) {
deadliner := testDeadliner{deadlineChan: make(chan core.Duty)}
tr := New(deadliner, peers)

var count int
var lastParticipation map[int]bool
tr.participationReporter = func(_ context.Context, actualDuty core.Duty, actualParticipation map[int]bool) {
var (
count int
lastParticipation map[int]bool
)
tr.participationReporter = func(_ context.Context, actualDuty core.Duty, actualParticipation map[int]bool, _ map[int]bool) {
require.Equal(t, testData[count].duty, actualDuty)
require.True(t, reflect.DeepEqual(actualParticipation, expectedParticipationPerDuty[testData[count].duty]))

Expand All @@ -195,6 +197,7 @@ func TestTrackerParticipation(t *testing.T) {

go func() {
for _, td := range testData {
require.NoError(t, tr.SchedulerEvent(ctx, td.duty, td.defSet))
require.NoError(t, tr.ParSigDBInternalEvent(ctx, td.duty, td.parSignedDataSet))
for _, data := range psigDataPerDutyPerPeer[td.duty] {
require.NoError(t, tr.ParSigExEvent(ctx, td.duty, data))
Expand All @@ -212,6 +215,85 @@ func TestTrackerParticipation(t *testing.T) {
require.ErrorIs(t, tr.Run(ctx), context.Canceled)
}

func TestUnexpectedParticipation(t *testing.T) {
const (
slot = 123
unexpectedPeer = 2
)

var peers []p2p.Peer
deadliner := testDeadliner{deadlineChan: make(chan core.Duty)}
data := core.NewPartialSignature(testutil.RandomCoreSignature(), unexpectedPeer)
pubkey := testutil.RandomCorePubKey(t)
participation := make(map[int]bool)

duties := []core.Duty{
core.NewRandaoDuty(slot),
core.NewProposerDuty(slot),
core.NewAttesterDuty(slot),
core.NewBuilderProposerDuty(slot),
}

for _, d := range duties {
t.Run(d.String(), func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
tr := New(deadliner, peers)

tr.participationReporter = func(_ context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
require.Equal(t, d, duty)
require.True(t, reflect.DeepEqual(unexpectedPeers, map[int]bool{unexpectedPeer: true}))
require.True(t, reflect.DeepEqual(participatedShares, participation))
cancel()
}

go func(duty core.Duty) {
require.NoError(t, tr.ParSigExEvent(ctx, duty, core.ParSignedDataSet{pubkey: data}))
deadliner.deadlineChan <- duty
}(d)

require.ErrorIs(t, tr.Run(ctx), context.Canceled)
})
}
}

func TestDutyRandaoExpected(t *testing.T) {
const (
slot = 123
validPeer = 1
)

dutyRandao := core.NewRandaoDuty(slot)
dutyProposer := core.NewProposerDuty(slot)

var peers []p2p.Peer
deadliner := testDeadliner{deadlineChan: make(chan core.Duty)}

data := core.NewPartialSignature(testutil.RandomCoreSignature(), validPeer)
pubkey := testutil.RandomCorePubKey(t)
participation := map[int]bool{validPeer: true}
unexpected := make(map[int]bool)

ctx, cancel := context.WithCancel(context.Background())
tr := New(deadliner, peers)

tr.participationReporter = func(_ context.Context, duty core.Duty, participatedShares map[int]bool, unexpectedPeers map[int]bool) {
require.Equal(t, dutyRandao, duty)
require.True(t, reflect.DeepEqual(unexpectedPeers, unexpected))
require.True(t, reflect.DeepEqual(participatedShares, participation))

cancel()
}

go func() {
require.NoError(t, tr.SchedulerEvent(ctx, dutyProposer, core.DutyDefinitionSet{pubkey: core.NewProposerDefinition(testutil.RandomProposerDuty(t))}))
require.NoError(t, tr.ParSigExEvent(ctx, dutyRandao, core.ParSignedDataSet{pubkey: data}))

deadliner.deadlineChan <- dutyRandao
}()

require.ErrorIs(t, tr.Run(ctx), context.Canceled)
}

// testDeadliner is a mock deadliner implementation.
type testDeadliner struct {
deadlineChan chan core.Duty
Expand Down

0 comments on commit 263ae95

Please sign in to comment.