Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/tracker: implement analyse cluster participation #846

Merged
merged 8 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/tracker/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright © 2022 Obol Labs Inc.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <http://www.gnu.org/licenses/>.

package tracker

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var participationGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Namespace: core
Subsystem: tracker
Name: duty_participation (or just participation)

Align label to "duty", "peer" and "pubkey".

Namespace: "core",
Subsystem: "tracker",
Name: "participation",
Help: "Set to 1 if peer participated successfully for the given duty and Distributed Validator public key or else 0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove pubkey stuff from help

}, []string{"duty", "peer"})
87 changes: 71 additions & 16 deletions core/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"fmt"
"sort"

"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/p2p"
)

//go:generate stringer -type=component
Expand All @@ -47,6 +50,7 @@ type event struct {
duty core.Duty
component component
pubkey core.PubKey
shareIdx int // This is an optional field only set by parSigEx and parSigDBInternal events.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and validatorapi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also maybe add: ShareIdx is 1 indexed, so 0 indicates unset.

}

// Tracker represents the component that listens to events from core workflow components.
Expand All @@ -61,16 +65,20 @@ type Tracker struct {

// failedDutyReporter instruments the duty. It ignores non-failed duties.
failedDutyReporter func(core.Duty, bool, string, string)

// participationReporter logs and instruments the participation.
participationReporter func(context.Context, core.Duty, map[int]bool)
}

// New returns a new Tracker.
func New(deadliner core.Deadliner) *Tracker {
func New(deadliner core.Deadliner, peers []p2p.Peer) *Tracker {
t := &Tracker{
input: make(chan event),
events: make(map[core.Duty][]event),
quit: make(chan struct{}),
deadliner: deadliner,
failedDutyReporter: failedDutyReporter,
input: make(chan event),
events: make(map[core.Duty][]event),
quit: make(chan struct{}),
deadliner: deadliner,
failedDutyReporter: failedDutyReporter,
participationReporter: newParticipationReporter(peers),
}

return t
Expand All @@ -94,11 +102,10 @@ func (t *Tracker) Run(ctx context.Context) error {
t.events[e.duty] = append(t.events[e.duty], e)
case duty := <-t.deadliner.C():
failed, failedComponent, failedMsg := analyseDutyFailed(duty, t.events[duty])

t.failedDutyReporter(duty, failed, failedComponent.String(), failedMsg)

// TODO(dhruv): Case of cluster participation
// analyseParticipation(duty, t.events[duty])
currentParticipation := analyseParticipation(t.events[duty])
Copy link
Contributor

@corverroos corverroos Jul 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: participatedShares := ...

t.participationReporter(ctx, duty, currentParticipation)

delete(t.events, duty)
}
Expand Down Expand Up @@ -134,11 +141,57 @@ func analyseDutyFailed(duty core.Duty, es []event) (bool, component, string) {
// TODO(xenowits): Implement logic for reporting duties.
func failedDutyReporter(core.Duty, bool, string, string) {}

// analyseParticipation returns the share indexes of peers that participated in this duty.
// TODO(dhruv): implement logic to analyse participation.
//nolint:deadcode
func analyseParticipation(core.Duty, []event) []int {
return nil
// analyseParticipation returns a set of share indexes of participated peers.
func analyseParticipation(events []event) map[int]bool {
// Set of shareIdx of participated peers.
resp := make(map[int]bool)

for _, e := range events {
// If we get a parSigDBInternal event, then the current node participated successfully.
// If we get a parSigEx event, then the corresponding peer with e.shareIdx participated successfully.
if e.shareIdx > 0 && (e.component == parSigEx || e.component == parSigDBInternal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather be defensive, error if shareIdx 0 for expected components.

resp[e.shareIdx] = true
}
}

return resp
}

// newParticipationReporter returns a new participation reporter function which logs and instruments peer participation.
func newParticipationReporter(peers []p2p.Peer) func(context.Context, core.Duty, map[int]bool) {
// lastParticipation is the set of peers who participated in the last duty.
lastParticipation := make(map[int]bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prevAbsent := make(...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just make this []string and compare against fmt.Sprint(absent) != fmt.Sprint(prevAbsent)


return func(ctx context.Context, duty core.Duty, currentParticipation map[int]bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: participatedShares

var absentPeers []string
for _, peer := range peers {
if currentParticipation[peer.ShareIdx()] {
participationGauge.WithLabelValues(duty.String(), peer.Name).Set(1)
} else {
absentPeers = append(absentPeers, peer.Name)
participationGauge.WithLabelValues(duty.String(), peer.Name).Set(0)
}
}

uniqueParticipation := false
for k, v := range lastParticipation {
val, ok := currentParticipation[k]
if !ok || val != v {
uniqueParticipation = true
break
}
}

// Avoid identical logs if same set of peers didn't participated.
if len(absentPeers) > 0 && !uniqueParticipation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if fmt.Sprint(absent) != fmt.Sprint(prevAbsent) {
  if len(absent) == 0 {
    log.Info(ctx, "All peers participated in duty", z.Any("duty"))
  else {
    log.Info(ctx, "Not all peers participated in duty", z.Any("duty"), z.Any("absent", absentPeers))
   }
    
  }
}

log.Info(ctx, "Peers didn't participate",
z.Str("duty", duty.String()),
z.Any("peers", absentPeers),
)
}

lastParticipation = currentParticipation
}
}

// SchedulerEvent inputs event from core.Scheduler component.
Expand Down Expand Up @@ -223,7 +276,7 @@ func (t *Tracker) ValidatorAPIEvent(ctx context.Context, duty core.Duty, data co

// ParSigExEvent inputs event from core.ParSigEx component.
func (t *Tracker) ParSigExEvent(ctx context.Context, duty core.Duty, data core.ParSignedDataSet) error {
for pubkey := range data {
for pubkey, pSig := range data {
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -234,6 +287,7 @@ func (t *Tracker) ParSigExEvent(ctx context.Context, duty core.Duty, data core.P
duty: duty,
component: parSigEx,
pubkey: pubkey,
shareIdx: pSig.ShareIdx,
}
}
}
Expand All @@ -243,7 +297,7 @@ func (t *Tracker) ParSigExEvent(ctx context.Context, duty core.Duty, data core.P

// ParSigDBInternalEvent inputs events from core.ParSigDB component for internal store event.
func (t *Tracker) ParSigDBInternalEvent(ctx context.Context, duty core.Duty, data core.ParSignedDataSet) error {
for pubkey := range data {
for pubkey, pSig := range data {
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -254,6 +308,7 @@ func (t *Tracker) ParSigDBInternalEvent(ctx context.Context, duty core.Duty, dat
duty: duty,
component: parSigDBInternal,
pubkey: pubkey,
shareIdx: pSig.ShareIdx,
}
}
}
Expand Down
Loading