Skip to content

Commit

Permalink
Persist Kademlia cache to disk (#1773)
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Sep 26, 2023
1 parent f6c36b9 commit 63737cd
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 234 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
/target_dirs
/.vscode/settings.json
**/.DS_Store
*.cache
47 changes: 11 additions & 36 deletions crates/hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
use super::NetworkingMetrics;
use crate::NodeImplementation;
use async_compatibility_layer::{
art::{async_block_on, async_sleep, async_spawn, async_timeout},
art::{async_block_on, async_sleep, async_spawn},
channel::{unbounded, UnboundedReceiver, UnboundedSendError, UnboundedSender},
};
use async_lock::RwLock;
use async_trait::async_trait;
use bimap::BiHashMap;
use bincode::Options;
use hotshot_constants::{KAD_DEFAULT_REPUB_INTERVAL_SEC, LOOK_AHEAD};
use hotshot_constants::LOOK_AHEAD;
use hotshot_task::{boxed_sync, BoxSyncFuture};
use hotshot_types::{
data::ViewNumber,
Expand Down Expand Up @@ -82,7 +82,7 @@ struct Libp2pNetworkInner<M: NetworkMsg, K: SignatureKey + 'static> {
/// this node's public key
pk: K,
/// handle to control the network
handle: Arc<NetworkNodeHandle<(), K>>,
handle: Arc<NetworkNodeHandle<()>>,
/// map of known replica peer ids to public keys
broadcast_recv: UnboundedReceiver<M>,
/// Sender for broadcast messages
Expand All @@ -93,8 +93,6 @@ struct Libp2pNetworkInner<M: NetworkMsg, K: SignatureKey + 'static> {
direct_recv: UnboundedReceiver<M>,
/// Sender for node lookup (relevant view number, key of node) (None for shutdown)
node_lookup_send: UnboundedSender<Option<(ViewNumber, K)>>,
/// Sender for shutdown of the peer cache's garbage collection task
cache_gc_shutdown_send: UnboundedSender<()>,
/// this is really cheating to enable local tests
/// hashset of (bootstrap_addr, peer_id)
bootstrap_addrs: PeerInfoVec,
Expand Down Expand Up @@ -200,6 +198,8 @@ where
// setting to sane defaults
.ttl(None)
.republication_interval(None)
// this removes the cache for tests
.dht_cache_location(None)
.build()
.unwrap()
} else {
Expand All @@ -220,6 +220,8 @@ where
// setting to sane defaults
.ttl(None)
.republication_interval(None)
// this removes the cache for tests
.dht_cache_location(None)
.build()
.unwrap()
};
Expand Down Expand Up @@ -287,7 +289,7 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
) -> Result<Libp2pNetwork<M, K>, NetworkError> {
assert!(bootstrap_addrs_len > 4, "Need at least 5 bootstrap nodes");
let network_handle = Arc::new(
NetworkNodeHandle::<(), K>::new(config, id)
Box::pin(NetworkNodeHandle::<()>::new(config, id))
.await
.map_err(Into::<NetworkError>::into)?,
);
Expand Down Expand Up @@ -318,7 +320,6 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
let (direct_send, direct_recv) = unbounded();
let (broadcast_send, broadcast_recv) = unbounded();
let (node_lookup_send, node_lookup_recv) = unbounded();
let (cache_gc_shutdown_send, cache_gc_shutdown_recv) = unbounded::<()>();

let result = Libp2pNetwork {
inner: Arc::new(Libp2pNetworkInner {
Expand All @@ -336,7 +337,6 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
metrics: NetworkingMetrics::new(&*metrics),
topic_map,
node_lookup_send,
cache_gc_shutdown_send,
// Start the latest view from 0. "Latest" refers to "most recent view we are polling for
// proposals on". We need this because to have consensus info injected we need a working
// network already. In the worst case, we send a few lookups we don't need.
Expand All @@ -345,20 +345,15 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
};

result.spawn_event_generator(direct_send, broadcast_send);
result.spawn_node_lookup(node_lookup_recv, cache_gc_shutdown_recv);
result.spawn_node_lookup(node_lookup_recv);
result.spawn_connect(id);

Ok(result)
}

/// Spawns task for looking up nodes pre-emptively
/// as well as garbage collecting the peer cache
#[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
fn spawn_node_lookup(
&self,
node_lookup_recv: UnboundedReceiver<Option<(ViewNumber, K)>>,
cache_gc_shutdown_send: UnboundedReceiver<()>,
) {
fn spawn_node_lookup(&self, node_lookup_recv: UnboundedReceiver<Option<(ViewNumber, K)>>) {
let handle = self.inner.handle.clone();
let dht_timeout = self.inner.dht_timeout;
let latest_seen_view = self.inner.latest_seen_view.clone();
Expand All @@ -375,32 +370,13 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {

// only run if we are not too close to the next view number
if latest_seen_view.load(Ordering::Relaxed) + THRESHOLD <= *view_number {
// look up node, caching if applicable
// look up
if let Err(err) = handle_.lookup_node::<K>(pk.clone(), dht_timeout).await {
error!("Failed to perform lookup for key {:?}: {}", pk, err);
};
}
}
});

// deals with garbage collecting the lookup queue
let handle_ = handle.clone();
async_spawn(async move {
loop {
let ttl = handle_
.config()
.ttl
.unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC * 8));
if async_timeout(ttl, cache_gc_shutdown_send.recv())
.await
.is_err()
{
handle_.prune_peer_cache().await;
} else {
break;
}
}
});
}

/// Initiates connection to the outside world
Expand Down Expand Up @@ -573,7 +549,6 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> ConnectedNetwork<M, K> for Libp2p
{
let closure = async move {
self.inner.node_lookup_send.send(None).await.unwrap();
self.inner.cache_gc_shutdown_send.send(()).await.unwrap();
if self.inner.handle.is_killed() {
error!("Called shut down when already shut down! Noop.");
} else {
Expand Down
Loading

0 comments on commit 63737cd

Please sign in to comment.