From 93e59768dfa4a7139dc0eefe069523131873eabd Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Thu, 28 Jan 2021 16:20:06 +0100 Subject: [PATCH] runtime: Add support for runtime queries --- go/runtime/client/api/api.go | 19 ++ go/runtime/client/api/grpc.go | 39 ++++ go/runtime/client/client.go | 23 ++ go/runtime/client/tests/tester.go | 15 ++ go/runtime/host/helpers.go | 26 +++ go/runtime/host/mock/mock.go | 6 + go/runtime/host/protocol/types.go | 14 ++ runtime/src/dispatcher.rs | 298 ++++++++++++-------------- runtime/src/protocol.rs | 15 +- runtime/src/transaction/dispatcher.rs | 112 ++++++++-- runtime/src/types.rs | 36 +++- 11 files changed, 410 insertions(+), 193 deletions(-) diff --git a/go/runtime/client/api/api.go b/go/runtime/client/api/api.go index 81535df95d2..9f72dad0701 100644 --- a/go/runtime/client/api/api.go +++ b/go/runtime/client/api/api.go @@ -5,6 +5,7 @@ import ( "math" "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/cbor" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/errors" "github.com/oasisprotocol/oasis-core/go/common/pubsub" @@ -34,6 +35,8 @@ var ( ErrNotSynced = errors.New(ModuleName, 4, "client: not finished initial sync") // ErrCheckTxFailed is an error returned if the local transaction check fails. ErrCheckTxFailed = errors.New(ModuleName, 5, "client: transaction check failed") + // ErrNoHostedRuntime is returned when the hosted runtime is not available locally. + ErrNoHostedRuntime = errors.New(ModuleName, 6, "client: no hosted runtime is available") ) // RuntimeClient is the runtime client interface. @@ -65,6 +68,9 @@ type RuntimeClient interface { // GetEvents returns all events emitted in a given block. GetEvents(ctx context.Context, request *GetEventsRequest) ([]*Event, error) + // Query makes a runtime-specific query. + Query(ctx context.Context, request *QueryRequest) (*QueryResponse, error) + // QueryTx queries the indexer for a specific runtime transaction. QueryTx(ctx context.Context, request *QueryTxRequest) (*TxResult, error) @@ -146,6 +152,19 @@ type Event struct { TxHash hash.Hash `json:"tx_hash"` } +// QueryRequest is a Query request. +type QueryRequest struct { + RuntimeID common.Namespace `json:"runtime_id"` + Round uint64 `json:"round"` + Method string `json:"method"` + Args cbor.RawMessage `json:"args"` +} + +// QueryResponse is a response to the runtime query. +type QueryResponse struct { + Data cbor.RawMessage `json:"data"` +} + // QueryTxRequest is a QueryTx request. type QueryTxRequest struct { RuntimeID common.Namespace `json:"runtime_id"` diff --git a/go/runtime/client/api/grpc.go b/go/runtime/client/api/grpc.go index 03a11211717..07c1621c41c 100644 --- a/go/runtime/client/api/grpc.go +++ b/go/runtime/client/api/grpc.go @@ -36,6 +36,8 @@ var ( methodGetTxs = serviceName.NewMethod("GetTxs", GetTxsRequest{}) // methodGetEvents is the GetEvents method. methodGetEvents = serviceName.NewMethod("GetEvents", GetEventsRequest{}) + // methodQuery is the Query method. + methodQuery = serviceName.NewMethod("Query", QueryRequest{}) // methodQueryTx is the QueryTx method. methodQueryTx = serviceName.NewMethod("QueryTx", QueryTxRequest{}) // methodQueryTxs is the QueryTxs method. @@ -83,6 +85,10 @@ var ( MethodName: methodGetEvents.ShortName(), Handler: handlerGetEvents, }, + { + MethodName: methodQuery.ShortName(), + Handler: handlerQuery, + }, { MethodName: methodQueryTx.ShortName(), Handler: handlerQueryTx, @@ -327,6 +333,31 @@ func handlerGetEvents( // nolint: golint return interceptor(ctx, &rq, info, handler) } +func handlerQuery( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var rq QueryRequest + if err := dec(&rq); err != nil { + return nil, err + } + if interceptor == nil { + rsp, err := srv.(RuntimeClient).Query(ctx, &rq) + return rsp, errorWrapNotFound(err) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodQuery.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + rsp, err := srv.(RuntimeClient).Query(ctx, req.(*QueryRequest)) + return rsp, errorWrapNotFound(err) + } + return interceptor(ctx, &rq, info, handler) +} + func handlerQueryTx( // nolint: golint srv interface{}, ctx context.Context, @@ -502,6 +533,14 @@ func (c *runtimeClient) GetEvents(ctx context.Context, request *GetEventsRequest return rsp, nil } +func (c *runtimeClient) Query(ctx context.Context, request *QueryRequest) (*QueryResponse, error) { + var rsp QueryResponse + if err := c.conn.Invoke(ctx, methodQuery.FullName(), request, &rsp); err != nil { + return nil, err + } + return &rsp, nil +} + func (c *runtimeClient) QueryTx(ctx context.Context, request *QueryTxRequest) (*TxResult, error) { var rsp TxResult if err := c.conn.Invoke(ctx, methodQueryTx.FullName(), request, &rsp); err != nil { diff --git a/go/runtime/client/client.go b/go/runtime/client/client.go index 501ae11be9b..d6b2316c5c0 100644 --- a/go/runtime/client/client.go +++ b/go/runtime/client/client.go @@ -364,6 +364,29 @@ func (c *runtimeClient) GetBlockByHash(ctx context.Context, request *api.GetBloc return c.GetBlock(ctx, &api.GetBlockRequest{RuntimeID: request.RuntimeID, Round: round}) } +// Implements api.RuntimeClient. +func (c *runtimeClient) Query(ctx context.Context, request *api.QueryRequest) (*api.QueryResponse, error) { + hrt, ok := c.hosts[request.RuntimeID] + if !ok { + return nil, api.ErrNoHostedRuntime + } + rt := hrt.GetHostedRuntime() + if rt == nil { + return nil, api.ErrNoHostedRuntime + } + + blk, err := c.GetBlock(ctx, &api.GetBlockRequest{RuntimeID: request.RuntimeID, Round: request.Round}) + if err != nil { + return nil, err + } + + data, err := rt.Query(ctx, blk, request.Method, request.Args) + if err != nil { + return nil, err + } + return &api.QueryResponse{Data: data}, nil +} + // Implements api.RuntimeClient. func (c *runtimeClient) QueryTx(ctx context.Context, request *api.QueryTxRequest) (*api.TxResult, error) { tagIndexer, err := c.tagIndexer(request.RuntimeID) diff --git a/go/runtime/client/tests/tester.go b/go/runtime/client/tests/tester.go index b0870c1bb42..33ca1f0f6fe 100644 --- a/go/runtime/client/tests/tester.go +++ b/go/runtime/client/tests/tester.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/cbor" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/runtime/client/api" ) @@ -178,4 +179,18 @@ func testQuery( genBlk2, err := c.GetGenesisBlock(ctx, runtimeID) require.NoError(t, err, "GetGenesisBlock2") require.EqualValues(t, genBlk, genBlk2, "GetGenesisBlock should match previous GetGenesisBlock") + + // Query runtime. + // Since we are using the mock runtime host the response should be a CBOR-serialized method name + // with the added " world" string. + rsp, err := c.Query(ctx, &api.QueryRequest{ + RuntimeID: runtimeID, + Round: blk.Header.Round, + Method: "hello", + }) + require.NoError(t, err, "Query") + var decMethod string + err = cbor.Unmarshal(rsp.Data, &decMethod) + require.NoError(t, err, "cbor.Unmarshal()") + require.EqualValues(t, "hello world", decMethod, "Query response should be correct") } diff --git a/go/runtime/host/helpers.go b/go/runtime/host/helpers.go index 7f8979f9ddb..eee8c6465da 100644 --- a/go/runtime/host/helpers.go +++ b/go/runtime/host/helpers.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "github.com/oasisprotocol/oasis-core/go/common/cbor" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" @@ -26,6 +27,9 @@ type RichRuntime interface { // CheckTx requests the runtime to check a given transaction. CheckTx(ctx context.Context, rb *block.Block, lb *consensus.LightBlock, tx []byte) error + + // Query requests the runtime to answer a runtime-specific query. + Query(ctx context.Context, rb *block.Block, method string, args cbor.RawMessage) (cbor.RawMessage, error) } type richRuntime struct { @@ -63,6 +67,28 @@ func (r *richRuntime) CheckTx(ctx context.Context, rb *block.Block, lb *consensu return nil } +// Implements RichRuntime. +func (r *richRuntime) Query(ctx context.Context, rb *block.Block, method string, args cbor.RawMessage) (cbor.RawMessage, error) { + if rb == nil { + return nil, ErrInvalidArgument + } + + resp, err := r.Call(ctx, &protocol.Body{ + RuntimeQueryRequest: &protocol.RuntimeQueryRequest{ + Method: method, + Header: rb.Header, + Args: args, + }, + }) + switch { + case err != nil: + return nil, err + case resp.RuntimeQueryResponse == nil: + return nil, fmt.Errorf("%w: malformed runtime response", ErrInternal) + } + return resp.RuntimeQueryResponse.Data, nil +} + // NewRichRuntime creates a new higher-level wrapper for a given runtime. It provides additional // convenience functions for talking with a runtime. func NewRichRuntime(rt Runtime) RichRuntime { diff --git a/go/runtime/host/mock/mock.go b/go/runtime/host/mock/mock.go index 77281bec431..e4d4cde3ebc 100644 --- a/go/runtime/host/mock/mock.go +++ b/go/runtime/host/mock/mock.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/cbor" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/errors" "github.com/oasisprotocol/oasis-core/go/common/pubsub" @@ -104,6 +105,11 @@ func (r *runtime) Call(ctx context.Context, body *protocol.Body) (*protocol.Body return &protocol.Body{RuntimeCheckTxBatchResponse: &protocol.RuntimeCheckTxBatchResponse{ Results: results, }}, nil + case body.RuntimeQueryRequest != nil: + rq := body.RuntimeQueryRequest + return &protocol.Body{RuntimeQueryResponse: &protocol.RuntimeQueryResponse{ + Data: cbor.Marshal(rq.Method + " world"), + }}, nil default: return nil, fmt.Errorf("(mock) method not supported") } diff --git a/go/runtime/host/protocol/types.go b/go/runtime/host/protocol/types.go index 608856e12fe..7d518c02fca 100644 --- a/go/runtime/host/protocol/types.go +++ b/go/runtime/host/protocol/types.go @@ -84,6 +84,8 @@ type Body struct { RuntimeAbortResponse *Empty `json:",omitempty"` RuntimeKeyManagerPolicyUpdateRequest *RuntimeKeyManagerPolicyUpdateRequest `json:",omitempty"` RuntimeKeyManagerPolicyUpdateResponse *Empty `json:",omitempty"` + RuntimeQueryRequest *RuntimeQueryRequest `json:",omitempty"` + RuntimeQueryResponse *RuntimeQueryResponse `json:",omitempty"` // Host interface. HostRPCCallRequest *HostRPCCallRequest `json:",omitempty"` @@ -269,6 +271,18 @@ type RuntimeKeyManagerPolicyUpdateRequest struct { SignedPolicyRaw []byte `json:"signed_policy_raw"` } +// RuntimeQueryRequest is a runtime query request message body. +type RuntimeQueryRequest struct { + Method string `json:"method"` + Header block.Header `json:"header"` + Args cbor.RawMessage `json:"args,omitempty"` +} + +// RuntimeQueryRequest is a runtime query response message body. +type RuntimeQueryResponse struct { + Data cbor.RawMessage `json:"data,omitempty"` +} + // HostRPCCallRequest is a host RPC call request message body. type HostRPCCallRequest struct { Endpoint string `json:"endpoint"` diff --git a/runtime/src/dispatcher.rs b/runtime/src/dispatcher.rs index b7278913fa5..a7c5699d303 100644 --- a/runtime/src/dispatcher.rs +++ b/runtime/src/dispatcher.rs @@ -9,7 +9,7 @@ use std::{ thread, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Result as AnyResult}; use crossbeam::channel; use io_context::Context; use slog::Logger; @@ -23,7 +23,9 @@ use crate::{ }, logger::get_logger, }, - consensus::roothash::{self, Block, ComputeResultsHeader, COMPUTE_RESULTS_HEADER_CONTEXT}, + consensus::roothash::{ + self, Block, ComputeResultsHeader, Header, COMPUTE_RESULTS_HEADER_CONTEXT, + }, enclave_rpc::{ demux::Demux as RpcDemux, dispatcher::Dispatcher as RpcDispatcher, @@ -45,11 +47,11 @@ use crate::{ types::TxnBatch, Context as TxnContext, }, - types::{Body, ComputedBatch, HostStorageEndpoint}, + types::{Body, ComputedBatch, Error, HostStorageEndpoint}, }; /// Maximum amount of requests that can be in the dispatcher queue. -const BACKLOG_SIZE: usize = 10; +const BACKLOG_SIZE: usize = 1000; /// Interface for dispatcher initializers. pub trait Initializer: Send + Sync { @@ -148,14 +150,14 @@ impl Dispatcher { } /// Queue a new request to be dispatched. - pub fn queue_request(&self, ctx: Context, id: u64, body: Body) -> Result<()> { + pub fn queue_request(&self, ctx: Context, id: u64, body: Body) -> AnyResult<()> { self.queue_tx.try_send((ctx, id, body))?; Ok(()) } /// Signals to dispatcher that it should abort and waits for the abort to /// complete. - pub fn abort_and_wait(&self, ctx: Context, id: u64, req: Body) -> Result<()> { + pub fn abort_and_wait(&self, ctx: Context, id: u64, req: Body) -> AnyResult<()> { self.abort_batch.store(true, Ordering::SeqCst); // Queue the request to break the dispatch loop in case nothing is // being processed at the moment. @@ -168,7 +170,7 @@ impl Dispatcher { &self, initializer: Box, rx: channel::Receiver, - ) -> Result<()> { + ) -> AnyResult<()> { // Wait for the protocol instance to be available. let protocol = { let mut guard = self.protocol.lock().unwrap(); @@ -207,85 +209,93 @@ impl Dispatcher { self.abort_tx.try_send(())?; } - match rx.recv() { - Ok((ctx, id, Body::RuntimeRPCCallRequest { request })) => { + let (ctx, id, request) = match rx.recv() { + Ok(data) => data, + Err(error) => { + error!(self.logger, "Error while waiting for request"; "err" => %error); + break 'dispatch; + } + }; + + let result = match request { + Body::RuntimeRPCCallRequest { request } => { // RPC call. - self.dispatch_rpc( - &mut rpc_demux, - &mut rpc_dispatcher, - &protocol, - ctx, - id, - request, - ); + self.dispatch_rpc(&mut rpc_demux, &mut rpc_dispatcher, &protocol, ctx, request) } - Ok((ctx, id, Body::RuntimeLocalRPCCallRequest { request })) => { + Body::RuntimeLocalRPCCallRequest { request } => { // Local RPC call. - self.dispatch_local_rpc(&mut rpc_dispatcher, &protocol, ctx, id, request); + self.dispatch_local_rpc(&mut rpc_dispatcher, &protocol, ctx, request) } - Ok(( - ctx, - id, - Body::RuntimeExecuteTxBatchRequest { - message_results, - io_root, - inputs, - block, - }, - )) => { + Body::RuntimeExecuteTxBatchRequest { + message_results, + io_root, + inputs, + block, + } => { // Transaction execution. self.dispatch_txn( &mut cache, &mut txn_dispatcher, &protocol, ctx, - id, io_root, inputs, block, message_results, false, - ); + ) } - Ok((ctx, id, Body::RuntimeCheckTxBatchRequest { inputs, block })) => { + Body::RuntimeCheckTxBatchRequest { inputs, block } => { // Transaction check. self.dispatch_txn( &mut cache_check, &mut txn_dispatcher, &protocol, ctx, - id, Hash::default(), inputs, block, vec![], true, - ); + ) } - Ok((ctx, id, Body::RuntimeKeyManagerPolicyUpdateRequest { signed_policy_raw })) => { + Body::RuntimeKeyManagerPolicyUpdateRequest { signed_policy_raw } => { // KeyManager policy update local RPC call. - self.handle_km_policy_update( - &mut rpc_dispatcher, + self.handle_km_policy_update(&mut rpc_dispatcher, ctx, signed_policy_raw) + } + Body::RuntimeQueryRequest { + method, + header, + args, + } => { + // Query. + self.dispatch_query( + &mut cache_check, + &mut txn_dispatcher, &protocol, ctx, - id, - signed_policy_raw, - ); + method, + header, + args, + ) } - Ok((_ctx, _id, Body::RuntimeAbortRequest {})) => { + Body::RuntimeAbortRequest {} => { // We handle the RuntimeAbortRequest here so that we break // the recv loop and re-check abort flag. info!(self.logger, "Received abort request"); + continue 'dispatch; } - Ok(_) => { + _ => { error!(self.logger, "Unsupported request type"); break 'dispatch; } - Err(error) => { - error!(self.logger, "Error while waiting for request"; "err" => %error); - break 'dispatch; - } - } + }; + + let response = match result { + Ok(body) => body, + Err(error) => Body::Error(error), + }; + protocol.send_response(id, response).unwrap(); } info!(self.logger, "Runtime call dispatcher is terminating"); @@ -293,45 +303,83 @@ impl Dispatcher { Ok(()) } + fn dispatch_query( + &self, + cache: &mut Cache, + txn_dispatcher: &mut dyn TxnDispatcher, + protocol: &Arc, + ctx: Context, + method: String, + header: Header, + args: cbor::Value, + ) -> Result { + debug!(self.logger, "Received query request"; + "state_root" => ?header.state_root, + "round" => ?header.round, + ); + + // Verify that the runtime ID matches the block's namespace. This is a protocol violation + // as the compute node should never change the runtime ID. + if header.namespace != protocol.get_runtime_id() { + panic!( + "block namespace does not match runtime id (namespace: {:?} runtime ID: {:?})", + header.namespace, + protocol.get_runtime_id(), + ); + } + + // Create a new context and dispatch the batch. + let ctx = ctx.freeze(); + cache.maybe_replace(Root { + namespace: header.namespace, + version: header.round, + hash: header.state_root, + }); + + let untrusted_local = Arc::new(ProtocolUntrustedLocalStorage::new( + Context::create_child(&ctx), + protocol.clone(), + )); + + let txn_ctx = TxnContext::new(ctx.clone(), &header, &[], true); + let mut overlay = OverlayTree::new(&mut cache.mkvs); + let result = StorageContext::enter(&mut overlay, untrusted_local, || { + txn_dispatcher.query(txn_ctx, &method, args) + }); + + result.map(|data| Body::RuntimeQueryResponse { data }) + } + fn txn_check_batch( &self, _ctx: Arc, - request_id: u64, cache: &mut Cache, - protocol: &Arc, - txn_dispatcher: &mut Box, + txn_dispatcher: &mut dyn TxnDispatcher, txn_ctx: TxnContext, untrusted_local: Arc, inputs: TxnBatch, _io_root: Hash, - ) -> Result<()> { + ) -> Result { let mut overlay = OverlayTree::new(&mut cache.mkvs); let results = StorageContext::enter(&mut overlay, untrusted_local.clone(), || { txn_dispatcher.check_batch(txn_ctx, &inputs) - })?; + }); debug!(self.logger, "Transaction batch check complete"); - // Send the result back. - protocol - .send_response(request_id, Body::RuntimeCheckTxBatchResponse { results }) - .unwrap(); - - Ok(()) + results.map(|results| Body::RuntimeCheckTxBatchResponse { results }) } fn txn_execute_batch( &self, ctx: Arc, - request_id: u64, cache: &mut Cache, - protocol: &Arc, - txn_dispatcher: &mut Box, + txn_dispatcher: &mut dyn TxnDispatcher, txn_ctx: TxnContext, untrusted_local: Arc, mut inputs: TxnBatch, io_root: Hash, - ) -> Result<()> { + ) -> Result { let header = txn_ctx.header.clone(); let mut overlay = OverlayTree::new(&mut cache.mkvs); let mut results = StorageContext::enter(&mut overlay, untrusted_local.clone(), || { @@ -421,38 +469,29 @@ impl Dispatcher { Signature::default() }; - let result = ComputedBatch { - header, - io_write_log, - state_write_log, - rak_sig, - messages: results.messages, - }; - - // Send the result back. - protocol - .send_response( - request_id, - Body::RuntimeExecuteTxBatchResponse { batch: result }, - ) - .unwrap(); - - Ok(()) + Ok(Body::RuntimeExecuteTxBatchResponse { + batch: ComputedBatch { + header, + io_write_log, + state_write_log, + rak_sig, + messages: results.messages, + }, + }) } fn dispatch_txn( &self, cache: &mut Cache, - txn_dispatcher: &mut Box, + txn_dispatcher: &mut dyn TxnDispatcher, protocol: &Arc, ctx: Context, - id: u64, io_root: Hash, inputs: TxnBatch, block: Block, message_results: Vec, check_only: bool, - ) { + ) -> Result { debug!(self.logger, "Received transaction batch request"; "state_root" => ?block.header.state_root, "round" => block.header.round + 1, @@ -483,12 +522,10 @@ impl Dispatcher { protocol.clone(), )); let txn_ctx = TxnContext::new(ctx.clone(), &block.header, &message_results, check_only); - let results = if check_only { + if check_only { self.txn_check_batch( ctx, - id, cache, - &protocol, txn_dispatcher, txn_ctx, untrusted_local, @@ -498,30 +535,13 @@ impl Dispatcher { } else { self.txn_execute_batch( ctx, - id, cache, - &protocol, txn_dispatcher, txn_ctx, untrusted_local, inputs, io_root, ) - }; - - // Return and error response in case of failure. - if let Err(error) = results { - warn!(self.logger, "Dispatching batch error"; "err" => %error); - protocol - .send_response( - id, - Body::Error { - module: "".to_owned(), // XXX: Error codes. - code: 1, // XXX: Error codes. - message: format!("{}", error), - }, - ) - .unwrap(); } } @@ -531,9 +551,8 @@ impl Dispatcher { rpc_dispatcher: &mut RpcDispatcher, protocol: &Arc, ctx: Context, - id: u64, request: Vec, - ) { + ) -> Result { debug!(self.logger, "Received RPC call request"); // Process frame. @@ -542,22 +561,10 @@ impl Dispatcher { Ok(result) => result, Err(error) => { error!(self.logger, "Error while processing frame"; "err" => %error); - - protocol - .send_response( - id, - Body::Error { - module: "".to_owned(), // XXX: Error codes. - code: 1, // XXX: Error codes. - message: format!("{}", error), - }, - ) - .unwrap(); - return; + return Err(Error::new("dispatcher", 1, &format!("{}", error))); } }; - let protocol_response; if let Some((session_id, session_info, message, untrusted_plaintext)) = result { // Dispatch request. assert!( @@ -574,14 +581,11 @@ impl Dispatcher { "untrusted_plaintext" => ?untrusted_plaintext, "method" => ?req.method ); - let err_reponse = Body::Error { - module: "".to_owned(), // XXX: Error codes. - code: 1, // XXX: Error codes. - message: "Request's method doesn't match untrusted_plaintext copy." - .to_string(), - }; - protocol.send_response(id, err_reponse).unwrap(); - return; + return Err(Error::new( + "dispatcher", + 1, + "Request's method doesn't match untrusted_plaintext copy.", + )); } // Request, dispatch. @@ -607,15 +611,11 @@ impl Dispatcher { match rpc_demux.write_message(session_id, response, &mut buffer) { Ok(_) => { // Transmit response. - protocol_response = Body::RuntimeRPCCallResponse { response: buffer }; + Ok(Body::RuntimeRPCCallResponse { response: buffer }) } Err(error) => { error!(self.logger, "Error while writing response"; "err" => %error); - protocol_response = Body::Error { - module: "".to_owned(), // XXX: Error codes. - code: 1, // XXX: Error codes. - message: format!("{}", error), - }; + Err(Error::new("dispatcher", 1, &format!("{}", error))) } } } @@ -625,33 +625,23 @@ impl Dispatcher { match rpc_demux.close(session_id, &mut buffer) { Ok(_) => { // Transmit response. - protocol_response = Body::RuntimeRPCCallResponse { response: buffer }; + Ok(Body::RuntimeRPCCallResponse { response: buffer }) } Err(error) => { error!(self.logger, "Error while closing session"; "err" => %error); - protocol_response = Body::Error { - module: "".to_owned(), // XXX: Error codes. - code: 1, // XXX: Error codes. - message: format!("{}", error), - }; + Err(Error::new("dispatcher", 1, &format!("{}", error))) } } } msg => { warn!(self.logger, "Ignoring invalid RPC message type"; "msg" => ?msg); - protocol_response = Body::Error { - module: "".to_owned(), // XXX: Error codes. - code: 1, // XXX: Error codes. - message: "invalid RPC message type".to_owned(), - }; + Err(Error::new("dispatcher", 1, "invalid RPC message type")) } } } else { // Send back any handshake frames. - protocol_response = Body::RuntimeRPCCallResponse { response: buffer }; + Ok(Body::RuntimeRPCCallResponse { response: buffer }) } - - protocol.send_response(id, protocol_response).unwrap(); } fn dispatch_local_rpc( @@ -659,12 +649,12 @@ impl Dispatcher { rpc_dispatcher: &mut RpcDispatcher, protocol: &Arc, ctx: Context, - id: u64, request: Vec, - ) { + ) -> Result { debug!(self.logger, "Received local RPC call request"); - let req: RpcRequest = cbor::from_slice(&request).unwrap(); + let req: RpcRequest = cbor::from_slice(&request) + .map_err(|_| Error::new("dispatcher", 1, "malformed request"))?; // Request, dispatch. let ctx = ctx.freeze(); @@ -685,26 +675,20 @@ impl Dispatcher { debug!(self.logger, "Local RPC call dispatch complete"); let response = cbor::to_vec(&response); - let protocol_response = Body::RuntimeLocalRPCCallResponse { response }; - - protocol.send_response(id, protocol_response).unwrap(); + Ok(Body::RuntimeLocalRPCCallResponse { response }) } fn handle_km_policy_update( &self, rpc_dispatcher: &mut RpcDispatcher, - protocol: &Arc, _ctx: Context, - id: u64, signed_policy_raw: Vec, - ) { + ) -> Result { debug!(self.logger, "Received km policy update request"); rpc_dispatcher.handle_km_policy_update(signed_policy_raw); debug!(self.logger, "KM policy update request complete"); - protocol - .send_response(id, Body::RuntimeKeyManagerPolicyUpdateResponse {}) - .unwrap(); + Ok(Body::RuntimeKeyManagerPolicyUpdateResponse {}) } } diff --git a/runtime/src/protocol.rs b/runtime/src/protocol.rs index 27a1570e247..01b9b51b246 100644 --- a/runtime/src/protocol.rs +++ b/runtime/src/protocol.rs @@ -21,7 +21,7 @@ use crate::{ rak::RAK, storage::KeyValue, tracing, - types::{Body, Message, MessageType}, + types::{Body, Error, Message, MessageType}, BUILD_INFO, }; @@ -146,7 +146,7 @@ impl Protocol { self.encode_message(message)?; match rx.recv()? { - Body::Error { message, .. } => Err(anyhow!("{}", message)), + Body::Error(Error { message, .. }) => Err(anyhow!("{}", message)), body => Ok(body), } } @@ -206,11 +206,7 @@ impl Protocol { // is no need to do anything more. return Ok(()); } - Err(error) => Body::Error { - module: "".to_owned(), // XXX: Error codes. - code: 1, // XXX: Error codes. - message: format!("{}", error), - }, + Err(error) => Body::Error(Error::new("dispatcher", 1, &format!("{}", error))), }; // Send response back. @@ -345,6 +341,11 @@ impl Protocol { self.dispatcher.queue_request(ctx, id, req)?; Ok(None) } + req @ Body::RuntimeQueryRequest { .. } => { + self.can_handle_runtime_requests()?; + self.dispatcher.queue_request(ctx, id, req)?; + Ok(None) + } req => { warn!(self.logger, "Received unsupported request"; "req" => format!("{:?}", req)); Err(ProtocolError::MethodNotSupported.into()) diff --git a/runtime/src/transaction/dispatcher.rs b/runtime/src/transaction/dispatcher.rs index 6524baa5377..8513eac42d7 100644 --- a/runtime/src/transaction/dispatcher.rs +++ b/runtime/src/transaction/dispatcher.rs @@ -7,7 +7,7 @@ use std::{ }, }; -use anyhow::{anyhow, Context as AnyContext, Result}; +use anyhow::{Context as AnyContext, Result as AnyResult}; use serde::{de::DeserializeOwned, Serialize}; use thiserror::Error; @@ -28,16 +28,74 @@ use crate::{ /// to process transactions. pub trait Dispatcher { /// Execute the transactions in the given batch. - fn execute_batch(&self, ctx: Context, batch: &TxnBatch) -> Result; + fn execute_batch( + &self, + ctx: Context, + batch: &TxnBatch, + ) -> Result; /// Check the transactions in the given batch for validity. - fn check_batch(&self, ctx: Context, batch: &TxnBatch) -> Result>; + fn check_batch( + &self, + ctx: Context, + batch: &TxnBatch, + ) -> Result, RuntimeError>; /// Invoke the finalizer (if any). - fn finalize(&self, new_storage_root: Hash); + fn finalize(&self, _new_storage_root: Hash) { + // Default implementation does nothing. + } /// Configure abort batch flag. - fn set_abort_batch_flag(&mut self, abort_batch: Arc); + fn set_abort_batch_flag(&mut self, _abort_batch: Arc) { + // Default implementation does nothing. + } + + /// Process a query. + fn query( + &self, + _ctx: Context, + _method: &str, + _args: cbor::Value, + ) -> Result { + // Default implementation returns an error. + Err(RuntimeError::new("dispatcher", 1, "query not supported")) + } +} + +impl Dispatcher for Box { + fn execute_batch( + &self, + ctx: Context, + batch: &TxnBatch, + ) -> Result { + T::execute_batch(&*self, ctx, batch) + } + + fn check_batch( + &self, + ctx: Context, + batch: &TxnBatch, + ) -> Result, RuntimeError> { + T::check_batch(&*self, ctx, batch) + } + + fn finalize(&self, new_storage_root: Hash) { + T::finalize(&*self, new_storage_root) + } + + fn set_abort_batch_flag(&mut self, abort_batch: Arc) { + T::set_abort_batch_flag(&mut *self, abort_batch) + } + + fn query( + &self, + ctx: Context, + method: &str, + args: cbor::Value, + ) -> Result { + T::query(&*self, ctx, method, args) + } } /// Result of processing an ExecuteTx. @@ -69,14 +127,22 @@ impl NoopDispatcher { } impl Dispatcher for NoopDispatcher { - fn execute_batch(&self, _ctx: Context, _batch: &TxnBatch) -> Result { + fn execute_batch( + &self, + _ctx: Context, + _batch: &TxnBatch, + ) -> Result { Ok(ExecuteBatchResult { results: Vec::new(), messages: Vec::new(), }) } - fn check_batch(&self, _ctx: Context, _batch: &TxnBatch) -> Result> { + fn check_batch( + &self, + _ctx: Context, + _batch: &TxnBatch, + ) -> Result, RuntimeError> { Ok(Vec::new()) } @@ -164,16 +230,16 @@ pub struct MethodDescriptor { /// Handler for a runtime method. pub trait MethodHandler { /// Invoke the method implementation and return a response. - fn handle(&self, call: &Call, ctx: &mut Context) -> Result; + fn handle(&self, call: &Call, ctx: &mut Context) -> AnyResult; } impl MethodHandler for F where Call: 'static, Output: 'static, - F: Fn(&Call, &mut Context) -> Result + 'static, + F: Fn(&Call, &mut Context) -> AnyResult + 'static, { - fn handle(&self, call: &Call, ctx: &mut Context) -> Result { + fn handle(&self, call: &Call, ctx: &mut Context) -> AnyResult { (*self)(&call, ctx) } } @@ -185,7 +251,7 @@ pub trait MethodHandlerDispatch { fn get_descriptor(&self) -> &MethodDescriptor; /// Dispatches the given raw call. - fn dispatch(&self, call: TxnCall, ctx: &mut Context) -> Result; + fn dispatch(&self, call: TxnCall, ctx: &mut Context) -> AnyResult; } struct MethodHandlerDispatchImpl { @@ -204,7 +270,7 @@ where &self.descriptor } - fn dispatch(&self, call: TxnCall, ctx: &mut Context) -> Result { + fn dispatch(&self, call: TxnCall, ctx: &mut Context) -> AnyResult { let call = cbor::from_value(call.args).context("unable to parse call arguments")?; let response = self.handler.handle(&call, ctx)?; @@ -241,7 +307,7 @@ impl Method { } /// Dispatch method call. - pub fn dispatch(&self, call: TxnCall, ctx: &mut Context) -> Result { + pub fn dispatch(&self, call: TxnCall, ctx: &mut Context) -> AnyResult { self.dispatcher.dispatch(call, ctx) } } @@ -343,7 +409,7 @@ impl MethodDispatcher { } } - fn dispatch_fallible(&self, call: &Vec, ctx: &mut Context) -> Result { + fn dispatch_fallible(&self, call: &Vec, ctx: &mut Context) -> AnyResult { let call: TxnCall = cbor::from_slice(call).context("unable to parse call")?; match self.methods.get(&call.method) { @@ -357,7 +423,11 @@ impl MethodDispatcher { } impl Dispatcher for MethodDispatcher { - fn check_batch(&self, mut ctx: Context, batch: &TxnBatch) -> Result> { + fn check_batch( + &self, + mut ctx: Context, + batch: &TxnBatch, + ) -> Result, RuntimeError> { if let Some(ref ctx_init) = self.ctx_initializer { ctx_init.init(&mut ctx); } @@ -376,7 +446,7 @@ impl Dispatcher for MethodDispatcher { .map(|b| b.load(Ordering::SeqCst)) .unwrap_or(false) { - return Err(anyhow!("batch aborted")); + return Err(RuntimeError::new("dispatcher", 1, "batch aborted")); } results.push(self.dispatch_check(call, &mut ctx)); let _ = ctx.take_tags(); @@ -385,7 +455,11 @@ impl Dispatcher for MethodDispatcher { Ok(results) } - fn execute_batch(&self, mut ctx: Context, batch: &TxnBatch) -> Result { + fn execute_batch( + &self, + mut ctx: Context, + batch: &TxnBatch, + ) -> Result { if let Some(ref ctx_init) = self.ctx_initializer { ctx_init.init(&mut ctx); } @@ -404,7 +478,7 @@ impl Dispatcher for MethodDispatcher { .map(|b| b.load(Ordering::SeqCst)) .unwrap_or(false) { - return Err(anyhow!("batch aborted")); + return Err(RuntimeError::new("dispatcher", 1, "batch aborted")); } results.push(self.dispatch_execute(call, &mut ctx)); } @@ -456,7 +530,7 @@ mod tests { MethodDescriptor { name: "dummy".to_owned(), }, - |call: &Complex, ctx: &mut Context| -> Result { + |call: &Complex, ctx: &mut Context| -> AnyResult { assert_eq!(ctx.header.timestamp, TEST_TIMESTAMP); Ok(Complex { diff --git a/runtime/src/types.rs b/runtime/src/types.rs index a29d11b63b3..32876b25106 100644 --- a/runtime/src/types.rs +++ b/runtime/src/types.rs @@ -2,6 +2,7 @@ use serde::{self, Deserialize, Deserializer, Serialize, Serializer}; use serde_bytes; use serde_repr::*; +use thiserror::Error; use crate::{ common::{ @@ -13,7 +14,7 @@ use crate::{ namespace::Namespace, sgx::avr::AVR, }, - consensus::roothash::{self, Block, ComputeResultsHeader}, + consensus::roothash::{self, Block, ComputeResultsHeader, Header}, storage::mkvs::{sync, WriteLog}, transaction::types::TxnBatch, }; @@ -63,14 +64,7 @@ pub enum Body { Empty {}, // An error response. - Error { - #[serde(default)] - module: String, - #[serde(default)] - code: u32, - #[serde(default)] - message: String, - }, + Error(Error), // Runtime interface. RuntimeInfoRequest { @@ -140,6 +134,14 @@ pub enum Body { signed_policy_raw: Vec, }, RuntimeKeyManagerPolicyUpdateResponse {}, + RuntimeQueryRequest { + method: String, + header: Header, + args: cbor::Value, + }, + RuntimeQueryResponse { + data: cbor::Value, + }, // Host interface. HostRPCCallRequest { @@ -182,16 +184,30 @@ pub enum Body { } /// A serializable error. -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Error, Serialize, Deserialize)] +#[error("module: {module} code: {code} message: {message}")] pub struct Error { #[serde(default)] pub module: String, + #[serde(default)] pub code: u32, + #[serde(default)] pub message: String, } +impl Error { + /// Create a new error. + pub fn new(module: &str, code: u32, msg: &str) -> Self { + Self { + module: module.to_owned(), + code, + message: msg.to_owned(), + } + } +} + /// Result of a CheckTx operation. #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct CheckTxResult {