From 806bfc7f673d40f06e650488621f1948573b676c Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Wed, 14 Dec 2022 16:23:10 +0530 Subject: [PATCH 1/6] detect VC not configured in monitoringapi --- app/app.go | 29 +++++++-- app/metrics.go | 2 + app/monitoringapi.go | 27 ++++++++- app/monitoringapi_internal_test.go | 23 ++++++- core/validatorapi/validatorapi.go | 3 +- core/validatorapi/validatorapi_test.go | 84 ++++++++++++++++++++------ 6 files changed, 140 insertions(+), 28 deletions(-) diff --git a/app/app.go b/app/app.go index bdce71c40..381835f7f 100644 --- a/app/app.go +++ b/app/app.go @@ -219,17 +219,37 @@ func Run(ctx context.Context, conf Config) (err error) { return err } + var pubkeys []core.PubKey + for _, dv := range lock.Validators { + pk, err := dv.PublicKey() + if err != nil { + return err + } + + pubkey, err := tblsconv.KeyToCore(pk) + if err != nil { + return err + } + pubkeys = append(pubkeys, pubkey) + } + sender := new(p2p.Sender) wirePeerInfo(life, tcpNode, peerIDs, lock.LockHash, sender) qbftDebug := newQBFTDebugger() + // Non-blocking channel to send seen public keys from validatorapi to monitoringapi. + seenPubkeys := make(chan core.PubKey, 1) + seenPubkeysFunc := func(pubkey core.PubKey) { + seenPubkeys <- pubkey + } + wireMonitoringAPI(ctx, life, conf.MonitoringAddr, localEnode, tcpNode, eth2Cl, peerIDs, - promRegistry, qbftDebug) + promRegistry, qbftDebug, pubkeys, seenPubkeys) err = wireCoreWorkflow(ctx, life, conf, lock, nodeIdx, tcpNode, p2pKey, eth2Cl, - peerIDs, sender, qbftDebug.AddInstance) + peerIDs, sender, qbftDebug.AddInstance, seenPubkeysFunc) if err != nil { return err } @@ -322,7 +342,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, lock cluster.Lock, nodeIdx cluster.NodeIdx, tcpNode host.Host, p2pKey *ecdsa.PrivateKey, eth2Cl eth2wrap.Client, peerIDs []peer.ID, sender *p2p.Sender, - qbftSniffer func(*pbv1.SniffedConsensusInstance), + qbftSniffer func(*pbv1.SniffedConsensusInstance), seenPubkeysFunc func(key core.PubKey), ) error { // Convert and prep public keys and public shares var ( @@ -405,7 +425,8 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, dutyDB := dutydb.NewMemDB(deadlinerFunc("dutydb")) - vapi, err := validatorapi.NewComponent(eth2Cl, pubSharesByKey, nodeIdx.ShareIdx, lock.FeeRecipientAddress, conf.BuilderAPI) + vapi, err := validatorapi.NewComponent(eth2Cl, pubSharesByKey, nodeIdx.ShareIdx, lock.FeeRecipientAddress, + conf.BuilderAPI, seenPubkeysFunc) if err != nil { return err } diff --git a/app/metrics.go b/app/metrics.go index 7dae93b5c..71a0017e9 100644 --- a/app/metrics.go +++ b/app/metrics.go @@ -32,6 +32,8 @@ const ( // readyzInsufficientPeers indicates the readyz is returning 500s since this node isn't connected // to quorum peers via the P2P network. readyzInsufficientPeers = 4 + // readyzVCNotConfigured indicated the readyz is returning 500s since VC is not configured for this node. + readyzVCNotConfigured = 5 ) var ( diff --git a/app/monitoringapi.go b/app/monitoringapi.go index 82cf7898a..a5cbe74c8 100644 --- a/app/monitoringapi.go +++ b/app/monitoringapi.go @@ -34,6 +34,7 @@ import ( "github.com/obolnetwork/charon/app/eth2wrap" "github.com/obolnetwork/charon/app/lifecycle" "github.com/obolnetwork/charon/cluster" + "github.com/obolnetwork/charon/core" ) var ( @@ -41,6 +42,7 @@ var ( errReadyInsufficientPeers = errors.New("quorum peers not connected") errReadyBeaconNodeSyncing = errors.New("beacon node not synced") errReadyBeaconNodeDown = errors.New("beacon node down") + errReadyVCNotConfigured = errors.New("vc not configured") ) // wireMonitoringAPI constructs the monitoring API and registers it with the life cycle manager. @@ -48,6 +50,7 @@ var ( func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string, localNode *enode.LocalNode, tcpNode host.Host, eth2Cl eth2wrap.Client, peerIDs []peer.ID, registry *prometheus.Registry, qbftDebug http.Handler, + pubkeys []core.PubKey, seenPubkeys chan core.PubKey, ) { mux := http.NewServeMux() @@ -66,7 +69,7 @@ func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string writeResponse(w, http.StatusOK, "ok") })) - readyErrFunc := startReadyChecker(ctx, tcpNode, eth2Cl, peerIDs, clockwork.NewRealClock()) + readyErrFunc := startReadyChecker(ctx, tcpNode, eth2Cl, peerIDs, clockwork.NewRealClock(), pubkeys, seenPubkeys) mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { readyErr := readyErrFunc() if readyErr != nil { @@ -98,7 +101,9 @@ func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string } // startReadyChecker returns function which returns an error resulting from ready checks periodically. -func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID, clock clockwork.Clock) func() error { +func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID, + clock clockwork.Clock, pubkeys []core.PubKey, seenPubkeys chan core.PubKey, +) func() error { const minNotConnected = 6 // Require 6 rounds (1min) of too few connected var ( mu sync.Mutex @@ -107,6 +112,13 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client ) go func() { ticker := clock.NewTicker(10 * time.Second) + epochTicker := clock.NewTicker(384 * time.Second) // 32 slots * 12 second slot time + current := make(map[core.PubKey]bool) + previous := make(map[core.PubKey]bool) + + for _, pubkey := range pubkeys { + current[pubkey] = true + } for { select { @@ -129,6 +141,9 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client } else if notConnectedRounds >= minNotConnected { err = errReadyInsufficientPeers readyzGauge.Set(readyzInsufficientPeers) + } else if len(previous) > 0 { + err = errReadyVCNotConfigured + readyzGauge.Set(readyzVCNotConfigured) } else { readyzGauge.Set(readyzReady) } @@ -136,6 +151,14 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client mu.Lock() readyErr = err mu.Unlock() + case <-epochTicker.Chan(): + previous = current + for _, pubkey := range pubkeys { + current[pubkey] = true + } + + case pubkey := <-seenPubkeys: + delete(current, pubkey) } } }() diff --git a/app/monitoringapi_internal_test.go b/app/monitoringapi_internal_test.go index 510c9c8a0..7958501e2 100644 --- a/app/monitoringapi_internal_test.go +++ b/app/monitoringapi_internal_test.go @@ -27,16 +27,19 @@ import ( "github.com/stretchr/testify/require" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/core" "github.com/obolnetwork/charon/testutil" "github.com/obolnetwork/charon/testutil/beaconmock" ) func TestStartChecker(t *testing.T) { + pubkeys := []core.PubKey{testutil.RandomCorePubKey(t), testutil.RandomCorePubKey(t), testutil.RandomCorePubKey(t)} tests := []struct { name string isSyncing bool numPeers int absentPeers int + seenPubkeys []core.PubKey err error }{ { @@ -44,12 +47,14 @@ func TestStartChecker(t *testing.T) { isSyncing: false, numPeers: 5, absentPeers: 0, + seenPubkeys: pubkeys, }, { name: "syncing", isSyncing: true, numPeers: 5, absentPeers: 0, + seenPubkeys: pubkeys, err: errReadyBeaconNodeSyncing, }, { @@ -57,13 +62,22 @@ func TestStartChecker(t *testing.T) { isSyncing: false, numPeers: 5, absentPeers: 3, + seenPubkeys: pubkeys, err: errReadyInsufficientPeers, }, + { + name: "vc not configured", + isSyncing: false, + numPeers: 4, + absentPeers: 0, + err: errReadyVCNotConfigured, + }, { name: "success", isSyncing: false, numPeers: 4, absentPeers: 1, + seenPubkeys: pubkeys, }, } @@ -103,12 +117,17 @@ func TestStartChecker(t *testing.T) { } clock := clockwork.NewFakeClock() - readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock) + seenPubkeys := make(chan core.PubKey) + readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock, pubkeys, seenPubkeys) + + for _, pubkey := range tt.seenPubkeys { + seenPubkeys <- pubkey + } // We wrap the Advance() calls with blockers to make sure that the ticker // can go to sleep and produce ticks without time passing in parallel. clock.BlockUntil(1) - clock.Advance(15 * time.Second) + clock.Advance(400 * time.Second) // Advance clock more than an epoch's time. clock.BlockUntil(1) if tt.err != nil { diff --git a/core/validatorapi/validatorapi.go b/core/validatorapi/validatorapi.go index 0baa097ec..5361432af 100644 --- a/core/validatorapi/validatorapi.go +++ b/core/validatorapi/validatorapi.go @@ -56,7 +56,7 @@ func NewComponentInsecure(_ *testing.T, eth2Cl eth2wrap.Client, shareIdx int) (* // NewComponent returns a new instance of the validator API core workflow component. func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]*bls_sig.PublicKey, - shareIdx int, feeRecipientAddress string, builderAPI bool, + shareIdx int, feeRecipientAddress string, builderAPI bool, seenPubkeysFunc func(key core.PubKey), ) (*Component, error) { // Create pubkey mappings. var ( @@ -99,6 +99,7 @@ func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]* } getPubShareFunc := func(pubkey eth2p0.BLSPubKey) (eth2p0.BLSPubKey, bool) { + seenPubkeysFunc(core.PubKeyFrom48Bytes(pubkey)) share, ok := sharesByKey[pubkey] return share, ok diff --git a/core/validatorapi/validatorapi_test.go b/core/validatorapi/validatorapi_test.go index 845cac164..0ef343eac 100644 --- a/core/validatorapi/validatorapi_test.go +++ b/core/validatorapi/validatorapi_test.go @@ -191,7 +191,9 @@ func TestSubmitAttestations_Verify(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { + require.Equal(t, key, corePubKey) + }) require.NoError(t, err) vapi.RegisterPubKeyByAttestation(func(ctx context.Context, slot, commIdx, valCommIdx int64) (core.PubKey, error) { @@ -291,7 +293,11 @@ func TestSignAndVerify(t *testing.T) { // Setup validatorapi component. vapi, err := validatorapi.NewComponent(bmock, map[*bls_sig.PublicKey]*bls_sig.PublicKey{ pubkey: pubkey, - }, 0, "", false) + }, 0, "", false, func(key core.PubKey) { + pk, err := tblsconv.KeyToCore(pubkey) + require.NoError(t, err) + require.Equal(t, pk, key) + }) require.NoError(t, err) vapi.RegisterPubKeyByAttestation(func(context.Context, int64, int64, int64) (core.PubKey, error) { return tblsconv.KeyToCore(pubkey) @@ -413,7 +419,9 @@ func TestComponent_SubmitBeaconBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { + require.Equal(t, corePubKey, key) + }) require.NoError(t, err) // Prepare unsigned beacon block @@ -491,7 +499,9 @@ func TestComponent_SubmitBeaconBlockInvalidSignature(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { + require.Equal(t, corePubKey, key) + }) require.NoError(t, err) // Prepare unsigned beacon block @@ -549,7 +559,9 @@ func TestComponent_SubmitBeaconBlockInvalidBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { + require.Equal(t, pubkey, key) + }) require.NoError(t, err) vapi.RegisterGetDutyDefinition(func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { @@ -707,7 +719,9 @@ func TestComponent_SubmitBlindedBeaconBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, func(key core.PubKey) { + require.Equal(t, corePubKey, key) + }) require.NoError(t, err) // Prepare unsigned beacon block @@ -781,7 +795,9 @@ func TestComponent_SubmitBlindedBeaconBlockInvalidSignature(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, func(key core.PubKey) { + require.Equal(t, corePubKey, key) + }) require.NoError(t, err) // Prepare unsigned beacon block @@ -841,7 +857,9 @@ func TestComponent_SubmitBlindedBeaconBlockInvalidBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, func(key core.PubKey) { + require.Equal(t, pubkey, key) + }) require.NoError(t, err) vapi.RegisterGetDutyDefinition(func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { @@ -911,7 +929,9 @@ func TestComponent_SubmitVoluntaryExit(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { + require.Equal(t, corePubKey, key) + }) require.NoError(t, err) // Prepare unsigned voluntary exit @@ -973,7 +993,11 @@ func TestComponent_SubmitVoluntaryExitInvalidSignature(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { + pk, err := tblsconv.KeyToCore(pubkey) + require.NoError(t, err) + require.Equal(t, pk, key) + }) require.NoError(t, err) // Register subscriber @@ -1030,7 +1054,9 @@ func TestComponent_Duties(t *testing.T) { } // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { + require.Equal(t, core.PubKeyFrom48Bytes(eth2Pubkey), key) + }) require.NoError(t, err) duties, err := vapi.ProposerDuties(ctx, eth2p0.Epoch(epch), []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}) require.NoError(t, err) @@ -1050,7 +1076,9 @@ func TestComponent_Duties(t *testing.T) { } // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { + require.Equal(t, core.PubKeyFrom48Bytes(eth2Pubkey), key) + }) require.NoError(t, err) duties, err := vapi.AttesterDuties(ctx, eth2p0.Epoch(epch), []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}) require.NoError(t, err) @@ -1070,7 +1098,9 @@ func TestComponent_Duties(t *testing.T) { } // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { + require.Equal(t, core.PubKeyFrom48Bytes(eth2Pubkey), key) + }) require.NoError(t, err) duties, err := vapi.SyncCommitteeDuties(ctx, eth2p0.Epoch(epch), []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}) require.NoError(t, err) @@ -1101,7 +1131,9 @@ func TestComponent_SubmitValidatorRegistration(t *testing.T) { builderAPI := true // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI, func(key core.PubKey) { + require.Equal(t, core.PubKeyFrom48Bytes(eth2Pubkey), key) + }) require.NoError(t, err) unsigned := testutil.RandomValidatorRegistration(t) @@ -1177,7 +1209,9 @@ func TestComponent_SubmitValidatorRegistrationInvalidSignature(t *testing.T) { builderAPI := true // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI, func(key core.PubKey) { + require.Equal(t, core.PubKeyFrom48Bytes(eth2Pubkey), key) + }) require.NoError(t, err) unsigned := testutil.RandomValidatorRegistration(t) @@ -1227,7 +1261,11 @@ func TestComponent_TekuProposerConfig(t *testing.T) { builderAPI := true // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, feeRecipient, builderAPI) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, feeRecipient, builderAPI, func(key core.PubKey) { + pk, err := tblsconv.KeyToCore(pubkey) + require.NoError(t, err) + require.Equal(t, pk, key) + }) require.NoError(t, err) resp, err := vapi.TekuProposerConfig(ctx) @@ -1384,7 +1422,11 @@ func TestComponent_SubmitAggregateAttestationVerify(t *testing.T) { pubShareByKey := map[*bls_sig.PublicKey]*bls_sig.PublicKey{pubkey: pubkey} // Maps self to self since not tbls // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { + pk, err := tblsconv.KeyToCore(pubkey) + require.NoError(t, err) + require.Equal(t, pk, key) + }) require.NoError(t, err) done := make(chan struct{}) @@ -1511,7 +1553,11 @@ func TestComponent_SubmitSyncCommitteeContributionsVerify(t *testing.T) { pubShareByKey := map[*bls_sig.PublicKey]*bls_sig.PublicKey{pubkey: pubkey} // Maps self to self since not tbls // Construct validatorapi component. - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { + pk, err := tblsconv.KeyToCore(pubkey) + require.NoError(t, err) + require.Equal(t, pk, key) + }) require.NoError(t, err) done := make(chan struct{}) @@ -1582,7 +1628,7 @@ func TestComponent_AggregateSyncCommitteeSelectionsVerify(t *testing.T) { selections := []*eth2exp.SyncCommitteeSelection{selection1, selection2} // Construct the validator api component. - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(core.PubKey) {}) require.NoError(t, err) vapi.RegisterAwaitAggSigDB(func(ctx context.Context, duty core.Duty, pubkey core.PubKey) (core.SignedData, error) { From 42a56150e60f4c37bfad79cf74d327a175374c9b Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Wed, 14 Dec 2022 18:17:07 +0530 Subject: [PATCH 2/6] cleanup --- app/monitoringapi.go | 2 ++ core/validatorapi/validatorapi.go | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/app/monitoringapi.go b/app/monitoringapi.go index a5cbe74c8..62d265b59 100644 --- a/app/monitoringapi.go +++ b/app/monitoringapi.go @@ -116,6 +116,7 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client current := make(map[core.PubKey]bool) previous := make(map[core.PubKey]bool) + // Initialise current with given pubkeys. for _, pubkey := range pubkeys { current[pubkey] = true } @@ -158,6 +159,7 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client } case pubkey := <-seenPubkeys: + // Delete pubkey if called by a VC. delete(current, pubkey) } } diff --git a/core/validatorapi/validatorapi.go b/core/validatorapi/validatorapi.go index 5361432af..86b785ef5 100644 --- a/core/validatorapi/validatorapi.go +++ b/core/validatorapi/validatorapi.go @@ -99,7 +99,6 @@ func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]* } getPubShareFunc := func(pubkey eth2p0.BLSPubKey) (eth2p0.BLSPubKey, bool) { - seenPubkeysFunc(core.PubKeyFrom48Bytes(pubkey)) share, ok := sharesByKey[pubkey] return share, ok @@ -111,6 +110,8 @@ func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]* return eth2p0.BLSPubKey{}, errors.New("unknown public share") } + seenPubkeysFunc(core.PubKeyFrom48Bytes(key)) + return key, nil } From 1a72d8490cb11f8283ad629848f3f3495ce0f087 Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Thu, 15 Dec 2022 15:06:45 +0530 Subject: [PATCH 3/6] copy current map to previous --- app/monitoringapi.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/app/monitoringapi.go b/app/monitoringapi.go index 62d265b59..d73966904 100644 --- a/app/monitoringapi.go +++ b/app/monitoringapi.go @@ -125,6 +125,13 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client select { case <-ctx.Done(): return + case <-epochTicker.Chan(): + for k, v := range current { + previous[k] = v + } + for _, pubkey := range pubkeys { + current[pubkey] = true + } case <-ticker.Chan(): if quorumPeersConnected(peerIDs, tcpNode) { notConnectedRounds = 0 @@ -152,12 +159,6 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client mu.Lock() readyErr = err mu.Unlock() - case <-epochTicker.Chan(): - previous = current - for _, pubkey := range pubkeys { - current[pubkey] = true - } - case pubkey := <-seenPubkeys: // Delete pubkey if called by a VC. delete(current, pubkey) From 9242d816c8716a48a374fe5eac8350bed2b4b0e8 Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Thu, 15 Dec 2022 17:54:35 +0530 Subject: [PATCH 4/6] cleanup --- app/monitoringapi.go | 5 +++++ app/monitoringapi_internal_test.go | 22 +++++++++++++++++----- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/app/monitoringapi.go b/app/monitoringapi.go index d73966904..379163d77 100644 --- a/app/monitoringapi.go +++ b/app/monitoringapi.go @@ -126,9 +126,14 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client case <-ctx.Done(): return case <-epochTicker.Chan(): + // Copy current to previous. + previous = make(map[core.PubKey]bool) for k, v := range current { previous[k] = v } + + // Initialise current with given pubkeys for next iteration. + current = make(map[core.PubKey]bool) for _, pubkey := range pubkeys { current[pubkey] = true } diff --git a/app/monitoringapi_internal_test.go b/app/monitoringapi_internal_test.go index 7958501e2..17819a7f8 100644 --- a/app/monitoringapi_internal_test.go +++ b/app/monitoringapi_internal_test.go @@ -124,11 +124,15 @@ func TestStartChecker(t *testing.T) { seenPubkeys <- pubkey } - // We wrap the Advance() calls with blockers to make sure that the ticker - // can go to sleep and produce ticks without time passing in parallel. - clock.BlockUntil(1) - clock.Advance(400 * time.Second) // Advance clock more than an epoch's time. - clock.BlockUntil(1) + // Advance clock for first tick. + advanceClock(clock, 10*time.Second) + + // Advance clock for first epoch tick. + advanceClock(clock, 384*time.Second) + + // Wait for epoch tick to happen before last 10s periodic tick. + time.Sleep(100 * time.Millisecond) + advanceClock(clock, 10*time.Second) if tt.err != nil { require.Eventually(t, func() bool { @@ -148,3 +152,11 @@ func TestStartChecker(t *testing.T) { }) } } + +func advanceClock(clock clockwork.FakeClock, duration time.Duration) { + // We wrap the Advance() calls with blockers to make sure that the ticker + // can go to sleep and produce ticks without time passing in parallel. + clock.BlockUntil(1) + clock.Advance(duration) + clock.BlockUntil(1) +} From 3958e66412329356abc816f20446d1a5dacf4398 Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Thu, 15 Dec 2022 21:24:26 +0530 Subject: [PATCH 5/6] review comments --- app/app.go | 45 ++++++++------ app/metrics.go | 4 +- app/monitoringapi.go | 36 +++++------ app/monitoringapi_internal_test.go | 23 ++++--- core/validatorapi/validatorapi.go | 6 +- core/validatorapi/validatorapi_test.go | 84 ++++++-------------------- 6 files changed, 86 insertions(+), 112 deletions(-) diff --git a/app/app.go b/app/app.go index 381835f7f..9d89dc1f6 100644 --- a/app/app.go +++ b/app/app.go @@ -219,20 +219,6 @@ func Run(ctx context.Context, conf Config) (err error) { return err } - var pubkeys []core.PubKey - for _, dv := range lock.Validators { - pk, err := dv.PublicKey() - if err != nil { - return err - } - - pubkey, err := tblsconv.KeyToCore(pk) - if err != nil { - return err - } - pubkeys = append(pubkeys, pubkey) - } - sender := new(p2p.Sender) wirePeerInfo(life, tcpNode, peerIDs, lock.LockHash, sender) @@ -241,15 +227,17 @@ func Run(ctx context.Context, conf Config) (err error) { // Non-blocking channel to send seen public keys from validatorapi to monitoringapi. seenPubkeys := make(chan core.PubKey, 1) - seenPubkeysFunc := func(pubkey core.PubKey) { - seenPubkeys <- pubkey + + pubkeys, err := getDVPubkeys(lock) + if err != nil { + return err } wireMonitoringAPI(ctx, life, conf.MonitoringAddr, localEnode, tcpNode, eth2Cl, peerIDs, promRegistry, qbftDebug, pubkeys, seenPubkeys) err = wireCoreWorkflow(ctx, life, conf, lock, nodeIdx, tcpNode, p2pKey, eth2Cl, - peerIDs, sender, qbftDebug.AddInstance, seenPubkeysFunc) + peerIDs, sender, qbftDebug.AddInstance, seenPubkeys) if err != nil { return err } @@ -342,7 +330,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, lock cluster.Lock, nodeIdx cluster.NodeIdx, tcpNode host.Host, p2pKey *ecdsa.PrivateKey, eth2Cl eth2wrap.Client, peerIDs []peer.ID, sender *p2p.Sender, - qbftSniffer func(*pbv1.SniffedConsensusInstance), seenPubkeysFunc func(key core.PubKey), + qbftSniffer func(*pbv1.SniffedConsensusInstance), seenPubkeys chan core.PubKey, ) error { // Convert and prep public keys and public shares var ( @@ -426,7 +414,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, dutyDB := dutydb.NewMemDB(deadlinerFunc("dutydb")) vapi, err := validatorapi.NewComponent(eth2Cl, pubSharesByKey, nodeIdx.ShareIdx, lock.FeeRecipientAddress, - conf.BuilderAPI, seenPubkeysFunc) + conf.BuilderAPI, seenPubkeys) if err != nil { return err } @@ -842,6 +830,25 @@ func setFeeRecipient(eth2Cl eth2wrap.Client, pubkeys []eth2p0.BLSPubKey, feeReci } } +// getDVPubkeys returns DV public keys from given cluster.Lock. +func getDVPubkeys(lock cluster.Lock) ([]core.PubKey, error) { + var pubkeys []core.PubKey + for _, dv := range lock.Validators { + pk, err := dv.PublicKey() + if err != nil { + return nil, err + } + + pubkey, err := tblsconv.KeyToCore(pk) + if err != nil { + return nil, err + } + pubkeys = append(pubkeys, pubkey) + } + + return pubkeys, nil +} + // httpServeHook wraps a http.Server.ListenAndServe function, swallowing http.ErrServerClosed. type httpServeHook func() error diff --git a/app/metrics.go b/app/metrics.go index 71a0017e9..37a5785b5 100644 --- a/app/metrics.go +++ b/app/metrics.go @@ -32,8 +32,10 @@ const ( // readyzInsufficientPeers indicates the readyz is returning 500s since this node isn't connected // to quorum peers via the P2P network. readyzInsufficientPeers = 4 - // readyzVCNotConfigured indicated the readyz is returning 500s since VC is not configured for this node. + // readyzVCNotConfigured indicates the readyz is returning 500s since VC is not configured for this node. readyzVCNotConfigured = 5 + // readyVCMissingValidators indicates the ready is returning 500s since VC missing some validators. + readyzVCMissingValidators = 6 ) var ( diff --git a/app/monitoringapi.go b/app/monitoringapi.go index 379163d77..a3e246f33 100644 --- a/app/monitoringapi.go +++ b/app/monitoringapi.go @@ -43,6 +43,7 @@ var ( errReadyBeaconNodeSyncing = errors.New("beacon node not synced") errReadyBeaconNodeDown = errors.New("beacon node down") errReadyVCNotConfigured = errors.New("vc not configured") + errReadyVCMissingVals = errors.New("vc missing some validators") ) // wireMonitoringAPI constructs the monitoring API and registers it with the life cycle manager. @@ -112,31 +113,29 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client ) go func() { ticker := clock.NewTicker(10 * time.Second) - epochTicker := clock.NewTicker(384 * time.Second) // 32 slots * 12 second slot time - current := make(map[core.PubKey]bool) + epochTicker := clock.NewTicker(32 * 12 * time.Second) // 32 slots * 12 second slot time previous := make(map[core.PubKey]bool) - // Initialise current with given pubkeys. - for _, pubkey := range pubkeys { - current[pubkey] = true + // newCurrent returns a new current map, populated with all the pubkeys. + newCurrent := func() map[core.PubKey]bool { + current := make(map[core.PubKey]bool) + for _, pubkey := range pubkeys { + current[pubkey] = true + } + + return current } + // Initialise current. + current := newCurrent() + for { select { case <-ctx.Done(): return case <-epochTicker.Chan(): - // Copy current to previous. - previous = make(map[core.PubKey]bool) - for k, v := range current { - previous[k] = v - } - - // Initialise current with given pubkeys for next iteration. - current = make(map[core.PubKey]bool) - for _, pubkey := range pubkeys { - current[pubkey] = true - } + // Copy current to previous and clear current. + previous, current = current, newCurrent() case <-ticker.Chan(): if quorumPeersConnected(peerIDs, tcpNode) { notConnectedRounds = 0 @@ -154,9 +153,12 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client } else if notConnectedRounds >= minNotConnected { err = errReadyInsufficientPeers readyzGauge.Set(readyzInsufficientPeers) - } else if len(previous) > 0 { + } else if len(previous) == len(pubkeys) { err = errReadyVCNotConfigured readyzGauge.Set(readyzVCNotConfigured) + } else if len(previous) > 0 { + err = errReadyVCMissingVals + readyzGauge.Set(readyzVCMissingValidators) } else { readyzGauge.Set(readyzReady) } diff --git a/app/monitoringapi_internal_test.go b/app/monitoringapi_internal_test.go index 17819a7f8..c16437fec 100644 --- a/app/monitoringapi_internal_test.go +++ b/app/monitoringapi_internal_test.go @@ -72,6 +72,14 @@ func TestStartChecker(t *testing.T) { absentPeers: 0, err: errReadyVCNotConfigured, }, + { + name: "vc missing some validators", + isSyncing: false, + numPeers: 4, + absentPeers: 0, + seenPubkeys: []core.PubKey{pubkeys[0]}, + err: errReadyVCMissingVals, + }, { name: "success", isSyncing: false, @@ -125,14 +133,13 @@ func TestStartChecker(t *testing.T) { } // Advance clock for first tick. - advanceClock(clock, 10*time.Second) + advanceClock(clock, 10*time.Second, 2) // Advance clock for first epoch tick. - advanceClock(clock, 384*time.Second) + advanceClock(clock, 384*time.Second, 2) - // Wait for epoch tick to happen before last 10s periodic tick. - time.Sleep(100 * time.Millisecond) - advanceClock(clock, 10*time.Second) + // Advance clock for last tick. + advanceClock(clock, 10*time.Second, 2) if tt.err != nil { require.Eventually(t, func() bool { @@ -153,10 +160,10 @@ func TestStartChecker(t *testing.T) { } } -func advanceClock(clock clockwork.FakeClock, duration time.Duration) { +func advanceClock(clock clockwork.FakeClock, duration time.Duration, numTickers int) { // We wrap the Advance() calls with blockers to make sure that the ticker // can go to sleep and produce ticks without time passing in parallel. - clock.BlockUntil(1) + clock.BlockUntil(numTickers) clock.Advance(duration) - clock.BlockUntil(1) + clock.BlockUntil(numTickers) } diff --git a/core/validatorapi/validatorapi.go b/core/validatorapi/validatorapi.go index 86b785ef5..6ffe55f55 100644 --- a/core/validatorapi/validatorapi.go +++ b/core/validatorapi/validatorapi.go @@ -56,7 +56,7 @@ func NewComponentInsecure(_ *testing.T, eth2Cl eth2wrap.Client, shareIdx int) (* // NewComponent returns a new instance of the validator API core workflow component. func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]*bls_sig.PublicKey, - shareIdx int, feeRecipientAddress string, builderAPI bool, seenPubkeysFunc func(key core.PubKey), + shareIdx int, feeRecipientAddress string, builderAPI bool, seenPubkeys chan core.PubKey, ) (*Component, error) { // Create pubkey mappings. var ( @@ -110,7 +110,9 @@ func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]* return eth2p0.BLSPubKey{}, errors.New("unknown public share") } - seenPubkeysFunc(core.PubKeyFrom48Bytes(key)) + if seenPubkeys != nil { + seenPubkeys <- core.PubKeyFrom48Bytes(key) + } return key, nil } diff --git a/core/validatorapi/validatorapi_test.go b/core/validatorapi/validatorapi_test.go index 0ef343eac..b439a2b31 100644 --- a/core/validatorapi/validatorapi_test.go +++ b/core/validatorapi/validatorapi_test.go @@ -191,9 +191,7 @@ func TestSubmitAttestations_Verify(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { - require.Equal(t, key, corePubKey) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) vapi.RegisterPubKeyByAttestation(func(ctx context.Context, slot, commIdx, valCommIdx int64) (core.PubKey, error) { @@ -293,11 +291,7 @@ func TestSignAndVerify(t *testing.T) { // Setup validatorapi component. vapi, err := validatorapi.NewComponent(bmock, map[*bls_sig.PublicKey]*bls_sig.PublicKey{ pubkey: pubkey, - }, 0, "", false, func(key core.PubKey) { - pk, err := tblsconv.KeyToCore(pubkey) - require.NoError(t, err) - require.Equal(t, pk, key) - }) + }, 0, "", false, nil) require.NoError(t, err) vapi.RegisterPubKeyByAttestation(func(context.Context, int64, int64, int64) (core.PubKey, error) { return tblsconv.KeyToCore(pubkey) @@ -419,9 +413,7 @@ func TestComponent_SubmitBeaconBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { - require.Equal(t, corePubKey, key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) // Prepare unsigned beacon block @@ -499,9 +491,7 @@ func TestComponent_SubmitBeaconBlockInvalidSignature(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { - require.Equal(t, corePubKey, key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) // Prepare unsigned beacon block @@ -559,9 +549,7 @@ func TestComponent_SubmitBeaconBlockInvalidBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { - require.Equal(t, pubkey, key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) vapi.RegisterGetDutyDefinition(func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { @@ -719,9 +707,7 @@ func TestComponent_SubmitBlindedBeaconBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, func(key core.PubKey) { - require.Equal(t, corePubKey, key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, nil) require.NoError(t, err) // Prepare unsigned beacon block @@ -795,9 +781,7 @@ func TestComponent_SubmitBlindedBeaconBlockInvalidSignature(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, func(key core.PubKey) { - require.Equal(t, corePubKey, key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, nil) require.NoError(t, err) // Prepare unsigned beacon block @@ -857,9 +841,7 @@ func TestComponent_SubmitBlindedBeaconBlockInvalidBlock(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, func(key core.PubKey) { - require.Equal(t, pubkey, key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", true, nil) require.NoError(t, err) vapi.RegisterGetDutyDefinition(func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { @@ -929,9 +911,7 @@ func TestComponent_SubmitVoluntaryExit(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { - require.Equal(t, corePubKey, key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) // Prepare unsigned voluntary exit @@ -993,11 +973,7 @@ func TestComponent_SubmitVoluntaryExitInvalidSignature(t *testing.T) { require.NoError(t, err) // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { - pk, err := tblsconv.KeyToCore(pubkey) - require.NoError(t, err) - require.Equal(t, pk, key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) // Register subscriber @@ -1054,9 +1030,7 @@ func TestComponent_Duties(t *testing.T) { } // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { - require.Equal(t, core.PubKeyFrom48Bytes(eth2Pubkey), key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) duties, err := vapi.ProposerDuties(ctx, eth2p0.Epoch(epch), []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}) require.NoError(t, err) @@ -1076,9 +1050,7 @@ func TestComponent_Duties(t *testing.T) { } // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { - require.Equal(t, core.PubKeyFrom48Bytes(eth2Pubkey), key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) duties, err := vapi.AttesterDuties(ctx, eth2p0.Epoch(epch), []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}) require.NoError(t, err) @@ -1098,9 +1070,7 @@ func TestComponent_Duties(t *testing.T) { } // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { - require.Equal(t, core.PubKeyFrom48Bytes(eth2Pubkey), key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) duties, err := vapi.SyncCommitteeDuties(ctx, eth2p0.Epoch(epch), []eth2p0.ValidatorIndex{eth2p0.ValidatorIndex(vIdx)}) require.NoError(t, err) @@ -1131,9 +1101,7 @@ func TestComponent_SubmitValidatorRegistration(t *testing.T) { builderAPI := true // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI, func(key core.PubKey) { - require.Equal(t, core.PubKeyFrom48Bytes(eth2Pubkey), key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI, nil) require.NoError(t, err) unsigned := testutil.RandomValidatorRegistration(t) @@ -1209,9 +1177,7 @@ func TestComponent_SubmitValidatorRegistrationInvalidSignature(t *testing.T) { builderAPI := true // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI, func(key core.PubKey) { - require.Equal(t, core.PubKeyFrom48Bytes(eth2Pubkey), key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", builderAPI, nil) require.NoError(t, err) unsigned := testutil.RandomValidatorRegistration(t) @@ -1261,11 +1227,7 @@ func TestComponent_TekuProposerConfig(t *testing.T) { builderAPI := true // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, feeRecipient, builderAPI, func(key core.PubKey) { - pk, err := tblsconv.KeyToCore(pubkey) - require.NoError(t, err) - require.Equal(t, pk, key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, feeRecipient, builderAPI, nil) require.NoError(t, err) resp, err := vapi.TekuProposerConfig(ctx) @@ -1422,11 +1384,7 @@ func TestComponent_SubmitAggregateAttestationVerify(t *testing.T) { pubShareByKey := map[*bls_sig.PublicKey]*bls_sig.PublicKey{pubkey: pubkey} // Maps self to self since not tbls // Construct the validator api component - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { - pk, err := tblsconv.KeyToCore(pubkey) - require.NoError(t, err) - require.Equal(t, pk, key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) done := make(chan struct{}) @@ -1553,11 +1511,7 @@ func TestComponent_SubmitSyncCommitteeContributionsVerify(t *testing.T) { pubShareByKey := map[*bls_sig.PublicKey]*bls_sig.PublicKey{pubkey: pubkey} // Maps self to self since not tbls // Construct validatorapi component. - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(key core.PubKey) { - pk, err := tblsconv.KeyToCore(pubkey) - require.NoError(t, err) - require.Equal(t, pk, key) - }) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) done := make(chan struct{}) @@ -1628,7 +1582,7 @@ func TestComponent_AggregateSyncCommitteeSelectionsVerify(t *testing.T) { selections := []*eth2exp.SyncCommitteeSelection{selection1, selection2} // Construct the validator api component. - vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, func(core.PubKey) {}) + vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0, "", false, nil) require.NoError(t, err) vapi.RegisterAwaitAggSigDB(func(ctx context.Context, duty core.Duty, pubkey core.PubKey) (core.SignedData, error) { From fd99ea57c99267469b3bae560387b250abcf35af Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Thu, 15 Dec 2022 22:34:15 +0530 Subject: [PATCH 6/6] replace non-blocking channel with blocking one --- app/app.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/app.go b/app/app.go index 9d89dc1f6..027cfda92 100644 --- a/app/app.go +++ b/app/app.go @@ -225,8 +225,8 @@ func Run(ctx context.Context, conf Config) (err error) { qbftDebug := newQBFTDebugger() - // Non-blocking channel to send seen public keys from validatorapi to monitoringapi. - seenPubkeys := make(chan core.PubKey, 1) + // seenPubkeys channel to send seen public keys from validatorapi to monitoringapi. + seenPubkeys := make(chan core.PubKey) pubkeys, err := getDVPubkeys(lock) if err != nil {