Skip to content

Commit

Permalink
go/worker/keymanager: Add an enclave rpc handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Yawning committed Apr 23, 2020
1 parent 34f5a8a commit 7a1fa12
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 21 deletions.
83 changes: 80 additions & 3 deletions go/worker/keymanager/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,71 @@ package keymanager
import (
"context"
"errors"
"sync"

"github.com/oasislabs/oasis-core/go/common/cbor"
"github.com/oasislabs/oasis-core/go/keymanager/api"
"github.com/oasislabs/oasis-core/go/keymanager/client"
"github.com/oasislabs/oasis-core/go/runtime/localstorage"
workerCommon "github.com/oasislabs/oasis-core/go/worker/common"
"github.com/oasislabs/oasis-core/go/worker/common/host/protocol"
)

var (
errMethodNotSupported = errors.New("worker/keymanager: method not supported")
errEndpointNotSupported = errors.New("worker/keymanager: RPC endpoint not supported")
errMethodNotSupported = errors.New("worker/keymanager: method not supported")

_ protocol.Handler = (*hostHandler)(nil)
)

type hostHandler struct {
sync.Mutex

w *Worker
remoteClient *client.Client
localStorage localstorage.LocalStorage
}

func (h *hostHandler) initRemoteClient(commonWorker *workerCommon.Worker) {
// Wait for the runtime (XXX: Do we need to do this?)
if _, err := h.w.runtime.RegistryDescriptor(h.w.ctx); err != nil {
h.w.logger.Error("failed to wait for registry descriptor",
"err", err,
)
return
}

remoteClient, err := client.New(h.w.ctx, h.w.runtime, commonWorker.KeyManager, commonWorker.Consensus.Registry(), commonWorker.Identity)
if err != nil {
h.w.logger.Error("failed to create remote client",
"err", err,
)
return
}

select {
case <-h.w.ctx.Done():
h.w.logger.Error("failed to wait for key manager",
"err", err,
)
case <-remoteClient.Initialized():
h.Lock()
defer h.Unlock()
h.remoteClient = remoteClient
}
}

func (h *hostHandler) getRemoteClient() (*client.Client, error) {
h.Lock()
defer h.Unlock()

if h.remoteClient != nil {
return h.remoteClient, nil
}

return nil, errEndpointNotSupported
}

func (h *hostHandler) Handle(ctx context.Context, body *protocol.Body) (*protocol.Body, error) {
// Local storage.
if body.HostLocalStorageGetRequest != nil {
Expand All @@ -34,10 +83,38 @@ func (h *hostHandler) Handle(ctx context.Context, body *protocol.Body) (*protoco
}
return &protocol.Body{HostLocalStorageSetResponse: &protocol.Empty{}}, nil
}
// RPC.
if body.HostRPCCallRequest != nil {
switch body.HostRPCCallRequest.Endpoint {
case api.EnclaveRPCEndpoint:
remoteClient, err := h.getRemoteClient()
if err != nil {
return nil, err
}

// Call into the remote key manager.
res, err := remoteClient.CallRemote(ctx, body.HostRPCCallRequest.Request)
if err != nil {
return nil, err
}
return &protocol.Body{HostRPCCallResponse: &protocol.HostRPCCallResponse{
Response: cbor.FixSliceForSerde(res),
}}, nil
default:
return nil, errEndpointNotSupported
}
}

return nil, errMethodNotSupported
}

func newHostHandler(w *Worker, localStorage localstorage.LocalStorage) protocol.Handler {
return &hostHandler{w, localStorage}
func newHostHandler(w *Worker, commonWorker *workerCommon.Worker, localStorage localstorage.LocalStorage) protocol.Handler {
h := &hostHandler{
w: w,
localStorage: localStorage,
}

go h.initRemoteClient(commonWorker)

return h
}
24 changes: 15 additions & 9 deletions go/worker/keymanager/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"context"
"fmt"

"github.com/pkg/errors"
flag "github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/grpc/policy"
"github.com/oasislabs/oasis-core/go/common/logging"
"github.com/oasislabs/oasis-core/go/common/node"
Expand Down Expand Up @@ -84,8 +84,9 @@ func New(
panic("common worker should have been enabled for key manager worker")
}

if err := w.runtimeID.UnmarshalHex(viper.GetString(CfgRuntimeID)); err != nil {
return nil, errors.Wrap(err, "worker/keymanager: failed to parse runtime ID")
var runtimeID common.Namespace
if err := runtimeID.UnmarshalHex(viper.GetString(CfgRuntimeID)); err != nil {
return nil, fmt.Errorf("worker/keymanager: failed to parse runtime ID: %w", err)
}

if workerRuntimeLoaderBinary == "" {
Expand All @@ -96,28 +97,33 @@ func New(
}

// Create local storage for the key manager.
path, err := runtimeRegistry.EnsureRuntimeStateDir(dataDir, w.runtimeID)
path, err := runtimeRegistry.EnsureRuntimeStateDir(dataDir, runtimeID)
if err != nil {
return nil, fmt.Errorf("worker/keymanager: failed to ensure runtime state directory: %w", err)
}
localStorage, err := localstorage.New(path, runtimeRegistry.LocalStorageFile, w.runtimeID)
localStorage, err := localstorage.New(path, runtimeRegistry.LocalStorageFile, runtimeID)
if err != nil {
return nil, fmt.Errorf("worker/keymanager: cannot create local storage: %w", err)
}

w.roleProvider, err = r.NewRuntimeRoleProvider(node.RoleKeyManager, w.runtimeID)
w.roleProvider, err = r.NewRuntimeRoleProvider(node.RoleKeyManager, runtimeID)
if err != nil {
return nil, fmt.Errorf("failed to create role provider: %w", err)
return nil, fmt.Errorf("worker/keymanager: failed to create role provider: %w", err)
}

w.runtime, err = commonWorker.RuntimeRegistry.NewUnmanagedRuntime(ctx, runtimeID)
if err != nil {
return nil, fmt.Errorf("worker/keymanager: failed to create runtime registry entry: %w", err)
}

w.workerHostCfg = host.Config{
Role: node.RoleKeyManager,
ID: w.runtimeID,
ID: runtimeID,
WorkerBinary: workerRuntimeLoaderBinary,
RuntimeBinary: runtimeBinary,
TEEHardware: teeHardware,
IAS: ias,
MessageHandler: newHostHandler(w, localStorage),
MessageHandler: newHostHandler(w, commonWorker, localStorage),
}

// Register the Keymanager EnclaveRPC transport gRPC service.
Expand Down
24 changes: 15 additions & 9 deletions go/worker/keymanager/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
roothash "github.com/oasislabs/oasis-core/go/roothash/api"
"github.com/oasislabs/oasis-core/go/roothash/api/block"
runtimeCommittee "github.com/oasislabs/oasis-core/go/runtime/committee"
runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry"
workerCommon "github.com/oasislabs/oasis-core/go/worker/common"
committeeCommon "github.com/oasislabs/oasis-core/go/worker/common/committee"
"github.com/oasislabs/oasis-core/go/worker/common/host"
Expand Down Expand Up @@ -63,7 +64,8 @@ type Worker struct { // nolint: maligned

initialSyncDone bool

runtimeID common.Namespace
//runtimeID common.Namespace
runtime runtimeRegistry.Runtime
workerHost host.Host
workerHostCfg host.Config

Expand Down Expand Up @@ -287,7 +289,7 @@ func (w *Worker) updateStatus(status *api.Status, startedEvent *host.StartedEven

// Register as we are now ready to handle requests.
w.roleProvider.SetAvailable(func(n *node.Node) error {
rt := n.AddOrUpdateRuntime(w.runtimeID)
rt := n.AddOrUpdateRuntime(w.runtime.ID())
rt.Version = startedEvent.Version
rt.ExtraInfo = cbor.Marshal(signedInitResp)
rt.Capabilities.TEE = startedEvent.CapabilityTEE
Expand Down Expand Up @@ -369,9 +371,13 @@ func (w *Worker) worker() { // nolint: gocyclo
}
defer rtSub.Close()

var workerHostCh <-chan *host.Event
var currentStatus *api.Status
var currentStartedEvent *host.StartedEvent
var (
workerHostCh <-chan *host.Event
currentStatus *api.Status
currentStartedEvent *host.StartedEvent

runtimeID = w.runtime.ID()
)
for {
select {
case ev := <-workerHostCh:
Expand Down Expand Up @@ -401,7 +407,7 @@ func (w *Worker) worker() { // nolint: gocyclo
)
}
case status := <-statusCh:
if !status.ID.Equal(&w.runtimeID) {
if !status.ID.Equal(&runtimeID) {
continue
}

Expand Down Expand Up @@ -461,7 +467,7 @@ func (w *Worker) worker() { // nolint: gocyclo
continue
}
case rt := <-rtCh:
if rt.Kind != registry.KindCompute || rt.KeyManager == nil || !rt.KeyManager.Equal(&w.runtimeID) {
if rt.Kind != registry.KindCompute || rt.KeyManager == nil || !rt.KeyManager.Equal(&runtimeID) {
continue
}
if clientRuntimes[rt.ID] != nil {
Expand Down Expand Up @@ -555,10 +561,10 @@ func (crw *clientRuntimeWatcher) updateExternalServicePolicyLocked(snapshot *com

// Fetch current KM node public keys, get their nodes, apply rules.
height := snapshot.GetGroupVersion()
status, err := crw.w.backend.GetStatus(crw.w.ctx, crw.w.runtimeID, height)
status, err := crw.w.backend.GetStatus(crw.w.ctx, crw.w.runtime.ID(), height)
if err != nil {
crw.w.logger.Error("worker/keymanager: unable to get KM status",
"runtimeID", crw.w.runtimeID,
"runtimeID", crw.w.runtime.ID(),
"err", err)
} else {
var kmNodes []*node.Node
Expand Down

0 comments on commit 7a1fa12

Please sign in to comment.