Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd: support multiple beacon nodes #830

Merged
merged 3 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
eth2client "github.com/attestantio/go-eth2-client"
eth2v1 "github.com/attestantio/go-eth2-client/api/v1"
eth2http "github.com/attestantio/go-eth2-client/http"
eth2multi "github.com/attestantio/go-eth2-client/multi"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/coinbase/kryptology/pkg/signatures/bls/bls_sig"
"github.com/ethereum/go-ethereum/p2p/enode"
Expand Down Expand Up @@ -73,7 +74,7 @@ type Config struct {
DataDir string
MonitoringAddr string
ValidatorAPIAddr string
BeaconNodeAddr string
BeaconNodeAddrs []string
JaegerAddr string
JaegerService string
SimnetBMock bool
Expand Down Expand Up @@ -273,7 +274,8 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,

// wireCoreWorkflow wires the core workflow components.
func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
lock cluster.Lock, nodeIdx cluster.NodeIdx, tcpNode host.Host, p2pKey *ecdsa.PrivateKey, eth2Cl eth2client.Service, beaconAddr string, peerIDs []peer.ID,
lock cluster.Lock, nodeIdx cluster.NodeIdx, tcpNode host.Host, p2pKey *ecdsa.PrivateKey, eth2Cl eth2client.Service,
beaconAddrs []string, peerIDs []peer.ID,
) error {
// Convert and prep public keys and public shares
var (
Expand Down Expand Up @@ -333,7 +335,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

if err := wireVAPIRouter(life, conf.ValidatorAPIAddr, beaconAddr, vapi); err != nil {
if err := wireVAPIRouter(life, conf.ValidatorAPIAddr, beaconAddrs, vapi); err != nil {
return err
}

Expand Down Expand Up @@ -418,11 +420,13 @@ func eth2PubKeys(validators []cluster.DistValidator) ([]eth2p0.BLSPubKey, error)
return pubkeys, nil
}

// newETH2Client returns a new eth2client and a beacon node address; it is either a beaconmock for simnet or a http client to a real beacon node.
func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager, validators []cluster.DistValidator) (eth2client.Service, string, error) {
// newETH2Client returns a new eth2client and a beacon node addresses; it is either a beaconmock for simnet or a http client to a real beacon node.
func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager,
validators []cluster.DistValidator,
) (eth2client.Service, []string, error) {
pubkeys, err := eth2PubKeys(validators)
if err != nil {
return nil, "", err
return nil, nil, err
}

if conf.SimnetBMock { // Configure the beacon mock.
Expand All @@ -436,23 +440,27 @@ func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager, va
opts = append(opts, conf.TestConfig.SimnetBMockOpts...)
bmock, err := beaconmock.New(opts...)
if err != nil {
return nil, "", err
return nil, nil, err
}

life.RegisterStop(lifecycle.StopBeaconMock, lifecycle.HookFuncErr(bmock.Close))

return bmock, bmock.HTTPAddr(), nil
return bmock, []string{bmock.HTTPAddr()}, nil
}

if len(conf.BeaconNodeAddrs) == 0 {
return nil, nil, errors.New("beacon node endpoints empty")
}

eth2Cl, err := eth2wrap.NewHTTPService(ctx,
eth2http.WithLogLevel(1),
eth2http.WithAddress(conf.BeaconNodeAddr),
eth2multi.WithLogLevel(1),
eth2multi.WithAddresses(conf.BeaconNodeAddrs),
)
if err != nil {
return nil, "", errors.Wrap(err, "new eth2 http client")
return nil, nil, errors.Wrap(err, "new eth2 http client")
}

return eth2Cl, conf.BeaconNodeAddr, nil
return eth2Cl, conf.BeaconNodeAddrs, nil
}

// newConsensus returns a new consensus component and its start lifecycle hook.
Expand Down Expand Up @@ -510,8 +518,8 @@ func createMockValidators(pubkeys []eth2p0.BLSPubKey) beaconmock.ValidatorSet {
}

// wireVAPIRouter constructs the validator API router and registers it with the life cycle manager.
func wireVAPIRouter(life *lifecycle.Manager, vapiAddr string, beaconAddr string, handler validatorapi.Handler) error {
vrouter, err := validatorapi.NewRouter(handler, beaconAddr)
func wireVAPIRouter(life *lifecycle.Manager, vapiAddr string, beaconAddrs []string, handler validatorapi.Handler) error {
vrouter, err := validatorapi.NewRouter(handler, beaconAddrs)
if err != nil {
return errors.Wrap(err, "new monitoring server")
}
Expand Down
16 changes: 8 additions & 8 deletions app/eth2wrap/eth2wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"
"time"

eth2http "github.com/attestantio/go-eth2-client/http"
eth2multi "github.com/attestantio/go-eth2-client/multi"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand All @@ -46,23 +46,23 @@ var (
)

// NewHTTPService returns a new instrumented eth2 http service.
func NewHTTPService(ctx context.Context, params ...eth2http.Parameter) (*Service, error) {
eth2Svc, err := eth2http.New(ctx, params...)
func NewHTTPService(ctx context.Context, params ...eth2multi.Parameter) (*Service, error) {
eth2Svc, err := eth2multi.New(ctx, params...)
if err != nil {
return nil, errors.Wrap(err, "new eth2http")
return nil, errors.Wrap(err, "new eth2multi")
}

eth2Cl, ok := eth2Svc.(*eth2http.Service)
eth2Cl, ok := eth2Svc.(*eth2multi.Service)
if !ok {
return nil, errors.New("invalid eth2http service")
return nil, errors.New("invalid eth2multi service")
}

return &Service{Service: eth2Cl}, nil
}

// Service wraps an eth2http.Service adding prometheus metrics and error wrapping.
// Service wraps an eth2multi.Service adding prometheus metrics and error wrapping.
type Service struct {
*eth2http.Service
*eth2multi.Service
}

// latency measures endpoint latency.
Expand Down
14 changes: 0 additions & 14 deletions app/eth2wrap/eth2wrap_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions app/eth2wrap/genwrap/genwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// this program. If not, see <http://www.gnu.org/licenses/>.

// Command genwrap provides a code generator for eth2client provider
// methods implemented by eth2http.Service.
// methods implemented by eth2multi.Service.
// It adds prometheus metrics and error wrapping.
package main

Expand Down Expand Up @@ -68,11 +68,13 @@ import (

// skip some provider methods.
skip = map[string]bool{
// eth2http doesn't implement these
// eth2multi doesn't implement these
"GenesisValidatorsRoot": true,
"Index": true,
"PubKey": true,
"SyncState": true,
"EpochFromStateID": true,
"NodeClient": true,

// these are cached, so no need to instrument.
"GenesisTime": true,
Expand Down
19 changes: 16 additions & 3 deletions cmd/cmd_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestCmdFlags(t *testing.T) {
P2PConfig *p2p.Config
Envs map[string]string
Datadir string
ErrorMsg string
}{
{
Name: "version verbose",
Expand All @@ -61,7 +62,10 @@ func TestCmdFlags(t *testing.T) {
{
Name: "run command",
Args: slice("run"),
Envs: map[string]string{"CHARON_DATA_DIR": "from_env"},
Envs: map[string]string{
"CHARON_DATA_DIR": "from_env",
"CHARON_BEACON_NODE_ENDPOINTS": "http://beacon.node",
},
AppConfig: &app.Config{
Log: log.Config{
Level: "info",
Expand All @@ -83,7 +87,7 @@ func TestCmdFlags(t *testing.T) {
DataDir: "from_env",
MonitoringAddr: "127.0.0.1:3620",
ValidatorAPIAddr: "127.0.0.1:3600",
BeaconNodeAddr: "http://localhost/",
BeaconNodeAddrs: []string{"http://beacon.node"},
JaegerAddr: "",
JaegerService: "charon",
},
Expand All @@ -100,6 +104,11 @@ func TestCmdFlags(t *testing.T) {
Denylist: "",
},
},
{
Name: "run require beacon addrs",
Args: slice("run"),
ErrorMsg: "either flag 'beacon-node-endpoints' or flag 'simnet-beacon-mock=true' must be specified",
},
}

for _, test := range tests {
Expand Down Expand Up @@ -137,7 +146,11 @@ func TestCmdFlags(t *testing.T) {
})

root.SetArgs(test.Args)
require.NoError(t, root.Execute())
if test.ErrorMsg != "" {
require.ErrorContains(t, root.Execute(), test.ErrorMsg)
} else {
require.NoError(t, root.Execute())
}
})
}
}
Expand Down
45 changes: 35 additions & 10 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"github.com/spf13/pflag"

"github.com/obolnetwork/charon/app"
"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/featureset"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/p2p"
)

Expand All @@ -44,7 +46,7 @@ func newRunCmd(runFunc func(context.Context, app.Config) error) *cobra.Command {
},
}

bindRunFlags(cmd.Flags(), &conf)
bindRunFlags(cmd, &conf)
bindDataDirFlag(cmd.Flags(), &conf.DataDir)
bindP2PFlags(cmd.Flags(), &conf.P2P)
bindLogFlags(cmd.Flags(), &conf.Log)
Expand All @@ -53,15 +55,38 @@ func newRunCmd(runFunc func(context.Context, app.Config) error) *cobra.Command {
return cmd
}

func bindRunFlags(flags *pflag.FlagSet, config *app.Config) {
flags.StringVar(&config.LockFile, "lock-file", ".charon/cluster-lock.json", "The path to the cluster lock file defining distributed validator cluster")
flags.StringVar(&config.BeaconNodeAddr, "beacon-node-endpoint", "http://localhost/", "Beacon node endpoint URL")
flags.StringVar(&config.ValidatorAPIAddr, "validator-api-address", "127.0.0.1:3600", "Listening address (ip and port) for validator-facing traffic proxying the beacon-node API")
flags.StringVar(&config.MonitoringAddr, "monitoring-address", "127.0.0.1:3620", "Listening address (ip and port) for the monitoring API (prometheus, pprof)")
flags.StringVar(&config.JaegerAddr, "jaeger-address", "", "Listening address for jaeger tracing")
flags.StringVar(&config.JaegerService, "jaeger-service", "charon", "Service name used for jaeger tracing")
flags.BoolVar(&config.SimnetBMock, "simnet-beacon-mock", false, "Enables an internal mock beacon node for running a simnet.")
flags.BoolVar(&config.SimnetVMock, "simnet-validator-mock", false, "Enables an internal mock validator client when running a simnet. Requires simnet-beacon-mock.")
func bindRunFlags(cmd *cobra.Command, config *app.Config) {
var beaconNodeAddr string
cmd.Flags().StringVar(&config.LockFile, "lock-file", ".charon/cluster-lock.json", "The path to the cluster lock file defining distributed validator cluster.")
cmd.Flags().StringVar(&beaconNodeAddr, "beacon-node-endpoint", "", "Beacon node endpoint URL. Deprecated, please use beacon-node-endpoints.")
cmd.Flags().StringSliceVar(&config.BeaconNodeAddrs, "beacon-node-endpoints", nil, "Comma separated list of one or more beacon node endpoint URLs.")
cmd.Flags().StringVar(&config.ValidatorAPIAddr, "validator-api-address", "127.0.0.1:3600", "Listening address (ip and port) for validator-facing traffic proxying the beacon-node API.")
cmd.Flags().StringVar(&config.MonitoringAddr, "monitoring-address", "127.0.0.1:3620", "Listening address (ip and port) for the monitoring API (prometheus, pprof).")
cmd.Flags().StringVar(&config.JaegerAddr, "jaeger-address", "", "Listening address for jaeger tracing.")
cmd.Flags().StringVar(&config.JaegerService, "jaeger-service", "charon", "Service name used for jaeger tracing.")
cmd.Flags().BoolVar(&config.SimnetBMock, "simnet-beacon-mock", false, "Enables an internal mock beacon node for running a simnet.")
cmd.Flags().BoolVar(&config.SimnetVMock, "simnet-validator-mock", false, "Enables an internal mock validator client when running a simnet. Requires simnet-beacon-mock.")

preRunE := cmd.PreRunE // Allow multiple wraps of PreRunE.
cmd.PreRunE = func(cmd *cobra.Command, args []string) error {
ctx := log.WithTopic(cmd.Context(), "cmd")
if beaconNodeAddr != "" {
log.Warn(ctx, "Deprecated flag 'beacon-node-endpoint' used, please use new flag 'beacon-node-endpoints'", nil)
config.BeaconNodeAddrs = []string{beaconNodeAddr}
} else if beaconNodeAddr != "" && len(config.BeaconNodeAddrs) > 0 {
log.Warn(ctx, "Deprecated flag 'beacon-node-endpoint' ignored since new flag 'beacon-node-endpoints' takes precedence",
nil, z.Str("beacon-node-endpoint", beaconNodeAddr), z.Any("beacon-node-endpoints", config.BeaconNodeAddrs))
} else if len(config.BeaconNodeAddrs) == 0 && !config.SimnetBMock {
return errors.New("either flag 'beacon-node-endpoints' or flag 'simnet-beacon-mock=true' must be specified")
// TODO(corver): Use MarkFlagsRequiredTogether once beacon-node-endpoint is removed.
}

if preRunE != nil {
return preRunE(cmd, args)
}

return nil
}
}

func bindLogFlags(flags *pflag.FlagSet, config *log.Config) {
Expand Down
13 changes: 9 additions & 4 deletions core/validatorapi/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Handler interface {
// NewRouter returns a new validator http server router. The http router
// translates http requests related to the distributed validator to the validatorapi.Handler.
// All other requests are reserve-proxied to the beacon-node address.
func NewRouter(h Handler, beaconNodeAddr string) (*mux.Router, error) {
func NewRouter(h Handler, beaconNodeAddrs []string) (*mux.Router, error) {
// Register subset of distributed validator related endpoints
endpoints := []struct {
Name string
Expand Down Expand Up @@ -127,7 +127,7 @@ func NewRouter(h Handler, beaconNodeAddr string) (*mux.Router, error) {
}

// Everything else is proxied
proxy, err := proxyHandler(beaconNodeAddr)
proxy, err := proxyHandler(beaconNodeAddrs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -443,8 +443,13 @@ func submitExit(p eth2client.VoluntaryExitSubmitter) handlerFunc {
}

// proxyHandler returns a reverse proxy handler.
func proxyHandler(target string) (http.HandlerFunc, error) {
targetURL, err := url.Parse(target)
func proxyHandler(targets []string) (http.HandlerFunc, error) {
if len(targets) == 0 {
return nil, errors.New("proxy targets empty")
}

// TODO(corver): Add support for proxing to an available (or best) target
targetURL, err := url.Parse(targets[0])
if err != nil {
return nil, errors.Wrap(err, "invalid proxy target address")
}
Expand Down
6 changes: 3 additions & 3 deletions core/validatorapi/router_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestRouterIntegration(t *testing.T) {
t.Skip("Skipping integration test since BEACON_URL not found")
}

r, err := NewRouter(Handler(nil), beaconURL)
r, err := NewRouter(Handler(nil), []string{beaconURL})
require.NoError(t, err)

server := httptest.NewServer(r)
Expand Down Expand Up @@ -484,7 +484,7 @@ func testRouter(t *testing.T, handler testHandler, callback func(context.Context
proxy := httptest.NewServer(handler.newBeaconHandler(t))
defer proxy.Close()

r, err := NewRouter(handler, proxy.URL)
r, err := NewRouter(handler, []string{proxy.URL})
require.NoError(t, err)

server := httptest.NewServer(r)
Expand All @@ -505,7 +505,7 @@ func testRawRouter(t *testing.T, handler testHandler, callback func(context.Cont
proxy := httptest.NewServer(handler.newBeaconHandler(t))
defer proxy.Close()

r, err := NewRouter(handler, proxy.URL)
r, err := NewRouter(handler, []string{proxy.URL})
require.NoError(t, err)

server := httptest.NewServer(r)
Expand Down
Loading