diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index 49ba0dceae..da8512f466 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -54,10 +54,8 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector_sync; -use subspace_farmer_components::plotting::{ - plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, -}; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions}; +use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy}; use subspace_proof_of_space::shim::ShimTable; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_verification::is_within_solution_range; diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index 0c28bfdf6e..969e2275f4 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -14,13 +14,11 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_plot_sync; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; -use subspace_farmer_components::plotting::{ - plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, -}; +use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector}; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy}; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::Table; diff --git a/crates/subspace-farmer-components/benches/plotting.rs b/crates/subspace-farmer-components/benches/plotting.rs index ef8c9c6899..90800c0a1e 100644 --- a/crates/subspace-farmer-components/benches/plotting.rs +++ b/crates/subspace-farmer-components/benches/plotting.rs @@ -8,11 +8,9 @@ use subspace_core_primitives::crypto::kzg; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{HistorySize, PublicKey, Record, RecordedHistorySegment}; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::plotting::{ - plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, -}; +use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions}; use subspace_farmer_components::sector::sector_size; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy}; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::Table; diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index fb79fc6358..d7468dc364 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -19,13 +19,11 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_plot_sync; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; -use subspace_farmer_components::plotting::{ - plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, -}; +use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector}; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy}; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::{Table, TableGenerator}; diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index 64174421b5..f9669fbac5 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -15,14 +15,12 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; -use subspace_farmer_components::plotting::{ - plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, -}; +use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector}; use subspace_farmer_components::reading::read_piece; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; -use subspace_farmer_components::{FarmerProtocolInfo, ReadAt, ReadAtSync}; +use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy, ReadAt, ReadAtSync}; use subspace_proof_of_space::chia::ChiaTable; use subspace_proof_of_space::Table; diff --git a/crates/subspace-farmer-components/src/lib.rs b/crates/subspace-farmer-components/src/lib.rs index c13f7f4237..0576792ff9 100644 --- a/crates/subspace-farmer-components/src/lib.rs +++ b/crates/subspace-farmer-components/src/lib.rs @@ -24,12 +24,69 @@ pub mod sector; mod segment_reconstruction; use crate::file_ext::FileExt; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use static_assertions::const_assert; use std::fs::File; use std::future::Future; use std::io; -use subspace_core_primitives::HistorySize; +use std::sync::Arc; +use subspace_core_primitives::{ArchivedHistorySegment, HistorySize, Piece, PieceIndex}; + +use std::error::Error; + +/// Defines retry policy on error during piece acquiring. +#[derive(PartialEq, Eq, Clone, Debug, Copy)] +pub enum PieceGetterRetryPolicy { + /// Retry N times (including zero) + Limited(u16), + /// No restrictions on retries + Unlimited, +} + +impl Default for PieceGetterRetryPolicy { + #[inline] + fn default() -> Self { + Self::Limited(0) + } +} + +/// Trait representing a way to get pieces +#[async_trait] +pub trait PieceGetter { + async fn get_piece( + &self, + piece_index: PieceIndex, + retry_policy: PieceGetterRetryPolicy, + ) -> Result, Box>; +} + +#[async_trait] +impl PieceGetter for Arc +where + T: PieceGetter + Send + Sync, +{ + async fn get_piece( + &self, + piece_index: PieceIndex, + retry_policy: PieceGetterRetryPolicy, + ) -> Result, Box> { + self.as_ref().get_piece(piece_index, retry_policy).await + } +} + +#[async_trait] +impl PieceGetter for ArchivedHistorySegment { + async fn get_piece( + &self, + piece_index: PieceIndex, + _retry_policy: PieceGetterRetryPolicy, + ) -> Result, Box> { + Ok(self + .get(usize::try_from(u64::from(piece_index))?) + .map(Piece::from)) + } +} /// Enum to encapsulate the selection between [`ReadAtSync`] and [`ReadAtAsync]` variants #[derive(Copy, Clone)] diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index d827636938..837596f70e 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -3,16 +3,14 @@ use crate::sector::{ SectorMetadata, SectorMetadataChecksummed, }; use crate::segment_reconstruction::recover_missing_piece; -use crate::FarmerProtocolInfo; +use crate::{FarmerProtocolInfo, PieceGetter, PieceGetterRetryPolicy}; use async_lock::Mutex; -use async_trait::async_trait; use backoff::future::retry; use backoff::{Error as BackoffError, ExponentialBackoff}; use futures::stream::FuturesUnordered; use futures::StreamExt; use parity_scale_codec::{Decode, Encode}; use rayon::prelude::*; -use std::error::Error; use std::mem; use std::simd::Simd; use std::sync::Arc; @@ -20,8 +18,7 @@ use std::time::Duration; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::crypto::{blake3_hash, blake3_hash_parallel, Scalar}; use subspace_core_primitives::{ - ArchivedHistorySegment, Blake3Hash, Piece, PieceIndex, PieceOffset, PublicKey, Record, SBucket, - SectorId, SectorIndex, + Blake3Hash, PieceIndex, PieceOffset, PublicKey, Record, SBucket, SectorId, SectorIndex, }; use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Table, TableGenerator}; @@ -42,60 +39,6 @@ fn default_backoff() -> ExponentialBackoff { } } -/// Defines retry policy on error during piece acquiring. -#[derive(PartialEq, Eq, Clone, Debug, Copy)] -pub enum PieceGetterRetryPolicy { - /// Retry N times (including zero) - Limited(u16), - /// No restrictions on retries - Unlimited, -} - -impl Default for PieceGetterRetryPolicy { - #[inline] - fn default() -> Self { - Self::Limited(0) - } -} - -/// Duplicate trait for the subspace_networking::PieceReceiver. The goal of this trait is -/// simplifying dependency graph. -#[async_trait] -pub trait PieceGetter { - async fn get_piece( - &self, - piece_index: PieceIndex, - retry_policy: PieceGetterRetryPolicy, - ) -> Result, Box>; -} - -#[async_trait] -impl PieceGetter for Arc -where - T: PieceGetter + Send + Sync, -{ - async fn get_piece( - &self, - piece_index: PieceIndex, - retry_policy: PieceGetterRetryPolicy, - ) -> Result, Box> { - self.as_ref().get_piece(piece_index, retry_policy).await - } -} - -#[async_trait] -impl PieceGetter for ArchivedHistorySegment { - async fn get_piece( - &self, - piece_index: PieceIndex, - _retry_policy: PieceGetterRetryPolicy, - ) -> Result, Box> { - Ok(self - .get(usize::try_from(u64::from(piece_index))?) - .map(Piece::from)) - } -} - /// Information about sector that was plotted #[derive(Debug, Clone, Encode, Decode)] pub struct PlottedSector { diff --git a/crates/subspace-farmer-components/src/segment_reconstruction.rs b/crates/subspace-farmer-components/src/segment_reconstruction.rs index 26571c6293..139b1e7ef1 100644 --- a/crates/subspace-farmer-components/src/segment_reconstruction.rs +++ b/crates/subspace-farmer-components/src/segment_reconstruction.rs @@ -1,4 +1,4 @@ -use crate::plotting::{PieceGetter, PieceGetterRetryPolicy}; +use crate::{PieceGetter, PieceGetterRetryPolicy}; use futures::stream::FuturesOrdered; use futures::StreamExt; use std::sync::atomic::{AtomicUsize, Ordering}; diff --git a/crates/subspace-farmer/src/piece_cache.rs b/crates/subspace-farmer/src/piece_cache.rs index 1a1708f34a..0dc5e11574 100644 --- a/crates/subspace-farmer/src/piece_cache.rs +++ b/crates/subspace-farmer/src/piece_cache.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use std::time::Duration; use std::{fmt, mem}; use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; -use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy}; +use subspace_farmer_components::{PieceGetter, PieceGetterRetryPolicy}; use subspace_networking::libp2p::kad::{ProviderRecord, RecordKey}; use subspace_networking::libp2p::PeerId; use subspace_networking::utils::multihash::ToMultihash; diff --git a/crates/subspace-farmer/src/piece_cache/tests.rs b/crates/subspace-farmer/src/piece_cache/tests.rs index 6b11150e0d..8166a8a15f 100644 --- a/crates/subspace-farmer/src/piece_cache/tests.rs +++ b/crates/subspace-farmer/src/piece_cache/tests.rs @@ -15,8 +15,7 @@ use std::time::Duration; use subspace_core_primitives::{ HistorySize, LastArchivedBlock, Piece, PieceIndex, SegmentHeader, SegmentIndex, }; -use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy}; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter, PieceGetterRetryPolicy}; use subspace_networking::libp2p::identity; use subspace_networking::libp2p::kad::RecordKey; use subspace_networking::utils::multihash::ToMultihash; diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 5c576d99a9..6c2f472d61 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -53,9 +53,9 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; -use subspace_farmer_components::plotting::{PieceGetter, PlottedSector}; +use subspace_farmer_components::plotting::PlottedSector; use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed}; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter}; use subspace_networking::KnownPeersManager; use subspace_proof_of_space::Table; use subspace_rpc_primitives::{FarmerAppInfo, SolutionResponse}; diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index f278939967..b86bd47fb3 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -26,12 +26,12 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::file_ext::FileExt; -use subspace_farmer_components::plotting; use subspace_farmer_components::plotting::{ download_sector, encode_sector, DownloadSectorOptions, DownloadedSector, EncodeSectorOptions, - PieceGetter, PieceGetterRetryPolicy, PlottedSector, + PlottedSector, }; use subspace_farmer_components::sector::SectorMetadataChecksummed; +use subspace_farmer_components::{plotting, PieceGetter, PieceGetterRetryPolicy}; use subspace_proof_of_space::Table; use thiserror::Error; use tokio::runtime::Handle; diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index 5ceb099307..5944613408 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -6,7 +6,7 @@ use parking_lot::Mutex; use std::error::Error; use std::sync::Arc; use subspace_core_primitives::{Piece, PieceIndex}; -use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy}; +use subspace_farmer_components::{PieceGetter, PieceGetterRetryPolicy}; use subspace_networking::libp2p::kad::RecordKey; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy}; diff --git a/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs b/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs index d8269b3f42..7a94dd59cc 100644 --- a/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs +++ b/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs @@ -208,6 +208,7 @@ fn main() -> Result<(), Error> { // Domain node needs slots notifications for bundle production. force_new_slot_notifications: true, subspace_networking: SubspaceNetworking::Create { config: dsn_config }, + dsn_piece_getter: None, sync_from_dsn: true, is_timekeeper: false, timekeeper_cpu_cores: Default::default(), diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index c3e0e6901c..9563e9a43c 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -9,6 +9,7 @@ use futures::StreamExt; use libp2p::PeerId; use std::collections::HashSet; use std::error::Error; +use std::fmt; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; use subspace_core_primitives::{Piece, PieceIndex}; @@ -63,6 +64,12 @@ pub struct PieceProvider { piece_validator: Option, } +impl fmt::Debug for PieceProvider { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("PieceProvider").finish_non_exhaustive() + } +} + impl PieceProvider where PV: PieceValidator, diff --git a/crates/subspace-node/src/commands/run/consensus.rs b/crates/subspace-node/src/commands/run/consensus.rs index 291bb192a5..7bd57f7829 100644 --- a/crates/subspace-node/src/commands/run/consensus.rs +++ b/crates/subspace-node/src/commands/run/consensus.rs @@ -647,6 +647,7 @@ pub(super) fn create_consensus_chain_configuration( // Domain node needs slots notifications for bundle production. force_new_slot_notifications: domains_enabled, subspace_networking: SubspaceNetworking::Create { config: dsn_config }, + dsn_piece_getter: None, sync_from_dsn, is_timekeeper: timekeeper_options.timekeeper, timekeeper_cpu_cores: timekeeper_options.timekeeper_cpu_cores, diff --git a/crates/subspace-service/src/config.rs b/crates/subspace-service/src/config.rs index 7acb8588c1..1fa9e6d6dc 100644 --- a/crates/subspace-service/src/config.rs +++ b/crates/subspace-service/src/config.rs @@ -1,4 +1,5 @@ use crate::dsn::DsnConfig; +use crate::sync_from_dsn::DsnSyncPieceGetter; use prometheus_client::registry::Registry; use sc_chain_spec::ChainSpec; use sc_network::config::{ @@ -242,6 +243,8 @@ pub struct SubspaceConfiguration { pub force_new_slot_notifications: bool, /// Subspace networking (DSN). pub subspace_networking: SubspaceNetworking, + /// DSN piece getter + pub dsn_piece_getter: Option>, /// Enables DSN-sync on startup. pub sync_from_dsn: bool, /// Is this node a Timekeeper diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 937e22dc59..a829b0809c 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -28,12 +28,13 @@ pub mod config; pub mod dsn; mod metrics; pub mod rpc; -mod sync_from_dsn; +pub mod sync_from_dsn; pub mod transaction_pool; use crate::config::{SubspaceConfiguration, SubspaceNetworking}; use crate::dsn::{create_dsn_instance, DsnConfigurationError}; use crate::metrics::NodeMetrics; +use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator; use crate::transaction_pool::FullPool; use core::sync::atomic::{AtomicU32, Ordering}; use cross_domain_message_gossip::xdm_gossip_peers_set_config; @@ -105,6 +106,7 @@ use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::{BlockNumber, PotSeed, REWARD_SIGNING_CONTEXT}; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::libp2p::multiaddr::Protocol; +use subspace_networking::utils::piece_provider::PieceProvider; use subspace_proof_of_space::Table; use subspace_runtime_primitives::opaque::Block; use subspace_runtime_primitives::{AccountId, Balance, Hash, Nonce}; @@ -844,6 +846,17 @@ where network_wrapper.set(network_service.clone()); if config.sync_from_dsn { + let dsn_sync_piece_getter = config.dsn_piece_getter.unwrap_or_else(|| { + Arc::new(PieceProvider::new( + node.clone(), + Some(SegmentCommitmentPieceValidator::new( + node.clone(), + subspace_link.kzg().clone(), + segment_headers_store.clone(), + )), + )) + }); + let (observer, worker) = sync_from_dsn::create_observer_and_worker( segment_headers_store.clone(), Arc::clone(&network_service), @@ -852,7 +865,7 @@ where import_queue_service, sync_target_block_number, pause_sync, - subspace_link.kzg().clone(), + dsn_sync_piece_getter, ); task_manager .spawn_handle() diff --git a/crates/subspace-service/src/sync_from_dsn.rs b/crates/subspace-service/src/sync_from_dsn.rs index fd93b205c1..2af30a0fee 100644 --- a/crates/subspace-service/src/sync_from_dsn.rs +++ b/crates/subspace-service/src/sync_from_dsn.rs @@ -1,9 +1,9 @@ mod import_blocks; -mod piece_validator; +pub(super) mod piece_validator; mod segment_header_downloader; use crate::sync_from_dsn::import_blocks::import_blocks_from_dsn; -use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator; +pub use crate::sync_from_dsn::import_blocks::DsnSyncPieceGetter; use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader; use futures::channel::mpsc; use futures::{select, FutureExt, StreamExt}; @@ -20,9 +20,7 @@ use std::future::Future; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; -use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::SegmentIndex; -use subspace_networking::utils::piece_provider::PieceProvider; use subspace_networking::Node; use tracing::{info, warn}; @@ -47,7 +45,7 @@ enum NotificationReason { /// Create node observer that will track node state and send notifications to worker to start sync /// from DSN. #[allow(clippy::too_many_arguments)] -pub(super) fn create_observer_and_worker( +pub(super) fn create_observer_and_worker( segment_headers_store: SegmentHeadersStore, network_service: Arc::Hash>>, node: Node, @@ -55,7 +53,7 @@ pub(super) fn create_observer_and_worker( mut import_queue_service: Box>, sync_target_block_number: Arc, pause_sync: Arc, - kzg: Kzg, + piece_getter: PG, ) -> ( impl Future + Send + 'static, impl Future> + Send + 'static, @@ -71,6 +69,7 @@ where + Sync + 'static, Client::Api: SubspaceApi, + PG: DsnSyncPieceGetter + Send + Sync + 'static, { let (tx, rx) = mpsc::channel(0); let observer_fut = { @@ -88,7 +87,7 @@ where sync_target_block_number, pause_sync, rx, - &kzg, + &piece_getter, ) .await }; @@ -211,7 +210,7 @@ async fn create_substrate_network_observer( } #[allow(clippy::too_many_arguments)] -async fn create_worker( +async fn create_worker( segment_headers_store: SegmentHeadersStore, node: &Node, client: &Client, @@ -219,7 +218,7 @@ async fn create_worker( sync_target_block_number: Arc, pause_sync: Arc, mut notifications: mpsc::Receiver, - kzg: &Kzg, + piece_getter: &PG, ) -> Result<(), sc_service::Error> where Block: BlockT, @@ -232,6 +231,7 @@ where + 'static, Client::Api: SubspaceApi, IQS: ImportQueueService + ?Sized, + PG: DsnSyncPieceGetter, { let info = client.info(); let chain_constants = client @@ -248,14 +248,6 @@ where .best_number .saturating_sub(chain_constants.confirmation_depth_k().into()); let segment_header_downloader = SegmentHeaderDownloader::new(node); - let piece_provider = PieceProvider::new( - node.clone(), - Some(SegmentCommitmentPieceValidator::::new( - node, - kzg, - &segment_headers_store, - )), - ); // Node starts as offline, we'll wait for it to go online shrtly after let mut initial_pause_sync = Some(pause_sync.swap(true, Ordering::AcqRel)); @@ -268,7 +260,7 @@ where &segment_headers_store, &segment_header_downloader, client, - &piece_provider, + piece_getter, import_queue_service, &mut last_processed_segment_index, &mut last_processed_block_number, diff --git a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs index 178e8981b9..b24c3ad6eb 100644 --- a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs +++ b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs @@ -15,6 +15,7 @@ // along with this program. If not, see . use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader; +use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::StreamExt; use sc_client_api::{AuxStore, BlockBackend, HeaderBackend}; @@ -26,16 +27,58 @@ use sp_consensus::BlockOrigin; use sp_runtime::generic::SignedBlock; use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One}; use sp_runtime::Saturating; +use std::error::Error; +use std::fmt; use std::num::NonZeroU16; +use std::sync::Arc; use std::time::Duration; use subspace_archiving::reconstructor::Reconstructor; use subspace_core_primitives::{ - ArchivedHistorySegment, BlockNumber, Piece, RecordedHistorySegment, SegmentIndex, + ArchivedHistorySegment, BlockNumber, Piece, PieceIndex, RecordedHistorySegment, SegmentIndex, }; use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator, RetryPolicy}; use tokio::sync::Semaphore; use tracing::warn; +/// Trait representing a way to get pieces for DSN sync purposes +#[async_trait] +pub trait DsnSyncPieceGetter: fmt::Debug { + async fn get_piece( + &self, + piece_index: PieceIndex, + ) -> Result, Box>; +} + +#[async_trait] +impl DsnSyncPieceGetter for Arc +where + T: DsnSyncPieceGetter + Send + Sync + ?Sized, +{ + async fn get_piece( + &self, + piece_index: PieceIndex, + ) -> Result, Box> { + self.as_ref().get_piece(piece_index).await + } +} + +#[async_trait] +impl DsnSyncPieceGetter for PieceProvider +where + PV: PieceValidator, +{ + async fn get_piece( + &self, + piece_index: PieceIndex, + ) -> Result, Box> { + self.get_piece_from_dsn_cache( + piece_index, + RetryPolicy::Limited(PIECE_GETTER_RETRY_NUMBER.get()), + ) + .await + } +} + /// Get piece retry attempts number. const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(7).expect("Not zero; qed"); @@ -48,11 +91,11 @@ const WAIT_FOR_BLOCKS_TO_IMPORT: Duration = Duration::from_secs(1); /// Starts the process of importing blocks. /// /// Returns number of downloaded blocks. -pub async fn import_blocks_from_dsn( +pub(super) async fn import_blocks_from_dsn( segment_headers_store: &SegmentHeadersStore, segment_header_downloader: &SegmentHeaderDownloader<'_>, client: &Client, - piece_provider: &PieceProvider, + piece_getter: &PG, import_queue_service: &mut IQS, last_processed_segment_index: &mut SegmentIndex, last_processed_block_number: &mut ::Number, @@ -61,7 +104,7 @@ where Block: BlockT, AS: AuxStore + Send + Sync + 'static, Client: HeaderBackend + BlockBackend + Send + Sync + 'static, - PV: PieceValidator, + PG: DsnSyncPieceGetter, IQS: ImportQueueService + ?Sized, { { @@ -135,7 +178,7 @@ where } let blocks = - download_and_reconstruct_blocks(segment_index, piece_provider, &mut reconstructor) + download_and_reconstruct_blocks(segment_index, piece_getter, &mut reconstructor) .await?; let mut blocks_to_import = Vec::with_capacity(QUEUED_BLOCKS_LIMIT as usize); @@ -242,13 +285,13 @@ where Ok(downloaded_blocks) } -async fn download_and_reconstruct_blocks( +async fn download_and_reconstruct_blocks( segment_index: SegmentIndex, - piece_provider: &PieceProvider, + piece_getter: &PG, reconstructor: &mut Reconstructor, ) -> Result)>, sc_service::Error> where - PV: PieceValidator, + PG: DsnSyncPieceGetter, { debug!(%segment_index, "Retrieving pieces of the segment"); @@ -279,13 +322,7 @@ where } } }; - let maybe_piece = match piece_provider - .get_piece_from_dsn_cache( - piece_index, - RetryPolicy::Limited(PIECE_GETTER_RETRY_NUMBER.get()), - ) - .await - { + let maybe_piece = match piece_getter.get_piece(piece_index).await { Ok(maybe_piece) => maybe_piece, Err(error) => { trace!( diff --git a/crates/subspace-service/src/sync_from_dsn/piece_validator.rs b/crates/subspace-service/src/sync_from_dsn/piece_validator.rs index c79ecdba1d..35ca09da2a 100644 --- a/crates/subspace-service/src/sync_from_dsn/piece_validator.rs +++ b/crates/subspace-service/src/sync_from_dsn/piece_validator.rs @@ -9,21 +9,21 @@ use subspace_networking::utils::piece_provider::PieceValidator; use subspace_networking::Node; use tracing::{error, warn}; -pub struct SegmentCommitmentPieceValidator<'a, AS> { - dsn_node: &'a Node, - kzg: &'a Kzg, - segment_headers_store: &'a SegmentHeadersStore, +pub(crate) struct SegmentCommitmentPieceValidator { + dsn_node: Node, + kzg: Kzg, + segment_headers_store: SegmentHeadersStore, } -impl<'a, AS> SegmentCommitmentPieceValidator<'a, AS> +impl SegmentCommitmentPieceValidator where AS: AuxStore + Send + Sync + 'static, { /// Segment headers must be in order from 0 to the last one that exists - pub fn new( - dsn_node: &'a Node, - kzg: &'a Kzg, - segment_headers_store: &'a SegmentHeadersStore, + pub(crate) fn new( + dsn_node: Node, + kzg: Kzg, + segment_headers_store: SegmentHeadersStore, ) -> Self { Self { dsn_node, @@ -34,7 +34,7 @@ where } #[async_trait] -impl<'a, AS> PieceValidator for SegmentCommitmentPieceValidator<'a, AS> +impl PieceValidator for SegmentCommitmentPieceValidator where AS: AuxStore + Send + Sync + 'static, { diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index ae8045e3aa..dcfcf773a4 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -39,10 +39,8 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector_sync; -use subspace_farmer_components::plotting::{ - plot_sector, PieceGetterRetryPolicy, PlotSectorOptions, PlottedSector, -}; -use subspace_farmer_components::FarmerProtocolInfo; +use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector}; +use subspace_farmer_components::{FarmerProtocolInfo, PieceGetterRetryPolicy}; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_runtime_primitives::opaque::Block; use subspace_service::{FullClient, NewFull};