Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consensus bootstrap streaming #3192

Merged
merged 44 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
cf391a6
Change streaming step struct and consensus bootstrap function.
Eitu33 Oct 28, 2022
f69da2c
Start updating the manage_bootstrap function.
Eitu33 Oct 28, 2022
381a00a
Update state and consensus end cond.
Eitu33 Nov 2, 2022
9edbb8e
Start updating messages.
Eitu33 Nov 2, 2022
f388005
Rename all messages accordingly.
Eitu33 Nov 2, 2022
1983038
Update bootstrap messages.
Eitu33 Nov 2, 2022
c9a114e
Update consensus mock.
Eitu33 Nov 2, 2022
8d3c862
Merge source branch and fix conflicts.
Eitu33 Nov 2, 2022
c921b02
Updat bootstrap test.
Eitu33 Nov 2, 2022
f62133b
Fix never-ending streaming issue.
Eitu33 Nov 2, 2022
b3adf32
Update consensus' get_bootstrap_part finished behaviour.
Eitu33 Nov 2, 2022
eacbcad
Fix bootstrap client consensus part handling issue & update test_boot…
Eitu33 Nov 3, 2022
b68d2a9
Debug logs.
Eitu33 Nov 3, 2022
44ea6ac
Fix logs compilation.
Eitu33 Nov 3, 2022
d0f4bf7
get_bootstrap_part debug updates.
Eitu33 Nov 4, 2022
2343b34
Fix debug compilation
Eitu33 Nov 4, 2022
b180402
Fix part of get_bootstrap_part behaviour.
Eitu33 Nov 4, 2022
1d35f42
Temporary one batch consensus bootstrap for global behaviour testing.
Eitu33 Nov 4, 2022
0092bde
Update list_required_active_blocks return value and re-enable consens…
Eitu33 Nov 4, 2022
006de38
Disable matching slot finish condition.
Eitu33 Nov 4, 2022
8c2da1b
Consensus streaming cursor udpate, according bootstrap messages updat…
Eitu33 Nov 7, 2022
8164532
Update consensus bootstrap cursor and prehashset ser & deser
Eitu33 Nov 7, 2022
149b943
Refactor consensus get_bootstrap_part
Eitu33 Nov 7, 2022
b7de571
Update consensus get_bootstrap_part behaviour
Eitu33 Nov 7, 2022
1d6d1c9
Update consensus get_bootstrap_part documentation
Eitu33 Nov 7, 2022
f23d68c
Slight get_bootstrap_part behaviour change + debug logs
Eitu33 Nov 8, 2022
0b52838
Update mock and bootstrap test
Eitu33 Nov 8, 2022
2347fa4
Remove the full operations from ExportActiveBlock graph
Eitu33 Nov 8, 2022
ce6b04a
Bootstrap client consensus cursor computing and better server diagnos…
Eitu33 Nov 8, 2022
eb55cbe
Remove useless client BootstrapSuccess bootstrap info double check
Eitu33 Nov 8, 2022
de5cd6e
Get rid of some useless debug logs
Eitu33 Nov 8, 2022
5634043
Add consensus bootstrap parameters to the configs
Eitu33 Nov 8, 2022
19abe9d
Modify bootstrap server streaming logic
Eitu33 Nov 10, 2022
989e932
Handle outdated ids in consensus get_bootstrap_part, update tests acc…
Eitu33 Nov 10, 2022
d4708d4
Add consensus outdated ids the bootstrap part message info
Eitu33 Nov 10, 2022
cbf9a1a
Remove outdated blocks on receive in bootstrap client
Eitu33 Nov 10, 2022
698b526
Review updates
Eitu33 Nov 10, 2022
fa7a177
Remove operations form export active block
Eitu33 Nov 14, 2022
2789da7
Remove export active block ope mismatch check
Eitu33 Nov 14, 2022
f8ca89e
Re-enable client bootstrap debug logs
Eitu33 Nov 14, 2022
2722190
Imporve consensus get_bootstrap_part doc
Eitu33 Nov 15, 2022
a4f7d9a
Remove unused deserializers and config parameters
Eitu33 Nov 15, 2022
ce414e4
Prune consensus streaming cursor
Eitu33 Nov 16, 2022
0958e68
Remove todo's
Eitu33 Nov 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions massa-async-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl AsyncPool {
let left_bound = match cursor {
StreamingStep::Started => Unbounded,
StreamingStep::Ongoing(last_id) => Excluded(last_id),
StreamingStep::Finished => return (pool_part, cursor),
StreamingStep::Finished(_) => return (pool_part, cursor),
};
let mut pool_part_last_id: Option<AsyncMessageId> = None;
for (id, message) in self.messages.range((left_bound, Unbounded)) {
Expand All @@ -171,7 +171,7 @@ impl AsyncPool {
if let Some(last_id) = pool_part_last_id {
(pool_part, StreamingStep::Ongoing(last_id))
} else {
(pool_part, StreamingStep::Finished)
(pool_part, StreamingStep::Finished(None))
}
}

Expand All @@ -191,7 +191,7 @@ impl AsyncPool {
if let Some(message_id) = self.messages.last_key_value().map(|(&id, _)| id) {
StreamingStep::Ongoing(message_id)
} else {
StreamingStep::Finished
StreamingStep::Finished(None)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion massa-bootstrap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ async-speed-limit = { git = "https://github.com/adrien-zinger/async-speed-limit"
"default",
"tokio",
] }
fix-hidden-lifetime-bug = "0.2.5"
displaydoc = "0.2"
futures = "0.3"
num_enum = "0.5"
Expand Down
89 changes: 47 additions & 42 deletions massa-bootstrap/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use crate::{
/// This function will send the starting point to receive a stream of the ledger and will receive and process each part until receive a `BootstrapServerMessage::FinalStateFinished` message from the server.
/// `next_bootstrap_message` passed as parameter must be `BootstrapClientMessage::AskFinalStatePart` enum variant.
/// `next_bootstrap_message` will be updated after receiving each part so that in case of connection lost we can restart from the last message we processed.
async fn stream_final_state(
async fn stream_final_state_and_consensus(
cfg: &BootstrapConfig,
client: &mut BootstrapClientBinder,
next_bootstrap_message: &mut BootstrapClientMessage,
global_bootstrap_state: &mut GlobalBootstrapState,
) -> Result<(), BootstrapError> {
if let BootstrapClientMessage::AskFinalStatePart { .. } = &next_bootstrap_message {
if let BootstrapClientMessage::AskBootstrapPart { .. } = &next_bootstrap_message {
match tokio::time::timeout(
cfg.write_timeout.into(),
client.send(next_bootstrap_message),
Expand Down Expand Up @@ -57,15 +57,18 @@ async fn stream_final_state(
Ok(Ok(msg)) => msg,
};
match msg {
BootstrapServerMessage::FinalStatePart {
BootstrapServerMessage::BootstrapPart {
slot,
ledger_part,
async_pool_part,
pos_cycle_part,
pos_credits_part,
exec_ops_part,
final_state_changes,
consensus_part,
consensus_outdated_ids,
} => {
// Set final state
let mut write_final_state = global_bootstrap_state.final_state.write();
let last_ledger_step = write_final_state.ledger.set_ledger_part(ledger_part)?;
let last_pool_step =
Expand Down Expand Up @@ -100,15 +103,42 @@ async fn stream_final_state(
}
}
write_final_state.slot = slot;

// Set consensus blocks
if let Some(graph) = global_bootstrap_state.graph.as_mut() {
// Extend the final blocks with the received part
graph.final_blocks.extend(consensus_part.final_blocks);
// Remove every outdated block
graph.final_blocks.retain(|block_export| {
!consensus_outdated_ids.contains(&block_export.block.id)
});
} else {
global_bootstrap_state.graph = Some(consensus_part);
}
let last_consensus_step = StreamingStep::Ongoing(
// Note that this unwrap call is safe because of the above conditional statement
global_bootstrap_state
.graph
.as_ref()
.unwrap()
.final_blocks
.iter()
.map(|b_export| b_export.block.id)
.collect(),
);

// Set new message in case of disconnection
*next_bootstrap_message = BootstrapClientMessage::AskFinalStatePart {
*next_bootstrap_message = BootstrapClientMessage::AskBootstrapPart {
last_slot: Some(slot),
last_ledger_step,
last_pool_step,
last_cycle_step,
last_credits_step,
last_ops_step,
last_consensus_step,
};

// Logs for an easier diagnostic if needed
debug!(
AurelienFT marked this conversation as resolved.
Show resolved Hide resolved
"client final state bootstrap cursors: {:?}",
next_bootstrap_message
Expand All @@ -118,21 +148,22 @@ async fn stream_final_state(
final_state_changes.len()
);
}
BootstrapServerMessage::FinalStateFinished => {
BootstrapServerMessage::BootstrapFinished => {
info!("State bootstrap complete");
// Set next bootstrap message
*next_bootstrap_message = BootstrapClientMessage::AskBootstrapPeers;
return Ok(());
}
BootstrapServerMessage::SlotTooOld => {
info!("Slot is too old retry bootstrap from scratch");
*next_bootstrap_message = BootstrapClientMessage::AskFinalStatePart {
*next_bootstrap_message = BootstrapClientMessage::AskBootstrapPart {
last_slot: None,
last_ledger_step: StreamingStep::Started,
last_pool_step: StreamingStep::Started,
last_cycle_step: StreamingStep::Started,
last_credits_step: StreamingStep::Started,
last_ops_step: StreamingStep::Started,
last_consensus_step: StreamingStep::Started,
};
panic!("Bootstrap failed, try to bootstrap again.");
}
Expand Down Expand Up @@ -268,9 +299,14 @@ async fn bootstrap_from_server(
// Loop to ask data to the server depending on the last message we sent
loop {
match next_bootstrap_message {
BootstrapClientMessage::AskFinalStatePart { .. } => {
stream_final_state(cfg, client, next_bootstrap_message, global_bootstrap_state)
.await?;
BootstrapClientMessage::AskBootstrapPart { .. } => {
stream_final_state_and_consensus(
cfg,
client,
next_bootstrap_message,
global_bootstrap_state,
)
.await?;
}
BootstrapClientMessage::AskBootstrapPeers => {
let peers = match send_client_message(
Expand All @@ -289,36 +325,9 @@ async fn bootstrap_from_server(
other => return Err(BootstrapError::UnexpectedServerMessage(other)),
};
global_bootstrap_state.peers = Some(peers);
*next_bootstrap_message = BootstrapClientMessage::AskConsensusState;
}
BootstrapClientMessage::AskConsensusState => {
let state = match send_client_message(
next_bootstrap_message,
client,
write_timeout,
cfg.read_timeout.into(),
"ask consensus state timed out",
)
.await?
{
BootstrapServerMessage::ConsensusState { graph } => graph,
BootstrapServerMessage::BootstrapError { error } => {
return Err(BootstrapError::ReceivedError(error))
}
other => return Err(BootstrapError::UnexpectedServerMessage(other)),
};
global_bootstrap_state.graph = Some(state);
*next_bootstrap_message = BootstrapClientMessage::BootstrapSuccess;
}
BootstrapClientMessage::BootstrapSuccess => {
if global_bootstrap_state.graph.is_none() {
*next_bootstrap_message = BootstrapClientMessage::AskConsensusState;
continue;
}
if global_bootstrap_state.peers.is_none() {
*next_bootstrap_message = BootstrapClientMessage::AskBootstrapPeers;
continue;
}
match tokio::time::timeout(write_timeout, client.send(next_bootstrap_message)).await
{
Err(_) => Err(std::io::Error::new(
Expand Down Expand Up @@ -389,12 +398,7 @@ async fn connect_to_server(
bootstrap_config.max_async_pool_changes,
bootstrap_config.max_async_pool_length,
bootstrap_config.max_async_message_data,
bootstrap_config.max_function_name_length,
bootstrap_config.max_parameters_size,
bootstrap_config.max_ledger_changes_count,
bootstrap_config.max_op_datastore_entry_count,
bootstrap_config.max_op_datastore_key_length,
bootstrap_config.max_op_datastore_value_length,
bootstrap_config.max_changes_slot_count,
bootstrap_config.max_rolls_length,
bootstrap_config.max_production_stats_length,
Expand Down Expand Up @@ -444,13 +448,14 @@ pub async fn get_state(
let mut shuffled_list = bootstrap_config.bootstrap_list.clone();
shuffled_list.shuffle(&mut StdRng::from_entropy());
let mut next_bootstrap_message: BootstrapClientMessage =
BootstrapClientMessage::AskFinalStatePart {
BootstrapClientMessage::AskBootstrapPart {
last_slot: None,
last_ledger_step: StreamingStep::Started,
last_pool_step: StreamingStep::Started,
last_cycle_step: StreamingStep::Started,
last_credits_step: StreamingStep::Started,
last_ops_step: StreamingStep::Started,
last_consensus_step: StreamingStep::Started,
};
let mut global_bootstrap_state = GlobalBootstrapState::new(final_state.clone());
loop {
Expand Down
20 changes: 0 additions & 20 deletions massa-bootstrap/src/client_binder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ pub struct BootstrapClientBinder {
max_async_pool_changes: u64,
max_async_pool_length: u64,
max_async_message_data: u64,
max_function_name_length: u16,
max_parameters_size: u32,
max_ledger_changes_count: u64,
max_op_datastore_entry_count: u64,
max_op_datastore_key_length: u8,
max_op_datastore_value_length: u64,
max_changes_slot_count: u64,
max_rolls_length: u64,
max_production_stats_length: u64,
Expand Down Expand Up @@ -79,12 +74,7 @@ impl BootstrapClientBinder {
max_async_pool_changes: u64,
max_async_pool_length: u64,
max_async_message_data: u64,
max_function_name_length: u16,
max_parameters_size: u32,
max_ledger_changes_count: u64,
max_op_datastore_entry_count: u64,
max_op_datastore_key_length: u8,
max_op_datastore_value_length: u64,
max_changes_slot_count: u64,
max_rolls_length: u64,
max_production_stats_length: u64,
Expand Down Expand Up @@ -114,12 +104,7 @@ impl BootstrapClientBinder {
max_async_pool_changes,
max_async_pool_length,
max_async_message_data,
max_function_name_length,
max_parameters_size,
max_ledger_changes_count,
max_op_datastore_entry_count,
max_op_datastore_key_length,
max_op_datastore_value_length,
max_changes_slot_count,
max_rolls_length,
max_production_stats_length,
Expand Down Expand Up @@ -183,12 +168,7 @@ impl BootstrapClientBinder {
self.max_datastore_key_length,
self.max_datastore_value_length,
self.max_datastore_entry_count,
self.max_function_name_length,
self.max_parameters_size,
self.max_bootstrap_error_length,
self.max_op_datastore_entry_count,
self.max_op_datastore_key_length,
self.max_op_datastore_value_length,
self.max_changes_slot_count,
self.max_rolls_length,
self.max_production_stats_length,
Expand Down
17 changes: 7 additions & 10 deletions massa-bootstrap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
#![feature(ip)]
#![feature(let_chains)]

#[macro_use]
extern crate fix_hidden_lifetime_bug;

pub use establisher::types::Establisher;
use massa_consensus_exports::bootstrapable_graph::BootstrapableGraph;
use massa_final_state::FinalState;
Expand Down Expand Up @@ -46,26 +43,26 @@ pub mod tests;

/// a collection of the bootstrap state snapshots of all relevant modules
pub struct GlobalBootstrapState {
/// state of the final state
pub final_state: Arc<RwLock<FinalState>>,

/// state of the consensus graph
pub graph: Option<BootstrapableGraph>,

/// timestamp correction in milliseconds
pub compensation_millis: i64,

/// list of network peers
pub peers: Option<BootstrapPeers>,

/// state of the final state
pub final_state: Arc<RwLock<FinalState>>,
/// timestamp correction in milliseconds
pub compensation_millis: i64,
}

impl GlobalBootstrapState {
fn new(final_state: Arc<RwLock<FinalState>>) -> Self {
Self {
final_state,
graph: None,
compensation_millis: Default::default(),
peers: None,
final_state,
compensation_millis: Default::default(),
}
}
}
Loading