From 2ddcbd708fb612fe7022c304c78f545dd24e5d74 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Sun, 3 May 2020 21:18:53 +0200 Subject: [PATCH] go/worker/consensusrpc: Add public consensus RPC services worker A public consensus services worker enables any full consensus node to expose light client services to other nodes that may need them (e.g., they are needed to support light clients). The worker can be enabled using --worker.consensusrpc.enabled and is disabled by default. Enabling the public consensus services worker exposes the light consensus client interface over publicly accessible gRPC. --- .changelog/2440.feature.2.md | 9 ++ go/common/node/node.go | 17 +-- go/consensus/api/light.go | 5 +- go/oasis-node/cmd/node/node.go | 22 +++- go/oasis-test-runner/oasis/args.go | 6 ++ go/oasis-test-runner/oasis/fixture.go | 3 + go/oasis-test-runner/oasis/validator.go | 13 ++- go/oasis-test-runner/scenario/e2e/runtime.go | 6 +- go/worker/consensusrpc/worker.go | 105 +++++++++++++++++++ 9 files changed, 172 insertions(+), 14 deletions(-) create mode 100644 .changelog/2440.feature.2.md create mode 100644 go/worker/consensusrpc/worker.go diff --git a/.changelog/2440.feature.2.md b/.changelog/2440.feature.2.md new file mode 100644 index 00000000000..4b619ad6b64 --- /dev/null +++ b/.changelog/2440.feature.2.md @@ -0,0 +1,9 @@ +go/worker/consensusrpc: Add public consensus RPC services worker + +A public consensus services worker enables any full consensus node to expose +light client services to other nodes that may need them (e.g., they are needed +to support light clients). + +The worker can be enabled using `--worker.consensusrpc.enabled` and is +disabled by default. Enabling the public consensus services worker exposes +the light consensus client interface over publicly accessible gRPC. diff --git a/go/common/node/node.go b/go/common/node/node.go index 9bd4ca08ded..402fe8093a7 100644 --- a/go/common/node/node.go +++ b/go/common/node/node.go @@ -69,18 +69,20 @@ type Node struct { type RolesMask uint32 const ( - // RoleComputeWorker is Oasis compute worker role. + // RoleComputeWorker is the compute worker role. RoleComputeWorker RolesMask = 1 << 0 - // RoleStorageWorker is Oasis storage worker role. + // RoleStorageWorker is the storage worker role. RoleStorageWorker RolesMask = 1 << 1 - // RoleKeyManager is the Oasis key manager role. + // RoleKeyManager is the the key manager role. RoleKeyManager RolesMask = 1 << 2 - // RoleValidator is the Oasis validator role. + // RoleValidator is the validator role. RoleValidator RolesMask = 1 << 3 + // RoleConsensusRPC is the public consensus RPC services worker role. + RoleConsensusRPC RolesMask = 1 << 4 // RoleReserved are all the bits of the Oasis node roles bitmask // that are reserved and must not be used. - RoleReserved RolesMask = ((1 << 32) - 1) & ^((RoleValidator << 1) - 1) + RoleReserved RolesMask = ((1 << 32) - 1) & ^((RoleConsensusRPC << 1) - 1) ) // IsSingleRole returns true if RolesMask encodes a single valid role. @@ -102,11 +104,14 @@ func (m RolesMask) String() string { ret = append(ret, "storage") } if m&RoleKeyManager != 0 { - ret = append(ret, "key_manager") + ret = append(ret, "key-manager") } if m&RoleValidator != 0 { ret = append(ret, "validator") } + if m&RoleConsensusRPC != 0 { + ret = append(ret, "consensus-rpc") + } return strings.Join(ret, ",") } diff --git a/go/consensus/api/light.go b/go/consensus/api/light.go index 029f8978737..bef57e18177 100644 --- a/go/consensus/api/light.go +++ b/go/consensus/api/light.go @@ -36,9 +36,8 @@ type ValidatorSet struct { type Parameters struct { // Height contains the block height these consensus parameters are for. Height int64 `json:"height"` - - // TODO: Consider also including consensus/genesis.Parameters which are backend-agnostic. - // Meta contains the consensus backend specific consensus parameters. Meta []byte `json:"meta"` + + // TODO: Consider also including consensus/genesis.Parameters which are backend-agnostic. } diff --git a/go/oasis-node/cmd/node/node.go b/go/oasis-node/cmd/node/node.go index 7c58ebe854f..586c211ed4b 100644 --- a/go/oasis-node/cmd/node/node.go +++ b/go/oasis-node/cmd/node/node.go @@ -61,6 +61,7 @@ import ( "github.com/oasislabs/oasis-core/go/worker/compute/executor" "github.com/oasislabs/oasis-core/go/worker/compute/merge" "github.com/oasislabs/oasis-core/go/worker/compute/txnscheduler" + workerConsensusRPC "github.com/oasislabs/oasis-core/go/worker/consensusrpc" workerKeymanager "github.com/oasislabs/oasis-core/go/worker/keymanager" "github.com/oasislabs/oasis-core/go/worker/registration" workerSentry "github.com/oasislabs/oasis-core/go/worker/sentry" @@ -126,6 +127,7 @@ type Node struct { P2P *p2p.P2P RegistrationWorker *registration.Worker KeymanagerWorker *workerKeymanager.Worker + ConsensusWorker *workerConsensusRPC.Worker } // Cleanup cleans up after the node has terminated. @@ -340,6 +342,13 @@ func (n *Node) initWorkers(logger *logging.Logger) error { } n.svcMgr.Register(n.TransactionSchedulerWorker) + // Initialize the public consensus services worker. + n.ConsensusWorker, err = workerConsensusRPC.New(n.CommonWorker, n.RegistrationWorker) + if err != nil { + return err + } + n.svcMgr.Register(n.ConsensusWorker) + return nil } @@ -384,8 +393,18 @@ func (n *Node) startWorkers(logger *logging.Logger) error { return err } + // Start the public consensus services worker. + if err := n.ConsensusWorker.Start(); err != nil { + return fmt.Errorf("consensus worker: %w", err) + } + // Only start the external gRPC server if any workers are enabled. - if n.StorageWorker.Enabled() || n.TransactionSchedulerWorker.Enabled() || n.MergeWorker.Enabled() || n.KeymanagerWorker.Enabled() { + if n.StorageWorker.Enabled() || + n.TransactionSchedulerWorker.Enabled() || + n.MergeWorker.Enabled() || + n.KeymanagerWorker.Enabled() || + n.ConsensusWorker.Enabled() { + if err := n.CommonWorker.Grpc.Start(); err != nil { logger.Error("failed to start external gRPC server", "err", err, @@ -793,6 +812,7 @@ func init() { workerCommon.Flags, workerStorage.Flags, workerSentry.Flags, + workerConsensusRPC.Flags, crash.InitFlags(), } { Flags.AddFlagSet(v) diff --git a/go/oasis-test-runner/oasis/args.go b/go/oasis-test-runner/oasis/args.go index 0ee717542df..7b6f45e229a 100644 --- a/go/oasis-test-runner/oasis/args.go +++ b/go/oasis-test-runner/oasis/args.go @@ -30,6 +30,7 @@ import ( "github.com/oasislabs/oasis-core/go/worker/common/p2p" "github.com/oasislabs/oasis-core/go/worker/compute" "github.com/oasislabs/oasis-core/go/worker/compute/txnscheduler" + workerConsensusRPC "github.com/oasislabs/oasis-core/go/worker/consensusrpc" "github.com/oasislabs/oasis-core/go/worker/keymanager" "github.com/oasislabs/oasis-core/go/worker/registration" workerSentry "github.com/oasislabs/oasis-core/go/worker/sentry" @@ -377,6 +378,11 @@ func (args *argBuilder) workerTxnschedulerCheckTxEnabled() *argBuilder { return args } +func (args *argBuilder) workerConsensusRPCEnabled() *argBuilder { + args.vec = append(args.vec, "--"+workerConsensusRPC.CfgWorkerEnabled) + return args +} + func (args *argBuilder) iasUseGenesis() *argBuilder { args.vec = append(args.vec, "--ias.use_genesis") return args diff --git a/go/oasis-test-runner/oasis/fixture.go b/go/oasis-test-runner/oasis/fixture.go index c84851ba998..b7c9a594792 100644 --- a/go/oasis-test-runner/oasis/fixture.go +++ b/go/oasis-test-runner/oasis/fixture.go @@ -133,6 +133,9 @@ type ConsensusFixture struct { // nolint: maligned // TendermintRecoverCorruptedWAL enables automatic recovery of corrupted Tendermint's WAL. TendermintRecoverCorruptedWAL bool `json:"tendermint_recover_corrupted_wal"` + + // EnableConsensusRPCWorker enables the public consensus RPC services worker. + EnableConsensusRPCWorker bool `json:"enable_consensusrpc_worker,omitempty"` } // TEEFixture is a TEE configuration fixture. diff --git a/go/oasis-test-runner/oasis/validator.go b/go/oasis-test-runner/oasis/validator.go index 67b2a2abe0c..d31fe8c9c7b 100644 --- a/go/oasis-test-runner/oasis/validator.go +++ b/go/oasis-test-runner/oasis/validator.go @@ -25,6 +25,7 @@ type Validator struct { tmAddress string consensusPort uint16 + clientPort uint16 } // ValidatorCfg is the Oasis validator provisioning configuration. @@ -66,6 +67,11 @@ func (val *Validator) ExportsPath() string { return nodeExportsPath(val.dir) } +// ExternalGRPCAddress returns the address of the node's external gRPC server. +func (val *Validator) ExternalGRPCAddress() string { + return fmt.Sprintf("127.0.0.1:%d", val.clientPort) +} + // Start starts an Oasis node. func (val *Validator) Start() error { return val.startNode() @@ -91,6 +97,10 @@ func (val *Validator) startNode() error { } else { args = args.appendSeedNodes(val.net) } + if val.consensus.EnableConsensusRPCWorker { + args = args.workerClientPort(val.clientPort). + workerConsensusRPCEnabled() + } if len(val.net.validators) >= 1 && val == val.net.validators[0] { args = args.supplementarysanityEnabled() @@ -130,6 +140,7 @@ func (net *Network) NewValidator(cfg *ValidatorCfg) (*Validator, error) { entity: cfg.Entity, sentries: cfg.Sentries, consensusPort: net.nextNodePort, + clientPort: net.nextNodePort + 1, } val.doStartNode = val.startNode @@ -191,7 +202,7 @@ func (net *Network) NewValidator(cfg *ValidatorCfg) (*Validator, error) { } net.validators = append(net.validators, val) - net.nextNodePort++ + net.nextNodePort += 2 if err := net.AddLogWatcher(&val.Node); err != nil { net.logger.Error("failed to add log watcher", diff --git a/go/oasis-test-runner/scenario/e2e/runtime.go b/go/oasis-test-runner/scenario/e2e/runtime.go index 52eba16d2f7..497d3605b19 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime.go +++ b/go/oasis-test-runner/scenario/e2e/runtime.go @@ -193,9 +193,9 @@ func (sc *runtimeImpl) Fixture() (*oasis.NetworkFixture, error) { }, }, Validators: []oasis.ValidatorFixture{ - oasis.ValidatorFixture{Entity: 1}, - oasis.ValidatorFixture{Entity: 1}, - oasis.ValidatorFixture{Entity: 1}, + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, + oasis.ValidatorFixture{Entity: 1, Consensus: oasis.ConsensusFixture{EnableConsensusRPCWorker: true}}, }, KeymanagerPolicies: []oasis.KeymanagerPolicyFixture{ oasis.KeymanagerPolicyFixture{Runtime: 0, Serial: 1}, diff --git a/go/worker/consensusrpc/worker.go b/go/worker/consensusrpc/worker.go new file mode 100644 index 00000000000..4376978c5a7 --- /dev/null +++ b/go/worker/consensusrpc/worker.go @@ -0,0 +1,105 @@ +// Package consensus implements publicly accessible consensus services. +package consensus + +import ( + "fmt" + + flag "github.com/spf13/pflag" + "github.com/spf13/viper" + + "github.com/oasislabs/oasis-core/go/common/logging" + "github.com/oasislabs/oasis-core/go/common/node" + consensus "github.com/oasislabs/oasis-core/go/consensus/api" + workerCommon "github.com/oasislabs/oasis-core/go/worker/common" + "github.com/oasislabs/oasis-core/go/worker/registration" +) + +const ( + // CfgWorkerEnabled enables the consensus RPC services worker. + CfgWorkerEnabled = "worker.consensusrpc.enabled" +) + +// Flags has the configuration flags. +var Flags = flag.NewFlagSet("", flag.ContinueOnError) + +// Worker is a worker providing publicly accessible consensus services. +// +// Currently this only exposes the consensus light client service. +type Worker struct { + enabled bool + + commonWorker *workerCommon.Worker + + quitCh chan struct{} + + logger *logging.Logger +} + +// Name returns the service name. +func (w *Worker) Name() string { + return "public consensus RPC services worker" +} + +// Enabled returns if worker is enabled. +func (w *Worker) Enabled() bool { + return w.enabled +} + +// Start starts the worker. +func (w *Worker) Start() error { + if w.enabled { + w.logger.Info("starting public consensus RPC services worker") + } + return nil +} + +// Stop halts the service. +func (w *Worker) Stop() { + close(w.quitCh) +} + +// Quit returns a channel that will be closed when the service terminates. +func (w *Worker) Quit() <-chan struct{} { + return w.quitCh +} + +// Cleanup performs the service specific post-termination cleanup. +func (w *Worker) Cleanup() { +} + +// New creates a new public consensus services worker. +func New(commonWorker *workerCommon.Worker, registration *registration.Worker) (*Worker, error) { + w := &Worker{ + enabled: Enabled(), + commonWorker: commonWorker, + quitCh: make(chan struct{}), + logger: logging.GetLogger("worker/consensusrpc"), + } + + if w.enabled { + // Register the consensus light client service. + consensus.RegisterLightService(commonWorker.Grpc.Server(), commonWorker.Consensus) + + // Publish our role to ease discovery for clients. + rp, err := registration.NewRoleProvider(node.RoleConsensusRPC) + if err != nil { + return nil, fmt.Errorf("failed to create role provider: %w", err) + } + + // The consensus RPC service is available immediately. + rp.SetAvailable(func(*node.Node) error { return nil }) + } + + return w, nil +} + +// Enabled reads our enabled flag from viper. +func Enabled() bool { + return viper.GetBool(CfgWorkerEnabled) +} + +func init() { + Flags.Bool(CfgWorkerEnabled, false, "Enable public consensus RPC services worker") + + _ = viper.BindPFlags(Flags) +}