Skip to content

Commit

Permalink
app/monitoringapi: detect all validators queried by VC (#1566)
Browse files Browse the repository at this point in the history
Configures monitoringapi to detect all validators queried by VC.

category: feature
ticket: #1501
  • Loading branch information
dB2510 authored Dec 15, 2022
1 parent 532b2be commit 708c345
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 32 deletions.
36 changes: 32 additions & 4 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,19 @@ func Run(ctx context.Context, conf Config) (err error) {

qbftDebug := newQBFTDebugger()

// seenPubkeys channel to send seen public keys from validatorapi to monitoringapi.
seenPubkeys := make(chan core.PubKey)

pubkeys, err := getDVPubkeys(lock)
if err != nil {
return err
}

wireMonitoringAPI(ctx, life, conf.MonitoringAddr, localEnode, tcpNode, eth2Cl, peerIDs,
promRegistry, qbftDebug)
promRegistry, qbftDebug, pubkeys, seenPubkeys)

err = wireCoreWorkflow(ctx, life, conf, lock, nodeIdx, tcpNode, p2pKey, eth2Cl,
peerIDs, sender, qbftDebug.AddInstance)
peerIDs, sender, qbftDebug.AddInstance, seenPubkeys)
if err != nil {
return err
}
Expand Down Expand Up @@ -322,7 +330,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
lock cluster.Lock, nodeIdx cluster.NodeIdx, tcpNode host.Host, p2pKey *ecdsa.PrivateKey,
eth2Cl eth2wrap.Client, peerIDs []peer.ID, sender *p2p.Sender,
qbftSniffer func(*pbv1.SniffedConsensusInstance),
qbftSniffer func(*pbv1.SniffedConsensusInstance), seenPubkeys chan core.PubKey,
) error {
// Convert and prep public keys and public shares
var (
Expand Down Expand Up @@ -405,7 +413,8 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,

dutyDB := dutydb.NewMemDB(deadlinerFunc("dutydb"))

vapi, err := validatorapi.NewComponent(eth2Cl, pubSharesByKey, nodeIdx.ShareIdx, lock.FeeRecipientAddress, conf.BuilderAPI)
vapi, err := validatorapi.NewComponent(eth2Cl, pubSharesByKey, nodeIdx.ShareIdx, lock.FeeRecipientAddress,
conf.BuilderAPI, seenPubkeys)
if err != nil {
return err
}
Expand Down Expand Up @@ -821,6 +830,25 @@ func setFeeRecipient(eth2Cl eth2wrap.Client, pubkeys []eth2p0.BLSPubKey, feeReci
}
}

// getDVPubkeys returns DV public keys from given cluster.Lock.
func getDVPubkeys(lock cluster.Lock) ([]core.PubKey, error) {
var pubkeys []core.PubKey
for _, dv := range lock.Validators {
pk, err := dv.PublicKey()
if err != nil {
return nil, err
}

pubkey, err := tblsconv.KeyToCore(pk)
if err != nil {
return nil, err
}
pubkeys = append(pubkeys, pubkey)
}

return pubkeys, nil
}

// httpServeHook wraps a http.Server.ListenAndServe function, swallowing http.ErrServerClosed.
type httpServeHook func() error

Expand Down
4 changes: 4 additions & 0 deletions app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const (
// readyzInsufficientPeers indicates the readyz is returning 500s since this node isn't connected
// to quorum peers via the P2P network.
readyzInsufficientPeers = 4
// readyzVCNotConfigured indicates the readyz is returning 500s since VC is not configured for this node.
readyzVCNotConfigured = 5
// readyVCMissingValidators indicates the ready is returning 500s since VC missing some validators.
readyzVCMissingValidators = 6
)

var (
Expand Down
37 changes: 35 additions & 2 deletions app/monitoringapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,24 @@ import (
"github.com/obolnetwork/charon/app/eth2wrap"
"github.com/obolnetwork/charon/app/lifecycle"
"github.com/obolnetwork/charon/cluster"
"github.com/obolnetwork/charon/core"
)

var (
errReadyUninitialised = errors.New("ready check uninitialised")
errReadyInsufficientPeers = errors.New("quorum peers not connected")
errReadyBeaconNodeSyncing = errors.New("beacon node not synced")
errReadyBeaconNodeDown = errors.New("beacon node down")
errReadyVCNotConfigured = errors.New("vc not configured")
errReadyVCMissingVals = errors.New("vc missing some validators")
)

// wireMonitoringAPI constructs the monitoring API and registers it with the life cycle manager.
// It serves prometheus metrics, pprof profiling and the runtime enr.
func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string,
localNode *enode.LocalNode, tcpNode host.Host, eth2Cl eth2wrap.Client,
peerIDs []peer.ID, registry *prometheus.Registry, qbftDebug http.Handler,
pubkeys []core.PubKey, seenPubkeys chan core.PubKey,
) {
mux := http.NewServeMux()

Expand All @@ -66,7 +70,7 @@ func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string
writeResponse(w, http.StatusOK, "ok")
}))

readyErrFunc := startReadyChecker(ctx, tcpNode, eth2Cl, peerIDs, clockwork.NewRealClock())
readyErrFunc := startReadyChecker(ctx, tcpNode, eth2Cl, peerIDs, clockwork.NewRealClock(), pubkeys, seenPubkeys)
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
readyErr := readyErrFunc()
if readyErr != nil {
Expand Down Expand Up @@ -98,7 +102,9 @@ func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string
}

// startReadyChecker returns function which returns an error resulting from ready checks periodically.
func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID, clock clockwork.Clock) func() error {
func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID,
clock clockwork.Clock, pubkeys []core.PubKey, seenPubkeys chan core.PubKey,
) func() error {
const minNotConnected = 6 // Require 6 rounds (1min) of too few connected
var (
mu sync.Mutex
Expand All @@ -107,11 +113,29 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client
)
go func() {
ticker := clock.NewTicker(10 * time.Second)
epochTicker := clock.NewTicker(32 * 12 * time.Second) // 32 slots * 12 second slot time
previous := make(map[core.PubKey]bool)

// newCurrent returns a new current map, populated with all the pubkeys.
newCurrent := func() map[core.PubKey]bool {
current := make(map[core.PubKey]bool)
for _, pubkey := range pubkeys {
current[pubkey] = true
}

return current
}

// Initialise current.
current := newCurrent()

for {
select {
case <-ctx.Done():
return
case <-epochTicker.Chan():
// Copy current to previous and clear current.
previous, current = current, newCurrent()
case <-ticker.Chan():
if quorumPeersConnected(peerIDs, tcpNode) {
notConnectedRounds = 0
Expand All @@ -129,13 +153,22 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client
} else if notConnectedRounds >= minNotConnected {
err = errReadyInsufficientPeers
readyzGauge.Set(readyzInsufficientPeers)
} else if len(previous) == len(pubkeys) {
err = errReadyVCNotConfigured
readyzGauge.Set(readyzVCNotConfigured)
} else if len(previous) > 0 {
err = errReadyVCMissingVals
readyzGauge.Set(readyzVCMissingValidators)
} else {
readyzGauge.Set(readyzReady)
}

mu.Lock()
readyErr = err
mu.Unlock()
case pubkey := <-seenPubkeys:
// Delete pubkey if called by a VC.
delete(current, pubkey)
}
}
}()
Expand Down
50 changes: 44 additions & 6 deletions app/monitoringapi_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,43 +27,65 @@ import (
"github.com/stretchr/testify/require"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/testutil"
"github.com/obolnetwork/charon/testutil/beaconmock"
)

func TestStartChecker(t *testing.T) {
pubkeys := []core.PubKey{testutil.RandomCorePubKey(t), testutil.RandomCorePubKey(t), testutil.RandomCorePubKey(t)}
tests := []struct {
name string
isSyncing bool
numPeers int
absentPeers int
seenPubkeys []core.PubKey
err error
}{
{
name: "success",
isSyncing: false,
numPeers: 5,
absentPeers: 0,
seenPubkeys: pubkeys,
},
{
name: "syncing",
isSyncing: true,
numPeers: 5,
absentPeers: 0,
seenPubkeys: pubkeys,
err: errReadyBeaconNodeSyncing,
},
{
name: "too few peers",
isSyncing: false,
numPeers: 5,
absentPeers: 3,
seenPubkeys: pubkeys,
err: errReadyInsufficientPeers,
},
{
name: "vc not configured",
isSyncing: false,
numPeers: 4,
absentPeers: 0,
err: errReadyVCNotConfigured,
},
{
name: "vc missing some validators",
isSyncing: false,
numPeers: 4,
absentPeers: 0,
seenPubkeys: []core.PubKey{pubkeys[0]},
err: errReadyVCMissingVals,
},
{
name: "success",
isSyncing: false,
numPeers: 4,
absentPeers: 1,
seenPubkeys: pubkeys,
},
}

Expand Down Expand Up @@ -103,13 +125,21 @@ func TestStartChecker(t *testing.T) {
}

clock := clockwork.NewFakeClock()
readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock)
seenPubkeys := make(chan core.PubKey)
readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock, pubkeys, seenPubkeys)

for _, pubkey := range tt.seenPubkeys {
seenPubkeys <- pubkey
}

// We wrap the Advance() calls with blockers to make sure that the ticker
// can go to sleep and produce ticks without time passing in parallel.
clock.BlockUntil(1)
clock.Advance(15 * time.Second)
clock.BlockUntil(1)
// Advance clock for first tick.
advanceClock(clock, 10*time.Second, 2)

// Advance clock for first epoch tick.
advanceClock(clock, 384*time.Second, 2)

// Advance clock for last tick.
advanceClock(clock, 10*time.Second, 2)

if tt.err != nil {
require.Eventually(t, func() bool {
Expand All @@ -129,3 +159,11 @@ func TestStartChecker(t *testing.T) {
})
}
}

func advanceClock(clock clockwork.FakeClock, duration time.Duration, numTickers int) {
// We wrap the Advance() calls with blockers to make sure that the ticker
// can go to sleep and produce ticks without time passing in parallel.
clock.BlockUntil(numTickers)
clock.Advance(duration)
clock.BlockUntil(numTickers)
}
6 changes: 5 additions & 1 deletion core/validatorapi/validatorapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewComponentInsecure(_ *testing.T, eth2Cl eth2wrap.Client, shareIdx int) (*

// NewComponent returns a new instance of the validator API core workflow component.
func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]*bls_sig.PublicKey,
shareIdx int, feeRecipientAddress string, builderAPI bool,
shareIdx int, feeRecipientAddress string, builderAPI bool, seenPubkeys chan core.PubKey,
) (*Component, error) {
// Create pubkey mappings.
var (
Expand Down Expand Up @@ -110,6 +110,10 @@ func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]*
return eth2p0.BLSPubKey{}, errors.New("unknown public share")
}

if seenPubkeys != nil {
seenPubkeys <- core.PubKeyFrom48Bytes(key)
}

return key, nil
}

Expand Down
Loading

0 comments on commit 708c345

Please sign in to comment.