From 708c3454c3fe05187b65302fc39e4c92ddb9037e Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Thu, 15 Dec 2022 22:39:20 +0530 Subject: [PATCH] app/monitoringapi: detect all validators queried by VC (#1566) Configures monitoringapi to detect all validators queried by VC. category: feature ticket: #1501 --- app/app.go | 36 ++++++++++++++++--- app/metrics.go | 4 +++ app/monitoringapi.go | 37 +++++++++++++++++-- app/monitoringapi_internal_test.go | 50 ++++++++++++++++++++++---- core/validatorapi/validatorapi.go | 6 +++- core/validatorapi/validatorapi_test.go | 38 ++++++++++---------- 6 files changed, 139 insertions(+), 32 deletions(-) diff --git a/app/app.go b/app/app.go index bdce71c40..027cfda92 100644 --- a/app/app.go +++ b/app/app.go @@ -225,11 +225,19 @@ func Run(ctx context.Context, conf Config) (err error) { qbftDebug := newQBFTDebugger() + // seenPubkeys channel to send seen public keys from validatorapi to monitoringapi. + seenPubkeys := make(chan core.PubKey) + + pubkeys, err := getDVPubkeys(lock) + if err != nil { + return err + } + wireMonitoringAPI(ctx, life, conf.MonitoringAddr, localEnode, tcpNode, eth2Cl, peerIDs, - promRegistry, qbftDebug) + promRegistry, qbftDebug, pubkeys, seenPubkeys) err = wireCoreWorkflow(ctx, life, conf, lock, nodeIdx, tcpNode, p2pKey, eth2Cl, - peerIDs, sender, qbftDebug.AddInstance) + peerIDs, sender, qbftDebug.AddInstance, seenPubkeys) if err != nil { return err } @@ -322,7 +330,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, lock cluster.Lock, nodeIdx cluster.NodeIdx, tcpNode host.Host, p2pKey *ecdsa.PrivateKey, eth2Cl eth2wrap.Client, peerIDs []peer.ID, sender *p2p.Sender, - qbftSniffer func(*pbv1.SniffedConsensusInstance), + qbftSniffer func(*pbv1.SniffedConsensusInstance), seenPubkeys chan core.PubKey, ) error { // Convert and prep public keys and public shares var ( @@ -405,7 +413,8 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, dutyDB := dutydb.NewMemDB(deadlinerFunc("dutydb")) - vapi, err := validatorapi.NewComponent(eth2Cl, pubSharesByKey, nodeIdx.ShareIdx, lock.FeeRecipientAddress, conf.BuilderAPI) + vapi, err := validatorapi.NewComponent(eth2Cl, pubSharesByKey, nodeIdx.ShareIdx, lock.FeeRecipientAddress, + conf.BuilderAPI, seenPubkeys) if err != nil { return err } @@ -821,6 +830,25 @@ func setFeeRecipient(eth2Cl eth2wrap.Client, pubkeys []eth2p0.BLSPubKey, feeReci } } +// getDVPubkeys returns DV public keys from given cluster.Lock. +func getDVPubkeys(lock cluster.Lock) ([]core.PubKey, error) { + var pubkeys []core.PubKey + for _, dv := range lock.Validators { + pk, err := dv.PublicKey() + if err != nil { + return nil, err + } + + pubkey, err := tblsconv.KeyToCore(pk) + if err != nil { + return nil, err + } + pubkeys = append(pubkeys, pubkey) + } + + return pubkeys, nil +} + // httpServeHook wraps a http.Server.ListenAndServe function, swallowing http.ErrServerClosed. type httpServeHook func() error diff --git a/app/metrics.go b/app/metrics.go index 7dae93b5c..37a5785b5 100644 --- a/app/metrics.go +++ b/app/metrics.go @@ -32,6 +32,10 @@ const ( // readyzInsufficientPeers indicates the readyz is returning 500s since this node isn't connected // to quorum peers via the P2P network. readyzInsufficientPeers = 4 + // readyzVCNotConfigured indicates the readyz is returning 500s since VC is not configured for this node. + readyzVCNotConfigured = 5 + // readyVCMissingValidators indicates the ready is returning 500s since VC missing some validators. + readyzVCMissingValidators = 6 ) var ( diff --git a/app/monitoringapi.go b/app/monitoringapi.go index 82cf7898a..a3e246f33 100644 --- a/app/monitoringapi.go +++ b/app/monitoringapi.go @@ -34,6 +34,7 @@ import ( "github.com/obolnetwork/charon/app/eth2wrap" "github.com/obolnetwork/charon/app/lifecycle" "github.com/obolnetwork/charon/cluster" + "github.com/obolnetwork/charon/core" ) var ( @@ -41,6 +42,8 @@ var ( errReadyInsufficientPeers = errors.New("quorum peers not connected") errReadyBeaconNodeSyncing = errors.New("beacon node not synced") errReadyBeaconNodeDown = errors.New("beacon node down") + errReadyVCNotConfigured = errors.New("vc not configured") + errReadyVCMissingVals = errors.New("vc missing some validators") ) // wireMonitoringAPI constructs the monitoring API and registers it with the life cycle manager. @@ -48,6 +51,7 @@ var ( func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string, localNode *enode.LocalNode, tcpNode host.Host, eth2Cl eth2wrap.Client, peerIDs []peer.ID, registry *prometheus.Registry, qbftDebug http.Handler, + pubkeys []core.PubKey, seenPubkeys chan core.PubKey, ) { mux := http.NewServeMux() @@ -66,7 +70,7 @@ func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string writeResponse(w, http.StatusOK, "ok") })) - readyErrFunc := startReadyChecker(ctx, tcpNode, eth2Cl, peerIDs, clockwork.NewRealClock()) + readyErrFunc := startReadyChecker(ctx, tcpNode, eth2Cl, peerIDs, clockwork.NewRealClock(), pubkeys, seenPubkeys) mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { readyErr := readyErrFunc() if readyErr != nil { @@ -98,7 +102,9 @@ func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string } // startReadyChecker returns function which returns an error resulting from ready checks periodically. -func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID, clock clockwork.Clock) func() error { +func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID, + clock clockwork.Clock, pubkeys []core.PubKey, seenPubkeys chan core.PubKey, +) func() error { const minNotConnected = 6 // Require 6 rounds (1min) of too few connected var ( mu sync.Mutex @@ -107,11 +113,29 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client ) go func() { ticker := clock.NewTicker(10 * time.Second) + epochTicker := clock.NewTicker(32 * 12 * time.Second) // 32 slots * 12 second slot time + previous := make(map[core.PubKey]bool) + + // newCurrent returns a new current map, populated with all the pubkeys. + newCurrent := func() map[core.PubKey]bool { + current := make(map[core.PubKey]bool) + for _, pubkey := range pubkeys { + current[pubkey] = true + } + + return current + } + + // Initialise current. + current := newCurrent() for { select { case <-ctx.Done(): return + case <-epochTicker.Chan(): + // Copy current to previous and clear current. + previous, current = current, newCurrent() case <-ticker.Chan(): if quorumPeersConnected(peerIDs, tcpNode) { notConnectedRounds = 0 @@ -129,6 +153,12 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client } else if notConnectedRounds >= minNotConnected { err = errReadyInsufficientPeers readyzGauge.Set(readyzInsufficientPeers) + } else if len(previous) == len(pubkeys) { + err = errReadyVCNotConfigured + readyzGauge.Set(readyzVCNotConfigured) + } else if len(previous) > 0 { + err = errReadyVCMissingVals + readyzGauge.Set(readyzVCMissingValidators) } else { readyzGauge.Set(readyzReady) } @@ -136,6 +166,9 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client mu.Lock() readyErr = err mu.Unlock() + case pubkey := <-seenPubkeys: + // Delete pubkey if called by a VC. + delete(current, pubkey) } } }() diff --git a/app/monitoringapi_internal_test.go b/app/monitoringapi_internal_test.go index 510c9c8a0..c16437fec 100644 --- a/app/monitoringapi_internal_test.go +++ b/app/monitoringapi_internal_test.go @@ -27,16 +27,19 @@ import ( "github.com/stretchr/testify/require" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/core" "github.com/obolnetwork/charon/testutil" "github.com/obolnetwork/charon/testutil/beaconmock" ) func TestStartChecker(t *testing.T) { + pubkeys := []core.PubKey{testutil.RandomCorePubKey(t), testutil.RandomCorePubKey(t), testutil.RandomCorePubKey(t)} tests := []struct { name string isSyncing bool numPeers int absentPeers int + seenPubkeys []core.PubKey err error }{ { @@ -44,12 +47,14 @@ func TestStartChecker(t *testing.T) { isSyncing: false, numPeers: 5, absentPeers: 0, + seenPubkeys: pubkeys, }, { name: "syncing", isSyncing: true, numPeers: 5, absentPeers: 0, + seenPubkeys: pubkeys, err: errReadyBeaconNodeSyncing, }, { @@ -57,13 +62,30 @@ func TestStartChecker(t *testing.T) { isSyncing: false, numPeers: 5, absentPeers: 3, + seenPubkeys: pubkeys, err: errReadyInsufficientPeers, }, + { + name: "vc not configured", + isSyncing: false, + numPeers: 4, + absentPeers: 0, + err: errReadyVCNotConfigured, + }, + { + name: "vc missing some validators", + isSyncing: false, + numPeers: 4, + absentPeers: 0, + seenPubkeys: []core.PubKey{pubkeys[0]}, + err: errReadyVCMissingVals, + }, { name: "success", isSyncing: false, numPeers: 4, absentPeers: 1, + seenPubkeys: pubkeys, }, } @@ -103,13 +125,21 @@ func TestStartChecker(t *testing.T) { } clock := clockwork.NewFakeClock() - readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock) + seenPubkeys := make(chan core.PubKey) + readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock, pubkeys, seenPubkeys) + + for _, pubkey := range tt.seenPubkeys { + seenPubkeys <- pubkey + } - // We wrap the Advance() calls with blockers to make sure that the ticker - // can go to sleep and produce ticks without time passing in parallel. - clock.BlockUntil(1) - clock.Advance(15 * time.Second) - clock.BlockUntil(1) + // Advance clock for first tick. + advanceClock(clock, 10*time.Second, 2) + + // Advance clock for first epoch tick. + advanceClock(clock, 384*time.Second, 2) + + // Advance clock for last tick. + advanceClock(clock, 10*time.Second, 2) if tt.err != nil { require.Eventually(t, func() bool { @@ -129,3 +159,11 @@ func TestStartChecker(t *testing.T) { }) } } + +func advanceClock(clock clockwork.FakeClock, duration time.Duration, numTickers int) { + // We wrap the Advance() calls with blockers to make sure that the ticker + // can go to sleep and produce ticks without time passing in parallel. + clock.BlockUntil(numTickers) + clock.Advance(duration) + clock.BlockUntil(numTickers) +} diff --git a/core/validatorapi/validatorapi.go b/core/validatorapi/validatorapi.go index 0baa097ec..6ffe55f55 100644 --- a/core/validatorapi/validatorapi.go +++ b/core/validatorapi/validatorapi.go @@ -56,7 +56,7 @@ func NewComponentInsecure(_ *testing.T, eth2Cl eth2wrap.Client, shareIdx int) (* // NewComponent returns a new instance of the validator API core workflow component. func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]*bls_sig.PublicKey, - shareIdx int, feeRecipientAddress string, builderAPI bool, + shareIdx int, feeRecipientAddress string, builderAPI bool, seenPubkeys chan core.PubKey, ) (*Component, error) { // Create pubkey mappings. var ( @@ -110,6 +110,10 @@ func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]* return eth2p0.BLSPubKey{}, errors.New("unknown public share") } + if seenPubkeys != nil { + seenPubkeys <- core.PubKeyFrom48Bytes(key) + } + return key, nil } diff --git a/core/validatorapi/validatorapi_test.go b/core/validatorapi/validatorapi_test.go index 845cac164..b439a2b31 100644 --- a/core/validatorapi/validatorapi_test.go +++ b/core/validatorapi/validatorapi_test.go @@ -191,7 +191,7 @@ func TestSubmitAttestations_Verify(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) vapi.RegisterPubKeyByAttestation(func(ctx context.Context, slot, commIdx, valCommIdx int64) (core.PubKey, error) { @@ -291,7 +291,7 @@ func TestSignAndVerify(t *testing.T) { // Setup validatorapi component. vapi, err := validatorapi.NewComponent(bmock, map[*bls_sig.PublicKey]*bls_sig.PublicKey{ pubkey: pubkey, - }, 0, "", false) + }, 0, "", false, nil) require.NoError(t, err) vapi.RegisterPubKeyByAttestation(func(context.Context, int64, int64, int64) (core.PubKey, error) { return tblsconv.KeyToCore(pubkey) @@ -413,7 +413,7 @@ func TestComponent_SubmitBeaconBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) // Prepare unsigned beacon block @@ -491,7 +491,7 @@ func TestComponent_SubmitBeaconBlockInvalidSignature(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) // Prepare unsigned beacon block @@ -549,7 +549,7 @@ func TestComponent_SubmitBeaconBlockInvalidBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) vapi.RegisterGetDutyDefinition(func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { @@ -707,7 +707,7 @@ func TestComponent_SubmitBlindedBeaconBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, nil) require.NoError(t, err) // Prepare unsigned beacon block @@ -781,7 +781,7 @@ func TestComponent_SubmitBlindedBeaconBlockInvalidSignature(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, nil) require.NoError(t, err) // Prepare unsigned beacon block @@ -841,7 +841,7 @@ func TestComponent_SubmitBlindedBeaconBlockInvalidBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, nil) require.NoError(t, err) vapi.RegisterGetDutyDefinition(func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { @@ -911,7 +911,7 @@ func TestComponent_SubmitVoluntaryExit(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) // Prepare unsigned voluntary exit @@ -973,7 +973,7 @@ func TestComponent_SubmitVoluntaryExitInvalidSignature(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) // Register subscriber @@ -1030,7 +1030,7 @@ func TestComponent_Duties(t *testing.T) { } // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) duties, err := vapi.ProposerDuties(ctx, eth2p0.Epoch(epch), []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}) require.NoError(t, err) @@ -1050,7 +1050,7 @@ func TestComponent_Duties(t *testing.T) { } // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) duties, err := vapi.AttesterDuties(ctx, eth2p0.Epoch(epch), []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}) require.NoError(t, err) @@ -1070,7 +1070,7 @@ func TestComponent_Duties(t *testing.T) { } // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) duties, err := vapi.SyncCommitteeDuties(ctx, eth2p0.Epoch(epch), []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}) require.NoError(t, err) @@ -1101,7 +1101,7 @@ func TestComponent_SubmitValidatorRegistration(t *testing.T) { builderAPI := true // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI, nil) require.NoError(t, err) unsigned := testutil.RandomValidatorRegistration(t) @@ -1177,7 +1177,7 @@ func TestComponent_SubmitValidatorRegistrationInvalidSignature(t *testing.T) { builderAPI := true // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI, nil) require.NoError(t, err) unsigned := testutil.RandomValidatorRegistration(t) @@ -1227,7 +1227,7 @@ func TestComponent_TekuProposerConfig(t *testing.T) { builderAPI := true // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, feeRecipient, builderAPI) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, feeRecipient, builderAPI, nil) require.NoError(t, err) resp, err := vapi.TekuProposerConfig(ctx) @@ -1384,7 +1384,7 @@ func TestComponent_SubmitAggregateAttestationVerify(t *testing.T) { pubShareByKey := map[*bls_sig.PublicKey]*bls_sig.PublicKey{pubkey: pubkey} // Maps self to self since not tbls // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) done := make(chan struct{}) @@ -1511,7 +1511,7 @@ func TestComponent_SubmitSyncCommitteeContributionsVerify(t *testing.T) { pubShareByKey := map[*bls_sig.PublicKey]*bls_sig.PublicKey{pubkey: pubkey} // Maps self to self since not tbls // Construct validatorapi component. - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) done := make(chan struct{}) @@ -1582,7 +1582,7 @@ func TestComponent_AggregateSyncCommitteeSelectionsVerify(t *testing.T) { selections := []*eth2exp.SyncCommitteeSelection{selection1, selection2} // Construct the validator api component. - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) vapi.RegisterAwaitAggSigDB(func(ctx context.Context, duty core.Duty, pubkey core.PubKey) (core.SignedData, error) {