Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime/dispatcher: Break recv loop on abort request #3023

Merged
merged 1 commit into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/3023.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
runtime/dispatcher: Break recv loop on abort request
27 changes: 23 additions & 4 deletions go/runtime/host/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ import (
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
)

// This needs to be large as some runtimes can take a long time to initialize due to remote
// attestation taking a long time.
const recvTimeout = 120 * time.Second
const (
// This needs to be large as some runtimes can take a long time to initialize due to remote
// attestation taking a long time.
recvTimeout = 120 * time.Second

// Runtime is already started at this point so we can have a smaller timeout than above.
recvAbortTimeout = 10 * time.Second
)

// mockMessageHandler is a mock message handler which only implements a small subset of methods.
type mockMessageHandler struct{}
Expand Down Expand Up @@ -192,7 +197,21 @@ func testRestart(t *testing.T, cfg host.Config, p host.Provisioner) {
select {
case ev := <-evCh:
require.Nil(ev.Stopped, "unexpected stop event")
case <-time.After(recvTimeout):
case <-time.After(recvAbortTimeout):
}

// Trigger another non-force abort (runtime should not be restarted).
// NOTE: the above Abort request makes it to the dispatcher before the first
// iteration of the dispatch loop, therefore the request is handled before
// the dispatcher is stuck in the recv loop. Here it is ensured that the
// runtime dispatcher is already running and waiting on requests.
err = r.Abort(context.Background(), false)
require.NoError(err, "Abort(force=false)")

// There should be no stop event.
select {
case ev := <-evCh:
require.Nil(ev.Stopped, "unexpected stop event")
case <-time.After(recvAbortTimeout):
}
}
10 changes: 9 additions & 1 deletion runtime/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,11 @@ impl Dispatcher {

/// Signals to dispatcher that it should abort and waits for the abort to
/// complete.
pub fn abort_and_wait(&self) -> Result<()> {
pub fn abort_and_wait(&self, ctx: Context, id: u64, req: Body) -> Result<()> {
self.abort_batch.store(true, Ordering::SeqCst);
// Queue the request to break the dispatch loop in case nothing is
// being processed at the moment.
self.queue_request(ctx, id, req)?;
// Wait for abort.
self.abort_rx.recv().map_err(|error| anyhow!("{}", error))
}
Expand Down Expand Up @@ -286,6 +289,11 @@ impl Dispatcher {
signed_policy_raw,
);
}
Ok((_ctx, _id, Body::RuntimeAbortRequest {})) => {
// We handle the RuntimeAbortRequest here so that we break
// the recv loop and re-check abort flag.
info!(self.logger, "Received abort request");
}
Ok(_) => {
error!(self.logger, "Unsupported request type");
break 'dispatch;
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ impl Protocol {
info!(self.logger, "Received worker shutdown request");
Err(ProtocolError::MethodNotSupported.into())
}
Body::RuntimeAbortRequest {} => {
req @ Body::RuntimeAbortRequest {} => {
info!(self.logger, "Received worker abort request");
self.can_handle_runtime_requests()?;
self.dispatcher.abort_and_wait()?;
self.dispatcher.abort_and_wait(ctx, id, req)?;
info!(self.logger, "Handled worker abort request");
Ok(Some(Body::RuntimeAbortResponse {}))
}
Expand Down