diff --git a/core/tracker/metrics.go b/core/tracker/metrics.go new file mode 100644 index 000000000..39d5f4a98 --- /dev/null +++ b/core/tracker/metrics.go @@ -0,0 +1,28 @@ +// Copyright © 2022 Obol Labs Inc. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . + +package tracker + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var participationGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "core", + Subsystem: "tracker", + Name: "participation", + Help: "Set to 1 if peer participated successfully for the given duty or else 0", +}, []string{"duty", "peer"}) diff --git a/core/tracker/tracker.go b/core/tracker/tracker.go index e5c05b85d..5efbb7daf 100644 --- a/core/tracker/tracker.go +++ b/core/tracker/tracker.go @@ -20,7 +20,11 @@ import ( "fmt" "sort" + "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/log" + "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" + "github.com/obolnetwork/charon/p2p" ) //go:generate stringer -type=component @@ -47,6 +51,10 @@ type event struct { duty core.Duty component component pubkey core.PubKey + + // This is an optional field only set by validatorAPI, parSigDBInternal and parSigEx events. + // shareidx is 1-indexed so 0 indicates unset. + shareIdx int } // Tracker represents the component that listens to events from core workflow components. @@ -61,16 +69,20 @@ type Tracker struct { // failedDutyReporter instruments the duty. It ignores non-failed duties. failedDutyReporter func(core.Duty, bool, string, string) + + // participationReporter logs and instruments the participation. + participationReporter func(context.Context, core.Duty, map[int]bool) } // New returns a new Tracker. -func New(deadliner core.Deadliner) *Tracker { +func New(deadliner core.Deadliner, peers []p2p.Peer) *Tracker { t := &Tracker{ - input: make(chan event), - events: make(map[core.Duty][]event), - quit: make(chan struct{}), - deadliner: deadliner, - failedDutyReporter: failedDutyReporter, + input: make(chan event), + events: make(map[core.Duty][]event), + quit: make(chan struct{}), + deadliner: deadliner, + failedDutyReporter: failedDutyReporter, + participationReporter: newParticipationReporter(peers), } return t @@ -79,6 +91,7 @@ func New(deadliner core.Deadliner) *Tracker { // Run blocks and registers events from each component in tracker's input channel. // It also analyses and reports the duties whose deadline gets crossed. func (t *Tracker) Run(ctx context.Context) error { + ctx = log.WithTopic(ctx, "tracker") defer close(t.quit) for { @@ -94,11 +107,14 @@ func (t *Tracker) Run(ctx context.Context) error { t.events[e.duty] = append(t.events[e.duty], e) case duty := <-t.deadliner.C(): failed, failedComponent, failedMsg := analyseDutyFailed(duty, t.events[duty]) - t.failedDutyReporter(duty, failed, failedComponent.String(), failedMsg) - // TODO(dhruv): Case of cluster participation - // analyseParticipation(duty, t.events[duty]) + participatedShares, err := analyseParticipation(t.events[duty]) + if err != nil { + log.Error(ctx, "Invalid participated shares", err) + } else { + t.participationReporter(ctx, duty, participatedShares) + } delete(t.events, duty) } @@ -134,11 +150,51 @@ func analyseDutyFailed(duty core.Duty, es []event) (bool, component, string) { // TODO(xenowits): Implement logic for reporting duties. func failedDutyReporter(core.Duty, bool, string, string) {} -// analyseParticipation returns the share indexes of peers that participated in this duty. -// TODO(dhruv): implement logic to analyse participation. -//nolint:deadcode -func analyseParticipation(core.Duty, []event) []int { - return nil +// analyseParticipation returns a set of share indexes of participated peers. +func analyseParticipation(events []event) (map[int]bool, error) { + // Set of shareIdx of participated peers. + resp := make(map[int]bool) + + for _, e := range events { + // If we get a parSigDBInternal event, then the current node participated successfully. + // If we get a parSigEx event, then the corresponding peer with e.shareIdx participated successfully. + if e.component == parSigEx || e.component == parSigDBInternal { + if e.shareIdx == 0 { + return nil, errors.New("shareIdx empty", z.Any("component", e.component)) + } + resp[e.shareIdx] = true + } + } + + return resp, nil +} + +// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation. +func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, map[int]bool) { + // prevAbsent is the set of peers who didn't participated in the last duty. + var prevAbsent []string + + return func(ctx context.Context, duty core.Duty, participatedShares map[int]bool) { + var absentPeers []string + for _, peer := range peers { + if participatedShares[peer.ShareIdx()] { + participationGauge.WithLabelValues(duty.String(), peer.Name).Set(1) + } else { + absentPeers = append(absentPeers, peer.Name) + participationGauge.WithLabelValues(duty.String(), peer.Name).Set(0) + } + } + + if fmt.Sprint(prevAbsent) != fmt.Sprint(absentPeers) { + if len(absentPeers) == 0 { + log.Info(ctx, "All peers participated in duty", z.Str("duty", duty.String())) + } else { + log.Info(ctx, "Not all peers participated in duty", z.Str("duty", duty.String()), z.Any("absent", absentPeers)) + } + } + + prevAbsent = absentPeers + } } // SchedulerEvent inputs event from core.Scheduler component. @@ -223,7 +279,7 @@ func (t *Tracker) ValidatorAPIEvent(ctx context.Context, duty core.Duty, data co // ParSigExEvent inputs event from core.ParSigEx component. func (t *Tracker) ParSigExEvent(ctx context.Context, duty core.Duty, data core.ParSignedDataSet) error { - for pubkey := range data { + for pubkey, pSig := range data { select { case <-ctx.Done(): return ctx.Err() @@ -234,6 +290,7 @@ func (t *Tracker) ParSigExEvent(ctx context.Context, duty core.Duty, data core.P duty: duty, component: parSigEx, pubkey: pubkey, + shareIdx: pSig.ShareIdx, } } } @@ -243,7 +300,7 @@ func (t *Tracker) ParSigExEvent(ctx context.Context, duty core.Duty, data core.P // ParSigDBInternalEvent inputs events from core.ParSigDB component for internal store event. func (t *Tracker) ParSigDBInternalEvent(ctx context.Context, duty core.Duty, data core.ParSignedDataSet) error { - for pubkey := range data { + for pubkey, pSig := range data { select { case <-ctx.Done(): return ctx.Err() @@ -254,6 +311,7 @@ func (t *Tracker) ParSigDBInternalEvent(ctx context.Context, duty core.Duty, dat duty: duty, component: parSigDBInternal, pubkey: pubkey, + shareIdx: pSig.ShareIdx, } } } diff --git a/core/tracker/tracker_internal_test.go b/core/tracker/tracker_internal_test.go index 8aeb940ea..3c926978a 100644 --- a/core/tracker/tracker_internal_test.go +++ b/core/tracker/tracker_internal_test.go @@ -17,6 +17,7 @@ package tracker import ( "context" + "reflect" "testing" eth2v1 "github.com/attestantio/go-eth2-client/api/v1" @@ -24,11 +25,13 @@ import ( "github.com/stretchr/testify/require" "github.com/obolnetwork/charon/core" + "github.com/obolnetwork/charon/p2p" "github.com/obolnetwork/charon/testutil" ) -func TestTracker(t *testing.T) { - duty, defSet, pubkey, unsignedDataSet, parSignedDataSet := setupData(t) +func TestTrackerFailedDuty(t *testing.T) { + const slot = 1 + testData, pubkeys := setupData(t, []int{slot}) t.Run("FailAtConsensus", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -36,22 +39,30 @@ func TestTracker(t *testing.T) { deadlineChan: make(chan core.Duty), } + count := 0 failedDutyReporter := func(failedDuty core.Duty, isFailed bool, component string, msg string) { - require.Equal(t, duty, failedDuty) + require.Equal(t, testData[0].duty, failedDuty) require.True(t, isFailed) require.Equal(t, component, "consensus") - cancel() + count++ + + if count == len(testData) { + cancel() + } } - tr := New(deadliner) + tr := New(deadliner, []p2p.Peer{}) tr.failedDutyReporter = failedDutyReporter + tr.participationReporter = func(_ context.Context, _ core.Duty, _ map[int]bool) {} go func() { - require.NoError(t, tr.SchedulerEvent(ctx, duty, defSet)) - require.NoError(t, tr.FetcherEvent(ctx, duty, unsignedDataSet)) + for _, td := range testData { + require.NoError(t, tr.SchedulerEvent(ctx, td.duty, td.defSet)) + require.NoError(t, tr.FetcherEvent(ctx, td.duty, td.unsignedDataSet)) - // Explicitly mark the current duty as deadlined. - deadliner.deadlineChan <- duty + // Explicitly mark the current duty as deadlined. + deadliner.deadlineChan <- td.duty + } }() require.ErrorIs(t, tr.Run(ctx), context.Canceled) @@ -63,34 +74,144 @@ func TestTracker(t *testing.T) { deadlineChan: make(chan core.Duty), } + count := 0 failedDutyReporter := func(failedDuty core.Duty, isFailed bool, component string, msg string) { - require.Equal(t, duty, failedDuty) + require.Equal(t, testData[0].duty, failedDuty) require.False(t, isFailed) require.Equal(t, "sigAgg", component) - cancel() + count++ + + if count == len(testData) { + cancel() + } } - tr := New(deadliner) + tr := New(deadliner, []p2p.Peer{}) tr.failedDutyReporter = failedDutyReporter go func() { - require.NoError(t, tr.SchedulerEvent(ctx, duty, defSet)) - require.NoError(t, tr.FetcherEvent(ctx, duty, unsignedDataSet)) - require.NoError(t, tr.ConsensusEvent(ctx, duty, unsignedDataSet)) - require.NoError(t, tr.ValidatorAPIEvent(ctx, duty, parSignedDataSet)) - require.NoError(t, tr.ParSigDBInternalEvent(ctx, duty, parSignedDataSet)) - require.NoError(t, tr.ParSigExEvent(ctx, duty, parSignedDataSet)) - require.NoError(t, tr.ParSigDBThresholdEvent(ctx, duty, pubkey, nil)) - require.NoError(t, tr.SigAggEvent(ctx, duty, pubkey, nil)) + for _, td := range testData { + require.NoError(t, tr.SchedulerEvent(ctx, td.duty, td.defSet)) + require.NoError(t, tr.FetcherEvent(ctx, td.duty, td.unsignedDataSet)) + require.NoError(t, tr.ConsensusEvent(ctx, td.duty, td.unsignedDataSet)) + require.NoError(t, tr.ValidatorAPIEvent(ctx, td.duty, td.parSignedDataSet)) + require.NoError(t, tr.ParSigDBInternalEvent(ctx, td.duty, td.parSignedDataSet)) + require.NoError(t, tr.ParSigExEvent(ctx, td.duty, td.parSignedDataSet)) + for _, pubkey := range pubkeys { + require.NoError(t, tr.ParSigDBThresholdEvent(ctx, td.duty, pubkey, nil)) + require.NoError(t, tr.SigAggEvent(ctx, td.duty, pubkey, nil)) + } - // Explicitly mark the current duty as deadlined. - deadliner.deadlineChan <- duty + // Explicitly mark the current duty as deadlined. + deadliner.deadlineChan <- td.duty + } }() require.ErrorIs(t, tr.Run(ctx), context.Canceled) }) } +func TestTrackerParticipation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + slots := []int{1, 2, 3} + testData, pubkeys := setupData(t, slots) + + // Assuming a DV with 4 nodes. + numPeers := 4 + var peers []p2p.Peer + for i := 0; i < numPeers; i++ { + peers = append(peers, p2p.Peer{Index: i}) + } + + // Participation set per duty for a cluster. + expectedParticipationPerDuty := map[core.Duty]map[int]bool{ + testData[0].duty: { + 1: true, + 2: true, + 3: true, + 4: true, + }, + testData[1].duty: { + 1: true, + 2: true, + 4: true, + }, + testData[2].duty: { + 1: true, + 2: true, + 4: true, + }, + } + + // ParSignedDataSet to be sent by ParSigExEvent per duty per peer for all the DVs. + psigDataPerDutyPerPeer := make(map[core.Duty][]core.ParSignedDataSet) + for _, td := range testData { + // ParSignedDataSet for each peer. + var data []core.ParSignedDataSet + for _, p := range peers { + set := make(core.ParSignedDataSet) + for _, pk := range pubkeys { + if !expectedParticipationPerDuty[td.duty][p.ShareIdx()] { + // This peer hasn't participated in this duty for this DV. + continue + } + + set[pk] = core.ParSignedData{ShareIdx: p.ShareIdx()} + } + + data = append(data, set) + } + + psigDataPerDutyPerPeer[td.duty] = data + } + + deadliner := testDeadliner{deadlineChan: make(chan core.Duty)} + tr := New(deadliner, peers) + + var count int + var lastParticipation map[int]bool + tr.participationReporter = func(_ context.Context, actualDuty core.Duty, actualParticipation map[int]bool) { + require.Equal(t, testData[count].duty, actualDuty) + require.True(t, reflect.DeepEqual(actualParticipation, expectedParticipationPerDuty[testData[count].duty])) + + if count == 2 { + // For third duty, last Participation should be equal to that of second duty. + require.Equal(t, expectedParticipationPerDuty[testData[count].duty], lastParticipation) + } else { + require.NotEqual(t, expectedParticipationPerDuty[testData[count].duty], lastParticipation) + } + count++ + + if count == len(testData) { + // Signal exit to central go routine. + cancel() + } + + lastParticipation = actualParticipation + } + + // Ignore failedDutyReporter part to isolate participation only. + tr.failedDutyReporter = func(core.Duty, bool, string, string) {} + + go func() { + for _, td := range testData { + require.NoError(t, tr.ParSigDBInternalEvent(ctx, td.duty, td.parSignedDataSet)) + for _, data := range psigDataPerDutyPerPeer[td.duty] { + require.NoError(t, tr.ParSigExEvent(ctx, td.duty, data)) + } + for _, pk := range pubkeys { + require.NoError(t, tr.ParSigDBThresholdEvent(ctx, td.duty, pk, nil)) + require.NoError(t, tr.SigAggEvent(ctx, td.duty, pk, nil)) + } + + // Explicitly mark the current duty as deadlined. + deadliner.deadlineChan <- td.duty + } + }() + + require.ErrorIs(t, tr.Run(ctx), context.Canceled) +} + // testDeadliner is a mock deadliner implementation. type testDeadliner struct { deadlineChan chan core.Duty @@ -104,59 +225,70 @@ func (t testDeadliner) C() <-chan core.Duty { return t.deadlineChan } -// setupData returns the data required to test tracker. -func setupData(t *testing.T) (core.Duty, core.DutyDefinitionSet, core.PubKey, core.UnsignedDataSet, core.ParSignedDataSet) { +// testDutyData represents data for each duty. +type testDutyData struct { + duty core.Duty + defSet core.DutyDefinitionSet + unsignedDataSet core.UnsignedDataSet + parSignedDataSet core.ParSignedDataSet +} + +// setupData returns test duty data and pubkeys required to test tracker. +func setupData(t *testing.T, slots []int) ([]testDutyData, []core.PubKey) { t.Helper() const ( - slot = 1 - vIdxA = 2 - vIdxB = 3 + vIdxA = 1 + vIdxB = 2 notZero = 99 // Validation require non-zero values ) - pubkey := testutil.RandomCorePubKey(t) - pubkeysByIdx := map[eth2p0.ValidatorIndex]core.PubKey{ - vIdxA: pubkey, - vIdxB: pubkey, - } - - dutyA := eth2v1.AttesterDuty{ - Slot: slot, - ValidatorIndex: vIdxA, - CommitteeIndex: vIdxA, - CommitteeLength: notZero, - CommitteesAtSlot: notZero, + vIdxA: testutil.RandomCorePubKey(t), + vIdxB: testutil.RandomCorePubKey(t), } - dutyB := eth2v1.AttesterDuty{ - Slot: slot, - ValidatorIndex: vIdxB, - CommitteeIndex: vIdxB, - CommitteeLength: notZero, - CommitteesAtSlot: notZero, - } + var data []testDutyData - defSet := core.DutyDefinitionSet{ - pubkeysByIdx[vIdxA]: core.NewAttesterDefinition(&dutyA), - pubkeysByIdx[vIdxB]: core.NewAttesterDefinition(&dutyB), - } + for _, slot := range slots { + duty := core.NewAttesterDuty(int64(slot)) - duty := core.Duty{Type: core.DutyAttester, Slot: slot} + dutyA := eth2v1.AttesterDuty{ + Slot: eth2p0.Slot(slot), + ValidatorIndex: vIdxA, + CommitteeIndex: vIdxA, + CommitteeLength: notZero, + CommitteesAtSlot: notZero, + } - unsignedDataSet := make(core.UnsignedDataSet) - for pubkey := range defSet { - unsignedDataSet[pubkey] = testutil.RandomCoreAttestationData(t) - } + dutyB := eth2v1.AttesterDuty{ + Slot: eth2p0.Slot(slot), + ValidatorIndex: vIdxB, + CommitteeIndex: vIdxB, + CommitteeLength: notZero, + CommitteesAtSlot: notZero, + } - parSignedDataSet := make(core.ParSignedDataSet) - for pubkey := range defSet { - parSignedDataSet[pubkey] = core.ParSignedData{ - SignedData: nil, - ShareIdx: 0, + defset := core.DutyDefinitionSet{ + pubkeysByIdx[vIdxA]: core.NewAttesterDefinition(&dutyA), + pubkeysByIdx[vIdxB]: core.NewAttesterDefinition(&dutyB), } + + unsignedset := make(core.UnsignedDataSet) + unsignedset[pubkeysByIdx[vIdxA]] = testutil.RandomCoreAttestationData(t) + unsignedset[pubkeysByIdx[vIdxB]] = testutil.RandomCoreAttestationData(t) + + parsignedset := make(core.ParSignedDataSet) + parsignedset[pubkeysByIdx[vIdxA]] = core.ParSignedData{ShareIdx: 1} + parsignedset[pubkeysByIdx[vIdxB]] = core.ParSignedData{ShareIdx: 1} + + data = append(data, testDutyData{ + duty: duty, + defSet: defset, + unsignedDataSet: unsignedset, + parSignedDataSet: parsignedset, + }) } - return duty, defSet, pubkey, unsignedDataSet, parSignedDataSet + return data, []core.PubKey{pubkeysByIdx[vIdxA], pubkeysByIdx[vIdxB]} } diff --git a/p2p/peer.go b/p2p/peer.go index cf2093d0c..32adba198 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -46,6 +46,11 @@ type Peer struct { Name string } +// ShareIdx returns share index of this Peer. ShareIdx is 1-indexed while peerIdx is 0-indexed. +func (p Peer) ShareIdx() int { + return p.Index + 1 +} + // NewPeer returns a new charon peer. func NewPeer(record enr.Record, index int) (Peer, error) { var enodePubkey enode.Secp256k1