Skip to content

Commit

Permalink
Merge pull request #4276 from oasisprotocol/kostko/fix/rt-dispatch-no…
Browse files Browse the repository at this point in the history
…panic

runtime: Initialize query cache with correct root
  • Loading branch information
kostko authored Sep 23, 2021
2 parents ac71808 + 468cf70 commit eca2d99
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 25 deletions.
1 change: 1 addition & 0 deletions .changelog/4276.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
runtime: Refactor protocol type, expose via `transaction::Context`
2 changes: 1 addition & 1 deletion client/src/enclave_rpc/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl Transport for RuntimeTransport {
) -> BoxFuture<Result<Vec<u8>, AnyError>> {
// NOTE: This is not actually async in SGX, but futures should be
// dispatched on the current thread anyway.
let rsp = self.protocol.make_request(
let rsp = self.protocol.call_host(
ctx,
Body::HostRPCCallRequest {
endpoint: self.endpoint.clone(),
Expand Down
6 changes: 4 additions & 2 deletions runtime/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl CacheSet {

/// Cache used for queries.
pub fn query(&self, root: Root) -> Rc<RefCell<Cache>> {
QUERY_CACHE.with(|caches| {
let cache = QUERY_CACHE.with(|caches| {
let mut caches = caches.borrow_mut();
if let Some(cache) = caches.get(&root.version) {
return cache.clone();
Expand All @@ -65,7 +65,9 @@ impl CacheSet {
let cache = Rc::new(RefCell::new(Cache::new(&self.protocol)));
caches.put(root.version, cache.clone());
cache
})
});
cache.borrow_mut().maybe_replace(&self.protocol, root);
cache
}
}

Expand Down
2 changes: 1 addition & 1 deletion runtime/src/consensus/tendermint/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ impl Io {

let result = self
.protocol
.make_request(
.call_host(
Context::background(),
Body::HostFetchConsensusBlockRequest { height },
)
Expand Down
33 changes: 24 additions & 9 deletions runtime/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ impl Drop for AbortOnPanic {
}
}

impl From<tokio::task::JoinError> for Error {
fn from(e: tokio::task::JoinError) -> Self {
Error::new(
"dispatcher",
1,
&format!("error while processing request: {}", e),
)
}
}

/// State related to dispatching a runtime transaction.
struct TxDispatchState {
consensus_block: LightBlock,
Expand Down Expand Up @@ -420,6 +430,7 @@ impl Dispatcher {
state: TxDispatchState,
) -> Result<Body, Error> {
debug!(self.logger, "Received query request";
"method" => &method,
"state_root" => ?state.header.state_root,
"round" => ?state.header.round,
);
Expand All @@ -438,6 +449,7 @@ impl Dispatcher {
));
}

let protocol = protocol.clone();
let txn_dispatcher = txn_dispatcher.clone();

tokio::task::spawn_blocking(move || {
Expand All @@ -458,6 +470,7 @@ impl Dispatcher {

let txn_ctx = TxnContext::new(
ctx.freeze(),
protocol,
consensus_state,
&mut overlay,
&state.header,
Expand All @@ -471,13 +484,13 @@ impl Dispatcher {
.query(txn_ctx, &method, args)
.map(|data| Body::RuntimeQueryResponse { data })
})
.await
.unwrap()
.await?
}

fn txn_check_batch(
&self,
ctx: Arc<Context>,
protocol: Arc<Protocol>,
cache_set: cache::CacheSet,
txn_dispatcher: &dyn TxnDispatcher,
inputs: TxnBatch,
Expand All @@ -498,6 +511,7 @@ impl Dispatcher {

let txn_ctx = TxnContext::new(
ctx.clone(),
protocol,
consensus_state,
&mut overlay,
&state.header,
Expand All @@ -516,6 +530,7 @@ impl Dispatcher {
fn txn_execute_batch(
&self,
ctx: Arc<Context>,
protocol: Arc<Protocol>,
cache_set: cache::CacheSet,
txn_dispatcher: &dyn TxnDispatcher,
mut inputs: TxnBatch,
Expand All @@ -539,6 +554,7 @@ impl Dispatcher {

let txn_ctx = TxnContext::new(
ctx.clone(),
protocol,
consensus_state,
&mut overlay,
header,
Expand Down Expand Up @@ -686,15 +702,17 @@ impl Dispatcher {
}

let ctx = ctx.freeze();
let protocol = protocol.clone();
let dispatcher = self.clone();
let txn_dispatcher = txn_dispatcher.clone();

tokio::task::spawn_blocking(move || {
if state.check_only {
dispatcher.txn_check_batch(ctx, cache_set, &txn_dispatcher, inputs, state)
dispatcher.txn_check_batch(ctx, protocol, cache_set, &txn_dispatcher, inputs, state)
} else {
dispatcher.txn_execute_batch(
ctx,
protocol,
cache_set,
&txn_dispatcher,
inputs,
Expand All @@ -703,8 +721,7 @@ impl Dispatcher {
)
}
})
.await
.unwrap()
.await?
}

async fn dispatch_rpc(
Expand Down Expand Up @@ -770,8 +787,7 @@ impl Dispatcher {
let response = rpc_dispatcher.dispatch(req, rpc_ctx);
RpcMessage::Response(response)
})
.await
.unwrap();
.await?;

// Note: MKVS commit is omitted, this MUST be global side-effect free.

Expand Down Expand Up @@ -853,8 +869,7 @@ impl Dispatcher {
let response = cbor::to_vec(response);
Ok(Body::RuntimeLocalRPCCallResponse { response })
})
.await
.unwrap()
.await?
}

fn handle_km_policy_update(
Expand Down
19 changes: 12 additions & 7 deletions runtime/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ pub struct Protocol {

impl Protocol {
/// Create a new protocol handler instance.
pub fn new(stream: Stream, rak: Arc<RAK>, dispatcher: Arc<Dispatcher>, config: Config) -> Self {
pub(crate) fn new(
stream: Stream,
rak: Arc<RAK>,
dispatcher: Arc<Dispatcher>,
config: Config,
) -> Self {
let logger = get_logger("runtime/protocol");

let (outgoing_tx, outgoing_rx) = channel::unbounded();
Expand Down Expand Up @@ -164,7 +169,7 @@ impl Protocol {
}

/// Start the protocol handler loop.
pub fn start(self: &Arc<Protocol>) {
pub(crate) fn start(self: &Arc<Protocol>) {
// Spawn write end in a separate thread.
let protocol = self.clone();
let write_thread = std::thread::spawn(move || protocol.io_write());
Expand Down Expand Up @@ -203,8 +208,8 @@ impl Protocol {
info!(self.logger, "Protocol writer thread is terminating");
}

/// Make a new request to the worker host and wait for the response.
pub fn make_request(&self, _ctx: Context, body: Body) -> Result<Body, Error> {
/// Make a new request to the runtime host and wait for the response.
pub fn call_host(&self, _ctx: Context, body: Body) -> Result<Body, Error> {
let id = self.last_request_id.fetch_add(1, Ordering::SeqCst) as u64;
let message = Message {
id,
Expand All @@ -231,7 +236,7 @@ impl Protocol {
}
}

/// Send an async response to a previous request back to the worker host.
/// Send an async response to a previous request back to the host.
pub fn send_response(&self, id: u64, body: Body) -> anyhow::Result<()> {
self.send_message(Message {
id,
Expand Down Expand Up @@ -506,7 +511,7 @@ impl KeyValue for ProtocolUntrustedLocalStorage {

match self
.protocol
.make_request(ctx, Body::HostLocalStorageGetRequest { key })?
.call_host(ctx, Body::HostLocalStorageGetRequest { key })?
{
Body::HostLocalStorageGetResponse { value } => Ok(value),
_ => Err(ProtocolError::InvalidResponse.into()),
Expand All @@ -518,7 +523,7 @@ impl KeyValue for ProtocolUntrustedLocalStorage {

match self
.protocol
.make_request(ctx, Body::HostLocalStorageSetRequest { key, value })?
.call_host(ctx, Body::HostLocalStorageSetRequest { key, value })?
{
Body::HostLocalStorageSetResponse {} => Ok(()),
_ => Err(ProtocolError::InvalidResponse.into()),
Expand Down
10 changes: 5 additions & 5 deletions runtime/src/storage/mkvs/sync/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl HostReadSyncer {
}
}

fn make_request_with_proof(
fn call_host_with_proof(
&self,
ctx: Context,
request: StorageSyncRequest,
Expand All @@ -36,7 +36,7 @@ impl HostReadSyncer {
endpoint: self.endpoint,
request,
});
match self.protocol.make_request(ctx, request) {
match self.protocol.call_host(ctx, request) {
Ok(Body::HostStorageSyncResponse(StorageSyncResponse::ProofResponse(response))) => {
Ok(response)
}
Expand All @@ -52,18 +52,18 @@ impl ReadSync for HostReadSyncer {
}

fn sync_get(&mut self, ctx: Context, request: GetRequest) -> Result<ProofResponse> {
self.make_request_with_proof(ctx, StorageSyncRequest::SyncGet(request))
self.call_host_with_proof(ctx, StorageSyncRequest::SyncGet(request))
}

fn sync_get_prefixes(
&mut self,
ctx: Context,
request: GetPrefixesRequest,
) -> Result<ProofResponse> {
self.make_request_with_proof(ctx, StorageSyncRequest::SyncGetPrefixes(request))
self.call_host_with_proof(ctx, StorageSyncRequest::SyncGetPrefixes(request))
}

fn sync_iterate(&mut self, ctx: Context, request: IterateRequest) -> Result<ProofResponse> {
self.make_request_with_proof(ctx, StorageSyncRequest::SyncIterate(request))
self.call_host_with_proof(ctx, StorageSyncRequest::SyncIterate(request))
}
}
5 changes: 5 additions & 0 deletions runtime/src/transaction/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ use crate::{
roothash::{Header, RoundResults},
state::ConsensusState,
},
protocol::Protocol,
storage::MKVS,
};

/// Transaction context.
pub struct Context<'a> {
/// I/O context.
pub io_ctx: Arc<IoContext>,
/// Low-level access to the underlying Runtime Host Protocol.
pub protocol: Arc<Protocol>,
/// Consensus state tree.
pub consensus_state: ConsensusState,
/// Runtime state.
Expand All @@ -37,6 +40,7 @@ impl<'a> Context<'a> {
/// Construct new transaction context.
pub fn new(
io_ctx: Arc<IoContext>,
protocol: Arc<Protocol>,
consensus_state: ConsensusState,
runtime_state: &'a mut dyn MKVS,
header: &'a Header,
Expand All @@ -47,6 +51,7 @@ impl<'a> Context<'a> {
) -> Self {
Self {
io_ctx,
protocol,
consensus_state,
runtime_state,
header,
Expand Down

0 comments on commit eca2d99

Please sign in to comment.