Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consensus] support committee with 1 node #20530

Merged
merged 3 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 101 additions & 1 deletion consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ where
block_manager,
// For streaming RPC, Core will be notified when consumer is available.
// For non-streaming RPC, there is no way to know so default to true.
!N::Client::SUPPORT_STREAMING,
// When there is only one (this) authority, assume subscriber exists.
!N::Client::SUPPORT_STREAMING || context.committee.size() == 1,
commit_observer,
core_signals,
protocol_keypair,
Expand Down Expand Up @@ -573,6 +574,105 @@ mod tests {
}
}

#[rstest]
#[tokio::test(flavor = "current_thread")]
async fn test_small_committee(
#[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
#[values(1, 2, 3)] num_authorities: usize,
) {
let db_registry = Registry::new();
DBMetrics::init(&db_registry);

let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]);
let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE();

let temp_dirs = (0..num_authorities)
.map(|_| TempDir::new().unwrap())
.collect::<Vec<_>>();

let mut output_receivers = Vec::with_capacity(committee.size());
let mut authorities: Vec<ConsensusAuthority> = Vec::with_capacity(committee.size());
let mut boot_counters = vec![0; num_authorities];

for (index, _authority_info) in committee.authorities() {
let (authority, receiver) = make_authority(
index,
&temp_dirs[index.value()],
committee.clone(),
keypairs.clone(),
network_type,
boot_counters[index],
protocol_config.clone(),
)
.await;
boot_counters[index] += 1;
output_receivers.push(receiver);
authorities.push(authority);
}

const NUM_TRANSACTIONS: u8 = 15;
let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
for i in 0..NUM_TRANSACTIONS {
let txn = vec![i; 16];
submitted_transactions.insert(txn.clone());
authorities[i as usize % authorities.len()]
.transaction_client()
.submit(vec![txn])
.await
.unwrap();
}

for receiver in &mut output_receivers {
let mut expected_transactions = submitted_transactions.clone();
loop {
let committed_subdag =
tokio::time::timeout(Duration::from_secs(1), receiver.recv())
.await
.unwrap()
.unwrap();
for b in committed_subdag.blocks {
for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
assert!(
expected_transactions.remove(&txn),
"Transaction not submitted or already seen: {:?}",
txn
);
}
}
assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
if expected_transactions.is_empty() {
break;
}
}
}

// Stop authority 0.
let index = committee.to_authority_index(0).unwrap();
authorities.remove(index.value()).stop().await;
sleep(Duration::from_secs(10)).await;

// Restart authority 0 and let it run.
let (authority, receiver) = make_authority(
index,
&temp_dirs[index.value()],
committee.clone(),
keypairs.clone(),
network_type,
boot_counters[index],
protocol_config.clone(),
)
.await;
boot_counters[index] += 1;
output_receivers[index] = receiver;
authorities.insert(index.value(), authority);
sleep(Duration::from_secs(10)).await;

// Stop all authorities and exit.
for authority in authorities {
authority.stop().await;
}
}

#[rstest]
#[tokio::test(flavor = "current_thread")]
async fn test_amnesia_recovery_success(
Expand Down
20 changes: 4 additions & 16 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,7 @@ impl Core {
.with_pipeline(true)
.build();

// Recover the last proposed block
let last_proposed_block = dag_state
.read()
.get_last_block_for_authority(context.own_index);
let last_proposed_block = dag_state.read().get_last_proposed_block();

// Recover the last included ancestor rounds based on the last proposed block. That will allow
// to perform the next block proposal by using ancestor blocks of higher rounds and avoid
Expand Down Expand Up @@ -207,10 +204,6 @@ impl Core {
"Waiting for {} ms while recovering ancestors from storage",
wait_ms
);
println!(
"Waiting for {} ms while recovering ancestors from storage",
wait_ms
);
std::thread::sleep(Duration::from_millis(wait_ms));
}
// Recover the last available quorum to correctly advance the threshold clock.
Expand All @@ -223,13 +216,10 @@ impl Core {
{
last_proposed_block
} else {
let last_proposed_block = self
.dag_state
.read()
.get_last_block_for_authority(self.context.own_index);
let last_proposed_block = self.dag_state.read().get_last_proposed_block();

if self.should_propose() {
assert!(last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher that genesis should have been produced during recovery");
assert!(last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher than genesis should have been produced during recovery");
}

// if no new block proposed then just re-broadcast the last proposed one to ensure liveness.
Expand Down Expand Up @@ -1001,9 +991,7 @@ impl Core {
}

fn last_proposed_block(&self) -> VerifiedBlock {
self.dag_state
.read()
.get_last_block_for_authority(self.context.own_index)
self.dag_state.read().get_last_proposed_block()
}
}

Expand Down
15 changes: 8 additions & 7 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,13 @@ impl DagState {
blocks.first().cloned().unwrap()
}

/// Retrieves the last block proposed for the specified `authority`. If no block is found in cache
/// Gets the last proposed block from this authority.
/// If no block is proposed yet, returns the genesis block.
pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
self.get_last_block_for_authority(self.context.own_index)
}

/// Retrieves the last accepted block from the specified `authority`. If no block is found in cache
/// then the genesis block is returned as no other block has been received from that authority.
pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock {
if let Some(last) = self.recent_refs_by_authority[authority].last() {
Expand Down Expand Up @@ -2209,12 +2215,7 @@ mod test {
.find(|block| block.author() == context.own_index)
.unwrap();

assert_eq!(
dag_state
.read()
.get_last_block_for_authority(context.own_index),
my_genesis
);
assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis);
}

// WHEN adding some blocks for authorities, only the last ones should be returned
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/round_prober.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl<C: NetworkClient> RoundProber<C> {
.collect::<Vec<_>>();
let last_proposed_round = local_highest_accepted_rounds[own_index];

// For our own index, the highest recieved & accepted round is our last
// For our own index, the highest received & accepted round is our last
// accepted round or our last proposed round.
highest_received_rounds[own_index] = self.core_thread_dispatcher.highest_received_rounds();
highest_accepted_rounds[own_index] = local_highest_accepted_rounds;
Expand Down
26 changes: 16 additions & 10 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
const MAX_RETRY_DELAY_STEP: Duration = Duration::from_millis(4_000);

let context = self.context.clone();
let dag_state = self.dag_state.clone();
let network_client = self.network_client.clone();
let block_verifier = self.block_verifier.clone();
let core_dispatcher = self.core_dispatcher.clone();
Expand Down Expand Up @@ -753,12 +754,17 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
};

// Get the highest of all the results. Retry until at least `f+1` results have been gathered.
let mut total_stake;
let mut highest_round;
let mut retries = 0;
let mut retry_delay_step = Duration::from_millis(500);
'main:loop {
total_stake = 0;
if context.committee.size() == 1 {
highest_round = dag_state.read().get_last_proposed_block().round();
info!("Only one node in the network, will not try fetching own last block from peers.");
break 'main;
}

let mut total_stake = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fix as well moving the total_stake here.

highest_round = 0;

// Ask all the other peers about our last block
Expand Down Expand Up @@ -811,16 +817,16 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
if context.committee.reached_validity(total_stake) {
info!("{} out of {} total stake returned acceptable results for our own last block with highest round {}, with {retries} retries.", total_stake, context.committee.total_stake(), highest_round);
break 'main;
} else {
retries += 1;
context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);
}

sleep(retry_delay_step).await;
retries += 1;
context.metrics.node_metrics.sync_last_known_own_block_retries.inc();
warn!("Not enough stake: {} out of {} total stake returned acceptable results for our own last block with highest round {}. Will now retry {retries}.", total_stake, context.committee.total_stake(), highest_round);

retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
}
sleep(retry_delay_step).await;

retry_delay_step = Duration::from_secs_f64(retry_delay_step.as_secs_f64() * 1.5);
retry_delay_step = retry_delay_step.min(MAX_RETRY_DELAY_STEP);
}

// Update the Core with the highest detected round
Expand Down
14 changes: 9 additions & 5 deletions crates/sui-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod test {
LocalValidatorAggregatorProxy, ValidatorProxy,
};
use sui_config::node::AuthorityOverloadConfig;
use sui_config::{ExecutionCacheConfig, AUTHORITIES_DB_NAME, SUI_KEYSTORE_FILENAME};
use sui_config::{AUTHORITIES_DB_NAME, SUI_KEYSTORE_FILENAME};
use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
use sui_core::authority::framework_injection;
use sui_core::authority::AuthorityState;
Expand Down Expand Up @@ -173,13 +173,10 @@ mod test {
test_cluster.wait_for_epoch_all_nodes(1).await;
}

#[ignore("Disabled due to flakiness - re-enable when failure is fixed")]
#[sim_test(config = "test_config()")]
async fn test_simulated_load_reconfig_restarts() {
// TODO added to invalidate a failing test seed in CI. Remove me
tokio::time::sleep(Duration::from_secs(1)).await;
sui_protocol_config::ProtocolConfig::poison_get_for_min_version();
let test_cluster = build_test_cluster(4, 1000, 1).await;
let test_cluster = build_test_cluster(4, 5_000, 1).await;
let node_restarter = test_cluster
.random_node_restarter()
.with_kill_interval_secs(5, 15)
Expand All @@ -188,6 +185,13 @@ mod test {
test_simulated_load(test_cluster, 120).await;
}

#[sim_test(config = "test_config()")]
async fn test_simulated_load_small_committee_reconfig() {
sui_protocol_config::ProtocolConfig::poison_get_for_min_version();
let test_cluster = build_test_cluster(1, 5_000, 0).await;
test_simulated_load(test_cluster, 120).await;
}

/// Get a list of nodes that we don't want to kill in the crash recovery tests.
/// This includes the client node which is the node that is running the test, as well as
/// rpc fullnode which are needed to run the benchmark.
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub fn checkpoint_service_for_testing(state: Arc<AuthorityState>) -> Arc<Checkpo
async fn test_mysticeti_manager() {
// GIVEN
let configs = ConfigBuilder::new_with_temp_dir()
.committee_size(1.try_into().unwrap())
.committee_size(4.try_into().unwrap())
.build();

let config = &configs.validator_configs()[0];
Expand Down
Loading