Skip to content

Commit

Permalink
go/worker/consensus: Add public consensus services worker
Browse files Browse the repository at this point in the history
A public consensus services worker enables any full consensus node to expose
light client services to other nodes that may need them (e.g., they are needed
to support light clients).

The worker can be enabled using --worker.consensus.enabled and is disabled by
default. Enabling the public consensus services worker exposes the light
consensus client interface over publicly accessible gRPC.
  • Loading branch information
kostko committed May 3, 2020
1 parent c0554af commit c33045c
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 2 deletions.
9 changes: 9 additions & 0 deletions .changelog/2440.feature.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
go/worker/consensus: Add public consensus services worker

A public consensus services worker enables any full consensus node to expose
light client services to other nodes that may need them (e.g., they are needed
to support light clients).

The worker can be enabled using --worker.consensus.enabled and is disabled by
default. Enabling the public consensus services worker exposes the light
consensus client interface over publicly accessible gRPC.
22 changes: 21 additions & 1 deletion go/oasis-node/cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/oasislabs/oasis-core/go/worker/compute/executor"
"github.com/oasislabs/oasis-core/go/worker/compute/merge"
"github.com/oasislabs/oasis-core/go/worker/compute/txnscheduler"
workerConsensus "github.com/oasislabs/oasis-core/go/worker/consensus"
workerKeymanager "github.com/oasislabs/oasis-core/go/worker/keymanager"
"github.com/oasislabs/oasis-core/go/worker/registration"
workerSentry "github.com/oasislabs/oasis-core/go/worker/sentry"
Expand Down Expand Up @@ -125,6 +126,7 @@ type Node struct {
P2P *p2p.P2P
RegistrationWorker *registration.Worker
KeymanagerWorker *workerKeymanager.Worker
ConsensusWorker *workerConsensus.Worker
}

// Cleanup cleans up after the node has terminated.
Expand Down Expand Up @@ -338,6 +340,13 @@ func (n *Node) initWorkers(logger *logging.Logger) error {
}
n.svcMgr.Register(n.TransactionSchedulerWorker)

// Initialize the public consensus services worker.
n.ConsensusWorker, err = workerConsensus.New(n.CommonWorker)
if err != nil {
return err
}
n.svcMgr.Register(n.ConsensusWorker)

return nil
}

Expand Down Expand Up @@ -382,8 +391,18 @@ func (n *Node) startWorkers(logger *logging.Logger) error {
return err
}

// Start the public consensus services worker.
if err := n.ConsensusWorker.Start(); err != nil {
return fmt.Errorf("consensus worker: %w", err)
}

// Only start the external gRPC server if any workers are enabled.
if n.StorageWorker.Enabled() || n.TransactionSchedulerWorker.Enabled() || n.MergeWorker.Enabled() || n.KeymanagerWorker.Enabled() {
if n.StorageWorker.Enabled() ||
n.TransactionSchedulerWorker.Enabled() ||
n.MergeWorker.Enabled() ||
n.KeymanagerWorker.Enabled() ||
n.ConsensusWorker.Enabled() {

if err := n.CommonWorker.Grpc.Start(); err != nil {
logger.Error("failed to start external gRPC server",
"err", err,
Expand Down Expand Up @@ -791,6 +810,7 @@ func init() {
workerCommon.Flags,
workerStorage.Flags,
workerSentry.Flags,
workerConsensus.Flags,
crash.InitFlags(),
} {
Flags.AddFlagSet(v)
Expand Down
6 changes: 6 additions & 0 deletions go/oasis-test-runner/oasis/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/oasislabs/oasis-core/go/worker/common/p2p"
"github.com/oasislabs/oasis-core/go/worker/compute"
"github.com/oasislabs/oasis-core/go/worker/compute/txnscheduler"
workerConsensus "github.com/oasislabs/oasis-core/go/worker/consensus"
"github.com/oasislabs/oasis-core/go/worker/keymanager"
"github.com/oasislabs/oasis-core/go/worker/registration"
workerSentry "github.com/oasislabs/oasis-core/go/worker/sentry"
Expand Down Expand Up @@ -377,6 +378,11 @@ func (args *argBuilder) workerTxnschedulerCheckTxEnabled() *argBuilder {
return args
}

func (args *argBuilder) workerConsensusEnabled() *argBuilder {
args.vec = append(args.vec, "--"+workerConsensus.CfgWorkerEnabled)
return args
}

func (args *argBuilder) iasUseGenesis() *argBuilder {
args.vec = append(args.vec, "--ias.use_genesis")
return args
Expand Down
3 changes: 3 additions & 0 deletions go/oasis-test-runner/oasis/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ type ConsensusFixture struct { // nolint: maligned

// TendermintRecoverCorruptedWAL enables automatic recovery of corrupted Tendermint's WAL.
TendermintRecoverCorruptedWAL bool `json:"tendermint_recover_corrupted_wal"`

// EnableConsensusWorker enables the public consensus services worker.
EnableConsensusWorker bool `json:"enable_consensus_worker,omitempty"`
}

// TEEFixture is a TEE configuration fixture.
Expand Down
13 changes: 12 additions & 1 deletion go/oasis-test-runner/oasis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Validator struct {

tmAddress string
consensusPort uint16
clientPort uint16
}

// ValidatorCfg is the Oasis validator provisioning configuration.
Expand Down Expand Up @@ -66,6 +67,11 @@ func (val *Validator) ExportsPath() string {
return nodeExportsPath(val.dir)
}

// ExternalGRPCAddress returns the address of the node's external gRPC server.
func (val *Validator) ExternalGRPCAddress() string {
return fmt.Sprintf("127.0.0.1:%d", val.clientPort)
}

// Start starts an Oasis node.
func (val *Validator) Start() error {
return val.startNode()
Expand All @@ -91,6 +97,10 @@ func (val *Validator) startNode() error {
} else {
args = args.appendSeedNodes(val.net)
}
if val.consensus.EnableConsensusWorker {
args = args.workerClientPort(val.clientPort).
workerConsensusEnabled()
}

if len(val.net.validators) >= 1 && val == val.net.validators[0] {
args = args.supplementarysanityEnabled()
Expand Down Expand Up @@ -130,6 +140,7 @@ func (net *Network) NewValidator(cfg *ValidatorCfg) (*Validator, error) {
entity: cfg.Entity,
sentries: cfg.Sentries,
consensusPort: net.nextNodePort,
clientPort: net.nextNodePort + 1,
}
val.doStartNode = val.startNode

Expand Down Expand Up @@ -191,7 +202,7 @@ func (net *Network) NewValidator(cfg *ValidatorCfg) (*Validator, error) {
}

net.validators = append(net.validators, val)
net.nextNodePort++
net.nextNodePort += 2

if err := net.AddLogWatcher(&val.Node); err != nil {
net.logger.Error("failed to add log watcher",
Expand Down
95 changes: 95 additions & 0 deletions go/worker/consensus/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Package consensus implements publicly accessible consensus services.
package consensus

import (
flag "github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/oasislabs/oasis-core/go/common/logging"
consensus "github.com/oasislabs/oasis-core/go/consensus/api"
workerCommon "github.com/oasislabs/oasis-core/go/worker/common"
)

const (
// CfgWorkerEnabled enables the consensus services worker.
CfgWorkerEnabled = "worker.consensus.enabled"
)

// Flags has the configuration flags.
var Flags = flag.NewFlagSet("", flag.ContinueOnError)

// Worker is a worker providing publicly accessible consensus services.
//
// Currently this only exposes the consensus light client service.
type Worker struct {
enabled bool

commonWorker *workerCommon.Worker

quitCh chan struct{}

logger *logging.Logger
}

// Name returns the service name.
func (w *Worker) Name() string {
return "public consensus services worker"
}

// Enabled returns if worker is enabled.
func (w *Worker) Enabled() bool {
return w.enabled
}

// Start starts the worker.
func (w *Worker) Start() error {
if w.enabled {
w.logger.Info("starting public consensus services worker")
}
return nil
}

// Stop halts the service.
func (w *Worker) Stop() {
close(w.quitCh)
}

// Quit returns a channel that will be closed when the service terminates.
func (w *Worker) Quit() <-chan struct{} {
return w.quitCh
}

// Cleanup performs the service specific post-termination cleanup.
func (w *Worker) Cleanup() {
}

// New creates a new public consensus services worker.
func New(commonWorker *workerCommon.Worker) (*Worker, error) {
w := &Worker{
enabled: Enabled(),
commonWorker: commonWorker,
quitCh: make(chan struct{}),
logger: logging.GetLogger("worker/consensus"),
}

if w.enabled {
// Register the consensus light client service.
consensus.RegisterLightService(commonWorker.Grpc.Server(), commonWorker.Consensus)

// TODO: We could introduce a role bit for nodes providing these public services so that you
// could easily discover suitable nodes to use.
}

return w, nil
}

// Enabled reads our enabled flag from viper.
func Enabled() bool {
return viper.GetBool(CfgWorkerEnabled)
}

func init() {
Flags.Bool(CfgWorkerEnabled, false, "Enable public consensus services worker")

_ = viper.BindPFlags(Flags)
}

0 comments on commit c33045c

Please sign in to comment.