From cee27ab1bef1498367fe1b3a15e9a638a112c4c4 Mon Sep 17 00:00:00 2001 From: ptrus Date: Tue, 19 May 2020 13:56:01 +0200 Subject: [PATCH] runtime: Notify runtimes of its key manager policy updates --- .changelog/2919.bugfix.md | 6 + client/src/rpc/client.rs | 20 ++- .../scenario/e2e/keymanager_upgrade.go | 29 ++-- go/runtime/host/protocol/connection.go | 26 ++- go/runtime/host/protocol/types.go | 35 ++-- go/runtime/host/tests/tester.go | 54 ++++--- go/worker/common/committee/runtime_host.go | 153 ++++++++++++++---- go/worker/common/runtime_host.go | 26 +-- go/worker/compute/executor/committee/node.go | 10 +- .../compute/txnscheduler/committee/node.go | 10 +- go/worker/keymanager/worker.go | 29 ++-- keymanager-client/src/client.rs | 40 +++-- runtime/src/dispatcher.rs | 27 ++++ runtime/src/protocol.rs | 6 + runtime/src/rpc/dispatcher.rs | 21 +++ runtime/src/types.rs | 8 +- tests/runtimes/simple-keyvalue/src/main.rs | 14 +- 17 files changed, 364 insertions(+), 150 deletions(-) create mode 100644 .changelog/2919.bugfix.md diff --git a/.changelog/2919.bugfix.md b/.changelog/2919.bugfix.md new file mode 100644 index 00000000000..2955b934c2d --- /dev/null +++ b/.changelog/2919.bugfix.md @@ -0,0 +1,6 @@ +runtime: Notify runtimes of its key manager policy updates + +Before runtimes were unaware of any key-manager policy updates. The runtime +only queried for the active key-manager policy at startup. This is now changed +so that the host notifies runtimes of any key-manager policy changes and +runtime updates the policies. diff --git a/client/src/rpc/client.rs b/client/src/rpc/client.rs index 7190c8ee579..56d135da38c 100644 --- a/client/src/rpc/client.rs +++ b/client/src/rpc/client.rs @@ -1,7 +1,10 @@ //! Enclave RPC client. -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, +use std::{ + collections::HashSet, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, }; use failure::{Fail, Fallible}; @@ -19,7 +22,7 @@ use tokio_executor::spawn; #[cfg(not(target_env = "sgx"))] use oasis_core_runtime::common::runtime::RuntimeId; use oasis_core_runtime::{ - common::cbor, + common::{cbor, sgx::avr::EnclaveIdentity}, protocol::Protocol, rpc::{ session::{Builder, Session}, @@ -372,6 +375,15 @@ impl RpcClient { }), ) } + + /// Update session enclaves if changed. + pub fn update_enclaves(&self, enclaves: Option>) { + let mut session = self.inner.session.lock().unwrap(); + if session.builder.get_remote_enclaves() != &enclaves { + session.builder = session.builder.clone().remote_enclaves(enclaves); + session.reset(); + } + } } #[cfg(test)] diff --git a/go/oasis-test-runner/scenario/e2e/keymanager_upgrade.go b/go/oasis-test-runner/scenario/e2e/keymanager_upgrade.go index 2f1ebc04f8e..9768a6f6312 100644 --- a/go/oasis-test-runner/scenario/e2e/keymanager_upgrade.go +++ b/go/oasis-test-runner/scenario/e2e/keymanager_upgrade.go @@ -294,21 +294,16 @@ func (sc *kmUpgradeImpl) Run(childEnv *env.Env) error { return fmt.Errorf("old keymanager node shutdown: %w", err) } - // The last part of the test won't work until: - // https://github.com/oasislabs/oasis-core/issues/2919 - /* - // Run test again. - sc.logger.Info("starting a second client to check if key manager works") - sc.runtimeImpl.clientArgs = []string{"--key", "key2"} - cmd, err = sc.startClient(childEnv) - if err != nil { - return err - } - client2ErrCh := make(chan error) - go func() { - client2ErrCh <- cmd.Wait() - }() - return sc.wait(childEnv, cmd, client2ErrCh) - */ - return nil + // Run client again. + sc.logger.Info("starting a second client to check if key manager works") + sc.runtimeImpl.clientArgs = []string{"--key", "key2"} + cmd, err = sc.startClient(childEnv) + if err != nil { + return err + } + client2ErrCh := make(chan error) + go func() { + client2ErrCh <- cmd.Wait() + }() + return sc.wait(childEnv, cmd, client2ErrCh) } diff --git a/go/runtime/host/protocol/connection.go b/go/runtime/host/protocol/connection.go index 0d93924474e..5fb1393c16e 100644 --- a/go/runtime/host/protocol/connection.go +++ b/go/runtime/host/protocol/connection.go @@ -65,6 +65,28 @@ type Handler interface { Handle(ctx context.Context, body *Body) (*Body, error) } +// Notifier is a protocol runtime notifier interface. +type Notifier interface { + // Start the notifier. + Start() error + + // Stop the notifier. + Stop() +} + +// NoOpNotifier is the default no-op runtime notifier implementation. +type NoOpNotifier struct { +} + +// Start the no-op notifier. +func (n *NoOpNotifier) Start() error { + return nil +} + +// Stop the no-op notifier. +func (n *NoOpNotifier) Stop() { +} + // Connection is a Runtime Host Protocol connection interface. type Connection interface { // Close closes the connection. @@ -340,10 +362,6 @@ func (c *connection) handleMessage(ctx context.Context, message *Message) { var allowed bool state := c.getState() switch { - case state == stateInitializing: - // Only whitelisted methods are allowed. - body := message.Body - allowed = body.HostKeyManagerPolicyRequest != nil case state == stateReady: // All requests allowed. allowed = true diff --git a/go/runtime/host/protocol/types.go b/go/runtime/host/protocol/types.go index 3d341eb2d11..4b9d574c93f 100644 --- a/go/runtime/host/protocol/types.go +++ b/go/runtime/host/protocol/types.go @@ -51,6 +51,12 @@ type Message struct { SpanContext []byte `json:"span_context"` } +// RuntimeKeyManagerPolicyUpdateRequest is a runtime key manager policy request +// message body. +type RuntimeKeyManagerPolicyUpdateRequest struct { + SignedPolicyRaw []byte `json:"signed_policy_raw"` +} + // Body is a protocol message body. type Body struct { Empty *Empty `json:",omitempty"` @@ -77,18 +83,18 @@ type Body struct { RuntimeExecuteTxBatchResponse *RuntimeExecuteTxBatchResponse `json:",omitempty"` RuntimeAbortRequest *Empty `json:",omitempty"` RuntimeAbortResponse *Empty `json:",omitempty"` + RuntimeKeyManagerPolicyUpdateRequest *RuntimeKeyManagerPolicyUpdateRequest `json:",omitempty"` + RuntimeKeyManagerPolicyUpdateResponse *Empty `json:",omitempty"` // Host interface. - HostKeyManagerPolicyRequest *HostKeyManagerPolicyRequest `json:",omitempty"` - HostKeyManagerPolicyResponse *HostKeyManagerPolicyResponse `json:",omitempty"` - HostRPCCallRequest *HostRPCCallRequest `json:",omitempty"` - HostRPCCallResponse *HostRPCCallResponse `json:",omitempty"` - HostStorageSyncRequest *HostStorageSyncRequest `json:",omitempty"` - HostStorageSyncResponse *HostStorageSyncResponse `json:",omitempty"` - HostLocalStorageGetRequest *HostLocalStorageGetRequest `json:",omitempty"` - HostLocalStorageGetResponse *HostLocalStorageGetResponse `json:",omitempty"` - HostLocalStorageSetRequest *HostLocalStorageSetRequest `json:",omitempty"` - HostLocalStorageSetResponse *Empty `json:",omitempty"` + HostRPCCallRequest *HostRPCCallRequest `json:",omitempty"` + HostRPCCallResponse *HostRPCCallResponse `json:",omitempty"` + HostStorageSyncRequest *HostStorageSyncRequest `json:",omitempty"` + HostStorageSyncResponse *HostStorageSyncResponse `json:",omitempty"` + HostLocalStorageGetRequest *HostLocalStorageGetRequest `json:",omitempty"` + HostLocalStorageGetResponse *HostLocalStorageGetResponse `json:",omitempty"` + HostLocalStorageSetRequest *HostLocalStorageSetRequest `json:",omitempty"` + HostLocalStorageSetResponse *Empty `json:",omitempty"` } // Type returns the message type by determining the name of the first non-nil member. @@ -226,15 +232,6 @@ type RuntimeExecuteTxBatchResponse struct { Batch ComputedBatch `json:"batch"` } -// HostKeyManagerPolicyRequest is a host key manager policy request message body. -type HostKeyManagerPolicyRequest struct { -} - -// HostKeyManagerPolicyResponse is a host key manager policy response message body. -type HostKeyManagerPolicyResponse struct { - SignedPolicyRaw []byte `json:"signed_policy_raw"` -} - // HostRPCCallRequest is a host RPC call request message body. type HostRPCCallRequest struct { Endpoint string `json:"endpoint"` diff --git a/go/runtime/host/tests/tester.go b/go/runtime/host/tests/tester.go index 53d6139fd1f..fba45822e85 100644 --- a/go/runtime/host/tests/tester.go +++ b/go/runtime/host/tests/tester.go @@ -28,30 +28,6 @@ type mockMessageHandler struct{} // Implements host.Handler. func (h *mockMessageHandler) Handle(ctx context.Context, body *protocol.Body) (*protocol.Body, error) { - // Key manager. - if body.HostKeyManagerPolicyRequest != nil { - // Generate a dummy key manager policy for tests. - policy := keymanager.PolicySGX{ - Serial: 1, - Enclaves: map[sgx.EnclaveIdentity]*keymanager.EnclavePolicySGX{}, - } - sigPolicy := keymanager.SignedPolicySGX{ - Policy: policy, - } - for _, signer := range keymanager.TestSigners[1:] { - sig, err := signature.Sign(signer, keymanager.PolicySGXSignatureContext, cbor.Marshal(policy)) - if err != nil { - return nil, fmt.Errorf("failed to sign mock policy: %w", err) - } - - sigPolicy.Signatures = append(sigPolicy.Signatures, *sig) - } - - return &protocol.Body{HostKeyManagerPolicyResponse: &protocol.HostKeyManagerPolicyResponse{ - SignedPolicyRaw: cbor.Marshal(sigPolicy), - }}, nil - } - return nil, fmt.Errorf("method not supported") } @@ -97,6 +73,29 @@ func TestProvisioner( } } +func mockKeyManagerPolicyRequest() (*protocol.Body, error) { + // Generate a dummy key manager policy for tests. + policy := keymanager.PolicySGX{ + Serial: 1, + Enclaves: map[sgx.EnclaveIdentity]*keymanager.EnclavePolicySGX{}, + } + sigPolicy := keymanager.SignedPolicySGX{ + Policy: policy, + } + for _, signer := range keymanager.TestSigners[1:] { + sig, err := signature.Sign(signer, keymanager.PolicySGXSignatureContext, cbor.Marshal(policy)) + if err != nil { + return nil, fmt.Errorf("failed to sign mock policy: %w", err) + } + + sigPolicy.Signatures = append(sigPolicy.Signatures, *sig) + } + + return &protocol.Body{RuntimeKeyManagerPolicyUpdateRequest: &protocol.RuntimeKeyManagerPolicyUpdateRequest{ + SignedPolicyRaw: cbor.Marshal(sigPolicy), + }}, nil +} + func testBasic(t *testing.T, cfg host.Config, p host.Provisioner) { require := require.New(t) @@ -125,6 +124,13 @@ func testBasic(t *testing.T, cfg host.Config, p host.Provisioner) { require.NoError(err, "Call") require.NotNil(rsp.Empty, "runtime response to RuntimePingRequest should return an Empty body") + req, err := mockKeyManagerPolicyRequest() + require.NoError(err, "mockKeyManagerPolicyRequest") + + rsp, err = r.Call(ctx, req) + require.NoError(err, "KeyManagerPolicyRequest Call") + require.NotNil(rsp.RuntimeKeyManagerPolicyUpdateResponse, "runtime response to KeyManagerPolicyRequest should return an RuntimeKeyManagerPolicyUpdateResponse body") + // Request the runtime to stop. r.Stop() diff --git a/go/worker/common/committee/runtime_host.go b/go/worker/common/committee/runtime_host.go index 785e881cca6..7b886fd5472 100644 --- a/go/worker/common/committee/runtime_host.go +++ b/go/worker/common/committee/runtime_host.go @@ -3,21 +3,28 @@ package committee import ( "context" "errors" - "fmt" + "sync" + "time" + "github.com/cenkalti/backoff/v4" "github.com/opentracing/opentracing-go" "github.com/oasislabs/oasis-core/go/common/cbor" - consensus "github.com/oasislabs/oasis-core/go/consensus/api" + "github.com/oasislabs/oasis-core/go/common/logging" keymanagerApi "github.com/oasislabs/oasis-core/go/keymanager/api" keymanagerClient "github.com/oasislabs/oasis-core/go/keymanager/client" - registry "github.com/oasislabs/oasis-core/go/registry/api" + "github.com/oasislabs/oasis-core/go/runtime/host" "github.com/oasislabs/oasis-core/go/runtime/host/protocol" "github.com/oasislabs/oasis-core/go/runtime/localstorage" runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry" storage "github.com/oasislabs/oasis-core/go/storage/api" ) +const ( + retryInterval = 1 * time.Second + maxRetries = 15 +) + var ( errMethodNotSupported = errors.New("method not supported") errEndpointNotSupported = errors.New("RPC endpoint not supported") @@ -34,31 +41,6 @@ type computeRuntimeHostHandler struct { } func (h *computeRuntimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (*protocol.Body, error) { - // Key manager. - if body.HostKeyManagerPolicyRequest != nil { - rt, err := h.runtime.RegistryDescriptor(ctx) - if err != nil { - return nil, fmt.Errorf("runtime host: failed to obtain runtime descriptor: %w", err) - } - if rt.KeyManager == nil { - return nil, errors.New("runtime has no key manager") - } - status, err := h.keyManager.GetStatus(ctx, ®istry.NamespaceQuery{ - ID: *rt.KeyManager, - Height: consensus.HeightLatest, - }) - if err != nil { - return nil, err - } - - var policy keymanagerApi.SignedPolicySGX - if status != nil && status.Policy != nil { - policy = *status.Policy - } - return &protocol.Body{HostKeyManagerPolicyResponse: &protocol.HostKeyManagerPolicyResponse{ - SignedPolicyRaw: cbor.Marshal(policy), - }}, nil - } // RPC. if body.HostRPCCallRequest != nil { switch body.HostRPCCallRequest.Endpoint { @@ -125,6 +107,121 @@ func (n *Node) GetRuntime() runtimeRegistry.Runtime { return n.Runtime } +// computeRuntimeHostNotifier is a runtime host notifier suitable for compute +// runtimes. +type computeRuntimeHostNotifier struct { + sync.Mutex + + ctx context.Context + + stopCh chan struct{} + + started bool + runtime runtimeRegistry.Runtime + host host.Runtime + keyManager keymanagerApi.Backend + + logger *logging.Logger +} + +func (n *computeRuntimeHostNotifier) watchPolicyUpdates() { + // Wait for the runtime. + rt, err := n.runtime.RegistryDescriptor(n.ctx) + if err != nil { + n.logger.Error("failed to wait for registry descriptor", + "err", err, + ) + return + } + if rt.KeyManager == nil { + n.logger.Info("no keymanager needed, not watching for policy updates") + return + } + + stCh, stSub := n.keyManager.WatchStatuses() + defer stSub.Close() + n.logger.Info("watching policy updates", "keymanager_runtime", rt.KeyManager) + + for { + select { + case <-n.ctx.Done(): + n.logger.Warn("contex canceled") + return + case <-n.stopCh: + n.logger.Warn("termination requested") + return + case st := <-stCh: + n.logger.Debug("got policy update", "status", st) + + // Ignore status updates if key manager is not yet known (is nil) + // or if the status update is for a different key manager. + if !st.ID.Equal(rt.KeyManager) { + continue + } + + raw := cbor.Marshal(st.Policy) + req := &protocol.Body{RuntimeKeyManagerPolicyUpdateRequest: &protocol.RuntimeKeyManagerPolicyUpdateRequest{ + SignedPolicyRaw: raw, + }} + + var response *protocol.Body + call := func() error { + resp, err := n.host.Call(n.ctx, req) + if err != nil { + n.logger.Error("failed to dispatch RPC call to runtime", + "err", err, + ) + return err + } + response = resp + return nil + } + + retry := backoff.WithMaxRetries(backoff.NewConstantBackOff(retryInterval), maxRetries) + err := backoff.Retry(call, backoff.WithContext(retry, n.ctx)) + if err != nil { + n.logger.Error("failed dispatching key manager policy update to runtime", + "err", err, + ) + continue + } + n.logger.Debug("key manager policy updated dispatched", "response", response) + } + } +} + +// Implements protocol.Notifier. +func (n *computeRuntimeHostNotifier) Start() error { + n.Lock() + defer n.Unlock() + + if n.started { + return nil + } + n.started = true + + go n.watchPolicyUpdates() + + return nil +} + +// Implements protocol.Notifier. +func (n *computeRuntimeHostNotifier) Stop() { + close(n.stopCh) +} + +// Implements RuntimeHostHandlerFactory. +func (n *Node) NewNotifier(ctx context.Context, host host.Runtime) protocol.Notifier { + return &computeRuntimeHostNotifier{ + ctx: ctx, + stopCh: make(chan struct{}), + runtime: n.Runtime, + host: host, + keyManager: n.KeyManager, + logger: logging.GetLogger("committee/runtime-host"), + } +} + // Implements RuntimeHostHandlerFactory. func (n *Node) NewRuntimeHostHandler() protocol.Handler { return &computeRuntimeHostHandler{ diff --git a/go/worker/common/runtime_host.go b/go/worker/common/runtime_host.go index 9d1e7b1913f..71fa7d48049 100644 --- a/go/worker/common/runtime_host.go +++ b/go/worker/common/runtime_host.go @@ -14,8 +14,9 @@ import ( type RuntimeHostNode struct { sync.Mutex - cfg *RuntimeHostConfig - factory RuntimeHostHandlerFactory + cfg *RuntimeHostConfig + factory RuntimeHostHandlerFactory + notifier protocol.Notifier runtime host.Runtime } @@ -24,35 +25,37 @@ type RuntimeHostNode struct { // // This method may return before the runtime is fully provisioned. The returned runtime will not be // started automatically, you must call Start explicitly. -func (n *RuntimeHostNode) ProvisionHostedRuntime(ctx context.Context) (host.Runtime, error) { +func (n *RuntimeHostNode) ProvisionHostedRuntime(ctx context.Context) (host.Runtime, protocol.Notifier, error) { rt, err := n.factory.GetRuntime().RegistryDescriptor(ctx) if err != nil { - return nil, fmt.Errorf("failed to get runtime registry descriptor: %w", err) + return nil, nil, fmt.Errorf("failed to get runtime registry descriptor: %w", err) } provisioner, ok := n.cfg.Provisioners[rt.TEEHardware] if !ok { - return nil, fmt.Errorf("no provisioner suitable for TEE hardware '%s'", rt.TEEHardware) + return nil, nil, fmt.Errorf("no provisioner suitable for TEE hardware '%s'", rt.TEEHardware) } // Get a copy of the configuration template for the given runtime and apply updates. cfg, ok := n.cfg.Runtimes[rt.ID] if !ok { - return nil, fmt.Errorf("missing runtime host configuration for runtime '%s'", rt.ID) + return nil, nil, fmt.Errorf("missing runtime host configuration for runtime '%s'", rt.ID) } cfg.MessageHandler = n.factory.NewRuntimeHostHandler() // Provision the runtime. prt, err := provisioner.NewRuntime(ctx, cfg) if err != nil { - return nil, fmt.Errorf("failed to provision runtime: %w", err) + return nil, nil, fmt.Errorf("failed to provision runtime: %w", err) } + notifier := n.factory.NewNotifier(ctx, prt) n.Lock() n.runtime = prt + n.notifier = notifier n.Unlock() - return prt, nil + return prt, notifier, nil } // GetHostedRuntime returns the provisioned hosted runtime (if any). @@ -63,14 +66,17 @@ func (n *RuntimeHostNode) GetHostedRuntime() host.Runtime { return rt } -// RuntimeHostHandlerFactory is an interface that can be used to create new runtime handlers when -// provisioning hosted runtimes. +// RuntimeHostHandlerFactory is an interface that can be used to create new runtime handlers and +// notifiers when provisioning hosted runtimes. type RuntimeHostHandlerFactory interface { // GetRuntime returns the registered runtime for which a runtime host handler is to be created. GetRuntime() runtimeRegistry.Runtime // NewRuntimeHostHandler creates a new runtime host handler. NewRuntimeHostHandler() protocol.Handler + + // NewNotifier creates a new runtime host notifier. + NewNotifier(ctx context.Context, host host.Runtime) protocol.Notifier } // NewRuntimeHostNode creates a new runtime host node. diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index b4b77a191cf..a31189f0e69 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -859,7 +859,7 @@ func (n *Node) worker() { n.logger.Info("starting committee node") // Provision the hosted runtime. - hrt, err := n.ProvisionHostedRuntime(n.ctx) + hrt, hrtNotifier, err := n.ProvisionHostedRuntime(n.ctx) if err != nil { n.logger.Error("failed to provision hosted runtime", "err", err, @@ -884,6 +884,14 @@ func (n *Node) worker() { } defer hrt.Stop() + if err = hrtNotifier.Start(); err != nil { + n.logger.Error("failed to start runtime notifier", + "err", err, + ) + return + } + defer hrtNotifier.Stop() + // We are initialized. close(n.initCh) diff --git a/go/worker/compute/txnscheduler/committee/node.go b/go/worker/compute/txnscheduler/committee/node.go index 39bf6124aba..686951092b1 100644 --- a/go/worker/compute/txnscheduler/committee/node.go +++ b/go/worker/compute/txnscheduler/committee/node.go @@ -490,7 +490,7 @@ func (n *Node) worker() { var hrtEventCh <-chan *host.Event if n.checkTxEnabled { // Provision hosted runtime. - hrt, err := n.ProvisionHostedRuntime(n.ctx) + hrt, hrtNotifier, err := n.ProvisionHostedRuntime(n.ctx) if err != nil { n.logger.Error("failed to provision hosted runtime", "err", err, @@ -515,6 +515,14 @@ func (n *Node) worker() { return } defer hrt.Stop() + + if err = hrtNotifier.Start(); err != nil { + n.logger.Error("failed to start runtime notifier", + "err", err, + ) + return + } + defer hrtNotifier.Stop() } // Initialize transaction scheduler's algorithm. diff --git a/go/worker/keymanager/worker.go b/go/worker/keymanager/worker.go index 1ee358ccb1b..28fe91abe92 100644 --- a/go/worker/keymanager/worker.go +++ b/go/worker/keymanager/worker.go @@ -128,6 +128,11 @@ func (w *Worker) GetRuntime() runtimeRegistry.Runtime { return w.runtime } +// Implements workerCommon.RuntimeHostHandlerFactory. +func (w *Worker) NewNotifier(ctx context.Context, host host.Runtime) protocol.Notifier { + return &protocol.NoOpNotifier{} +} + // Implements workerCommon.RuntimeHostHandlerFactory. func (w *Worker) NewRuntimeHostHandler() protocol.Handler { return w.runtimeHostHandler @@ -161,13 +166,6 @@ func (w *Worker) callLocal(ctx context.Context, data []byte) ([]byte, error) { return nil, err } - if response.Error != nil { - w.logger.Error("error from runtime", - "err", response.Error.Message, - ) - return nil, fmt.Errorf("worker/keymanager: error from runtime: %s", response.Error.Message) - } - resp := response.RuntimeRPCCallResponse if resp == nil { w.logger.Error("malformed response from runtime", @@ -245,12 +243,6 @@ func (w *Worker) updateStatus(status *api.Status, startedEvent *host.StartedEven ) return err } - if response.Error != nil { - w.logger.Error("error initializing enclave", - "err", response.Error.Message, - ) - return fmt.Errorf("worker/keymanager: error initializing enclave: %s", response.Error.Message) - } resp := response.RuntimeLocalRPCCallResponse if resp == nil { @@ -447,7 +439,8 @@ func (w *Worker) worker() { // nolint: gocyclo // Start key manager runtime. w.logger.Info("provisioning key manager runtime") - hrt, err = w.ProvisionHostedRuntime(w.ctx) + var hrtNotifier protocol.Notifier + hrt, hrtNotifier, err = w.ProvisionHostedRuntime(w.ctx) if err != nil { w.logger.Error("failed to provision key manager runtime", "err", err, @@ -471,6 +464,14 @@ func (w *Worker) worker() { // nolint: gocyclo return } defer hrt.Stop() + + if err = hrtNotifier.Start(); err != nil { + w.logger.Error("failed to start runtime notifier", + "err", err, + ) + return + } + defer hrtNotifier.Stop() } currentStatus = status diff --git a/keymanager-client/src/client.rs b/keymanager-client/src/client.rs index 61735f5f0ab..0f5b8d71d3e 100644 --- a/keymanager-client/src/client.rs +++ b/keymanager-client/src/client.rs @@ -4,16 +4,15 @@ use std::{ sync::{Arc, RwLock}, }; +use failure::Fallible; use futures::{future, prelude::*}; #[cfg(not(target_env = "sgx"))] use grpcio::Channel; use io_context::Context; use lru::LruCache; -#[cfg(target_env = "sgx")] use std::iter::FromIterator; -#[cfg(target_env = "sgx")] -use oasis_core_runtime::{common::cbor, protocol::ProtocolError, types::Body}; +use oasis_core_runtime::common::cbor; use oasis_core_client::{create_rpc_api_client, BoxFuture, RpcClient}; use oasis_core_keymanager_api_common::*; @@ -85,8 +84,10 @@ impl RemoteClient { /// Create a new key manager client with runtime-internal transport. /// - /// Using this method automatically obtains valid key manager enclave identities via the - /// worker-host protocol. + /// Using this method valid enclave identities won't be preset and should + /// be obtained via the worker-host protocol and updated with the set_policy + /// method. In case of sgx, the session establishment will fail until the + /// initial policies will be updated. pub fn new_runtime( runtime_id: RuntimeId, protocol: Arc, @@ -101,23 +102,7 @@ impl RemoteClient { let _ = signers; #[cfg(target_env = "sgx")] - let enclaves: Option> = match protocol - .make_request(Context::background(), Body::HostKeyManagerPolicyRequest {}) - { - Ok(Body::HostKeyManagerPolicyResponse { signed_policy_raw }) => { - let untrusted_policy: SignedPolicySGX = match cbor::from_slice(&signed_policy_raw) { - Ok(sp) => sp, - Err(err) => panic!("error obtaining list of KM enclaves: {}", err), - }; - let policy = untrusted_policy - .verify() - .expect("failed to verify KM policy"); - Some(HashSet::from_iter(policy.enclaves.keys().cloned())) - } - Ok(_) => panic!(ProtocolError::InvalidResponse), - Err(_) => panic!("cannot obtain list of KM enclaves"), - }; - + let enclaves = Some(HashSet::new()); #[cfg(not(target_env = "sgx"))] let enclaves = None; @@ -149,6 +134,17 @@ impl RemoteClient { keys_cache_sizes, ) } + + /// Set client allowed enclaves from key manager policy. + pub fn set_policy(&self, signed_policy_raw: Vec) -> Fallible<()> { + let untrusted_policy: SignedPolicySGX = cbor::from_slice(&signed_policy_raw)?; + let policy = untrusted_policy.verify()?; + let client = &self.inner.rpc_client.rpc_client; + let policies: HashSet = + HashSet::from_iter(policy.enclaves.keys().cloned()); + client.update_enclaves(Some(policies)); + Ok(()) + } } impl KeyManagerClient for RemoteClient { diff --git a/runtime/src/dispatcher.rs b/runtime/src/dispatcher.rs index b3d240e4ebe..e27ba2b1c46 100644 --- a/runtime/src/dispatcher.rs +++ b/runtime/src/dispatcher.rs @@ -243,6 +243,16 @@ impl Dispatcher { true, ); } + Ok((ctx, id, Body::RuntimeKeyManagerPolicyUpdateRequest { signed_policy_raw })) => { + // KeyManager policy update local RPC call. + self.handle_km_policy_update( + &mut rpc_dispatcher, + &protocol, + ctx, + id, + signed_policy_raw, + ); + } Ok(_) => { error!(self.logger, "Unsupported request type"); break 'dispatch; @@ -586,6 +596,23 @@ impl Dispatcher { protocol.send_response(id, protocol_response).unwrap(); } + + fn handle_km_policy_update( + &self, + rpc_dispatcher: &mut RpcDispatcher, + protocol: &Arc, + _ctx: Context, + id: u64, + signed_policy_raw: Vec, + ) { + debug!(self.logger, "Received km policy update request"); + rpc_dispatcher.handle_km_policy_update(signed_policy_raw); + debug!(self.logger, "KM policy update request complete"); + + protocol + .send_response(id, Body::RuntimeKeyManagerPolicyUpdateResponse {}) + .unwrap(); + } } struct Cache { diff --git a/runtime/src/protocol.rs b/runtime/src/protocol.rs index 4afda7beba3..69391ea90fb 100644 --- a/runtime/src/protocol.rs +++ b/runtime/src/protocol.rs @@ -324,6 +324,12 @@ impl Protocol { self.dispatcher.queue_request(ctx, id, req)?; Ok(None) } + req @ Body::RuntimeKeyManagerPolicyUpdateRequest { .. } => { + info!(self.logger, "Received key manager policy update request"); + self.can_handle_runtime_requests()?; + self.dispatcher.queue_request(ctx, id, req)?; + Ok(None) + } req => { warn!(self.logger, "Received unsupported request"; "req" => format!("{:?}", req)); Err(ProtocolError::MethodNotSupported.into()) diff --git a/runtime/src/rpc/dispatcher.rs b/runtime/src/rpc/dispatcher.rs index acbf309854c..b39b3a1ba19 100644 --- a/runtime/src/rpc/dispatcher.rs +++ b/runtime/src/rpc/dispatcher.rs @@ -124,12 +124,17 @@ impl Method { } } +/// Key manager policy update handler callback. +pub type KeyManagerPolicyHandler = dyn Fn(Vec) -> (); + /// RPC call dispatcher. pub struct Dispatcher { /// Registered RPC methods. methods: HashMap, /// Registered local RPC methods. local_methods: HashMap, + /// Registered key manager policy handler. + km_policy_handler: Option>, /// Registered context initializer. ctx_initializer: Option>, } @@ -140,6 +145,7 @@ impl Dispatcher { Self { methods: HashMap::new(), local_methods: HashMap::new(), + km_policy_handler: None, ctx_initializer: None, } } @@ -211,4 +217,19 @@ impl Dispatcher { }, } } + + // Handle key manager policy update. + pub fn handle_km_policy_update(&self, signed_policy_raw: Vec) { + self.km_policy_handler + .as_ref() + .map(|handler| handler(signed_policy_raw)); + } + + /// Update key manager policy update handler. + pub fn set_keymanager_policy_update_handler( + &mut self, + f: Option>, + ) { + self.km_policy_handler = f; + } } diff --git a/runtime/src/types.rs b/runtime/src/types.rs index 8f63a010d6d..a2f73a4afdd 100644 --- a/runtime/src/types.rs +++ b/runtime/src/types.rs @@ -124,13 +124,13 @@ pub enum Body { RuntimeExecuteTxBatchResponse { batch: ComputedBatch, }, - - // Host interface. - HostKeyManagerPolicyRequest {}, - HostKeyManagerPolicyResponse { + RuntimeKeyManagerPolicyUpdateRequest { #[serde(with = "serde_bytes")] signed_policy_raw: Vec, }, + RuntimeKeyManagerPolicyUpdateResponse {}, + + // Host interface. HostRPCCallRequest { endpoint: String, #[serde(with = "serde_bytes")] diff --git a/tests/runtimes/simple-keyvalue/src/main.rs b/tests/runtimes/simple-keyvalue/src/main.rs index d7bf67f18e2..f0774d33cd4 100644 --- a/tests/runtimes/simple-keyvalue/src/main.rs +++ b/tests/runtimes/simple-keyvalue/src/main.rs @@ -241,7 +241,7 @@ fn main() { let init = |protocol: &Arc, rak: &Arc, _rpc_demux: &mut RpcDemux, - _rpc: &mut RpcDispatcher| + rpc: &mut RpcDispatcher| -> Option> { let mut txn = TxnMethDispatcher::new(); with_api! { register_runtime_txn_methods!(txn, api); } @@ -255,11 +255,21 @@ fn main() { 1024, trusted_policy_signers(), )); + let initializer_client = km_client.clone(); + + #[cfg(not(target_env = "sgx"))] + let _ = rpc; + #[cfg(target_env = "sgx")] + rpc.set_keymanager_policy_update_handler(Some(Box::new(move |raw_signed_policy| { + km_client + .set_policy(raw_signed_policy) + .expect("failed to update km client policy"); + }))); txn.set_context_initializer(move |ctx: &mut TxnContext| { ctx.runtime = Box::new(Context { test_runtime_id: rt_id.clone(), - km_client: km_client.clone(), + km_client: initializer_client.clone(), }) });