Skip to content

Commit

Permalink
[reconfig] Add enable_reconfig flag to checkpoint state
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Oct 12, 2022
1 parent a14db1a commit 75d8223
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 67 deletions.
1 change: 1 addition & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,7 @@ impl AuthorityState {
&genesis_committee,
secret.public().into(),
secret.clone(),
false,
)
.expect("Should not fail to open local checkpoint DB");
if let Some(consensus_sender) = consensus_sender {
Expand Down
12 changes: 3 additions & 9 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,25 +406,19 @@ where
pub async fn spawn_checkpoint_process(
self: Arc<Self>,
metrics: CheckpointMetrics,
enable_reconfig: bool,
) -> JoinHandle<()> {
self.spawn_checkpoint_process_with_config(
CheckpointProcessControl::default(),
metrics,
enable_reconfig,
)
.await
self.spawn_checkpoint_process_with_config(CheckpointProcessControl::default(), metrics)
.await
}

pub async fn spawn_checkpoint_process_with_config(
self: Arc<Self>,
checkpoint_process_control: CheckpointProcessControl,
metrics: CheckpointMetrics,
enable_reconfig: bool,
) -> JoinHandle<()> {
// Spawn task to take care of checkpointing
tokio::task::spawn(async move {
checkpoint_process(self, &checkpoint_process_control, metrics, enable_reconfig).await;
checkpoint_process(self, &checkpoint_process_control, metrics).await;
})
}
}
35 changes: 13 additions & 22 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ pub async fn checkpoint_process<A>(
active_authority: Arc<ActiveAuthority<A>>,
timing: &CheckpointProcessControl,
metrics: CheckpointMetrics,
enable_reconfig: bool,
) where
A: AuthorityAPI + Send + Sync + 'static + Clone + Reconfigurable,
{
Expand All @@ -167,8 +166,7 @@ pub async fn checkpoint_process<A>(
let mut last_cert_time = Instant::now();

loop {
let result =
checkpoint_process_step(active_authority.clone(), timing, enable_reconfig).await;
let result = checkpoint_process_step(active_authority.clone(), timing).await;
let state_checkpoints = &active_authority.state.checkpoints;
let next_cp_seq = state_checkpoints.lock().next_checkpoint();
match result {
Expand All @@ -189,22 +187,17 @@ pub async fn checkpoint_process<A>(
.checkpoint_sequence_number
.set((next_cp_seq - 1) as i64);
last_cert_time = Instant::now();
if enable_reconfig {
if state_checkpoints.lock().is_ready_to_start_epoch_change() {
while let Err(err) = active_authority.start_epoch_change().await {
error!(?next_cp_seq, "Failed to start epoch change: {:?}", err);
tokio::time::sleep(timing.epoch_change_retry_delay).await;
}
// No delay to minimize the reconfiguration latency.
continue;
} else if state_checkpoints.lock().is_ready_to_finish_epoch_change() {
while let Err(err) = active_authority.finish_epoch_change().await {
error!(
?next_cp_seq,
"Failed to finish epoch change: {:?}", err
);
tokio::time::sleep(timing.epoch_change_retry_delay).await;
}
if state_checkpoints.lock().is_ready_to_start_epoch_change() {
while let Err(err) = active_authority.start_epoch_change().await {
error!(?next_cp_seq, "Failed to start epoch change: {:?}", err);
tokio::time::sleep(timing.epoch_change_retry_delay).await;
}
// No delay to minimize the reconfiguration latency.
continue;
} else if state_checkpoints.lock().is_ready_to_finish_epoch_change() {
while let Err(err) = active_authority.finish_epoch_change().await {
error!(?next_cp_seq, "Failed to finish epoch change: {:?}", err);
tokio::time::sleep(timing.epoch_change_retry_delay).await;
}
}
tokio::time::sleep(timing.long_pause_between_checkpoints).await;
Expand Down Expand Up @@ -271,7 +264,6 @@ pub async fn checkpoint_process<A>(
pub async fn checkpoint_process_step<A>(
active_authority: Arc<ActiveAuthority<A>>,
timing: &CheckpointProcessControl,
enable_reconfig: bool,
) -> Result<CheckpointStepResult, CheckpointStepError>
where
A: AuthorityAPI + Send + Sync + 'static + Clone + Reconfigurable,
Expand Down Expand Up @@ -370,7 +362,6 @@ where
my_proposal.signed_summary.auth_signature.epoch,
*my_proposal.sequence_number(),
transactions,
enable_reconfig,
)
.await
.map_err(|err| CheckpointStepError::CheckpointSignBlocked(Box::new(err)))?;
Expand All @@ -383,7 +374,6 @@ pub async fn sync_and_sign_new_checkpoint<A>(
epoch: EpochId,
seq: CheckpointSequenceNumber,
transactions: BTreeSet<ExecutionDigests>,
enable_reconfig: bool,
) -> SuiResult
where
A: AuthorityAPI + Send + Sync + 'static + Clone + Reconfigurable,
Expand All @@ -409,6 +399,7 @@ where
return Err(SuiError::CheckpointingError { error });
}

let enable_reconfig = active_authority.state.checkpoints.lock().enable_reconfig;
let next_epoch_committee = if enable_reconfig {
// Ready to start epoch change means that we have finalized the last second checkpoint,
// and now we are about to finalize the last checkpoint of the epoch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn checkpoint_active_flow_happy_path() {
.unwrap(),
);
let _active_handle = active_state
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests(), false)
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await;
}

Expand Down Expand Up @@ -121,7 +121,6 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {
.spawn_checkpoint_process_with_config(
Default::default(),
CheckpointMetrics::new_for_tests(),
false,
)
.await;
});
Expand Down Expand Up @@ -213,7 +212,6 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {
.spawn_checkpoint_process_with_config(
CheckpointProcessControl::default(),
CheckpointMetrics::new_for_tests(),
false,
)
.await;
});
Expand Down Expand Up @@ -303,7 +301,6 @@ async fn test_empty_checkpoint() {
.spawn_checkpoint_process_with_config(
CheckpointProcessControl::default(),
CheckpointMetrics::new_for_tests(),
false,
)
.await;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn pending_exec_storage_notify() {
.unwrap(),
);
let _active_handle = active_state
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests(), false)
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await;
}

Expand Down Expand Up @@ -134,7 +134,7 @@ async fn pending_exec_full() {

active_state.clone().spawn_execute_process().await;
active_state
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests(), false)
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await;
});
}
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/checkpoints/causal_order_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ mod tests {
&committee,
k.public().into(),
Arc::pin(k.copy()),
false,
)
.unwrap();

Expand Down
26 changes: 21 additions & 5 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ pub struct CheckpointStore {

memory_locals: Arc<CheckpointLocals>,

/// Whether reconfiguration is enabled.
pub enable_reconfig: bool,

/// Consensus sender
sender: Option<Box<dyn ConsensusSender>>,

Expand Down Expand Up @@ -274,6 +277,7 @@ impl CheckpointStore {
current_committee: &Committee,
name: AuthorityName,
secret: StableSyncAuthoritySigner,
enable_reconfig: bool,
) -> Result<CheckpointStore, SuiError> {
let tables =
CheckpointStoreTables::open_tables_read_write(path.to_path_buf(), db_options, None);
Expand All @@ -287,6 +291,7 @@ impl CheckpointStore {
name,
secret,
memory_locals,
enable_reconfig,
sender: None,
tables,
})
Expand Down Expand Up @@ -835,27 +840,38 @@ impl CheckpointStore {

pub fn is_ready_to_start_epoch_change(&mut self) -> bool {
let next_seq = self.next_checkpoint();
next_seq % CHECKPOINT_COUNT_PER_EPOCH == 0 && next_seq != 0
self.enable_reconfig && next_seq % CHECKPOINT_COUNT_PER_EPOCH == 0 && next_seq != 0
}

pub fn is_ready_to_finish_epoch_change(&mut self) -> bool {
let next_seq = self.next_checkpoint();
next_seq % CHECKPOINT_COUNT_PER_EPOCH == 1 && next_seq != 1
self.enable_reconfig && next_seq % CHECKPOINT_COUNT_PER_EPOCH == 1 && next_seq != 1
}

/// Checks whether we should reject consensus transaction.
/// We stop accepting consensus transactions after we received the last fragment needed to
/// create the second last checkpoint of the epoch. We continue to reject consensus transactions
/// until we finish the last checkpoint.
pub fn should_reject_consensus_transaction(&mut self) -> bool {
// Never reject consensus message if reconfiguration is not enabled.
if !self.enable_reconfig {
return false;
}
let next_seq = self.next_checkpoint();
// Either we just finished constructing the second last checkpoint,
// or just finished constructing the last checkpoint.
((next_seq + 1) % CHECKPOINT_COUNT_PER_EPOCH == 0 || self.is_ready_to_start_epoch_change())
// Either we just finished constructing the second last checkpoint
if (next_seq + 1) % CHECKPOINT_COUNT_PER_EPOCH == 0
&& self
.memory_locals
.checkpoint_to_be_constructed
.is_completed()
{
return true;
}
// Or we are already in the process of constructing the last checkpoint.
if next_seq % CHECKPOINT_COUNT_PER_EPOCH == 0 && next_seq != 0 {
return true;
}
false
}

pub fn validators_already_fragmented_with(
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ fn random_ckpoint_store_num(
&committee,
k.public().into(),
Arc::pin(k.copy()),
false,
)
.unwrap();
(path, cps)
Expand Down Expand Up @@ -97,6 +98,7 @@ fn crash_recovery() {
&committee,
k.public().into(),
Arc::pin(k.copy()),
false,
)
.unwrap();

Expand Down Expand Up @@ -141,6 +143,7 @@ fn crash_recovery() {
&committee,
k.public().into(),
Arc::pin(k.copy()),
false,
)
.unwrap();

Expand Down Expand Up @@ -751,6 +754,7 @@ fn checkpoint_integration() {
&committee,
k.public().into(),
Arc::pin(k.copy()),
false,
)
.unwrap();

Expand Down
9 changes: 9 additions & 0 deletions crates/sui-core/src/epoch/tests/reconfiguration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use sui_types::{
SUI_SYSTEM_STATE_OBJECT_ID,
};

use crate::authority::AuthorityState;
use crate::checkpoints::reconstruction::SpanGraph;
use crate::{
authority::TemporaryStore,
Expand All @@ -39,6 +40,7 @@ async fn test_start_epoch_change() {
let genesis_objects = vec![object.clone(), gas_object.clone()];
// Create authority_aggregator and authority states.
let (net, states, _) = init_local_authorities(4, genesis_objects.clone()).await;
enable_reconfig(&states);
let state = states[0].clone();

// Check that we initialized the genesis epoch.
Expand Down Expand Up @@ -173,6 +175,7 @@ async fn test_finish_epoch_change() {
// Create authority_aggregator and authority states.
let genesis_objects = vec![];
let (net, states, _) = init_local_authorities(4, genesis_objects.clone()).await;
enable_reconfig(&states);
let actives: Vec<_> = states
.iter()
.map(|state| {
Expand Down Expand Up @@ -247,3 +250,9 @@ async fn test_finish_epoch_change() {
assert!(response.signed_effects.is_some());
}
}

fn enable_reconfig(states: &[Arc<AuthorityState>]) {
for state in states {
state.checkpoints.lock().enable_reconfig = true;
}
}
10 changes: 9 additions & 1 deletion crates/sui-core/src/unit_tests/batch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@ pub(crate) async fn init_state(
let (tx_reconfigure_consensus, _rx_reconfigure_consensus) = tokio::sync::mpsc::channel(10);
let committee_store = Arc::new(CommitteeStore::new(epoch_path, &committee, None));
let checkpoint_store = Arc::new(parking_lot::Mutex::new(
CheckpointStore::open(&checkpoint_path, None, &committee, name, secrete.clone()).unwrap(),
CheckpointStore::open(
&checkpoint_path,
None,
&committee,
name,
secrete.clone(),
false,
)
.unwrap(),
));
AuthorityState::new(
name,
Expand Down
6 changes: 2 additions & 4 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl SuiNode {
&committee,
config.protocol_public_key(),
secret.clone(),
config.enable_reconfig,
)?));

let index_store = if is_validator {
Expand Down Expand Up @@ -256,10 +257,7 @@ impl SuiNode {
Some(
active_authority
.clone()
.spawn_checkpoint_process(
CheckpointMetrics::new(&prometheus_registry),
config.enable_reconfig,
)
.spawn_checkpoint_process(CheckpointMetrics::new(&prometheus_registry))
.await,
)
} else {
Expand Down
Loading

0 comments on commit 75d8223

Please sign in to comment.