Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
corverroos committed Jul 21, 2022
1 parent e8f54ef commit f0a9b6a
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 47 deletions.
29 changes: 15 additions & 14 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func Run(ctx context.Context, conf Config) (err error) {
}
initStartupMetrics(lockHashHex)

eth2Cl, beaconAddr, err := newETH2Client(ctx, conf, life, lock.Validators)
eth2Cl, err := newETH2Client(ctx, conf, life, lock.Validators)
if err != nil {
return err
}
Expand All @@ -204,7 +204,7 @@ func Run(ctx context.Context, conf Config) (err error) {
return err
}

if err := wireCoreWorkflow(ctx, life, conf, lock, nodeIdx, tcpNode, p2pKey, eth2Cl, beaconAddr, peerIDs); err != nil {
if err := wireCoreWorkflow(ctx, life, conf, lock, nodeIdx, tcpNode, p2pKey, eth2Cl, peerIDs); err != nil {
return err
}

Expand Down Expand Up @@ -275,7 +275,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
// wireCoreWorkflow wires the core workflow components.
func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
lock cluster.Lock, nodeIdx cluster.NodeIdx, tcpNode host.Host, p2pKey *ecdsa.PrivateKey, eth2Cl eth2client.Service,
beaconAddrs []string, peerIDs []peer.ID,
peerIDs []peer.ID,
) error {
// Convert and prep public keys and public shares
var (
Expand Down Expand Up @@ -335,7 +335,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

if err := wireVAPIRouter(life, conf.ValidatorAPIAddr, beaconAddrs, vapi); err != nil {
if err := wireVAPIRouter(life, conf.ValidatorAPIAddr, eth2Cl, vapi); err != nil {
return err
}

Expand Down Expand Up @@ -420,13 +420,14 @@ func eth2PubKeys(validators []cluster.DistValidator) ([]eth2p0.BLSPubKey, error)
return pubkeys, nil
}

// newETH2Client returns a new eth2client and a beacon node addresses; it is either a beaconmock for simnet or a http client to a real beacon node.
// newETH2Client returns a new eth2client; it is either a beaconmock for
// simnet or a multi http client to a real beacon node.
func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager,
validators []cluster.DistValidator,
) (eth2client.Service, []string, error) {
) (eth2client.Service, error) {
pubkeys, err := eth2PubKeys(validators)
if err != nil {
return nil, nil, err
return nil, err
}

if conf.SimnetBMock { // Configure the beacon mock.
Expand All @@ -440,16 +441,16 @@ func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager,
opts = append(opts, conf.TestConfig.SimnetBMockOpts...)
bmock, err := beaconmock.New(opts...)
if err != nil {
return nil, nil, err
return nil, err
}

life.RegisterStop(lifecycle.StopBeaconMock, lifecycle.HookFuncErr(bmock.Close))

return bmock, []string{bmock.HTTPAddr()}, nil
return bmock, nil
}

if len(conf.BeaconNodeAddrs) == 0 {
return nil, nil, errors.New("beacon node endpoints empty")
return nil, errors.New("beacon node endpoints empty")
}

eth2Cl, err := eth2wrap.NewHTTPService(ctx,
Expand All @@ -458,10 +459,10 @@ func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager,
eth2wrap.WithMultiMetrics(),
)
if err != nil {
return nil, nil, errors.Wrap(err, "new eth2 http client")
return nil, errors.Wrap(err, "new eth2 http client")
}

return eth2Cl, conf.BeaconNodeAddrs, nil
return eth2Cl, nil
}

// newConsensus returns a new consensus component and its start lifecycle hook.
Expand Down Expand Up @@ -519,8 +520,8 @@ func createMockValidators(pubkeys []eth2p0.BLSPubKey) beaconmock.ValidatorSet {
}

// wireVAPIRouter constructs the validator API router and registers it with the life cycle manager.
func wireVAPIRouter(life *lifecycle.Manager, vapiAddr string, beaconAddrs []string, handler validatorapi.Handler) error {
vrouter, err := validatorapi.NewRouter(handler, beaconAddrs)
func wireVAPIRouter(life *lifecycle.Manager, vapiAddr string, eth2Cl eth2client.Service, handler validatorapi.Handler) error {
vrouter, err := validatorapi.NewRouter(handler, eth2Cl)
if err != nil {
return errors.Wrap(err, "new monitoring server")
}
Expand Down
65 changes: 40 additions & 25 deletions core/validatorapi/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Handler interface {
// NewRouter returns a new validator http server router. The http router
// translates http requests related to the distributed validator to the validatorapi.Handler.
// All other requests are reserve-proxied to the beacon-node address.
func NewRouter(h Handler, beaconNodeAddrs []string) (*mux.Router, error) {
func NewRouter(h Handler, eth2Cl eth2client.Service) (*mux.Router, error) {
// Register subset of distributed validator related endpoints
endpoints := []struct {
Name string
Expand Down Expand Up @@ -127,7 +127,7 @@ func NewRouter(h Handler, beaconNodeAddrs []string) (*mux.Router, error) {
}

// Everything else is proxied
proxy, err := proxyHandler(beaconNodeAddrs)
proxy, err := proxyHandler(eth2Cl)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -443,38 +443,53 @@ func submitExit(p eth2client.VoluntaryExitSubmitter) handlerFunc {
}

// proxyHandler returns a reverse proxy handler.
func proxyHandler(targets []string) (http.HandlerFunc, error) {
if len(targets) == 0 {
return nil, errors.New("proxy targets empty")
}

// TODO(corver): Add support for proxing to an available (or best) target
targetURL, err := url.Parse(targets[0])
if err != nil {
return nil, errors.Wrap(err, "invalid proxy target address")
}
func proxyHandler(eth2Cl eth2client.Service) (http.HandlerFunc, error) {
return func(w http.ResponseWriter, r *http.Request) {
// Get active beacon node address.
targetURL, err := getBeaconNodeAddress(eth2Cl)
if err != nil {
ctx := log.WithTopic(r.Context(), "vapi")
log.Error(ctx, "Proxy target beacon node address", err)
writeError(ctx, w, "proxy", err)

// TODO(corver): Add support for multiple upstream targets via some form of load balancing.
proxy := httputil.NewSingleHostReverseProxy(targetURL)
return
}
// Get address for active beacon node
proxy := httputil.NewSingleHostReverseProxy(targetURL)

// Extend default proxy director with basic auth and host header.
defaultDirector := proxy.Director
proxy.Director = func(req *http.Request) {
if targetURL.User != nil {
password, _ := targetURL.User.Password()
req.SetBasicAuth(targetURL.User.Username(), password)
// Extend default proxy director with basic auth and host header.
defaultDirector := proxy.Director
proxy.Director = func(req *http.Request) {
if targetURL.User != nil {
password, _ := targetURL.User.Password()
req.SetBasicAuth(targetURL.User.Username(), password)
}
req.Host = targetURL.Host
defaultDirector(req)
}
req.Host = targetURL.Host
defaultDirector(req)
}
proxy.ErrorLog = stdlog.New(io.Discard, "", 0)
proxy.ErrorLog = stdlog.New(io.Discard, "", 0)

return func(w http.ResponseWriter, r *http.Request) {
defer observeAPILatency("proxy")()
proxy.ServeHTTP(proxyResponseWriter{w.(writeFlusher)}, r)
}, nil
}

// getBeaconNodeAddress returns an active beacon node proxy target address.
func getBeaconNodeAddress(eth2Cl eth2client.Service) (*url.URL, error) {
addr := eth2Cl.Address()
if addr == "none" {
// eth2multi returns "none" if no active clients.
return nil, errors.New("no active beacon node clients")
}

targetURL, err := url.Parse(addr)
if err != nil {
return nil, errors.Wrap(err, "invalid beacon node address", z.Str("address", addr))
}

return targetURL, nil
}

// writeResponse writes the 200 OK response and json response body.
func writeResponse(ctx context.Context, w http.ResponseWriter, endpoint string, response interface{}) {
w.WriteHeader(http.StatusOK)
Expand Down
17 changes: 14 additions & 3 deletions core/validatorapi/router_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestRouterIntegration(t *testing.T) {
t.Skip("Skipping integration test since BEACON_URL not found")
}

r, err := NewRouter(Handler(nil), []string{beaconURL})
r, err := NewRouter(Handler(nil), testBeaconAddr(beaconURL))
require.NoError(t, err)

server := httptest.NewServer(r)
Expand Down Expand Up @@ -484,7 +484,7 @@ func testRouter(t *testing.T, handler testHandler, callback func(context.Context
proxy := httptest.NewServer(handler.newBeaconHandler(t))
defer proxy.Close()

r, err := NewRouter(handler, []string{proxy.URL})
r, err := NewRouter(handler, testBeaconAddr(proxy.URL))
require.NoError(t, err)

server := httptest.NewServer(r)
Expand All @@ -505,7 +505,7 @@ func testRawRouter(t *testing.T, handler testHandler, callback func(context.Cont
proxy := httptest.NewServer(handler.newBeaconHandler(t))
defer proxy.Close()

r, err := NewRouter(handler, []string{proxy.URL})
r, err := NewRouter(handler, testBeaconAddr(proxy.URL))
require.NoError(t, err)

server := httptest.NewServer(r)
Expand Down Expand Up @@ -617,3 +617,14 @@ func nest(data interface{}, nests ...string) interface{} {

return res
}

// testBeaconAddr implements eth2client.Service only returning an address.
type testBeaconAddr string

func (t testBeaconAddr) Name() string {
return string(t)
}

func (t testBeaconAddr) Address() string {
return string(t)
}
6 changes: 1 addition & 5 deletions testutil/beaconmock/beaconmock.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,7 @@ func (Mock) Name() string {
return "beacon-mock"
}

func (Mock) Address() string {
return "mock-address"
}

func (m Mock) HTTPAddr() string {
func (m Mock) Address() string {
return "http://" + m.httpServer.Addr
}

Expand Down

0 comments on commit f0a9b6a

Please sign in to comment.