diff --git a/core/consensus/component.go b/core/consensus/component.go index 2affb687e..4221df9b8 100644 --- a/core/consensus/component.go +++ b/core/consensus/component.go @@ -322,6 +322,8 @@ func (c *Component) propose(ctx context.Context, duty core.Duty, value proto.Mes if !decided { consensusTimeout.WithLabelValues(duty.Type.String()).Inc() + + return errors.New("consensus timeout", z.Str("duty", duty.String())) } return nil diff --git a/core/interfaces.go b/core/interfaces.go index 7a99ca42c..464a199ca 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -178,6 +178,9 @@ type Tracker interface { // FetcherFetched sends Fetcher component's events to tracker. FetcherFetched(context.Context, Duty, DutyDefinitionSet, error) + // ConsensusProposed sends Consensus component's events to tracker. + ConsensusProposed(context.Context, Duty, UnsignedDataSet, error) + // DutyDBStored sends DutyDB component's store events to tracker. DutyDBStored(context.Context, Duty, UnsignedDataSet, error) @@ -187,6 +190,9 @@ type Tracker interface { // ParSigDBStoredExternal sends ParSigDB component's store external events to tracker. ParSigDBStoredExternal(context.Context, Duty, ParSignedDataSet, error) + // SigAggAggregated sends SigAgg component's events to tracker. + SigAggAggregated(context.Context, Duty, PubKey, []ParSignedData, error) + // BroadcasterBroadcast sends Broadcaster component's broadcast events to tracker. BroadcasterBroadcast(context.Context, Duty, PubKey, SignedData, error) } diff --git a/core/sigagg/sigagg.go b/core/sigagg/sigagg.go index 64aca8cdb..a471035a6 100644 --- a/core/sigagg/sigagg.go +++ b/core/sigagg/sigagg.go @@ -51,6 +51,8 @@ func (a *Aggregator) Subscribe(fn func(context.Context, core.Duty, core.PubKey, // Aggregate aggregates the partially signed duty data for the DV. func (a *Aggregator) Aggregate(ctx context.Context, duty core.Duty, pubkey core.PubKey, parSigs []core.ParSignedData) error { + ctx = log.WithTopic(ctx, "sigagg") + if len(parSigs) < a.threshold { return errors.New("require threshold signatures") } else if a.threshold == 0 { diff --git a/core/tracker/tracker2/metrics.go b/core/tracker/tracker2/metrics.go index 871189d31..69713aed6 100644 --- a/core/tracker/tracker2/metrics.go +++ b/core/tracker/tracker2/metrics.go @@ -56,11 +56,4 @@ var ( Name: "inconsistent_parsigs_total", Help: "Total number of duties that contained inconsistent partial signed data by duty type", }, []string{"duty"}) - - inclusionDelay = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "core", - Subsystem: "tracker", - Name: "inclusion_delay", - Help: "Cluster's average attestation inclusion delay in slots", - }) ) diff --git a/core/tracker/tracker2/tracker.go b/core/tracker/tracker2/tracker.go index 26f2f5fb2..231924132 100644 --- a/core/tracker/tracker2/tracker.go +++ b/core/tracker/tracker2/tracker.go @@ -19,6 +19,10 @@ import ( "context" "encoding/json" "fmt" + "sort" + "strings" + + "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" @@ -28,10 +32,9 @@ import ( // Steps arranged in the order they are triggered in the core workflow. const ( zero step = iota - scheduler // Duty scheduled with definition fetcher // Duty data fetched consensus // Duty data consensus reached - dutyDBStore // Duty data stored in DutyDB + dutyDB // Duty data stored in DutyDB validatorAPI // Partial signed data from local VC submitted to vapi parSigDBInternal // Partial signed data from local VC stored in parsigdb parSigEx // Partial signed data from other VC received via parsigex @@ -39,15 +42,13 @@ const ( parSigDBThreshold // Partial signed data threshold reached; emitted from parsigdb sigAgg // Partial signed data aggregated; emitted from sigagg bcast // Aggregated data submitted to beacon node - sentinel ) var stepLabels = map[step]string{ zero: "unknown", - scheduler: "scheduler", fetcher: "fetcher", consensus: "consensus", - dutyDBStore: "duty_db_store", + dutyDB: "duty_db", validatorAPI: "validator_api", parSigDBInternal: "parsig_db_local", parSigEx: "parsig_exchange", @@ -93,6 +94,17 @@ const ( // This indicates the associated prepare aggregation duty failed. msgFetcherAggregatorFailedPrepare = "couldn't aggregate attestation due to failed prepare aggregator duty" + // msgFetcherAggregatorNoExternalPrepares indicates an attestation aggregation duty failed in + // the fetcher step since it couldn't fetch the prerequisite aggregated beacon committee selections. + // This indicates the associated prepare aggregation duty failed due to no partial beacon committee selections received from peers. + msgFetcherAggregatorNoExternalPrepares = "couldn't aggregate attestation due to no partial beacon committee selections received from peers" + + // msgFetcherAggregatorFailedSigAggPrepare indicates an attestation aggregation duty failed in + // the fetcher step since it couldn't fetch the prerequisite aggregated beacon committee selections. + // This indicates the associated prepare aggregation duty failed due to failure of threshold signature aggregation. + // This indicates a bug in charon as it is unexpected. + msgFetcherAggregatorFailedSigAggPrepare = "couldn't aggregate attestation due to no aggregated beacon committee selection, this is unexpected" + // msgFetcherProposerFewRandaos indicates a block proposer duty failed in // the fetcher step since it couldn't fetch the prerequisite aggregated RANDAO. // This indicates the associated randao duty failed due to insufficient partial randao signatures @@ -110,6 +122,17 @@ const ( // This indicates the associated randao duty failed. msgFetcherProposerFailedRandao = "couldn't propose block due to failed randao duty" + // msgFetcherProposerNoExternalRandaos indicates a block proposer duty failed in + // the fetcher step since it couldn't fetch the prerequisite aggregated RANDAO. + // This indicates the associated randao duty failed due to no partial randao signatures received from peers. + msgFetcherProposerNoExternalRandaos = "couldn't propose block due to no partial randao signatures received from peers" + + // msgFetcherProposerFailedSigAggRandao indicates a block proposer duty failed in + // the fetcher step since it couldn't fetch the prerequisite aggregated RANDAO. + // This indicates the associated randao duty failed due to failure of threshold signature aggregation. + // This indicates a bug in charon as it is unexpected. + msgFetcherProposerFailedSigAggRandao = "couldn't propose block due to no aggregated randao signature, this is unexpected" + // msgFetcherSyncContributionNoSyncMsg indicates a sync contribution duty failed in // the fetcher step since it couldn't fetch the prerequisite sync message. This // indicates the associated sync message duty failed to obtain a cluster agreed upon value. @@ -132,11 +155,25 @@ const ( // This indicates the associated prepare sync contribution duty failed. msgFetcherSyncContributionFailedPrepare = "couldn't fetch sync contribution due to failed prepare sync contribution duty" + // msgFetcherSyncContributionNoExternalPrepares indicates a sync contribution duty failed in + // the fetcher step since it couldn't fetch the prerequisite aggregated sync contribution selections. + // This indicates the associated prepare sync contribution duty failed due to no partial sync contribution selections received from peers. + msgFetcherSyncContributionNoExternalPrepares = "couldn't fetch sync contribution due to no partial sync contribution selections received from peers" + + // msgFetcherSyncContributionFailedSigAggPrepare indicates a sync contribution duty failed in + // the fetcher step since it couldn't fetch the prerequisite aggregated sync contribution selections. + // This indicates the associated prepare sync contribution duty failed due to failure of threshold signature aggregation. + // This indicates a bug in charon as it is unexpected. + msgFetcherSyncContributionFailedSigAggPrepare = "couldn't fetch sync contribution due to no aggregated sync contribution selection, this is unexpected" + // msgConsensus indicates a duty failed in consensus step. // This could indicate that insufficient honest peers participated in consensus or p2p network // connection problems. msgConsensus = "consensus algorithm didn't complete" + // msgDutyDB indicates a bug in the DutyDB database as it is unexpected. + msgDutyDB = "bug: failed to store duty data in DutyDB" + // msgValidatorAPI indicates that partial signature we never submitted by the local // validator client. This could indicate that the local validator client is offline, // or has connection problems with charon, or has some other problem. See validator client @@ -163,9 +200,15 @@ const ( // This indicates a bug in charon as it is unexpected (for non-sync-committee-duties). msgParSigDBInconsistent = "bug: inconsistent partial signatures received" + // msgParSigDBExternal indicates a bug in the partial signature database as it is unexpected. + msgParSigDBExternal = "bug: failed to store external partial signatures in parsigdb" + // msgSigAgg indicates that BLS threshold aggregation of sufficient partial signatures failed. This // indicates inconsistent signed data. This indicates a bug in charon as it is unexpected. msgSigAgg = "bug: threshold aggregation of partial signatures failed due to inconsistent signed data" + + // msgBcast indicates that beacon node returned an error while submitting aggregated duty signature to beacon node. + msgBcast = "failed to broadcast duty to beacon node" ) // parsigsByMsg contains partial signatures grouped by message root grouped by pubkey. @@ -277,8 +320,30 @@ func (t *Tracker) Run(ctx context.Context) error { // If the duty didn't fail, it returns false and the zero step. // It assumes that all the input events are for a single duty. // If the input events slice is empty, it returns true and the zero step. -func dutyFailedStep(es []event) (bool, step) { - return false, zero +func dutyFailedStep(es []event) (bool, step, error) { + if len(es) == 0 { + return true, zero, errors.New("") // Duty failed since no events. + } + + // Copy and sort in reverse order (see step order above). + clone := append([]event(nil), es...) + sort.Slice(clone, func(i, j int) bool { + return clone[i].step < clone[j].step + }) + + lastEvent := clone[len(clone)-1] + + // No failed step. + if lastEvent.step == bcast && lastEvent.stepErr == nil { + return false, zero, nil + } + + // Failed in last event. + if lastEvent.stepErr != nil { + return true, lastEvent.step, lastEvent.stepErr + } + + return true, lastEvent.step + 1, nil } // analyseDutyFailed detects if the given duty failed. @@ -287,9 +352,155 @@ func dutyFailedStep(es []event) (bool, step) { // where the duty got stuck and a human friendly error message explaining why. // // It returns false if the duty didn't fail, i.e., the duty -// didn't get stuck and completed the sigAgg step. -func analyseDutyFailed(duty core.Duty, allEvents map[core.Duty][]event, msgRootConsistent bool) (bool, step, string) { - return false, 0, "" +// didn't get stuck and completed the bcast step. +func analyseDutyFailed(duty core.Duty, allEvents map[core.Duty][]event, msgRootConsistent bool) (failed bool, failedStep step, failureMsg string) { + failed, step, err := dutyFailedStep(allEvents[duty]) + if !failed { + return false, zero, "" + } + + defer func() { + if err != nil { + failureMsg = fmt.Sprintf("%s with error: %s", failureMsg, err.Error()) + } + }() + + var msg string + switch step { + case fetcher: + msg = analyseFetcherFailed(duty, allEvents, err) + case consensus: + return analyseConsensusFailed(duty, err) + case dutyDB: + msg = msgDutyDB + case validatorAPI: + msg = msgValidatorAPI + case parSigDBInternal: + msg = msgParSigDBInternal + case parSigEx: + msg = msgParSigEx + case parSigDBExternal: + msg = msgParSigDBExternal + case parSigDBThreshold: + if msgRootConsistent { + msg = msgParSigDBInsufficient + } else { + msg = msgParSigDBInconsistent + if expectInconsistentParSigs(duty.Type) { + msg = msgParSigDBInconsistentSync + } + } + case sigAgg: + msg = msgSigAgg + case bcast: + msg = msgBcast + case zero: + msg = fmt.Sprintf("no events for %s duty", duty.String()) // This should never happen. + default: + msg = fmt.Sprintf("%s duty failed at %s", duty.String(), step.String()) + } + + return failed, step, msg +} + +// analyseFetcherFailed returns whether the duty that got stuck in fetcher actually failed +// and the reason which might actually be due a pre-requisite duty that failed. +func analyseFetcherFailed(duty core.Duty, allEvents map[core.Duty][]event, fetchErr error) string { + // Check for beacon api errors. + if strings.Contains(fetchErr.Error(), "beacon api") { + return msgFetcher + } + + // Proposer duties depend on randao duty, so check if that was why it failed. + if duty.Type == core.DutyProposer || duty.Type == core.DutyBuilderProposer { + // Proposer duties will fail if core.DutyRandao fails. + // Ignoring error as it will be handled in DutyRandao analysis. + randaoFailed, randaoStep, _ := dutyFailedStep(allEvents[core.NewRandaoDuty(duty.Slot)]) + if randaoFailed { + switch randaoStep { + case parSigDBThreshold: + return msgFetcherProposerFewRandaos + case parSigEx, parSigDBExternal: + return msgFetcherProposerNoExternalRandaos + case sigAgg: + return msgFetcherProposerFailedSigAggRandao + case zero: + return msgFetcherProposerZeroRandaos + default: + return msgFetcherProposerFailedRandao + } + } + } + + // Duty aggregator depend on prepare aggregator duty, so check if that was why it failed. + if duty.Type == core.DutyAggregator { + // Aggregator duties will fail if core.DutyPrepareAggregator fails. + // Ignoring error as it will be handled in DutyPrepareAggregator duty analysis. + prepAggFailed, prepAggStep, _ := dutyFailedStep(allEvents[core.NewPrepareAggregatorDuty(duty.Slot)]) + if prepAggFailed { + switch prepAggStep { + case parSigDBThreshold: + return msgFetcherAggregatorFewPrepares + case parSigEx, parSigDBExternal: + return msgFetcherAggregatorNoExternalPrepares + case sigAgg: + return msgFetcherAggregatorFailedSigAggPrepare + case zero: + return msgFetcherAggregatorZeroPrepares + default: + return msgFetcherAggregatorFailedPrepare + } + } + + // Aggregator duties will fail if no attestation data in DutyDB. + // Ignoring error as it will be handled in DutyAttester analysis. + attFailed, attStep, _ := dutyFailedStep(allEvents[core.NewAttesterDuty(duty.Slot)]) + if attFailed && attStep <= dutyDB { + return msgFetcherAggregatorNoAttData + } + } + + // Duty sync contribution depends on prepare sync contribution duty, so check if that was why it failed. + if duty.Type == core.DutySyncContribution { + // Sync contribution duties will fail if core.DutyPrepareSyncContribution fails. + // Ignoring error as it will be handled in DutyPrepareSyncContribution analysis. + prepSyncConFailed, prepSyncConStep, _ := dutyFailedStep(allEvents[core.NewPrepareSyncContributionDuty(duty.Slot)]) + if prepSyncConFailed { + switch prepSyncConStep { + case parSigDBThreshold: + return msgFetcherSyncContributionFewPrepares + case parSigEx, parSigDBExternal: + return msgFetcherSyncContributionNoExternalPrepares + case sigAgg: + return msgFetcherSyncContributionFailedSigAggPrepare + case zero: + return msgFetcherSyncContributionZeroPrepares + default: + return msgFetcherSyncContributionFailedPrepare + } + } + + // Sync contribution duties will fail if no sync message in AggSigDB. + // Ignoring error as it will be handled in DutySyncMessage analysis. + syncMsgFailed, syncMsgStep, _ := dutyFailedStep(allEvents[core.NewSyncMessageDuty(duty.Slot)]) + if syncMsgFailed && syncMsgStep <= sigAgg { + return msgFetcherSyncContributionNoSyncMsg + } + } + + return fmt.Sprintf("unknown error in fetcher: %s", fetchErr.Error()) +} + +// analyseConsensusFailed returns whether the duty that got stuck in consensus got failed +// because of error in consensus component. +func analyseConsensusFailed(duty core.Duty, consensusErr error) (bool, step, string) { + // No aggregators or sync committee contributors found in this slot. + // Fetcher sends an event with nil error in this case. + if consensusErr == nil && (duty.Type == core.DutyAggregator || duty.Type == core.DutySyncContribution) { + return false, zero, "" + } + + return true, consensus, msgConsensus } // analyseParSigs returns a mapping of partial signed data messages by peers (share index) by validator (pubkey). @@ -376,8 +587,8 @@ func analyseParticipation(duty core.Duty, allEvents map[core.Duty][]event) (map[ for _, e := range allEvents[duty] { // If we get a parSigDBInternal event, then the current node participated successfully. - // If we get a parSigEx event, then the corresponding peer with e.shareIdx participated successfully. - if e.step == parSigEx || e.step == parSigDBInternal { + // If we get a parSigDBExternal event, then the corresponding peer with e.shareIdx participated successfully. + if e.step == parSigDBExternal || e.step == parSigDBInternal { if !isParSigEventExpected(duty, e.pubkey, allEvents) { unexpectedShares[e.parSig.ShareIdx] = true continue @@ -401,7 +612,7 @@ func isParSigEventExpected(duty core.Duty, pubkey core.PubKey, allEvents map[cor // scheduled returns true if the provided duty type was scheduled for the above slot and pubkey. scheduled := func(typ core.DutyType) bool { for _, e := range allEvents[core.Duty{Slot: duty.Slot, Type: typ}] { - if e.step == scheduler && e.pubkey == pubkey { + if e.step == fetcher && e.pubkey == pubkey { return true } } @@ -486,6 +697,24 @@ func (t *Tracker) FetcherFetched(ctx context.Context, duty core.Duty, set core.D } } +// ConsensusProposed implements core.Tracker interface. +func (t *Tracker) ConsensusProposed(ctx context.Context, duty core.Duty, set core.UnsignedDataSet, stepErr error) { + for pubkey := range set { + select { + case <-ctx.Done(): + return + case <-t.quit: + return + case t.input <- event{ + duty: duty, + step: consensus, + pubkey: pubkey, + stepErr: stepErr, + }: + } + } +} + // DutyDBStored implements core.Tracker interface. func (t *Tracker) DutyDBStored(ctx context.Context, duty core.Duty, set core.UnsignedDataSet, stepErr error) { for pubkey := range set { @@ -496,7 +725,7 @@ func (t *Tracker) DutyDBStored(ctx context.Context, duty core.Duty, set core.Uns return case t.input <- event{ duty: duty, - step: dutyDBStore, + step: dutyDB, pubkey: pubkey, stepErr: stepErr, }: @@ -544,6 +773,22 @@ func (t *Tracker) ParSigDBStoredExternal(ctx context.Context, duty core.Duty, se } } +// SigAggAggregated implements core.Tracker interface. +func (t *Tracker) SigAggAggregated(ctx context.Context, duty core.Duty, pubkey core.PubKey, _ []core.ParSignedData, stepErr error) { + select { + case <-ctx.Done(): + return + case <-t.quit: + return + case t.input <- event{ + duty: duty, + step: sigAgg, + pubkey: pubkey, + stepErr: stepErr, + }: + } +} + // BroadcasterBroadcast implements core.Tracker interface. func (t *Tracker) BroadcasterBroadcast(ctx context.Context, duty core.Duty, pubkey core.PubKey, _ core.SignedData, stepErr error) { select { diff --git a/core/tracking.go b/core/tracking.go index 08e4356fc..eb179ce7e 100644 --- a/core/tracking.go +++ b/core/tracking.go @@ -30,6 +30,12 @@ func WithTracking(tracker Tracker) WireOption { return err } + w.ConsensusPropose = func(ctx context.Context, duty Duty, set UnsignedDataSet) error { + err := clone.ConsensusPropose(ctx, duty, set) + tracker.ConsensusProposed(ctx, duty, set, err) + + return err + } w.DutyDBStore = func(ctx context.Context, duty Duty, set UnsignedDataSet) error { err := clone.DutyDBStore(ctx, duty, set) tracker.DutyDBStored(ctx, duty, set, err) @@ -48,6 +54,12 @@ func WithTracking(tracker Tracker) WireOption { return err } + w.SigAggAggregate = func(ctx context.Context, duty Duty, key PubKey, data []ParSignedData) error { + err := clone.SigAggAggregate(ctx, duty, key, data) + tracker.SigAggAggregated(ctx, duty, key, data, err) + + return err + } w.BroadcasterBroadcast = func(ctx context.Context, duty Duty, pubkey PubKey, data SignedData) error { err := clone.BroadcasterBroadcast(ctx, duty, pubkey, data) tracker.BroadcasterBroadcast(ctx, duty, pubkey, data, err)