diff --git a/app/app.go b/app/app.go index 6fc132633..0af0e78b1 100644 --- a/app/app.go +++ b/app/app.go @@ -205,7 +205,7 @@ func Run(ctx context.Context, conf Config) (err error) { return err } - if err := wireMonitoringAPI(life, conf.MonitoringAddr, localEnode, tcpNode, eth2Cl, peerIDs); err != nil { + if err := wireMonitoringAPI(ctx, life, conf.MonitoringAddr, localEnode, tcpNode, eth2Cl, peerIDs); err != nil { return err } diff --git a/app/monitoringapi.go b/app/monitoringapi.go index 95135681a..7346b2ac5 100644 --- a/app/monitoringapi.go +++ b/app/monitoringapi.go @@ -20,10 +20,12 @@ import ( "math" "net/http" "net/http/pprof" + "sync" "time" eth2client "github.com/attestantio/go-eth2-client" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/jonboulle/clockwork" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" @@ -33,9 +35,16 @@ import ( "github.com/obolnetwork/charon/app/lifecycle" ) +var ( + errReadyUnInit = errors.New("ready check uninitialised") + errReadyPingFailing = errors.New("couldn't ping all peers") + errReadySyncing = errors.New("beacon node not synced") + errReadyBeaconNodeFailed = errors.New("failed to get beacon sync state") +) + // wireMonitoringAPI constructs the monitoring API and registers it with the life cycle manager. // It serves prometheus metrics, pprof profiling and the runtime enr. -func wireMonitoringAPI(life *lifecycle.Manager, addr string, localNode *enode.LocalNode, tcpNode host.Host, eth2Svc eth2client.Service, peerIDs []peer.ID) error { +func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string, localNode *enode.LocalNode, tcpNode host.Host, eth2Svc eth2client.Service, peerIDs []peer.ID) error { mux := http.NewServeMux() // Serve prometheus metrics @@ -56,7 +65,16 @@ func wireMonitoringAPI(life *lifecycle.Manager, addr string, localNode *enode.Lo return errors.New("invalid eth2 service") } - mux.Handle("/readyz", newReadyHandler(tcpNode, eth2Cl, peerIDs)) + readyErrFunc := startReadyChecker(ctx, tcpNode, eth2Cl, peerIDs, clockwork.NewRealClock()) + mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { + readyErr := readyErrFunc() + if readyErr != nil { + writeResponse(w, http.StatusInternalServerError, readyErr.Error()) + return + } + + writeResponse(w, http.StatusOK, "ok") + }) // Copied from net/http/pprof/pprof.go mux.HandleFunc("/debug/pprof/", pprof.Index) @@ -76,32 +94,46 @@ func wireMonitoringAPI(life *lifecycle.Manager, addr string, localNode *enode.Lo return nil } -// newReadyHandler returns a http.HandlerFunc which returns 200 when both the beacon node is synced and all quorum peers can be pinged in parallel within a timeout. Returns 500 otherwise. -func newReadyHandler(tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(r.Context(), time.Second) - defer cancel() - - var ready float64 - defer func() { readyzGauge.Set(ready) }() - - syncing, err := beaconNodeSyncing(ctx, eth2Cl) - if err != nil { - writeResponse(w, http.StatusInternalServerError, "Failed to get beacon sync state") - return - } else if syncing { - writeResponse(w, http.StatusInternalServerError, "Beacon node not synced") - return +// startReadyChecker returns function which returns an error resulting from ready checks periodically. +func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID, clock clockwork.Clock) func() error { + var ( + mu sync.Mutex + readyErr = errReadyUnInit + ) + go func() { + ticker := clock.NewTicker(10 * time.Second) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.Chan(): + syncing, err := beaconNodeSyncing(ctx, eth2Cl) + if err != nil { + err = errReadyBeaconNodeFailed + readyzGauge.Set(0) + } else if syncing { + err = errReadySyncing + readyzGauge.Set(0) + } else if peersReady(ctx, peerIDs, tcpNode) != nil { + err = errReadyPingFailing + readyzGauge.Set(0) + } else { + readyzGauge.Set(1) + } + + mu.Lock() + readyErr = err + mu.Unlock() + } } + }() - err = peersReady(ctx, peerIDs, tcpNode) - if err != nil { - writeResponse(w, http.StatusInternalServerError, "Couldn't ping all peers") - return - } + return func() error { + mu.Lock() + defer mu.Unlock() - ready = 1 - writeResponse(w, http.StatusOK, "ok") + return readyErr } } @@ -133,8 +165,9 @@ func peersReady(ctx context.Context, peerIDs []peer.ID, tcpNode host.Host) error var ( // Require quorum successes (excluding self). Formula from IBFT 2.0 paper https://arxiv.org/pdf/1909.10194.pdf - require = int(math.Ceil(float64(len(peerIDs)*2)/3)) - 1 - actual int + require = int(math.Ceil(float64(len(peerIDs)*2)/3)) - 1 + okCount int + errCount int ) for { select { @@ -142,11 +175,17 @@ func peersReady(ctx context.Context, peerIDs []peer.ID, tcpNode host.Host) error return ctx.Err() case res := <-results: if res.Error != nil { - continue + errCount++ + } else { + okCount++ + } + + // Return error if we cannot reach quorum peers. + if errCount > (len(peerIDs) - require - 1) { + return errors.New("not enough peers") } - actual++ - if actual == require { + if okCount == require { return nil } } diff --git a/app/monitoringapi_internal_test.go b/app/monitoringapi_internal_test.go new file mode 100644 index 000000000..48b6844b1 --- /dev/null +++ b/app/monitoringapi_internal_test.go @@ -0,0 +1,134 @@ +// Copyright © 2022 Obol Labs Inc. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . + +package app + +import ( + "context" + "testing" + "time" + + eth2v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/jonboulle/clockwork" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/stretchr/testify/require" + + "github.com/obolnetwork/charon/testutil" + "github.com/obolnetwork/charon/testutil/beaconmock" +) + +func TestStartChecker(t *testing.T) { + tests := []struct { + name string + isSyncing bool + numPeers int + absentPeers int + err error + }{ + { + name: "success", + isSyncing: false, + numPeers: 5, + absentPeers: 0, + }, + { + name: "syncing", + isSyncing: true, + numPeers: 5, + absentPeers: 0, + err: errReadySyncing, + }, + { + name: "peer ping failing", + isSyncing: false, + numPeers: 5, + absentPeers: 3, + err: errReadyPingFailing, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + bmock, err := beaconmock.New() + require.NoError(t, err) + + bmock.NodeSyncingFunc = func(ctx context.Context) (*eth2v1.SyncState, error) { + return ð2v1.SyncState{IsSyncing: tt.isSyncing}, nil + } + + var ( + peers []peer.ID + hosts []host.Host + hostsInfo []peer.AddrInfo + ) + + for i := 0; i < tt.numPeers; i++ { + h := testutil.CreateHost(t, testutil.AvailableAddr(t)) + info := peer.AddrInfo{ + ID: h.ID(), + Addrs: h.Addrs(), + } + hostsInfo = append(hostsInfo, info) + peers = append(peers, h.ID()) + hosts = append(hosts, h) + } + + // connect each host with its peers + for i := 0; i < tt.numPeers; i++ { + for k := 0; k < tt.numPeers-tt.absentPeers; k++ { + if i == k { + continue + } + hosts[i].Peerstore().AddAddrs(hostsInfo[k].ID, hostsInfo[k].Addrs, peerstore.PermanentAddrTTL) + } + } + + clock := clockwork.NewFakeClock() + readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock) + + // We wrap the Advance() calls with blockers to make sure that the ticker + // can go to sleep and produce ticks without time passing in parallel. + clock.BlockUntil(1) + clock.Advance(15 * time.Second) + clock.BlockUntil(1) + + if tt.err != nil { + require.Eventually(t, func() bool { + err = readyErrFunc() + if err != nil { + require.EqualError(t, err, tt.err.Error()) + return true + } + + return false + }, 100*time.Millisecond, time.Millisecond) + } else { + require.Eventually(t, func() bool { + err = readyErrFunc() + if err == nil { + return true + } + + return false + }, 100*time.Millisecond, time.Millisecond) + } + }) + } +}