Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexes Payload store by view number #2449

Merged
merged 6 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use hotshot_task_impls::{events::HotShotEvent, network::NetworkTaskKind};
use hotshot_types::traits::node_implementation::ChannelMaps;

use hotshot_types::{
consensus::{Consensus, ConsensusMetricsValue, PayloadStore, View, ViewInner, ViewQueue},
consensus::{Consensus, ConsensusMetricsValue, View, ViewInner, ViewQueue},
data::Leaf,
error::StorageSnafu,
event::EventType,
Expand Down Expand Up @@ -217,19 +217,19 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
);

let mut saved_leaves = HashMap::new();
let mut saved_payloads = PayloadStore::default();
let mut saved_payloads = BTreeMap::new();
saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
let payload_commitment = anchored_leaf.get_payload_commitment();
if let Some(payload) = anchored_leaf.get_block_payload() {
let encoded_txns = match payload.encode() {
let encoded_txns: Vec<u8> = match payload.encode() {
// TODO (Keyao) [VALIDATED_STATE] - Avoid collect/copy on the encoded transaction bytes.
// <https://github.com/EspressoSystems/HotShot/issues/2115>
Ok(encoded) => encoded.into_iter().collect(),
Err(e) => {
return Err(HotShotError::BlockError { source: e });
}
};
saved_payloads.insert(payload_commitment, encoded_txns);
saved_payloads.insert(anchored_leaf.get_view_number(), encoded_txns.clone());
saved_payloads.insert(TYPES::Time::new(1), encoded_txns);
}

let start_view = anchored_leaf.get_view_number();
Expand Down
5 changes: 2 additions & 3 deletions crates/task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
let liveness_check = justify_qc.get_view_number() > consensus.locked_view;

let high_qc = consensus.high_qc.clone();
let locked_view = consensus.locked_view;

let locked_view = consensus.locked_view;

drop(consensus);

Expand Down Expand Up @@ -702,7 +701,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
// If the block payload is available for this leaf, include it in
// the leaf chain that we send to the client.
if let Some(encoded_txns) =
consensus.saved_payloads.get(leaf.get_payload_commitment())
consensus.saved_payloads.get(&leaf.get_view_number())
{
let payload = BlockPayload::from_bytes(
encoded_txns.clone().into_iter(),
Expand Down
2 changes: 1 addition & 1 deletion crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
// Record the payload we have promised to make available.
consensus
.saved_payloads
.insert(payload_commitment, proposal.data.encoded_transactions);
.insert(view, proposal.data.encoded_transactions);
}
HotShotEvent::DAVoteRecv(ref vote) => {
debug!("DA vote recv, Main Task {:?}", vote.get_view_number());
Expand Down
8 changes: 4 additions & 4 deletions crates/testing/tests/unreliable_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn libp2p_network_sync() {
.gen_launcher::<TestTypes, Libp2pImpl>(0)
.launch()
.run_test()
.await
.await;
}

#[cfg(test)]
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn libp2p_network_async() {
.gen_launcher::<TestTypes, Libp2pImpl>(0)
.launch()
.run_test()
.await
.await;
}

#[cfg(test)]
Expand Down Expand Up @@ -260,7 +260,7 @@ async fn libp2p_network_partially_sync() {
.gen_launcher::<TestTypes, Libp2pImpl>(0)
.launch()
.run_test()
.await
.await;
}

#[cfg(test)]
Expand Down Expand Up @@ -339,5 +339,5 @@ async fn libp2p_network_chaos() {
.gen_launcher::<TestTypes, Libp2pImpl>(0)
.launch()
.run_test()
.await
.await;
}
76 changes: 7 additions & 69 deletions crates/types/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use crate::{
use displaydoc::Display;

use crate::{
data::{Leaf, VidCommitment},
data::Leaf,
error::HotShotError,
simple_certificate::{DACertificate, QuorumCertificate},
traits::{
Expand All @@ -17,9 +17,9 @@ use crate::{
utils::Terminator,
};
use commit::Commitment;
use derivative::Derivative;

use std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
collections::{BTreeMap, HashMap},
sync::{Arc, Mutex},
};
use tracing::error;
Expand Down Expand Up @@ -53,9 +53,8 @@ pub struct Consensus<TYPES: NodeType> {

/// Saved payloads.
///
/// Contains the block payload commitment and encoded transactions for every leaf in
/// `saved_leaves` if that payload is available.
pub saved_payloads: PayloadStore,
/// Encoded transactions for every view if we got a payload for that view.
pub saved_payloads: BTreeMap<TYPES::Time, Vec<u8>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the other PR. Let's update the comment.


/// The `locked_qc` view number
pub locked_view: TYPES::Time,
Expand Down Expand Up @@ -316,21 +315,14 @@ impl<TYPES: NodeType> Consensus<TYPES> {
// perform gc
self.saved_da_certs
.retain(|view_number, _| *view_number >= old_anchor_view);
self.state_map
.range(old_anchor_view..new_anchor_view)
.filter_map(|(_view_number, view)| view.get_payload_commitment())
.for_each(|payload_commitment| {
self.saved_payloads.remove(payload_commitment);
});
self.state_map
.range(old_anchor_view..new_anchor_view)
.filter_map(|(_view_number, view)| view.get_leaf_commitment())
.for_each(|leaf| {
if let Some(removed) = self.saved_leaves.remove(&leaf) {
self.saved_payloads.remove(removed.get_payload_commitment());
}
self.saved_leaves.remove(&leaf);
});
self.state_map = self.state_map.split_off(&new_anchor_view);
self.saved_payloads = self.saved_payloads.split_off(&new_anchor_view);
}

/// Gets the last decided state
Expand All @@ -347,57 +339,3 @@ impl<TYPES: NodeType> Consensus<TYPES> {
self.saved_leaves.get(&leaf).unwrap().clone()
}
}

/// Mapping from block payload commitments to the encoded transactions.
///
/// Entries in this mapping are reference-counted, so multiple consensus objects can refer to the
/// same block, and the block will only be deleted after _all_ such objects are garbage collected.
/// For example, multiple leaves may temporarily reference the same block on different branches,
/// before all but one branch are ultimately garbage collected.
#[derive(Clone, Debug, Derivative)]
#[derivative(Default(bound = ""))]
pub struct PayloadStore(HashMap<VidCommitment, (Vec<u8>, u64)>);

impl PayloadStore {
/// Save the encoded transactions for later retrieval.
///
/// After calling this function, and before the corresponding call to [`remove`](Self::remove),
/// `self.get(payload_commitment)` will return `Some(encoded_transactions)`.
///
/// This function will increment a reference count on the saved payload commitment, so that
/// multiple calls to [`insert`](Self::insert) for the same payload commitment result in
/// multiple owning references to the payload commitment. [`remove`](Self::remove) must be
/// called once for each reference before the payload commitment will be deallocated.
pub fn insert(&mut self, payload_commitment: VidCommitment, encoded_transactions: Vec<u8>) {
self.0
.entry(payload_commitment)
.and_modify(|(_, refcount)| *refcount += 1)
.or_insert((encoded_transactions, 1));
}

/// Get the saved encoded transactions, if available.
///
/// If the encoded transactions has been saved with [`insert`](Self::insert), this function
/// will retrieve it. It may return [`None`] if a block with the given commitment has not been
/// saved or if the block has been dropped with [`remove`](Self::remove).
#[must_use]
pub fn get(&self, payload_commitment: VidCommitment) -> Option<&Vec<u8>> {
self.0.get(&payload_commitment).map(|(encoded, _)| encoded)
}

/// Drop a reference to the saved encoded transactions.
///
/// If the set exists and this call drops the last reference to it, the set will be returned,
/// Otherwise, the return value is [`None`].
pub fn remove(&mut self, payload_commitment: VidCommitment) -> Option<Vec<u8>> {
if let Entry::Occupied(mut e) = self.0.entry(payload_commitment) {
let (_, refcount) = e.get_mut();
*refcount -= 1;
if *refcount == 0 {
let (encoded, _) = e.remove();
return Some(encoded);
}
}
None
}
}
Loading