Skip to content

Commit

Permalink
Merge pull request #3213 from autonomys/remove-cache-index-generic
Browse files Browse the repository at this point in the history
Remove cache index generic
  • Loading branch information
nazar-pc authored Nov 6, 2024
2 parents 4452df8 + d1d5600 commit cc5a9c3
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ use std::future::{ready, Future};
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::time::{Duration, Instant};
use subspace_farmer::cluster::cache::{
ClusterCacheIdentifyBroadcast, ClusterCacheIndex, ClusterPieceCache,
};
use subspace_farmer::cluster::cache::{ClusterCacheIdentifyBroadcast, ClusterPieceCache};
use subspace_farmer::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::{PieceCache, PieceCacheId};
Expand Down Expand Up @@ -88,7 +86,7 @@ impl KnownCaches {
pub(super) async fn maintain_caches(
cache_group: &str,
nats_client: &NatsClient,
farmer_cache: FarmerCache<ClusterCacheIndex>,
farmer_cache: FarmerCache,
) -> anyhow::Result<()> {
let mut known_caches = KnownCaches::default();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ const MAX_SPACE_PLEDGED_FOR_PLOT_CACHE_ON_WINDOWS: u64 = 7 * 1024 * 1024 * 1024
const FARM_ERROR_PRINT_INTERVAL: Duration = Duration::from_secs(30);
const PLOTTING_RETRY_INTERVAL: Duration = Duration::from_secs(5);

type CacheIndex = u8;

#[derive(Debug, Parser)]
struct CpuPlottingOptions {
/// How many sectors a farmer will download concurrently. Limits memory usage of
Expand Down Expand Up @@ -424,7 +422,7 @@ where
let should_start_prometheus_server = !prometheus_listen_on.is_empty();

let (farmer_cache, farmer_cache_worker) =
FarmerCache::<CacheIndex>::new(node_client.clone(), peer_id, Some(&mut registry));
FarmerCache::new(node_client.clone(), peer_id, Some(&mut registry));

let node_client = CachingProxyNodeClient::new(node_client)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub(in super::super) struct NetworkArgs {
}

#[allow(clippy::too_many_arguments)]
pub(in super::super) fn configure_network<FarmIndex, CacheIndex, NC>(
pub(in super::super) fn configure_network<FarmIndex, NC>(
protocol_prefix: String,
base_path: &Path,
keypair: Keypair,
Expand All @@ -96,15 +96,12 @@ pub(in super::super) fn configure_network<FarmIndex, CacheIndex, NC>(
}: NetworkArgs,
weak_plotted_pieces: Weak<AsyncRwLock<PlottedPieces<FarmIndex>>>,
node_client: NC,
farmer_cache: FarmerCache<CacheIndex>,
farmer_cache: FarmerCache,
prometheus_metrics_registry: Option<&mut Registry>,
) -> Result<(Node, NodeRunner<FarmerCache<CacheIndex>>), anyhow::Error>
) -> Result<(Node, NodeRunner<FarmerCache>), anyhow::Error>
where
FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
usize: From<FarmIndex>,
CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static,
usize: From<CacheIndex>,
CacheIndex: TryFrom<usize>,
NC: NodeClientExt + Clone,
{
let known_peers_registry = KnownPeersManager::new(KnownPeersManagerConfig {
Expand Down
3 changes: 0 additions & 3 deletions crates/subspace-farmer/src/cluster/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument};

const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);

/// Type alias for cache index used by cluster.
pub type ClusterCacheIndex = u16;

/// Broadcast with identification details by caches
#[derive(Debug, Clone, Encode, Decode)]
pub struct ClusterCacheIdentifyBroadcast {
Expand Down
10 changes: 4 additions & 6 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
//! client implementations designed to work with cluster controller and a service function to drive
//! the backend part of the controller.
use crate::cluster::cache::{
ClusterCacheIndex, ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest,
};
use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
use crate::cluster::nats_client::{
GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
};
Expand Down Expand Up @@ -608,7 +606,7 @@ pub async fn controller_service<NC, PG>(
nats_client: &NatsClient,
node_client: &NC,
piece_getter: &PG,
farmer_cache: &FarmerCache<ClusterCacheIndex>,
farmer_cache: &FarmerCache,
instance: &str,
primary_instance: bool,
) -> anyhow::Result<()>
Expand Down Expand Up @@ -906,7 +904,7 @@ where

async fn find_piece_responder(
nats_client: &NatsClient,
farmer_cache: &FarmerCache<ClusterCacheIndex>,
farmer_cache: &FarmerCache,
) -> anyhow::Result<()> {
nats_client
.request_responder(
Expand All @@ -921,7 +919,7 @@ async fn find_piece_responder(

async fn find_pieces_responder(
nats_client: &NatsClient,
farmer_cache: &FarmerCache<ClusterCacheIndex>,
farmer_cache: &FarmerCache,
) -> anyhow::Result<()> {
nats_client
.stream_request_responder(
Expand Down
47 changes: 15 additions & 32 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use rayon::prelude::*;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::future::join;
use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
Expand Down Expand Up @@ -55,23 +54,20 @@ const IS_PIECE_MAYBE_STORED_TIMEOUT: Duration = Duration::from_millis(100);

type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
type Handler<A> = Bag<HandlerFn<A>, A>;
type CacheIndex = u8;

#[derive(Default, Debug)]
struct Handlers {
progress: Handler<f32>,
}

#[derive(Debug, Clone, Copy)]
struct FarmerCacheOffset<CacheIndex> {
struct FarmerCacheOffset {
cache_index: CacheIndex,
piece_offset: PieceCacheOffset,
}

impl<CacheIndex> FarmerCacheOffset<CacheIndex>
where
CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static,
CacheIndex: TryFrom<usize>,
{
impl FarmerCacheOffset {
fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self {
Self {
cache_index,
Expand Down Expand Up @@ -121,9 +117,9 @@ impl CacheBackend {
}

#[derive(Debug)]
struct CacheState<CacheIndex> {
cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset<CacheIndex>>,
cache_free_offsets: Vec<FarmerCacheOffset<CacheIndex>>,
struct CacheState {
cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset>,
cache_free_offsets: Vec<FarmerCacheOffset>,
backend: CacheBackend,
}

Expand All @@ -140,25 +136,22 @@ enum WorkerCommand {
/// Farmer cache worker used to drive the farmer cache backend
#[derive(Debug)]
#[must_use = "Farmer cache will not work unless its worker is running"]
pub struct FarmerCacheWorker<NC, CacheIndex>
pub struct FarmerCacheWorker<NC>
where
NC: fmt::Debug,
{
peer_id: PeerId,
node_client: NC,
piece_caches: Arc<AsyncRwLock<PieceCachesState<CacheIndex>>>,
piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
plot_caches: Arc<PlotCaches>,
handlers: Arc<Handlers>,
worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
metrics: Option<Arc<FarmerCacheMetrics>>,
}

impl<NC, CacheIndex> FarmerCacheWorker<NC, CacheIndex>
impl<NC> FarmerCacheWorker<NC>
where
NC: NodeClient,
CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static,
usize: From<CacheIndex>,
CacheIndex: TryFrom<usize>,
{
/// Run the cache worker with provided piece getter.
///
Expand Down Expand Up @@ -1020,10 +1013,10 @@ impl PlotCaches {
/// where piece cache is not enough to store all the pieces on the network, while there is a lot of
/// space in the plot that is not used by sectors yet and can be leverage as extra caching space.
#[derive(Debug, Clone)]
pub struct FarmerCache<CacheIndex> {
pub struct FarmerCache {
peer_id: PeerId,
/// Individual dedicated piece caches
piece_caches: Arc<AsyncRwLock<PieceCachesState<CacheIndex>>>,
piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
/// Additional piece caches
plot_caches: Arc<PlotCaches>,
handlers: Arc<Handlers>,
Expand All @@ -1032,12 +1025,7 @@ pub struct FarmerCache<CacheIndex> {
metrics: Option<Arc<FarmerCacheMetrics>>,
}

impl<CacheIndex> FarmerCache<CacheIndex>
where
CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static,
usize: From<CacheIndex>,
CacheIndex: TryFrom<usize>,
{
impl FarmerCache {
/// Create new piece cache instance and corresponding worker.
///
/// NOTE: Returned future is async, but does blocking operations and should be running in
Expand All @@ -1046,7 +1034,7 @@ where
node_client: NC,
peer_id: PeerId,
registry: Option<&mut Registry>,
) -> (Self, FarmerCacheWorker<NC, CacheIndex>)
) -> (Self, FarmerCacheWorker<NC>)
where
NC: NodeClient,
{
Expand Down Expand Up @@ -1496,7 +1484,7 @@ where

fn find_piece_internal(
&self,
caches: &PieceCachesState<CacheIndex>,
caches: &PieceCachesState,
piece_index: PieceIndex,
) -> Option<(PieceCacheId, PieceCacheOffset)> {
let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
Expand Down Expand Up @@ -1562,12 +1550,7 @@ where
}
}

impl<CacheIndex> LocalRecordProvider for FarmerCache<CacheIndex>
where
CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static,
usize: From<CacheIndex>,
CacheIndex: TryFrom<usize>,
{
impl LocalRecordProvider for FarmerCache {
fn record(&self, key: &RecordKey) -> Option<ProviderRecord> {
let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
if self
Expand Down
58 changes: 18 additions & 40 deletions crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
use crate::farmer_cache::{CacheBackend, FarmerCacheOffset};
use crate::farmer_cache::{CacheBackend, CacheIndex, FarmerCacheOffset};
use std::collections::btree_map::Values;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::fmt;
use std::hash::Hash;
use subspace_core_primitives::pieces::PieceIndex;
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::KeyWithDistance;
use tracing::{debug, trace};

#[derive(Debug, Clone)]
pub(super) struct PieceCachesState<CacheIndex> {
stored_pieces: BTreeMap<KeyWithDistance, FarmerCacheOffset<CacheIndex>>,
dangling_free_offsets: VecDeque<FarmerCacheOffset<CacheIndex>>,
#[derive(Debug, Default, Clone)]
pub(super) struct PieceCachesState {
stored_pieces: BTreeMap<KeyWithDistance, FarmerCacheOffset>,
dangling_free_offsets: VecDeque<FarmerCacheOffset>,
backends: Vec<CacheBackend>,
}

impl<CacheIndex> PieceCachesState<CacheIndex>
where
CacheIndex: Hash + Eq + Copy + fmt::Debug + fmt::Display + Send + Sync + 'static,
usize: From<CacheIndex>,
CacheIndex: TryFrom<usize>,
{
impl PieceCachesState {
pub(super) fn new(
stored_pieces: BTreeMap<KeyWithDistance, FarmerCacheOffset<CacheIndex>>,
dangling_free_offsets: VecDeque<FarmerCacheOffset<CacheIndex>>,
stored_pieces: BTreeMap<KeyWithDistance, FarmerCacheOffset>,
dangling_free_offsets: VecDeque<FarmerCacheOffset>,
backends: Vec<CacheBackend>,
) -> Self {
Self {
Expand All @@ -39,7 +32,7 @@ where
.fold(0usize, |acc, backend| acc + backend.total_capacity as usize)
}

pub(super) fn pop_free_offset(&mut self) -> Option<FarmerCacheOffset<CacheIndex>> {
pub(super) fn pop_free_offset(&mut self) -> Option<FarmerCacheOffset> {
match self.dangling_free_offsets.pop_front() {
Some(free_offset) => {
debug!(?free_offset, "Popped dangling free offset");
Expand Down Expand Up @@ -71,10 +64,7 @@ where
}
}

pub(super) fn get_stored_piece(
&self,
key: &KeyWithDistance,
) -> Option<&FarmerCacheOffset<CacheIndex>> {
pub(super) fn get_stored_piece(&self, key: &KeyWithDistance) -> Option<&FarmerCacheOffset> {
self.stored_pieces.get(key)
}

Expand All @@ -85,21 +75,19 @@ where
pub(super) fn push_stored_piece(
&mut self,
key: KeyWithDistance,
cache_offset: FarmerCacheOffset<CacheIndex>,
) -> Option<FarmerCacheOffset<CacheIndex>> {
cache_offset: FarmerCacheOffset,
) -> Option<FarmerCacheOffset> {
self.stored_pieces.insert(key, cache_offset)
}

pub(super) fn stored_pieces_offsets(
&self,
) -> Values<'_, KeyWithDistance, FarmerCacheOffset<CacheIndex>> {
pub(super) fn stored_pieces_offsets(&self) -> Values<'_, KeyWithDistance, FarmerCacheOffset> {
self.stored_pieces.values()
}

pub(super) fn remove_stored_piece(
&mut self,
key: &KeyWithDistance,
) -> Option<FarmerCacheOffset<CacheIndex>> {
) -> Option<FarmerCacheOffset> {
self.stored_pieces.remove(key)
}

Expand All @@ -116,7 +104,7 @@ where
})
}

pub(super) fn push_dangling_free_offset(&mut self, offset: FarmerCacheOffset<CacheIndex>) {
pub(super) fn push_dangling_free_offset(&mut self, offset: FarmerCacheOffset) {
trace!(?offset, "Pushing dangling free offset");
self.dangling_free_offsets.push_back(offset);
}
Expand All @@ -132,8 +120,8 @@ where
pub(super) fn reuse(
self,
) -> (
BTreeMap<KeyWithDistance, FarmerCacheOffset<CacheIndex>>,
VecDeque<FarmerCacheOffset<CacheIndex>>,
BTreeMap<KeyWithDistance, FarmerCacheOffset>,
VecDeque<FarmerCacheOffset>,
) {
let Self {
mut stored_pieces,
Expand All @@ -149,7 +137,7 @@ where
pub(super) fn should_replace(
&mut self,
key: &KeyWithDistance,
) -> Option<(KeyWithDistance, FarmerCacheOffset<CacheIndex>)> {
) -> Option<(KeyWithDistance, FarmerCacheOffset)> {
if !self.should_include_key_internal(key) {
return None;
}
Expand Down Expand Up @@ -192,13 +180,3 @@ where
}
}
}

impl<CacheIndex> Default for PieceCachesState<CacheIndex> {
fn default() -> Self {
Self {
stored_pieces: BTreeMap::default(),
dangling_free_offsets: VecDeque::default(),
backends: Vec::default(),
}
}
}
Loading

0 comments on commit cc5a9c3

Please sign in to comment.