Skip to content
This repository has been archived by the owner on Feb 3, 2023. It is now read-only.

More Pending Validation #1059

Merged
merged 16 commits into from
Feb 28, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- mac os x install script installs cmake and qt
- The current git-commit hash is now included in the compile code of the core, and is checked against the same hash that was used to compile the wasm and a warning issued if it's not. [PR#1050](https://github.com/holochain/holochain-rust/pull/1036)
- Validation of link entries gets retried now if base or target of the link were not yet accessible on the validating node. This fixes a bug where links have been invalid due to network timing issues [PR#1054](https://github.com/holochain/holochain-rust/pull/1054).
- Validation of any entry gets retried now if the validation package could not be retrieved from the source [PR#1059](https://github.com/holochain/holochain-rust/pull/1059).
### Fixed

## [0.0.4-alpha] - 2019-02-15
Expand Down
3 changes: 3 additions & 0 deletions core/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ pub enum Action {
/// An entry could not be validated yet because dependencies are still missing.
/// This adds the entry to nucleus state's pending list.
AddPendingValidation(PendingValidation),

/// Clear an entry from the pending validation list
RemovePendingValidation(Address),
}

/// function signature for action handler functions
Expand Down
12 changes: 5 additions & 7 deletions core/src/dht/actions/hold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@ use holochain_core_types::{
};
use std::{pin::Pin, sync::Arc};

pub async fn hold_entry<'a>(
entry_wh: EntryWithHeader,
pub async fn hold_entry(
entry_wh: &EntryWithHeader,
context: Arc<Context>,
) -> Result<Address, HolochainError> {
let action_wrapper = ActionWrapper::new(Action::Hold(entry_wh.clone()));
let address = entry_wh.entry.address();
let action_wrapper = ActionWrapper::new(Action::Hold(entry_wh.to_owned()));
dispatch_action(context.action_channel(), action_wrapper.clone());

await!(HoldEntryFuture {
context: context,
address: entry_wh.entry.address(),
})
await!(HoldEntryFuture { context, address })
}

pub struct HoldEntryFuture {
Expand Down
2 changes: 1 addition & 1 deletion core/src/network/handler/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn handle_store_entry(dht_data: EntryData, context: Arc<Context>) {
let entry_with_header: EntryWithHeader =
serde_json::from_str(&serde_json::to_string(&dht_data.entry_content).unwrap()).unwrap();
thread::spawn(move || {
match context.block_on(hold_entry_workflow(entry_with_header, context.clone())) {
match context.block_on(hold_entry_workflow(&entry_with_header, context.clone())) {
Err(error) => context.log(format!("err/net/dht: {}", error)),
_ => (),
}
Expand Down
22 changes: 22 additions & 0 deletions core/src/nucleus/actions/add_pending_validation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::{
action::{Action, ActionWrapper},
context::Context,
instance::dispatch_action,
network::entry_with_header::EntryWithHeader,
};
use holochain_core_types::cas::content::Address;
use std::sync::Arc;

pub fn add_pending_validation(
entry_with_header: EntryWithHeader,
dependencies: Vec<Address>,
context: &Arc<Context>,
) {
dispatch_action(
context.action_channel(),
ActionWrapper::new(Action::AddPendingValidation(Arc::new((
entry_with_header.to_owned(),
dependencies.clone(),
)))),
);
}
2 changes: 2 additions & 0 deletions core/src/nucleus/actions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub mod add_pending_validation;
pub mod build_validation_package;
pub mod call_zome_function;
pub mod get_entry;
pub mod initialize;
pub mod remove_pending_validation;
pub mod run_validation_callback;

#[cfg(test)]
Expand Down
14 changes: 14 additions & 0 deletions core/src/nucleus/actions/remove_pending_validation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::{
action::{Action, ActionWrapper},
context::Context,
instance::dispatch_action,
};
use holochain_core_types::cas::content::Address;
use std::sync::Arc;

pub fn remove_pending_validation(address: Address, context: &Arc<Context>) {
dispatch_action(
context.action_channel(),
ActionWrapper::new(Action::RemovePendingValidation(address)),
);
}
2 changes: 1 addition & 1 deletion core/src/nucleus/reducers/add_pending_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub mod tests {
header: test_chain_header(),
};

let action_wrapper = ActionWrapper::new(Action::AddPendingValidation(Box::new((
let action_wrapper = ActionWrapper::new(Action::AddPendingValidation(Arc::new((
entry_with_header.clone(),
Vec::new(),
))));
Expand Down
3 changes: 3 additions & 0 deletions core/src/nucleus/reducers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod add_pending_validation;
pub mod init_application;
mod remove_pending_validation;
pub mod return_initialization_result;
pub mod return_validation_package;
pub mod return_validation_result;
Expand All @@ -12,6 +13,7 @@ use crate::{
reducers::{
add_pending_validation::reduce_add_pending_validation,
init_application::reduce_init_application,
remove_pending_validation::reduce_remove_pending_validation,
return_initialization_result::reduce_return_initialization_result,
return_validation_package::reduce_return_validation_package,
return_validation_result::reduce_return_validation_result,
Expand All @@ -27,6 +29,7 @@ use std::sync::Arc;
fn resolve_reducer(action_wrapper: &ActionWrapper) -> Option<NucleusReduceFn> {
match action_wrapper.action() {
Action::AddPendingValidation(_) => Some(reduce_add_pending_validation),
Action::RemovePendingValidation(_) => Some(reduce_remove_pending_validation),
Action::ReturnInitializationResult(_) => Some(reduce_return_initialization_result),
Action::InitApplication(_) => Some(reduce_init_application),
Action::ReturnZomeFunctionResult(_) => Some(reduce_return_zome_function_result),
Expand Down
63 changes: 63 additions & 0 deletions core/src/nucleus/reducers/remove_pending_validation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use crate::{
action::{Action, ActionWrapper},
context::Context,
nucleus::state::NucleusState,
};
use std::sync::Arc;

/// Reduce AddPendingValidation Action.
thedavidmeister marked this conversation as resolved.
Show resolved Hide resolved
/// Inserts boxed EntryWithHeader and dependencies into state, referenced with
/// the entry's address.
#[allow(unknown_lints)]
#[allow(needless_pass_by_value)]
pub fn reduce_remove_pending_validation(
_context: Arc<Context>,
state: &mut NucleusState,
action_wrapper: &ActionWrapper,
) {
let action = action_wrapper.action();
let address = unwrap_to!(action => Action::RemovePendingValidation);
state.pending_validations.remove(address);
}

#[cfg(test)]
pub mod tests {
use super::*;
use crate::{
instance::tests::test_context,
network::entry_with_header::EntryWithHeader,
nucleus::{
reducers::add_pending_validation::reduce_add_pending_validation,
state::tests::test_nucleus_state,
},
};
use holochain_core_types::{
cas::content::AddressableContent, chain_header::test_chain_header, entry::Entry,
json::RawString,
};

#[test]
fn test_reduce_remove_pending_validation() {
let context = test_context("jimmy", None);
let mut state = test_nucleus_state();

let entry = Entry::App("package_entry".into(), RawString::from("test value").into());
let entry_with_header = EntryWithHeader {
entry: entry.clone(),
header: test_chain_header(),
};

let action_wrapper = ActionWrapper::new(Action::AddPendingValidation(Arc::new((
entry_with_header.clone(),
Vec::new(),
))));

reduce_add_pending_validation(context.clone(), &mut state, &action_wrapper);

assert!(state.pending_validations.contains_key(&entry.address()));

let action_wrapper = ActionWrapper::new(Action::RemovePendingValidation(entry.address()));

reduce_remove_pending_validation(context, &mut state, &action_wrapper)
}
lucksus marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions core/src/scheduled_jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

pub fn create_callback(context: Arc<Context>) -> impl 'static + FnMut() + Sync + Send {
move || {
context.log("debug/scheduled_jobs: tick");
pending_validations::run_pending_validations(context.clone());
}
}
45 changes: 33 additions & 12 deletions core/src/scheduled_jobs/pending_validations.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,47 @@
use crate::{
context::Context, network::entry_with_header::EntryWithHeader,
workflows::hold_link::hold_link_workflow,
context::Context,
network::entry_with_header::EntryWithHeader,
nucleus::actions::remove_pending_validation::remove_pending_validation,
workflows::{hold_entry::hold_entry_workflow, hold_link::hold_link_workflow},
};
use holochain_core_types::{
cas::content::{Address, AddressableContent},
entry::entry_type::EntryType,
error::error::HolochainError,
};
use holochain_core_types::{cas::content::Address, entry::entry_type::EntryType};
use std::{sync::Arc, thread};

pub type PendingValidation = Box<(EntryWithHeader, Vec<Address>)>;
pub type PendingValidation = Arc<(EntryWithHeader, Vec<Address>)>;

fn retry_validation(pending: PendingValidation, context: Arc<Context>) {
thread::spawn(move || context.block_on(hold_link_workflow(&pending.0, &context)));
thread::spawn(move || {
let result = match pending.0.entry.entry_type() {
EntryType::LinkAdd | EntryType::LinkRemove => {
context.block_on(hold_link_workflow(&pending.0, &context))
}
_ => context.block_on(hold_entry_workflow(&pending.0, context.clone())),
};

if Err(HolochainError::ValidationPending) != result {
remove_pending_validation(pending.0.entry.address(), &context);
}
});
}

pub fn run_pending_validations(context: Arc<Context>) {
context
let pending_validations = context
.state()
.unwrap()
.nucleus()
.pending_validations
.iter()
.for_each(|(_, boxed)| match boxed.0.entry.entry_type() {
EntryType::LinkAdd => retry_validation(boxed.clone(), context.clone()),
EntryType::LinkRemove => retry_validation(boxed.clone(), context.clone()),
_ => panic!("Pending validations are (currently) only implemented for links"),
});
.clone();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We get this clone of a HashMap<Arc<...>> so we can drop the state lock immediately.


pending_validations.iter().for_each(|(_, pending)| {
context.log(dbg!(format!(
"debug/scheduled_jobs/run_pending_validations: found pending validation for {}: {}",
pending.0.entry.entry_type(),
pending.0.entry.address()
)));
retry_validation(pending.clone(), context.clone());
});
}
32 changes: 23 additions & 9 deletions core/src/workflows/hold_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,38 @@ use crate::{
network::{
actions::get_validation_package::get_validation_package, entry_with_header::EntryWithHeader,
},
nucleus::validation::validate_entry,
nucleus::{
actions::add_pending_validation::add_pending_validation, validation::validate_entry,
},
};

use holochain_core_types::{
cas::content::Address,
error::HolochainError,
validation::{EntryAction, EntryLifecycle, ValidationData},
};
use std::sync::Arc;

pub async fn hold_entry_workflow<'a>(
entry_with_header: EntryWithHeader,
entry_with_header: &EntryWithHeader,
context: Arc<Context>,
) -> Result<Address, HolochainError> {
let EntryWithHeader { entry, header } = &entry_with_header;
) -> Result<(), HolochainError> {
let EntryWithHeader { entry, header } = entry_with_header;

// 1. Get validation package from source
let maybe_validation_package = await!(get_validation_package(header.clone(), &context))?;
let validation_package = maybe_validation_package
.ok_or("Could not get validation package from source".to_string())?;
let maybe_validation_package = await!(get_validation_package(header.clone(), &context))
.map_err(|err| {
let message = "Could not get validation package from source! -> Add to pending...";
context.log(format!("debug/workflow/hold_entry: {}", message));
context.log(format!("debug/workflow/hold_entry: Error was: {:?}", err));
add_pending_validation(entry_with_header.to_owned(), Vec::new(), &context);
HolochainError::ValidationPending
})?;
let validation_package = maybe_validation_package.ok_or({
let message = "Source did respond to request but did not deliver validation package! This is weird! Entry is not valid!";
context.log(format!("debug/workflow/hold_entry: {}", message));
HolochainError::ValidationPending
lucksus marked this conversation as resolved.
Show resolved Hide resolved
})?;
context.log(format!("debug/workflow/hold_entry: got validation package"));

// 2. Create validation data struct
let validation_data = ValidationData {
Expand All @@ -36,7 +48,9 @@ pub async fn hold_entry_workflow<'a>(
await!(validate_entry(entry.clone(), validation_data, &context))?;

// 3. If valid store the entry in the local DHT shard
await!(hold_entry(entry_with_header, context))
await!(hold_entry(entry_with_header, context))?;

Ok(())
}

#[cfg(test)]
Expand Down
33 changes: 19 additions & 14 deletions core/src/workflows/hold_link.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{
action::{Action, ActionWrapper},
context::Context,
dht::actions::add_link::add_link,
network::{
Expand All @@ -8,7 +7,9 @@ use crate::{
nucleus::validation::validate_entry,
};

use crate::{instance::dispatch_action, nucleus::validation::ValidationError};
use crate::nucleus::{
actions::add_pending_validation::add_pending_validation, validation::ValidationError,
};
use holochain_core_types::{
entry::Entry,
error::HolochainError,
Expand All @@ -35,10 +36,20 @@ pub async fn hold_link_workflow<'a>(
context.log(format!(
"debug/workflow/hold_link: getting validation package..."
));
let maybe_validation_package = await!(get_validation_package(header.clone(), &context))?;
let validation_package = maybe_validation_package
.ok_or("Could not get validation package from source".to_string())?;
context.log(format!("debug/workflow/hold_link: got validation package!"));
let maybe_validation_package = await!(get_validation_package(header.clone(), &context))
.map_err(|err| {
let message = "Could not get validation package from source! -> Add to pending...";
context.log(format!("debug/workflow/hold_link: {}", message));
context.log(format!("debug/workflow/hold_link: Error was: {:?}", err));
add_pending_validation(entry_with_header.to_owned(), Vec::new(), context);
HolochainError::ValidationPending
})?;
let validation_package = maybe_validation_package.ok_or({
let message = "Source did respond to request but did not deliver validation package! This is weird! Entry is not valid!";
context.log(format!("debug/workflow/hold_link: {}", message));
HolochainError::ValidationPending
lucksus marked this conversation as resolved.
Show resolved Hide resolved
})?;
context.log(format!("debug/workflow/hold_link: got validation package"));

// 2. Create validation data struct
let validation_data = ValidationData {
Expand All @@ -52,15 +63,9 @@ pub async fn hold_link_workflow<'a>(
await!(validate_entry(entry.clone(), validation_data, &context)).map_err(|err| {
context.log(format!("debug/workflow/hold_link: invalid! {:?}", err));
if let ValidationError::UnresolvedDependencies(dependencies) = &err {
dispatch_action(
context.action_channel(),
ActionWrapper::new(Action::AddPendingValidation(Box::new((
entry_with_header.to_owned(),
dependencies.clone(),
)))),
);
add_pending_validation(entry_with_header.to_owned(), dependencies.clone(), &context);
}
err
HolochainError::ValidationPending
})?;
context.log(format!("debug/workflow/hold_link: is valid!"));

Expand Down
Loading