Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry runtime calls if the proof is invalid #2195

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 57 additions & 48 deletions bin/light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ impl<TPlat: Platform> Background<TPlat> {
}
};

let (runtime_call_lock, virtual_machine) = precall
let (mut runtime_call_lock, mut virtual_machine) = precall
.start(
function_to_call,
call_parameters.clone(),
Expand All @@ -1356,56 +1356,65 @@ impl<TPlat: Platform> Background<TPlat> {
.await
.unwrap(); // TODO: don't unwrap

// Now that we have obtained the virtual machine, we can perform the call.
// This is a CPU-only operation that executes the virtual machine.
// The virtual machine might access the storage.
// TODO: finish doc

let mut runtime_call = match read_only_runtime_host::run(read_only_runtime_host::Config {
virtual_machine,
function_to_call,
parameter: call_parameters,
}) {
Ok(vm) => vm,
Err((err, prototype)) => {
runtime_call_lock.unlock(prototype);
return Err(RuntimeCallError::StartError(err));
}
};

loop {
match runtime_call {
read_only_runtime_host::RuntimeHostVm::Finished(Ok(success)) => {
let output = success.virtual_machine.value().as_ref().to_vec();
runtime_call_lock.unlock(success.virtual_machine.into_prototype());
break Ok(output);
}
read_only_runtime_host::RuntimeHostVm::Finished(Err(error)) => {
runtime_call_lock.unlock(error.prototype);
break Err(RuntimeCallError::ReadOnlyRuntime(error.detail));
}
read_only_runtime_host::RuntimeHostVm::StorageGet(get) => {
let storage_value = match runtime_call_lock.storage_entry(&get.key_as_vec()) {
Ok(v) => v,
Err(err) => {
runtime_call_lock.unlock(
// Now that we have obtained the virtual machine, we can perform the call.
// This is a CPU-only operation that executes the virtual machine.
// The virtual machine might access the storage.
// TODO: finish doc

let mut runtime_call =
match read_only_runtime_host::run(read_only_runtime_host::Config {
virtual_machine,
function_to_call,
parameter: call_parameters.clone(),
}) {
Ok(vm) => vm,
Err((err, prototype)) => {
runtime_call_lock.unlock_finish(prototype);
return Err(RuntimeCallError::StartError(err));
}
};

loop {
match runtime_call {
read_only_runtime_host::RuntimeHostVm::Finished(Ok(success)) => {
let output = success.virtual_machine.value().as_ref().to_vec();
runtime_call_lock.unlock_finish(success.virtual_machine.into_prototype());
return Ok(output);
}
read_only_runtime_host::RuntimeHostVm::Finished(Err(error)) => {
runtime_call_lock.unlock_finish(error.prototype);
return Err(RuntimeCallError::ReadOnlyRuntime(error.detail));
}
read_only_runtime_host::RuntimeHostVm::StorageGet(get) => {
if let Ok(storage_value) =
runtime_call_lock.storage_entry(&get.key_as_vec())
{
runtime_call = get.inject_value(storage_value.map(iter::once));
continue;
}

let (new_lock, new_vm) = runtime_call_lock
.unlock_retry(
read_only_runtime_host::RuntimeHostVm::StorageGet(get)
.into_prototype(),
);
break Err(RuntimeCallError::Call(err));
}
};
runtime_call = get.inject_value(storage_value.map(iter::once));
}
read_only_runtime_host::RuntimeHostVm::NextKey(nk) => {
// TODO:
runtime_call_lock.unlock(
read_only_runtime_host::RuntimeHostVm::NextKey(nk).into_prototype(),
);
break Err(RuntimeCallError::NextKeyForbidden);
}
read_only_runtime_host::RuntimeHostVm::StorageRoot(storage_root) => {
runtime_call = storage_root.resume(runtime_call_lock.block_storage_root());
)
.await
.unwrap(); // TODO: don't unwrap
runtime_call_lock = new_lock;
virtual_machine = new_vm;
break;
}
read_only_runtime_host::RuntimeHostVm::NextKey(nk) => {
// TODO:
runtime_call_lock.unlock_finish(
read_only_runtime_host::RuntimeHostVm::NextKey(nk).into_prototype(),
);
return Err(RuntimeCallError::NextKeyForbidden);
}
read_only_runtime_host::RuntimeHostVm::StorageRoot(storage_root) => {
runtime_call = storage_root.resume(runtime_call_lock.block_storage_root());
}
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions bin/light-base/src/json_rpc_service/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl<TPlat: Platform> Background<TPlat> {
storage_top_trie_changes: Default::default(),
}) {
Err((error, prototype)) => {
runtime_call_lock.unlock(prototype);
runtime_call_lock.unlock_finish(prototype);
methods::ServerToClient::chainHead_unstable_callEvent {
subscription: (&subscription_id).into(),
result: methods::ChainHeadCallEvent::Error {
Expand All @@ -203,8 +203,9 @@ impl<TPlat: Platform> Background<TPlat> {
runtime_host::RuntimeHostVm::Finished(Ok(success)) => {
let output =
success.virtual_machine.value().as_ref().to_owned();
runtime_call_lock
.unlock(success.virtual_machine.into_prototype());
runtime_call_lock.unlock_finish(
success.virtual_machine.into_prototype(),
);
break methods::ServerToClient::chainHead_unstable_callEvent {
subscription: (&subscription_id).into(),
result: methods::ChainHeadCallEvent::Done {
Expand All @@ -214,7 +215,7 @@ impl<TPlat: Platform> Background<TPlat> {
.to_json_call_object_parameters(None);
}
runtime_host::RuntimeHostVm::Finished(Err(error)) => {
runtime_call_lock.unlock(error.prototype);
runtime_call_lock.unlock_finish(error.prototype);
break methods::ServerToClient::chainHead_unstable_callEvent {
subscription: (&subscription_id).into(),
result: methods::ChainHeadCallEvent::Error {
Expand All @@ -230,7 +231,8 @@ impl<TPlat: Platform> Background<TPlat> {
{
Ok(v) => v,
Err(error) => {
runtime_call_lock.unlock(
// TODO: call unlock_retry instead of ending with error
runtime_call_lock.unlock_finish(
runtime_host::RuntimeHostVm::StorageGet(
get,
)
Expand All @@ -250,7 +252,7 @@ impl<TPlat: Platform> Background<TPlat> {
}
runtime_host::RuntimeHostVm::NextKey(nk) => {
// TODO: implement somehow
runtime_call_lock.unlock(
runtime_call_lock.unlock_finish(
runtime_host::RuntimeHostVm::NextKey(nk)
.into_prototype(),
);
Expand All @@ -264,7 +266,7 @@ impl<TPlat: Platform> Background<TPlat> {
}
runtime_host::RuntimeHostVm::PrefixKeys(nk) => {
// TODO: implement somehow
runtime_call_lock.unlock(
runtime_call_lock.unlock_finish(
runtime_host::RuntimeHostVm::PrefixKeys(nk)
.into_prototype(),
);
Expand Down
114 changes: 74 additions & 40 deletions bin/light-base/src/runtime_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,46 +725,55 @@ impl<'a, TPlat: Platform> RuntimeLock<'a, TPlat> {
) -> Result<(RuntimeCallLock<'a>, executor::host::HostVmPrototype), RuntimeCallError> {
// TODO: DRY :-/ this whole thing is messy

// Perform the call proof request.
// Note that `guarded` is not locked.
// TODO: there's no way to verify that the call proof is actually correct; we have to ban the peer and restart the whole call process if it turns out that it's not
// TODO: also, an empty proof will be reported as an error right now, which is weird
let call_proof = self
.service
.sync_service
.clone()
.call_proof_query(
self.block_number,
protocol::CallProofRequestConfig {
block_hash: self.hash,
method,
parameter_vectored: parameter_vectored.clone(),
},
total_attempts,
timeout_per_request,
max_parallel,
)
.await
.map_err(RuntimeCallError::CallProof);

let (guarded, virtual_machine) = match self.runtime.runtime.as_ref() {
Ok(r) => {
let mut lock = r.virtual_machine.lock().await;
let vm = lock.take().unwrap();
(lock, vm)
}
Err(err) => {
return Err(RuntimeCallError::InvalidRuntime(err.clone()));
}
};
let mut attempts_remaining = total_attempts;

while attempts_remaining > 0 {
attempts_remaining -= 1;

// Perform the call proof request.
// Note that `guarded` is not locked.
// TODO: there's no way to verify that the call proof is actually correct; we have to ban the peer and restart the whole call process if it turns out that it's not
// TODO: also, an empty proof will be reported as an error right now, which is weird
let call_proof = self
.service
.sync_service
.clone()
.call_proof_query(
self.block_number,
protocol::CallProofRequestConfig {
block_hash: self.hash,
method,
parameter_vectored: parameter_vectored.clone(),
},
1, // TODO: we do this as `total_attempts` is enforced manually; this can only be done properly after a refactoring of `call_proof_query`
timeout_per_request,
max_parallel,
)
.await
.map_err(RuntimeCallError::CallProof);

let (guarded, virtual_machine) = match self.runtime.runtime.as_ref() {
Ok(r) => {
let mut lock = r.virtual_machine.lock().await;
let vm = lock.take().unwrap();
(lock, vm)
}
Err(err) => {
return Err(RuntimeCallError::InvalidRuntime(err.clone()));
}
};

let lock = RuntimeCallLock {
guarded,
block_state_root_hash: self.block_state_root_hash,
call_proof,
};
let lock = RuntimeCallLock {
guarded,
block_state_root_hash: self.block_state_root_hash,
call_proof,
attempts_remaining,
};

Ok((lock, virtual_machine))
return Ok((lock, virtual_machine));
}

todo!() // Err(RuntimeCallError::)
}
}

Expand All @@ -774,6 +783,7 @@ pub struct RuntimeCallLock<'a> {
guarded: MutexGuard<'a, Option<executor::host::HostVmPrototype>>,
block_state_root_hash: [u8; 32],
call_proof: Result<Vec<Vec<u8>>, RuntimeCallError>,
attempts_remaining: u32,
}

impl<'a> RuntimeCallLock<'a> {
Expand Down Expand Up @@ -862,10 +872,34 @@ impl<'a> RuntimeCallLock<'a> {
Ok(output.into_iter())
}

/// End the runtime call.
/// End the runtime call, then .
///
/// The provided runtime is put back into the state of the [`RuntimeService`] while the
/// networking request is in progress, then extracted again.
pub async fn unlock_retry(
mut self,
vm: executor::host::HostVmPrototype,
) -> Result<(RuntimeCallLock<'a>, executor::host::HostVmPrototype), RuntimeCallError> {
// We're destroying `self` below. Let's first extract the fields we need.
let block_state_root_hash = self.block_state_root_hash;
let attempts_remaining = self.attempts_remaining;

// Unlock everything.
debug_assert!(self.guarded.is_none());
*self.guarded = Some(vm);
drop(self);

if attempts_remaining == 0 {
// TODO: return Err()
}

todo!()
}

/// End the runtime call, either after a success or because we are giving up.
///
/// This method **must** be called.
pub fn unlock(mut self, vm: executor::host::HostVmPrototype) {
pub fn unlock_finish(mut self, vm: executor::host::HostVmPrototype) {
debug_assert!(self.guarded.is_none());
*self.guarded = Some(vm);
}
Expand Down
Loading