Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app/monitoringapi: detect all validators queried by VC #1566

Merged
merged 6 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,19 @@ func Run(ctx context.Context, conf Config) (err error) {

qbftDebug := newQBFTDebugger()

// Non-blocking channel to send seen public keys from validatorapi to monitoringapi.
seenPubkeys := make(chan core.PubKey, 1)
dB2510 marked this conversation as resolved.
Show resolved Hide resolved

pubkeys, err := getDVPubkeys(lock)
if err != nil {
return err
}

wireMonitoringAPI(ctx, life, conf.MonitoringAddr, localEnode, tcpNode, eth2Cl, peerIDs,
promRegistry, qbftDebug)
promRegistry, qbftDebug, pubkeys, seenPubkeys)

err = wireCoreWorkflow(ctx, life, conf, lock, nodeIdx, tcpNode, p2pKey, eth2Cl,
peerIDs, sender, qbftDebug.AddInstance)
peerIDs, sender, qbftDebug.AddInstance, seenPubkeys)
if err != nil {
return err
}
Expand Down Expand Up @@ -322,7 +330,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
lock cluster.Lock, nodeIdx cluster.NodeIdx, tcpNode host.Host, p2pKey *ecdsa.PrivateKey,
eth2Cl eth2wrap.Client, peerIDs []peer.ID, sender *p2p.Sender,
qbftSniffer func(*pbv1.SniffedConsensusInstance),
qbftSniffer func(*pbv1.SniffedConsensusInstance), seenPubkeys chan core.PubKey,
) error {
// Convert and prep public keys and public shares
var (
Expand Down Expand Up @@ -405,7 +413,8 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,

dutyDB := dutydb.NewMemDB(deadlinerFunc("dutydb"))

vapi, err := validatorapi.NewComponent(eth2Cl, pubSharesByKey, nodeIdx.ShareIdx, lock.FeeRecipientAddress, conf.BuilderAPI)
vapi, err := validatorapi.NewComponent(eth2Cl, pubSharesByKey, nodeIdx.ShareIdx, lock.FeeRecipientAddress,
conf.BuilderAPI, seenPubkeys)
if err != nil {
return err
}
Expand Down Expand Up @@ -821,6 +830,25 @@ func setFeeRecipient(eth2Cl eth2wrap.Client, pubkeys []eth2p0.BLSPubKey, feeReci
}
}

// getDVPubkeys returns DV public keys from given cluster.Lock.
func getDVPubkeys(lock cluster.Lock) ([]core.PubKey, error) {
var pubkeys []core.PubKey
for _, dv := range lock.Validators {
pk, err := dv.PublicKey()
if err != nil {
return nil, err
}

pubkey, err := tblsconv.KeyToCore(pk)
if err != nil {
return nil, err
}
pubkeys = append(pubkeys, pubkey)
}

return pubkeys, nil
}

// httpServeHook wraps a http.Server.ListenAndServe function, swallowing http.ErrServerClosed.
type httpServeHook func() error

Expand Down
4 changes: 4 additions & 0 deletions app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const (
// readyzInsufficientPeers indicates the readyz is returning 500s since this node isn't connected
// to quorum peers via the P2P network.
readyzInsufficientPeers = 4
// readyzVCNotConfigured indicates the readyz is returning 500s since VC is not configured for this node.
readyzVCNotConfigured = 5
// readyVCMissingValidators indicates the ready is returning 500s since VC missing some validators.
readyzVCMissingValidators = 6
)

var (
Expand Down
37 changes: 35 additions & 2 deletions app/monitoringapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,24 @@ import (
"github.com/obolnetwork/charon/app/eth2wrap"
"github.com/obolnetwork/charon/app/lifecycle"
"github.com/obolnetwork/charon/cluster"
"github.com/obolnetwork/charon/core"
)

var (
errReadyUninitialised = errors.New("ready check uninitialised")
errReadyInsufficientPeers = errors.New("quorum peers not connected")
errReadyBeaconNodeSyncing = errors.New("beacon node not synced")
errReadyBeaconNodeDown = errors.New("beacon node down")
errReadyVCNotConfigured = errors.New("vc not configured")
errReadyVCMissingVals = errors.New("vc missing some validators")
)

// wireMonitoringAPI constructs the monitoring API and registers it with the life cycle manager.
// It serves prometheus metrics, pprof profiling and the runtime enr.
func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string,
localNode *enode.LocalNode, tcpNode host.Host, eth2Cl eth2wrap.Client,
peerIDs []peer.ID, registry *prometheus.Registry, qbftDebug http.Handler,
pubkeys []core.PubKey, seenPubkeys chan core.PubKey,
) {
mux := http.NewServeMux()

Expand All @@ -66,7 +70,7 @@ func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string
writeResponse(w, http.StatusOK, "ok")
}))

readyErrFunc := startReadyChecker(ctx, tcpNode, eth2Cl, peerIDs, clockwork.NewRealClock())
readyErrFunc := startReadyChecker(ctx, tcpNode, eth2Cl, peerIDs, clockwork.NewRealClock(), pubkeys, seenPubkeys)
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
readyErr := readyErrFunc()
if readyErr != nil {
Expand Down Expand Up @@ -98,7 +102,9 @@ func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string
}

// startReadyChecker returns function which returns an error resulting from ready checks periodically.
func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID, clock clockwork.Clock) func() error {
func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID,
clock clockwork.Clock, pubkeys []core.PubKey, seenPubkeys chan core.PubKey,
) func() error {
const minNotConnected = 6 // Require 6 rounds (1min) of too few connected
var (
mu sync.Mutex
Expand All @@ -107,11 +113,29 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client
)
go func() {
ticker := clock.NewTicker(10 * time.Second)
epochTicker := clock.NewTicker(32 * 12 * time.Second) // 32 slots * 12 second slot time
previous := make(map[core.PubKey]bool)
dB2510 marked this conversation as resolved.
Show resolved Hide resolved

// newCurrent returns a new current map, populated with all the pubkeys.
newCurrent := func() map[core.PubKey]bool {
current := make(map[core.PubKey]bool)
for _, pubkey := range pubkeys {
current[pubkey] = true
}

return current
}

// Initialise current.
current := newCurrent()

for {
select {
case <-ctx.Done():
return
case <-epochTicker.Chan():
// Copy current to previous and clear current.
previous, current = current, newCurrent()
case <-ticker.Chan():
if quorumPeersConnected(peerIDs, tcpNode) {
notConnectedRounds = 0
Expand All @@ -129,13 +153,22 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client
} else if notConnectedRounds >= minNotConnected {
err = errReadyInsufficientPeers
readyzGauge.Set(readyzInsufficientPeers)
} else if len(previous) == len(pubkeys) {
err = errReadyVCNotConfigured
readyzGauge.Set(readyzVCNotConfigured)
} else if len(previous) > 0 {
err = errReadyVCMissingVals
readyzGauge.Set(readyzVCMissingValidators)
} else {
readyzGauge.Set(readyzReady)
}

mu.Lock()
readyErr = err
mu.Unlock()
case pubkey := <-seenPubkeys:
// Delete pubkey if called by a VC.
delete(current, pubkey)
}
}
}()
Expand Down
50 changes: 44 additions & 6 deletions app/monitoringapi_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,43 +27,65 @@ import (
"github.com/stretchr/testify/require"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/testutil"
"github.com/obolnetwork/charon/testutil/beaconmock"
)

func TestStartChecker(t *testing.T) {
pubkeys := []core.PubKey{testutil.RandomCorePubKey(t), testutil.RandomCorePubKey(t), testutil.RandomCorePubKey(t)}
tests := []struct {
name string
isSyncing bool
numPeers int
absentPeers int
seenPubkeys []core.PubKey
err error
}{
{
name: "success",
isSyncing: false,
numPeers: 5,
absentPeers: 0,
seenPubkeys: pubkeys,
},
{
name: "syncing",
isSyncing: true,
numPeers: 5,
absentPeers: 0,
seenPubkeys: pubkeys,
err: errReadyBeaconNodeSyncing,
},
{
name: "too few peers",
isSyncing: false,
numPeers: 5,
absentPeers: 3,
seenPubkeys: pubkeys,
err: errReadyInsufficientPeers,
},
{
name: "vc not configured",
isSyncing: false,
numPeers: 4,
absentPeers: 0,
err: errReadyVCNotConfigured,
},
{
name: "vc missing some validators",
isSyncing: false,
numPeers: 4,
absentPeers: 0,
seenPubkeys: []core.PubKey{pubkeys[0]},
err: errReadyVCMissingVals,
},
{
name: "success",
isSyncing: false,
numPeers: 4,
absentPeers: 1,
seenPubkeys: pubkeys,
},
}

Expand Down Expand Up @@ -103,13 +125,21 @@ func TestStartChecker(t *testing.T) {
}

clock := clockwork.NewFakeClock()
readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock)
seenPubkeys := make(chan core.PubKey)
readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock, pubkeys, seenPubkeys)

for _, pubkey := range tt.seenPubkeys {
seenPubkeys <- pubkey
}

// We wrap the Advance() calls with blockers to make sure that the ticker
// can go to sleep and produce ticks without time passing in parallel.
clock.BlockUntil(1)
clock.Advance(15 * time.Second)
clock.BlockUntil(1)
// Advance clock for first tick.
advanceClock(clock, 10*time.Second, 2)

// Advance clock for first epoch tick.
advanceClock(clock, 384*time.Second, 2)

// Advance clock for last tick.
advanceClock(clock, 10*time.Second, 2)

if tt.err != nil {
require.Eventually(t, func() bool {
Expand All @@ -129,3 +159,11 @@ func TestStartChecker(t *testing.T) {
})
}
}

func advanceClock(clock clockwork.FakeClock, duration time.Duration, numTickers int) {
// We wrap the Advance() calls with blockers to make sure that the ticker
// can go to sleep and produce ticks without time passing in parallel.
clock.BlockUntil(numTickers)
clock.Advance(duration)
clock.BlockUntil(numTickers)
}
6 changes: 5 additions & 1 deletion core/validatorapi/validatorapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewComponentInsecure(_ *testing.T, eth2Cl eth2wrap.Client, shareIdx int) (*

// NewComponent returns a new instance of the validator API core workflow component.
func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]*bls_sig.PublicKey,
shareIdx int, feeRecipientAddress string, builderAPI bool,
shareIdx int, feeRecipientAddress string, builderAPI bool, seenPubkeys chan core.PubKey,
) (*Component, error) {
// Create pubkey mappings.
var (
Expand Down Expand Up @@ -110,6 +110,10 @@ func NewComponent(eth2Cl eth2wrap.Client, pubShareByKey map[*bls_sig.PublicKey]*
return eth2p0.BLSPubKey{}, errors.New("unknown public share")
}

if seenPubkeys != nil {
seenPubkeys <- core.PubKeyFrom48Bytes(key)
}

return key, nil
}

Expand Down
Loading