Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app: push readyz in background #867

Merged
merged 7 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func Run(ctx context.Context, conf Config) (err error) {
return err
}

if err := wireMonitoringAPI(life, conf.MonitoringAddr, localEnode, tcpNode, eth2Cl, peerIDs); err != nil {
if err := wireMonitoringAPI(ctx, life, conf.MonitoringAddr, localEnode, tcpNode, eth2Cl, peerIDs); err != nil {
return err
}

Expand Down
90 changes: 61 additions & 29 deletions app/monitoringapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"math"
"net/http"
"net/http/pprof"
"sync"
"time"

eth2client "github.com/attestantio/go-eth2-client"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/jonboulle/clockwork"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
Expand All @@ -35,7 +37,7 @@ import (

// wireMonitoringAPI constructs the monitoring API and registers it with the life cycle manager.
// It serves prometheus metrics, pprof profiling and the runtime enr.
func wireMonitoringAPI(life *lifecycle.Manager, addr string, localNode *enode.LocalNode, tcpNode host.Host, eth2Svc eth2client.Service, peerIDs []peer.ID) error {
func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, addr string, localNode *enode.LocalNode, tcpNode host.Host, eth2Svc eth2client.Service, peerIDs []peer.ID) error {
mux := http.NewServeMux()

// Serve prometheus metrics
Expand All @@ -56,7 +58,16 @@ func wireMonitoringAPI(life *lifecycle.Manager, addr string, localNode *enode.Lo
return errors.New("invalid eth2 service")
}

mux.Handle("/readyz", newReadyHandler(tcpNode, eth2Cl, peerIDs))
readyErrFunc := startReadyChecker(ctx, tcpNode, eth2Cl, peerIDs, clockwork.NewRealClock())
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
readyErr := readyErrFunc()
if readyErr != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can inline

Copy link
Contributor Author

@dB2510 dB2510 Jul 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah but i avoided this because it needs to send response as error string which will result in calling readErrFunc() twice and again asking for mutex.

writeResponse(w, http.StatusInternalServerError, readyErr.Error())
return
}

writeResponse(w, http.StatusOK, "ok")
})

// Copied from net/http/pprof/pprof.go
mux.HandleFunc("/debug/pprof/", pprof.Index)
Expand All @@ -76,32 +87,46 @@ func wireMonitoringAPI(life *lifecycle.Manager, addr string, localNode *enode.Lo
return nil
}

// newReadyHandler returns a http.HandlerFunc which returns 200 when both the beacon node is synced and all quorum peers can be pinged in parallel within a timeout. Returns 500 otherwise.
func newReadyHandler(tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), time.Second)
defer cancel()

var ready float64
defer func() { readyzGauge.Set(ready) }()

syncing, err := beaconNodeSyncing(ctx, eth2Cl)
if err != nil {
writeResponse(w, http.StatusInternalServerError, "Failed to get beacon sync state")
return
} else if syncing {
writeResponse(w, http.StatusInternalServerError, "Beacon node not synced")
return
// startReadyChecker returns function which returns an error resulting from ready checks periodically.
func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2client.NodeSyncingProvider, peerIDs []peer.ID, clock clockwork.Clock) func() error {
var (
mu sync.Mutex
readyErr error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest initialising the error with non-nil, "errReadyUninit = "ready check uninitialised"

)
go func() {
Copy link
Contributor

@corverroos corverroos Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest extracting a function isReady(ctx) bool which you call here and in newReadyHandler

Copy link
Contributor

@corverroos corverroos Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a test for isReady please 🙏

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should ONLY have a async "isReady" resolver, and then readyz just reads the latest value..?

ticker := clock.NewTicker(10 * time.Second)

for {
select {
case <-ctx.Done():
return
case <-ticker.Chan():
mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember to decrease mutex locked scope to absolute minimum, and remember to NEVER do IO (disk/network) calls while a lock is held.

syncing, err := beaconNodeSyncing(ctx, eth2Cl)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoided logging errors here, since that would spam the logs

if err != nil {
readyErr = errors.New("failed to get beacon sync state")
readyzGauge.Set(0)
} else if syncing {
readyErr = errors.New("beacon node not synced")
readyzGauge.Set(0)
} else if peersReady(ctx, peerIDs, tcpNode) != nil {
readyErr = errors.New("couldn't ping all peers")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest using error sentinels (var errReadyNoPing = errors.New("blah blah")) and use them for comparison in tests

readyzGauge.Set(0)
} else {
readyErr = nil
readyzGauge.Set(1)
}

mu.Unlock()
}
}
}()

err = peersReady(ctx, peerIDs, tcpNode)
if err != nil {
writeResponse(w, http.StatusInternalServerError, "Couldn't ping all peers")
return
}
return func() error {
mu.Lock()
defer mu.Unlock()

ready = 1
writeResponse(w, http.StatusOK, "ok")
return readyErr
}
}

Expand Down Expand Up @@ -133,19 +158,26 @@ func peersReady(ctx context.Context, peerIDs []peer.ID, tcpNode host.Host) error

var (
// Require quorum successes (excluding self). Formula from IBFT 2.0 paper https://arxiv.org/pdf/1909.10194.pdf
require = int(math.Ceil(float64(len(peerIDs)*2)/3)) - 1
actual int
require = int(math.Ceil(float64(len(peerIDs)*2)/3)) - 1
actual int
errCount int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest: okCount, errCount

)
for {
select {
case <-ctx.Done():
return ctx.Err()
case res := <-results:
if res.Error != nil {
continue
errCount++
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added error counter to return error if are unable to ping quorum number of peers

} else {
actual++
}

// Return error if we cannot reach quorum peers.
if errCount > (len(peerIDs) - require - 1) {
return errors.New("not enough peers")
}

actual++
if actual == require {
return nil
}
Expand Down
195 changes: 195 additions & 0 deletions app/monitoringapi_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright © 2022 Obol Labs Inc.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <http://www.gnu.org/licenses/>.

package app

import (
"context"
"testing"
"time"

eth2v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/jonboulle/clockwork"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/stretchr/testify/require"

"github.com/obolnetwork/charon/testutil"
"github.com/obolnetwork/charon/testutil/beaconmock"
)

func TestStartCheckerSuccess(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bmock, err := beaconmock.New()
require.NoError(t, err)

const numNodes = 3
var (
peers []peer.ID
hosts []host.Host
hostsInfo []peer.AddrInfo
)

for i := 0; i < numNodes; i++ {
h := testutil.CreateHost(t, testutil.AvailableAddr(t))
info := peer.AddrInfo{
ID: h.ID(),
Addrs: h.Addrs(),
}
hostsInfo = append(hostsInfo, info)
peers = append(peers, h.ID())
hosts = append(hosts, h)
}

// connect each host with its peers
for i := 0; i < numNodes; i++ {
for k := 0; k < numNodes; k++ {
if i == k {
continue
}
hosts[i].Peerstore().AddAddrs(hostsInfo[k].ID, hostsInfo[k].Addrs, peerstore.PermanentAddrTTL)
}
}

clock := clockwork.NewFakeClock()
readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock)

// We wrap the Advance() calls with blockers to make sure that the ticker
// can go to sleep and produce ticks without time passing in parallel.
clock.BlockUntil(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockUntil only works for calls to Sleep and After, not Timer.Chan().. so you'll need to another way.

clock.Advance(15 * time.Second)
clock.BlockUntil(1)

require.NoError(t, readyErrFunc())
}

func TestStartCheckerSyncing(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bmock, err := beaconmock.New()
require.NoError(t, err)

bmock.NodeSyncingFunc = func(ctx context.Context) (*eth2v1.SyncState, error) {
return &eth2v1.SyncState{IsSyncing: true}, nil
}

const numNodes = 3
var (
peers []peer.ID
hosts []host.Host
hostsInfo []peer.AddrInfo
)

for i := 0; i < numNodes; i++ {
h := testutil.CreateHost(t, testutil.AvailableAddr(t))
info := peer.AddrInfo{
ID: h.ID(),
Addrs: h.Addrs(),
}
hostsInfo = append(hostsInfo, info)
peers = append(peers, h.ID())
hosts = append(hosts, h)
}

// connect each host with its peers
for i := 0; i < numNodes; i++ {
for k := 0; k < numNodes; k++ {
if i == k {
continue
}
hosts[i].Peerstore().AddAddrs(hostsInfo[k].ID, hostsInfo[k].Addrs, peerstore.PermanentAddrTTL)
}
}

clock := clockwork.NewFakeClock()
readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock)

// We wrap the Advance() calls with blockers to make sure that the ticker
// can go to sleep and produce ticks without time passing in parallel.
clock.BlockUntil(1)
clock.Advance(15 * time.Second)
clock.BlockUntil(1)

// Infinite loop required since mutexes are non-deterministic.
for {
err := readyErrFunc()
if err != nil {
require.EqualError(t, err, "beacon node not synced")
break
}
}
}

func TestStartCheckerPingFail(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lots of duplication in these tests, can't we make it a table test rather?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah sure

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bmock, err := beaconmock.New()
require.NoError(t, err)

bmock.NodeSyncingFunc = func(ctx context.Context) (*eth2v1.SyncState, error) {
return &eth2v1.SyncState{IsSyncing: false}, nil
}

const numNodes = 5
var (
peers []peer.ID
hosts []host.Host
hostsInfo []peer.AddrInfo
)

for i := 0; i < numNodes; i++ {
h := testutil.CreateHost(t, testutil.AvailableAddr(t))
info := peer.AddrInfo{
ID: h.ID(),
Addrs: h.Addrs(),
}
hostsInfo = append(hostsInfo, info)
peers = append(peers, h.ID())
hosts = append(hosts, h)
}

// Connect each host with its peers except 3 peers so that quorum number of peers cannot connect.
for i := 0; i < numNodes; i++ {
for k := 0; k < numNodes-3; k++ {
if i == k {
continue
}
hosts[i].Peerstore().AddAddrs(hostsInfo[k].ID, hostsInfo[k].Addrs, peerstore.PermanentAddrTTL)
}
}

clock := clockwork.NewFakeClock()
readyErrFunc := startReadyChecker(ctx, hosts[0], bmock, peers, clock)

// We wrap the Advance() calls with blockers to make sure that the ticker
// can go to sleep and produce ticks without time passing in parallel.
clock.BlockUntil(1)
clock.Advance(15 * time.Second)
clock.BlockUntil(1)

// Infinite loop required since mutexes are non-deterministic.
for {
err := readyErrFunc()
if err != nil {
require.EqualError(t, err, "couldn't ping all peers")
break
}
}
}