Skip to content

Commit

Permalink
Merge pull request #2503 from subspace/piece-getter-for-dsn-sync
Browse files Browse the repository at this point in the history
Piece getter for DSN sync
  • Loading branch information
nazar-pc authored Feb 5, 2024
2 parents a413a06 + 519aabb commit a2e0318
Show file tree
Hide file tree
Showing 22 changed files with 179 additions and 138 deletions.
6 changes: 2 additions & 4 deletions crates/pallet-subspace/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ use subspace_core_primitives::{
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::auditing::audit_sector_sync;
use subspace_farmer_components::plotting::{
plot_sector, PieceGetterRetryPolicy, PlotSectorOptions,
};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions};
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy};
use subspace_proof_of_space::shim::ShimTable;
use subspace_proof_of_space::{Table, TableGenerator};
use subspace_verification::is_within_solution_range;
Expand Down
6 changes: 2 additions & 4 deletions crates/subspace-farmer-components/benches/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ use subspace_core_primitives::{
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::auditing::audit_plot_sync;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::plotting::{
plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector,
};
use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector};
use subspace_farmer_components::sector::{
sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy};
use subspace_proof_of_space::chia::ChiaTable;
use subspace_proof_of_space::Table;

Expand Down
6 changes: 2 additions & 4 deletions crates/subspace-farmer-components/benches/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ use subspace_core_primitives::crypto::kzg;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::{HistorySize, PublicKey, Record, RecordedHistorySegment};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::plotting::{
plot_sector, PieceGetterRetryPolicy, PlotSectorOptions,
};
use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions};
use subspace_farmer_components::sector::sector_size;
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy};
use subspace_proof_of_space::chia::ChiaTable;
use subspace_proof_of_space::Table;

Expand Down
6 changes: 2 additions & 4 deletions crates/subspace-farmer-components/benches/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ use subspace_core_primitives::{
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::auditing::audit_plot_sync;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::plotting::{
plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector,
};
use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector};
use subspace_farmer_components::sector::{
sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy};
use subspace_proof_of_space::chia::ChiaTable;
use subspace_proof_of_space::{Table, TableGenerator};

Expand Down
6 changes: 2 additions & 4 deletions crates/subspace-farmer-components/benches/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ use subspace_core_primitives::{
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::plotting::{
plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector,
};
use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector};
use subspace_farmer_components::reading::read_piece;
use subspace_farmer_components::sector::{
sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
};
use subspace_farmer_components::{FarmerProtocolInfo, ReadAt, ReadAtSync};
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy, ReadAt, ReadAtSync};
use subspace_proof_of_space::chia::ChiaTable;
use subspace_proof_of_space::Table;

Expand Down
59 changes: 58 additions & 1 deletion crates/subspace-farmer-components/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,69 @@ pub mod sector;
mod segment_reconstruction;

use crate::file_ext::FileExt;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use static_assertions::const_assert;
use std::fs::File;
use std::future::Future;
use std::io;
use subspace_core_primitives::HistorySize;
use std::sync::Arc;
use subspace_core_primitives::{ArchivedHistorySegment, HistorySize, Piece, PieceIndex};

use std::error::Error;

/// Defines retry policy on error during piece acquiring.
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
pub enum PieceGetterRetryPolicy {
/// Retry N times (including zero)
Limited(u16),
/// No restrictions on retries
Unlimited,
}

impl Default for PieceGetterRetryPolicy {
#[inline]
fn default() -> Self {
Self::Limited(0)
}
}

/// Trait representing a way to get pieces
#[async_trait]
pub trait PieceGetter {
async fn get_piece(
&self,
piece_index: PieceIndex,
retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>>;
}

#[async_trait]
impl<T> PieceGetter for Arc<T>
where
T: PieceGetter + Send + Sync,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
self.as_ref().get_piece(piece_index, retry_policy).await
}
}

#[async_trait]
impl PieceGetter for ArchivedHistorySegment {
async fn get_piece(
&self,
piece_index: PieceIndex,
_retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
Ok(self
.get(usize::try_from(u64::from(piece_index))?)
.map(Piece::from))
}
}

/// Enum to encapsulate the selection between [`ReadAtSync`] and [`ReadAtAsync]` variants
#[derive(Copy, Clone)]
Expand Down
61 changes: 2 additions & 59 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,22 @@ use crate::sector::{
SectorMetadata, SectorMetadataChecksummed,
};
use crate::segment_reconstruction::recover_missing_piece;
use crate::FarmerProtocolInfo;
use crate::{FarmerProtocolInfo, PieceGetter, PieceGetterRetryPolicy};
use async_lock::Mutex;
use async_trait::async_trait;
use backoff::future::retry;
use backoff::{Error as BackoffError, ExponentialBackoff};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use parity_scale_codec::{Decode, Encode};
use rayon::prelude::*;
use std::error::Error;
use std::mem;
use std::simd::Simd;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::crypto::{blake3_hash, blake3_hash_parallel, Scalar};
use subspace_core_primitives::{
ArchivedHistorySegment, Blake3Hash, Piece, PieceIndex, PieceOffset, PublicKey, Record, SBucket,
SectorId, SectorIndex,
Blake3Hash, PieceIndex, PieceOffset, PublicKey, Record, SBucket, SectorId, SectorIndex,
};
use subspace_erasure_coding::ErasureCoding;
use subspace_proof_of_space::{Table, TableGenerator};
Expand All @@ -42,60 +39,6 @@ fn default_backoff() -> ExponentialBackoff {
}
}

/// Defines retry policy on error during piece acquiring.
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
pub enum PieceGetterRetryPolicy {
/// Retry N times (including zero)
Limited(u16),
/// No restrictions on retries
Unlimited,
}

impl Default for PieceGetterRetryPolicy {
#[inline]
fn default() -> Self {
Self::Limited(0)
}
}

/// Duplicate trait for the subspace_networking::PieceReceiver. The goal of this trait is
/// simplifying dependency graph.
#[async_trait]
pub trait PieceGetter {
async fn get_piece(
&self,
piece_index: PieceIndex,
retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>>;
}

#[async_trait]
impl<T> PieceGetter for Arc<T>
where
T: PieceGetter + Send + Sync,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
self.as_ref().get_piece(piece_index, retry_policy).await
}
}

#[async_trait]
impl PieceGetter for ArchivedHistorySegment {
async fn get_piece(
&self,
piece_index: PieceIndex,
_retry_policy: PieceGetterRetryPolicy,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
Ok(self
.get(usize::try_from(u64::from(piece_index))?)
.map(Piece::from))
}
}

/// Information about sector that was plotted
#[derive(Debug, Clone, Encode, Decode)]
pub struct PlottedSector {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::plotting::{PieceGetter, PieceGetterRetryPolicy};
use crate::{PieceGetter, PieceGetterRetryPolicy};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{fmt, mem};
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_farmer_components::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::kad::{ProviderRecord, RecordKey};
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
Expand Down
3 changes: 1 addition & 2 deletions crates/subspace-farmer/src/piece_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use std::time::Duration;
use subspace_core_primitives::{
HistorySize, LastArchivedBlock, Piece, PieceIndex, SegmentHeader, SegmentIndex,
};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::identity;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::utils::multihash::ToMultihash;
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ use subspace_core_primitives::{
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::plotting::{PieceGetter, PlottedSector};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter};
use subspace_networking::KnownPeersManager;
use subspace_proof_of_space::Table;
use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse};
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use subspace_core_primitives::{
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::FileExt;
use subspace_farmer_components::plotting;
use subspace_farmer_components::plotting::{
download_sector, encode_sector, DownloadSectorOptions, DownloadedSector, EncodeSectorOptions,
PieceGetter, PieceGetterRetryPolicy, PlottedSector,
PlottedSector,
};
use subspace_farmer_components::sector::SectorMetadataChecksummed;
use subspace_farmer_components::{plotting, PieceGetter, PieceGetterRetryPolicy};
use subspace_proof_of_space::Table;
use thiserror::Error;
use tokio::runtime::Handle;
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/utils/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use parking_lot::Mutex;
use std::error::Error;
use std::sync::Arc;
use subspace_core_primitives::{Piece, PieceIndex};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_farmer_components::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ fn main() -> Result<(), Error> {
// Domain node needs slots notifications for bundle production.
force_new_slot_notifications: true,
subspace_networking: SubspaceNetworking::Create { config: dsn_config },
dsn_piece_getter: None,
sync_from_dsn: true,
is_timekeeper: false,
timekeeper_cpu_cores: Default::default(),
Expand Down
7 changes: 7 additions & 0 deletions crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::StreamExt;
use libp2p::PeerId;
use std::collections::HashSet;
use std::error::Error;
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use subspace_core_primitives::{Piece, PieceIndex};
Expand Down Expand Up @@ -63,6 +64,12 @@ pub struct PieceProvider<PV> {
piece_validator: Option<PV>,
}

impl<PV> fmt::Debug for PieceProvider<PV> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("PieceProvider").finish_non_exhaustive()
}
}

impl<PV> PieceProvider<PV>
where
PV: PieceValidator,
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-node/src/commands/run/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ pub(super) fn create_consensus_chain_configuration(
// Domain node needs slots notifications for bundle production.
force_new_slot_notifications: domains_enabled,
subspace_networking: SubspaceNetworking::Create { config: dsn_config },
dsn_piece_getter: None,
sync_from_dsn,
is_timekeeper: timekeeper_options.timekeeper,
timekeeper_cpu_cores: timekeeper_options.timekeeper_cpu_cores,
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-service/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::dsn::DsnConfig;
use crate::sync_from_dsn::DsnSyncPieceGetter;
use prometheus_client::registry::Registry;
use sc_chain_spec::ChainSpec;
use sc_network::config::{
Expand Down Expand Up @@ -242,6 +243,8 @@ pub struct SubspaceConfiguration {
pub force_new_slot_notifications: bool,
/// Subspace networking (DSN).
pub subspace_networking: SubspaceNetworking,
/// DSN piece getter
pub dsn_piece_getter: Option<Arc<dyn DsnSyncPieceGetter + Send + Sync + 'static>>,
/// Enables DSN-sync on startup.
pub sync_from_dsn: bool,
/// Is this node a Timekeeper
Expand Down
17 changes: 15 additions & 2 deletions crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ pub mod config;
pub mod dsn;
mod metrics;
pub mod rpc;
mod sync_from_dsn;
pub mod sync_from_dsn;
pub mod transaction_pool;

use crate::config::{SubspaceConfiguration, SubspaceNetworking};
use crate::dsn::{create_dsn_instance, DsnConfigurationError};
use crate::metrics::NodeMetrics;
use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
use crate::transaction_pool::FullPool;
use core::sync::atomic::{AtomicU32, Ordering};
use cross_domain_message_gossip::xdm_gossip_peers_set_config;
Expand Down Expand Up @@ -105,6 +106,7 @@ use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_core_primitives::{BlockNumber, PotSeed, REWARD_SIGNING_CONTEXT};
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
use subspace_runtime_primitives::opaque::Block;
use subspace_runtime_primitives::{AccountId, Balance, Hash, Nonce};
Expand Down Expand Up @@ -844,6 +846,17 @@ where

network_wrapper.set(network_service.clone());
if config.sync_from_dsn {
let dsn_sync_piece_getter = config.dsn_piece_getter.unwrap_or_else(|| {
Arc::new(PieceProvider::new(
node.clone(),
Some(SegmentCommitmentPieceValidator::new(
node.clone(),
subspace_link.kzg().clone(),
segment_headers_store.clone(),
)),
))
});

let (observer, worker) = sync_from_dsn::create_observer_and_worker(
segment_headers_store.clone(),
Arc::clone(&network_service),
Expand All @@ -852,7 +865,7 @@ where
import_queue_service,
sync_target_block_number,
pause_sync,
subspace_link.kzg().clone(),
dsn_sync_piece_getter,
);
task_manager
.spawn_handle()
Expand Down
Loading

0 comments on commit a2e0318

Please sign in to comment.