Skip to content
This repository has been archived by the owner on Feb 15, 2024. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
ParthDesai committed Aug 15, 2023
1 parent aeee259 commit 56d2682
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 195 deletions.
86 changes: 48 additions & 38 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ sdk-dsn = { path = "dsn" }
sdk-substrate = { path = "substrate" }
sdk-farmer = { path = "farmer" }

subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }

# The only triple tested and confirmed as working in `jemallocator` crate is `x86_64-unknown-linux-gnu`
[target.'cfg(all(target_arch = "x86_64", target_vendor = "unknown", target_os = "linux", target_env = "gnu"))'.dev-dependencies]
Expand All @@ -35,7 +35,7 @@ tracing = "0.1"
tracing-futures = "0.2"
tracing-subscriber = "0.3"

subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }

# The list of dependencies below (which can be both direct and indirect dependencies) are crates
# that are suspected to be CPU-intensive, and that are unlikely to require debugging (as some of
Expand Down
10 changes: 5 additions & 5 deletions dsn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/subs
sp-runtime = { version = "24.0.0", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" }
sp-blockchain = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" }

subspace-networking = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-service = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
sc-consensus-subspace = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }
subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }
subspace-service = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }
sc-consensus-subspace = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }
99 changes: 13 additions & 86 deletions dsn/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::{Arc, Weak};

use anyhow::Context;
use derivative::Derivative;
use derive_builder::Builder;
use derive_more::{Deref, DerefMut, Display, From};
use either::*;
use futures::prelude::*;
use sc_consensus_subspace::SegmentHeadersStore;
use sdk_utils::{self, DropCollection, Multiaddr, MultiaddrWithPeerId};
Expand All @@ -15,16 +13,14 @@ use subspace_core_primitives::Piece;
use subspace_farmer::utils::archival_storage_info::ArchivalStorageInfo;
use subspace_farmer::utils::archival_storage_pieces::ArchivalStoragePieces;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_networking::libp2p::kad::ProviderRecord;
use subspace_networking::{
PeerInfo, PeerInfoProvider, PieceAnnouncementRequestHandler, PieceAnnouncementResponse,
PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse, ProviderStorage as _,
PeerInfo, PeerInfoProvider, PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse,
SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse,
KADEMLIA_PROVIDER_TTL_IN_SECS,
};

use super::provider_storage_utils::MaybeProviderStorage;
use super::{FarmerProviderStorage, NodePieceCache, NodeProviderStorage, ProviderStorage};
use super::{FarmerProviderStorage, NodePieceCache, ProviderStorage};

/// Wrapper with default value for listen address
#[derive(
Expand Down Expand Up @@ -155,10 +151,6 @@ impl DsnBuilder {
}
}

const MAX_PROVIDER_RECORDS_LIMIT: NonZeroUsize = NonZeroUsize::new(100000).expect("100000 > 0"); // ~ 10 MB
const MAX_CONCURRENT_ANNOUNCEMENTS_QUEUE: NonZeroUsize =
NonZeroUsize::new(2000).expect("Not zero; qed");

/// Options for DSN
pub struct DsnOptions<C, ASNS, PieceByHash, SegmentHeaderByIndexes> {
/// Client to aux storage for node piece cache
Expand All @@ -180,7 +172,7 @@ pub struct DsnOptions<C, ASNS, PieceByHash, SegmentHeaderByIndexes> {
/// Farmer total allocated space across all plots
pub farmer_total_space_pledged: usize,
/// Segment header store
pub segment_header_store: SegmentHeadersStore<C>
pub segment_header_store: SegmentHeadersStore<C>,
}

/// Farmer piece store
Expand All @@ -200,9 +192,6 @@ pub struct DsnShared<C: sc_client_api::AuxStore + Send + Sync + 'static> {
/// Farmer piece store
#[derivative(Debug = "ignore")]
pub farmer_piece_store: Arc<tokio::sync::Mutex<Option<PieceStore>>>,
/// Provider records receiver
pub provider_records_receiver:
parking_lot::Mutex<Option<futures::channel::mpsc::Receiver<ProviderRecord>>>,
/// Farmer provider storage
pub farmer_provider_storage: MaybeProviderStorage<FarmerProviderStorage>,
/// Farmer archival storage pieces
Expand Down Expand Up @@ -253,7 +242,7 @@ impl Dsn {
get_piece_by_hash,
get_segment_header_by_segment_indexes,
farmer_total_space_pledged,
segment_header_store
segment_header_store,
} = options;
let farmer_readers_and_pieces = Arc::new(parking_lot::Mutex::new(None));
let farmer_piece_store = Arc::new(tokio::sync::Mutex::new(None));
Expand Down Expand Up @@ -303,7 +292,7 @@ impl Dsn {
listen_addresses,
reserved_nodes,
allow_non_global_addresses_in_dht,
provider_storage_path,
provider_storage_path: _,
in_connections: InConnections(max_established_incoming_connections),
out_connections: OutConnections(max_established_outgoing_connections),
target_connections: TargetConnections(target_connections),
Expand All @@ -312,96 +301,32 @@ impl Dsn {
boot_nodes,
} = self;

let peer_id = subspace_networking::peer_id(&keypair);
let bootstrap_nodes = boot_nodes.into_iter().map(Into::into).collect::<Vec<_>>();

let listen_on = listen_addresses.0.into_iter().map(Into::into).collect();

let networking_parameters_registry = subspace_networking::NetworkingParametersManager::new(
&base_path.join("known_addresses_db"),
bootstrap_nodes,
)
.context("Failed to open known addresses database for DSN")?
.boxed();

let external_provider_storage = match provider_storage_path {
Some(path) => Either::Left(subspace_networking::ParityDbProviderStorage::new(
&path,
MAX_PROVIDER_RECORDS_LIMIT,
peer_id,
)?),
None => Either::Right(subspace_networking::MemoryProviderStorage::new(peer_id)),
};

let node_provider_storage =
NodeProviderStorage::new(peer_id, piece_cache.clone(), external_provider_storage);
let provider_storage =
ProviderStorage::new(farmer_provider_storage.clone(), node_provider_storage);

let (provider_records_sender, provider_records_receiver) =
futures::channel::mpsc::channel(MAX_CONCURRENT_ANNOUNCEMENTS_QUEUE.get());
ProviderStorage::new(farmer_provider_storage.clone(), piece_cache.clone());

let mut default_networking_config = subspace_networking::Config::new(
protocol_version,
keypair,
provider_storage.clone(),
PeerInfoProvider::new_farmer(Box::new(farmer_archival_storage_pieces.clone())),
Some(PeerInfoProvider::new_farmer(Box::new(farmer_archival_storage_pieces.clone()))),
);
default_networking_config.kademlia.set_provider_record_ttl(KADEMLIA_PROVIDER_TTL_IN_SECS);

let config = subspace_networking::Config {
listen_on,
allow_non_global_addresses_in_dht,
networking_parameters_registry,
networking_parameters_registry: Some(networking_parameters_registry),
request_response_protocols: vec![
PieceAnnouncementRequestHandler::create({
move |peer_id, req| {
tracing::trace!(?req, %peer_id, "Piece announcement request received.");

let mut provider_records_sender = provider_records_sender.clone();
let provider_record = ProviderRecord {
provider: peer_id,
key: req.piece_index_hash.into(),
addresses: req.addresses.clone(),
expires: KADEMLIA_PROVIDER_TTL_IN_SECS
.map(|ttl| std::time::Instant::now() + ttl),
};

let result = match provider_storage.add_provider(provider_record.clone()) {
Ok(()) => {
if let Err(error) =
provider_records_sender.try_send(provider_record)
{
if !error.is_disconnected() {
let record = error.into_inner();
// TODO: This should be made a warning, but due to
// https://github.com/libp2p/rust-libp2p/discussions/3411 it'll
// take us some time to resolve
tracing::debug!(
?record.key,
?record.provider,
"Failed to add provider record to the channel."
);
}
};

Some(PieceAnnouncementResponse::Success)
}
Err(error) => {
tracing::error!(
%error,
%peer_id,
?req,
"Failed to add provider for received key."
);

None
}
};

futures::future::ready(result)
}
}),
PieceByHashRequestHandler::create({
let weak_readers_and_pieces = Arc::downgrade(&farmer_readers_and_pieces);
let farmer_piece_store = Arc::clone(&farmer_piece_store);
Expand Down Expand Up @@ -434,11 +359,14 @@ impl Dsn {
max_established_outgoing_connections,
general_target_connections: target_connections,
// maintain permanent connections between farmers
special_connected_peers_handler: Arc::new(PeerInfo::is_farmer),
special_connected_peers_handler: Some(Arc::new(PeerInfo::is_farmer)),
// other (non-farmer) connections
general_connected_peers_handler: Arc::new(|peer_info| !PeerInfo::is_farmer(peer_info)),
general_connected_peers_handler: Some(Arc::new(|peer_info| {
!PeerInfo::is_farmer(peer_info)
})),
max_pending_incoming_connections,
max_pending_outgoing_connections,
bootstrap_addresses: bootstrap_nodes,
..default_networking_config
};

Expand Down Expand Up @@ -490,7 +418,6 @@ impl Dsn {
DsnShared {
node,
farmer_piece_store,
provider_records_receiver: parking_lot::Mutex::new(Some(provider_records_receiver)),
farmer_provider_storage,
farmer_readers_and_pieces,
piece_cache,
Expand Down
15 changes: 3 additions & 12 deletions dsn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,18 @@ mod builder;
mod provider_storage_utils;

pub use builder::*;
use either::*;
/// Farmer piece cache
pub use subspace_farmer::utils::farmer_piece_cache::FarmerPieceCache;
use subspace_networking::ParityDbProviderStorage;
use tracing::warn;

/// Node piece cache
pub type NodePieceCache<C> = subspace_service::piece_cache::PieceCache<C>;
/// Farmer provider storage
pub type FarmerProviderStorage =
subspace_farmer::utils::farmer_provider_storage::FarmerProviderStorage<
ParityDbProviderStorage,
FarmerPieceCache,
>;
/// Node provider storage
pub type NodeProviderStorage<C> = subspace_service::dsn::node_provider_storage::NodeProviderStorage<
NodePieceCache<C>,
Either<ParityDbProviderStorage, subspace_networking::MemoryProviderStorage>,
>;
subspace_farmer::utils::farmer_provider_storage::FarmerProviderStorage<FarmerPieceCache>;

/// General provider storage
pub type ProviderStorage<C> = provider_storage_utils::AndProviderStorage<
provider_storage_utils::MaybeProviderStorage<FarmerProviderStorage>,
NodeProviderStorage<C>,
NodePieceCache<C>,
>;
14 changes: 7 additions & 7 deletions farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ sdk-utils = { path = "../utils" }
sdk-dsn = { path = "../dsn" }
sdk-traits = { path = "../traits" }

subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-erasure-coding = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4", features = ["parallel", "chia"] }
subspace-rpc-primitives = { git = "https://github.com/subspace/subspace", rev = "1e12a6e258e4562c7411c8dec40c155cf112ade4" }
subspace-core-primitives = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }
subspace-erasure-coding = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }
subspace-farmer = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }
subspace-farmer-components = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }
subspace-networking = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }
subspace-proof-of-space = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051", features = ["parallel", "chia"] }
subspace-rpc-primitives = { git = "https://github.com/subspace/subspace", rev = "ecd8c8fc4da522a36eb667c702e9cf6e25862051" }
24 changes: 3 additions & 21 deletions farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@ use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy, PlottedSector};
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::PieceValidator;
use subspace_networking::ParityDbProviderStorage;
use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
use tokio::sync::{oneshot, watch, Mutex};
use tracing::{debug, error, trace, warn};
use tracing_futures::Instrument;

use self::builder::{PieceCacheSize, ProvidedKeysLimit};
use self::builder::PieceCacheSize;

/// Description of the cache
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
Expand Down Expand Up @@ -370,7 +369,7 @@ impl Config {
let Self {
max_concurrent_plots: _,
piece_cache_size: PieceCacheSize(piece_cache_size),
provided_keys_limit: ProvidedKeysLimit(provided_keys_limit),
provided_keys_limit: _,
max_pieces_in_sector,
} = self;

Expand All @@ -390,20 +389,8 @@ impl Config {
let piece_cache_db_path = base_path.join("piece_cache_db");

let (piece_store, piece_cache, farmer_provider_storage) = {
let provider_db_path = base_path.join("providers_db");

tracing::info!(
db_path = ?provider_db_path,
keys_limit = ?provided_keys_limit,
"Initializing provider storage..."
);

let peer_id = node.dsn().node.id();

let db_provider_storage =
ParityDbProviderStorage::new(&provider_db_path, provided_keys_limit, peer_id)
.context("Failed to create parity db provider storage")?;

tracing::info!(
db_path = ?piece_cache_db_path,
size = ?piece_cache_size,
Expand All @@ -419,12 +406,7 @@ impl Config {
current_size = ?piece_cache.size(),
"Piece cache initialized successfully"
);
let farmer_provider_storage = FarmerProviderStorage::new(
peer_id,
Arc::clone(&readers_and_pieces),
db_provider_storage,
piece_cache.clone(),
);
let farmer_provider_storage = FarmerProviderStorage::new(peer_id, piece_cache.clone());

(piece_store, Arc::new(Mutex::new(piece_cache)), farmer_provider_storage)
};
Expand Down
Loading

0 comments on commit 56d2682

Please sign in to comment.