From 24af4e5d79f495fbd4ba2a6d430f0eee90d99434 Mon Sep 17 00:00:00 2001 From: ptrus Date: Wed, 17 Jun 2020 13:48:00 +0200 Subject: [PATCH] runtime/dispatcher: Break recv loop on abort request --- go/runtime/host/tests/tester.go | 13 +++++++++---- runtime/src/dispatcher.rs | 10 +++++++++- runtime/src/protocol.rs | 4 ++-- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/go/runtime/host/tests/tester.go b/go/runtime/host/tests/tester.go index bdd564913af..e4737f41084 100644 --- a/go/runtime/host/tests/tester.go +++ b/go/runtime/host/tests/tester.go @@ -19,9 +19,14 @@ import ( "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" ) -// This needs to be large as some runtimes can take a long time to initialize due to remote -// attestation taking a long time. -const recvTimeout = 120 * time.Second +const ( + // This needs to be large as some runtimes can take a long time to initialize due to remote + // attestation taking a long time. + recvTimeout = 120 * time.Second + + // Runtime is already started at this point so we can have a smaller timeout than above. + recvAbortTimeout = 10 * time.Second +) // mockMessageHandler is a mock message handler which only implements a small subset of methods. type mockMessageHandler struct{} @@ -192,7 +197,7 @@ func testRestart(t *testing.T, cfg host.Config, p host.Provisioner) { select { case ev := <-evCh: require.Nil(ev.Stopped, "unexpected stop event") - case <-time.After(recvTimeout): + case <-time.After(recvAbortTimeout): } } diff --git a/runtime/src/dispatcher.rs b/runtime/src/dispatcher.rs index 34cde839bcc..1391034b218 100644 --- a/runtime/src/dispatcher.rs +++ b/runtime/src/dispatcher.rs @@ -155,8 +155,11 @@ impl Dispatcher { /// Signals to dispatcher that it should abort and waits for the abort to /// complete. - pub fn abort_and_wait(&self) -> Result<()> { + pub fn abort_and_wait(&self, ctx: Context, id: u64, req: Body) -> Result<()> { self.abort_batch.store(true, Ordering::SeqCst); + // Queue the request to break the dispatch loop in case nothing is + // being processed at the moment. + self.queue_request(ctx, id, req)?; // Wait for abort. self.abort_rx.recv().map_err(|error| anyhow!("{}", error)) } @@ -286,6 +289,11 @@ impl Dispatcher { signed_policy_raw, ); } + Ok((_ctx, _id, Body::RuntimeAbortRequest {})) => { + // We handle the RuntimeAbortRequest here so that we break + // the recv loop and re-check abort flag. + info!(self.logger, "Received abort request"); + } Ok(_) => { error!(self.logger, "Unsupported request type"); break 'dispatch; diff --git a/runtime/src/protocol.rs b/runtime/src/protocol.rs index f4a0e206bf5..485c7d39a9b 100644 --- a/runtime/src/protocol.rs +++ b/runtime/src/protocol.rs @@ -268,10 +268,10 @@ impl Protocol { info!(self.logger, "Received worker shutdown request"); Err(ProtocolError::MethodNotSupported.into()) } - Body::RuntimeAbortRequest {} => { + req @ Body::RuntimeAbortRequest {} => { info!(self.logger, "Received worker abort request"); self.can_handle_runtime_requests()?; - self.dispatcher.abort_and_wait()?; + self.dispatcher.abort_and_wait(ctx, id, req)?; info!(self.logger, "Handled worker abort request"); Ok(Some(Body::RuntimeAbortResponse {})) }