Skip to content

Commit

Permalink
Merge pull request #1101 from subspace/dsn-optimizations-part-4
Browse files Browse the repository at this point in the history
DSN optimizations (part 4)
  • Loading branch information
nazar-pc authored Jan 23, 2023
2 parents fc29a0c + 93717f8 commit 0c4ba93
Show file tree
Hide file tree
Showing 15 changed files with 235 additions and 232 deletions.
2 changes: 0 additions & 2 deletions crates/sp-lightclient/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use sp_runtime::{Digest, DigestItem};
use std::error::Error;
use std::io::Cursor;
use std::num::{NonZeroU32, NonZeroU64};
use std::sync::atomic::AtomicBool;
use subspace_archiving::archiver::{ArchivedSegment, Archiver};
use subspace_core_primitives::crypto::kzg;
use subspace_core_primitives::crypto::kzg::Kzg;
Expand Down Expand Up @@ -111,7 +110,6 @@ impl Farmer {
&public_key,
sector_index,
&piece_receiver,
&AtomicBool::new(false),
&farmer_protocol_info,
&kzg,
&sector_codec,
Expand Down
3 changes: 0 additions & 3 deletions crates/subspace-farmer-components/benches/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use memmap2::Mmap;
use std::fs::OpenOptions;
use std::io::Write;
use std::num::{NonZeroU32, NonZeroU64};
use std::sync::atomic::AtomicBool;
use std::time::Instant;
use std::{env, fs, io};
use subspace_archiving::archiver::Archiver;
Expand Down Expand Up @@ -54,7 +53,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
)
.unwrap();

let cancelled = AtomicBool::new(false);
let farmer_protocol_info = FarmerProtocolInfo {
record_size: NonZeroU32::new(RECORD_SIZE).unwrap(),
recorded_history_segment_size: RECORDED_HISTORY_SEGMENT_SIZE,
Expand All @@ -71,7 +69,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
&public_key,
sector_index,
&BenchPieceReceiver::new(piece),
&cancelled,
&farmer_protocol_info,
&kzg,
&sector_codec,
Expand Down
4 changes: 0 additions & 4 deletions crates/subspace-farmer-components/benches/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use rayon::current_num_threads;
use rayon::prelude::*;
use std::io;
use std::num::{NonZeroU32, NonZeroU64};
use std::sync::atomic::AtomicBool;
use std::time::Instant;
use subspace_archiving::archiver::Archiver;
use subspace_core_primitives::crypto::kzg;
Expand Down Expand Up @@ -45,7 +44,6 @@ fn criterion_benchmark(c: &mut Criterion) {
)
.unwrap();

let cancelled = AtomicBool::new(false);
let farmer_protocol_info = FarmerProtocolInfo {
record_size: NonZeroU32::new(RECORD_SIZE).unwrap(),
recorded_history_segment_size: RECORDED_HISTORY_SEGMENT_SIZE,
Expand All @@ -62,7 +60,6 @@ fn criterion_benchmark(c: &mut Criterion) {
black_box(&public_key),
black_box(sector_index),
black_box(&piece_receiver),
black_box(&cancelled),
black_box(&farmer_protocol_info),
black_box(&kzg),
black_box(&sector_codec),
Expand All @@ -85,7 +82,6 @@ fn criterion_benchmark(c: &mut Criterion) {
black_box(&public_key),
black_box(sector_index),
black_box(&piece_receiver),
black_box(&cancelled),
black_box(&farmer_protocol_info),
black_box(&kzg),
black_box(&sector_codec),
Expand Down
3 changes: 0 additions & 3 deletions crates/subspace-farmer-components/benches/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use schnorrkel::Keypair;
use std::fs::OpenOptions;
use std::io::Write;
use std::num::{NonZeroU32, NonZeroU64};
use std::sync::atomic::AtomicBool;
use std::time::Instant;
use std::{env, fs, io};
use subspace_archiving::archiver::Archiver;
Expand Down Expand Up @@ -56,7 +55,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
)
.unwrap();

let cancelled = AtomicBool::new(false);
let farmer_protocol_info = FarmerProtocolInfo {
record_size: NonZeroU32::new(RECORD_SIZE).unwrap(),
recorded_history_segment_size: RECORDED_HISTORY_SEGMENT_SIZE,
Expand All @@ -75,7 +73,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
&public_key,
sector_index,
&BenchPieceReceiver::new(piece),
&cancelled,
&farmer_protocol_info,
&kzg,
&sector_codec,
Expand Down
42 changes: 16 additions & 26 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use futures::StreamExt;
use parity_scale_codec::Encode;
use std::error::Error;
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use subspace_core_primitives::crypto::kzg;
use subspace_core_primitives::crypto::kzg::{Commitment, Kzg};
use subspace_core_primitives::sector_codec::{SectorCodec, SectorCodecError};
use subspace_core_primitives::{
Piece, PieceIndex, PublicKey, Scalar, SectorId, SectorIndex, PIECE_SIZE, PLOT_SECTOR_SIZE,
};
use thiserror::Error;
use tracing::{debug, info};
use tracing::info;

/// Duplicate trait for the subspace_networking::PieceReceiver. The goal of this trait is
/// simplifying dependency graph.
Expand All @@ -25,6 +25,19 @@ pub trait PieceReceiver {
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>>;
}

#[async_trait]
impl<T> PieceReceiver for Arc<T>
where
T: PieceReceiver + Send + Sync,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
self.as_ref().get_piece(piece_index).await
}
}

/// Information about sector that was plotted
#[derive(Debug, Clone)]
pub struct PlottedSector {
Expand All @@ -41,9 +54,6 @@ pub struct PlottedSector {
/// Plotting status
#[derive(Debug, Error)]
pub enum PlottingError {
/// Plotting was cancelled
#[error("Plotting was cancelled")]
Cancelled,
/// Piece not found, can't create sector, this should never happen
#[error("Piece {piece_index} not found, can't create sector, this should never happen")]
PieceNotFound {
Expand Down Expand Up @@ -79,7 +89,6 @@ pub async fn plot_sector<PR, S, SM>(
public_key: &PublicKey,
sector_index: u64,
piece_receiver: &PR,
cancelled: &AtomicBool,
farmer_protocol_info: &FarmerProtocolInfo,
kzg: &Kzg,
sector_codec: &SectorCodec,
Expand Down Expand Up @@ -119,7 +128,6 @@ where
sector_index,
piece_receiver,
&piece_indexes,
cancelled,
)
.await?;

Expand Down Expand Up @@ -181,22 +189,16 @@ async fn plot_pieces_in_batches_non_blocking<PR: PieceReceiver>(
sector_index: u64,
piece_receiver: &PR,
piece_indexes: &[PieceIndex],
cancelled: &AtomicBool,
) -> Result<(), PlottingError> {
let mut pieces_receiving_futures = piece_indexes
.iter()
.map(|piece_index| async {
let piece_result = match check_cancellation(cancelled, sector_index) {
Ok(()) => piece_receiver.get_piece(*piece_index).await,
Err(error) => Err(error.into()),
};
let piece_result = piece_receiver.get_piece(*piece_index).await;
(*piece_index, piece_result)
})
.collect::<FuturesOrdered<_>>();

while let Some((piece_index, piece_result)) = pieces_receiving_futures.next().await {
check_cancellation(cancelled, sector_index)?;

let piece = piece_result
.map_err(|error| PlottingError::FailedToRetrievePiece { piece_index, error })?
.ok_or(PlottingError::PieceNotFound { piece_index })?;
Expand All @@ -213,15 +215,3 @@ async fn plot_pieces_in_batches_non_blocking<PR: PieceReceiver>(

Ok(())
}

fn check_cancellation(cancelled: &AtomicBool, sector_index: u64) -> Result<(), PlottingError> {
if cancelled.load(Ordering::Acquire) {
debug!(
%sector_index,
"Plotting was cancelled, interrupting plotting"
);
return Err(PlottingError::Cancelled);
}

Ok(())
}
118 changes: 111 additions & 7 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,108 @@ mod dsn;
mod farmer_provider_storage;
mod piece_storage;

use crate::commands::farm::dsn::{configure_dsn, start_announcements_processor};
use crate::commands::farm::dsn::{configure_dsn, start_announcements_processor, PieceStorage};
use crate::utils::{get_required_plot_space_with_overhead, shutdown_signal};
use crate::{DiskFarm, FarmingArgs};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use futures::future::{select, Either};
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use lru::LruCache;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::error::Error;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use subspace_core_primitives::{PieceIndexHash, SectorIndex, PLOT_SECTOR_SIZE};
use subspace_core_primitives::crypto::kzg::{test_public_parameters, Kzg};
use subspace_core_primitives::{Piece, PieceIndex, PieceIndexHash, SectorIndex, PLOT_SECTOR_SIZE};
use subspace_farmer::single_disk_plot::piece_reader::PieceReader;
use subspace_farmer::single_disk_plot::{SingleDiskPlot, SingleDiskPlotOptions};
use subspace_farmer::utils::piece_validator::RecordsRootPieceValidator;
use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
use subspace_farmer_components::plotting::PieceReceiver;
use subspace_networking::libp2p::identity::{ed25519, Keypair};
use subspace_networking::utils::pieces::announce_single_piece_index_with_backoff;
use subspace_networking::utils::pieces::{
announce_single_piece_index_hash_with_backoff, announce_single_piece_index_with_backoff,
};
use subspace_networking::{Node, PieceProvider, ToMultihash};
use tokio::sync::broadcast;
use tracing::{debug, error, info};
use zeroize::Zeroizing;

const RECORDS_ROOTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1_000_000).expect("Not zero; qed");

struct FarmerPieceReceiver<PV, PS> {
piece_provider: PieceProvider<PV>,
piece_storage: Arc<tokio::sync::Mutex<PS>>,
node: Node,
}

impl<PV, PS> FarmerPieceReceiver<PV, PS> {
fn new(
piece_provider: PieceProvider<PV>,
piece_storage: Arc<tokio::sync::Mutex<PS>>,
node: Node,
) -> Self {
Self {
piece_provider,
piece_storage,
node,
}
}
}

#[async_trait]
impl<PV, PS> PieceReceiver for FarmerPieceReceiver<PV, PS>
where
PV: subspace_networking::PieceValidator,
PS: PieceStorage + Send + 'static,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
let piece_index_hash = PieceIndexHash::from_index(piece_index);
let key = piece_index_hash.to_multihash().into();

let maybe_should_store = {
let piece_storage = self.piece_storage.lock().await;
if let Some(piece) = piece_storage.get_piece(&key) {
return Ok(Some(piece));
}

piece_storage.should_include_in_storage(&key)
};

let maybe_piece = self.piece_provider.get_piece(piece_index).await?;

if let Some(piece) = &maybe_piece {
if maybe_should_store {
let mut piece_storage = self.piece_storage.lock().await;
if piece_storage.should_include_in_storage(&key)
&& piece_storage.get_piece(&key).is_none()
{
piece_storage.add_piece(key, piece.clone());
if let Err(error) =
announce_single_piece_index_hash_with_backoff(piece_index_hash, &self.node)
.await
{
debug!(
?error,
?piece_index_hash,
"Announcing retrieved and cached piece index hash failed"
);
}
}
}
}

Ok(maybe_piece)
}
}

#[derive(Debug, Copy, Clone)]
struct PieceDetails {
plot_offset: usize,
Expand Down Expand Up @@ -69,7 +150,7 @@ pub(crate) async fn farm_multi_disk(
farming_args.max_concurrent_plots.get(),
));

let (node, mut node_runner, wrapped_piece_storage) = {
let (node, mut node_runner, piece_storage) = {
// TODO: Temporary networking identity derivation from the first disk farm identity.
let directory = disk_farms
.first()
Expand All @@ -92,8 +173,30 @@ pub(crate) async fn farm_multi_disk(
configure_dsn(base_path, keypair, dsn, &readers_and_pieces).await?
};

let _announcements_processing_handler =
start_announcements_processor(node.clone(), wrapped_piece_storage)?;
let piece_storage = Arc::new(tokio::sync::Mutex::new(piece_storage));

let _announcements_processing_handler = start_announcements_processor(
node.clone(),
Arc::clone(&piece_storage),
Arc::downgrade(&readers_and_pieces),
)?;

let kzg = Kzg::new(test_public_parameters());
let records_roots_cache = Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE));
let piece_provider = PieceProvider::new(
node.clone(),
Some(RecordsRootPieceValidator::new(
node.clone(),
node_client.clone(),
kzg.clone(),
records_roots_cache,
)),
);
let piece_receiver = Arc::new(FarmerPieceReceiver::new(
piece_provider,
piece_storage,
node.clone(),
));

let mut single_disk_plots = Vec::with_capacity(disk_farms.len());

Expand All @@ -118,7 +221,8 @@ pub(crate) async fn farm_multi_disk(
allocated_space: disk_farm.allocated_plotting_space,
node_client,
reward_address,
dsn_node: node.clone(),
kzg: kzg.clone(),
piece_receiver: piece_receiver.clone(),
concurrent_plotting_semaphore: Arc::clone(&concurrent_plotting_semaphore),
});

Expand Down
Loading

0 comments on commit 0c4ba93

Please sign in to comment.