Skip to content

Commit

Permalink
Merge pull request #2014 from subspace/gemini-3f-backport-farmer-thre…
Browse files Browse the repository at this point in the history
…ad-pools

Gemini 3f backport: farmer thread pools
  • Loading branch information
nazar-pc authored Sep 29, 2023
2 parents 2d04263 + 4895b88 commit d3914a1
Show file tree
Hide file tree
Showing 17 changed files with 594 additions and 375 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ tracing = "0.1.37"
[dev-dependencies]
criterion = "0.5.1"
futures = "0.3.28"
memmap2 = "0.7.1"
subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" }
subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-space", features = ["chia"] }

Expand Down
23 changes: 6 additions & 17 deletions crates/subspace-farmer-components/benches/auditing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use futures::executor::block_on;
use memmap2::Mmap;
use rand::prelude::*;
use std::fs::OpenOptions;
use std::io::Write;
Expand All @@ -21,7 +20,7 @@ use subspace_farmer_components::plotting::{plot_sector, PieceGetterRetryPolicy,
use subspace_farmer_components::sector::{
sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, ReadAt};
use subspace_proof_of_space::chia::ChiaTable;
use subspace_proof_of_space::Table;

Expand Down Expand Up @@ -183,29 +182,19 @@ pub fn criterion_benchmark(c: &mut Criterion) {
.unwrap();
}

let plot_mmap = unsafe { Mmap::map(&plot_file).unwrap() };

#[cfg(unix)]
{
plot_mmap.advise(memmap2::Advice::Random).unwrap();
}

group.throughput(Throughput::Elements(sectors_count));
group.bench_function("disk", move |b| {
group.bench_function("disk", |b| {
b.iter_custom(|iters| {
let start = Instant::now();
for _i in 0..iters {
for (sector_index, sector) in plot_mmap
.chunks_exact(sector_size)
.enumerate()
.map(|(sector_index, sector)| (sector_index as SectorIndex, sector))
{
for sector_index in 0..sectors_count as usize {
let sector = plot_file.offset(sector_index * sector_size);
audit_sector(
black_box(&public_key),
black_box(sector_index),
black_box(sector_index as SectorIndex),
black_box(&global_challenge),
black_box(solution_range),
black_box(sector),
black_box(&sector),
black_box(&plotted_sector.sector_metadata),
);
}
Expand Down
16 changes: 6 additions & 10 deletions crates/subspace-farmer-components/benches/proving.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use futures::executor::block_on;
use memmap2::Mmap;
use rand::prelude::*;
use schnorrkel::Keypair;
use std::fs::OpenOptions;
Expand All @@ -21,7 +20,7 @@ use subspace_farmer_components::plotting::{plot_sector, PieceGetterRetryPolicy,
use subspace_farmer_components::sector::{
sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, ReadAt};
use subspace_proof_of_space::chia::ChiaTable;
use subspace_proof_of_space::Table;

Expand Down Expand Up @@ -234,15 +233,12 @@ pub fn criterion_benchmark(c: &mut Criterion) {
.unwrap();
}

let plot_mmap = unsafe { Mmap::map(&plot_file).unwrap() };

#[cfg(unix)]
{
plot_mmap.advise(memmap2::Advice::Random).unwrap();
}
let sectors = (0..sectors_count as usize)
.map(|sector_offset| plot_file.offset(sector_offset * sector_size))
.collect::<Vec<_>>();

let solution_candidates = plot_mmap
.chunks_exact(sector_size)
let solution_candidates = sectors
.iter()
.map(|sector| {
audit_sector(
&public_key,
Expand Down
21 changes: 7 additions & 14 deletions crates/subspace-farmer-components/benches/reading.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use futures::executor::block_on;
use memmap2::Mmap;
use rand::prelude::*;
use std::fs::OpenOptions;
use std::io::Write;
Expand All @@ -20,7 +19,7 @@ use subspace_farmer_components::reading::read_piece;
use subspace_farmer_components::sector::{
sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_farmer_components::{FarmerProtocolInfo, ReadAt};
use subspace_proof_of_space::chia::ChiaTable;
use subspace_proof_of_space::Table;

Expand Down Expand Up @@ -148,7 +147,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
group.throughput(Throughput::Elements(1));
group.bench_function("piece/memory", |b| {
b.iter(|| {
read_piece::<PosTable>(
read_piece::<PosTable, _>(
black_box(piece_offset),
black_box(&plotted_sector.sector_id),
black_box(&plotted_sector.sector_metadata),
Expand Down Expand Up @@ -183,24 +182,18 @@ pub fn criterion_benchmark(c: &mut Criterion) {
.unwrap();
}

let plot_mmap = unsafe { Mmap::map(&plot_file).unwrap() };

#[cfg(unix)]
{
plot_mmap.advise(memmap2::Advice::Random).unwrap();
}

group.throughput(Throughput::Elements(sectors_count));
group.bench_function("piece/disk", move |b| {
group.bench_function("piece/disk", |b| {
b.iter_custom(|iters| {
let start = Instant::now();
for _i in 0..iters {
for sector in plot_mmap.chunks_exact(sector_size) {
read_piece::<PosTable>(
for sector_index in 0..sectors_count as usize {
let sector = plot_file.offset(sector_index * sector_size);
read_piece::<PosTable, _>(
black_box(piece_offset),
black_box(&plotted_sector.sector_id),
black_box(&plotted_sector.sector_metadata),
black_box(sector),
black_box(&sector),
black_box(&erasure_coding),
black_box(&mut table_generator),
)
Expand Down
33 changes: 21 additions & 12 deletions crates/subspace-farmer-components/src/auditing.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::proving::SolutionCandidates;
use crate::sector::{SectorContentsMap, SectorMetadataChecksummed};
use crate::ReadAt;
use std::collections::VecDeque;
use std::mem;
use subspace_core_primitives::crypto::Scalar;
use subspace_core_primitives::{Blake2b256Hash, PublicKey, SectorId, SectorIndex, SolutionRange};
use subspace_verification::is_within_solution_range;
use tracing::warn;

#[derive(Debug, Clone)]
pub(crate) struct ChunkCandidate {
Expand All @@ -17,20 +19,23 @@ pub(crate) struct ChunkCandidate {
/// Audit a single sector and generate a stream of solutions, where `sector` must be positioned
/// correctly at the beginning of the sector (seek to desired offset before calling this function
/// and seek back afterwards if necessary).
pub fn audit_sector<'a>(
pub fn audit_sector<'a, Sector>(
public_key: &'a PublicKey,
sector_index: SectorIndex,
global_challenge: &Blake2b256Hash,
solution_range: SolutionRange,
sector: &'a [u8],
sector: &'a Sector,
sector_metadata: &'a SectorMetadataChecksummed,
) -> Option<SolutionCandidates<'a>> {
) -> Option<SolutionCandidates<'a, Sector>>
where
Sector: ReadAt + ?Sized,
{
let sector_id = SectorId::new(public_key.hash(), sector_index);

let sector_slot_challenge = sector_id.derive_sector_slot_challenge(global_challenge);
let s_bucket_audit_index = sector_slot_challenge.s_bucket_audit_index();
let s_bucket_audit_size = Scalar::FULL_BYTES
* usize::from(sector_metadata.s_bucket_sizes[usize::from(s_bucket_audit_index)]);
let s_bucket_audit_size =
usize::from(sector_metadata.s_bucket_sizes[usize::from(s_bucket_audit_index)]);
let s_bucket_audit_offset = Scalar::FULL_BYTES
* sector_metadata
.s_bucket_sizes
Expand All @@ -43,15 +48,19 @@ pub fn audit_sector<'a>(
let sector_contents_map_size =
SectorContentsMap::encoded_size(sector_metadata.pieces_in_sector);

// Read s-bucket
let s_bucket =
&sector[sector_contents_map_size + s_bucket_audit_offset..][..s_bucket_audit_size];
let s_bucket_audit_offset_in_sector = sector_contents_map_size + s_bucket_audit_offset;

// Map all winning chunks
let winning_chunks = s_bucket
.array_chunks::<{ Scalar::FULL_BYTES }>()
.enumerate()
.filter_map(|(chunk_offset, chunk)| {
let winning_chunks = (0..s_bucket_audit_size)
.filter_map(|chunk_offset| {
let mut chunk = [0; Scalar::FULL_BYTES];
if let Err(error) = sector.read_at(
&mut chunk,
s_bucket_audit_offset_in_sector + chunk_offset * Scalar::FULL_BYTES,
) {
warn!(%error, %sector_index, %chunk_offset, "Failed read chunk sector");
return None;
}
// Check all audit chunks within chunk, there might be more than one winning
let winning_audit_chunk_offsets = chunk
.array_chunks::<{ mem::size_of::<SolutionRange>() }>()
Expand Down
68 changes: 68 additions & 0 deletions crates/subspace-farmer-components/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,78 @@ pub mod reading;
pub mod sector;
mod segment_reconstruction;

use crate::file_ext::FileExt;
use serde::{Deserialize, Serialize};
use static_assertions::const_assert;
use std::fs::File;
use std::io;
use subspace_core_primitives::HistorySize;

/// Trait for reading data at specific offset
pub trait ReadAt: Send + Sync {
/// Get implementation of [`ReadAt`] that add specified offset to all attempted reads
fn offset(&self, offset: usize) -> ReadAtOffset<&Self>
where
Self: Sized,
{
ReadAtOffset {
inner: self,
offset,
}
}

/// Fill the buffer by reading bytes at a specific offset
fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()>;
}

impl ReadAt for [u8] {
fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
if buf.len() + offset > self.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Buffer length with offset exceeds own length",
));
}

buf.copy_from_slice(&self[offset..][..buf.len()]);

Ok(())
}
}

impl ReadAt for Vec<u8> {
fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
self.as_slice().read_at(buf, offset)
}
}

impl ReadAt for File {
fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
self.read_exact_at(buf, offset as u64)
}
}

impl ReadAt for &File {
fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
self.read_exact_at(buf, offset as u64)
}
}

/// Reader with fixed offset added to all attempted reads
pub struct ReadAtOffset<T> {
inner: T,
offset: usize,
}

impl<T> ReadAt for ReadAtOffset<T>
where
T: ReadAt,
{
fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
self.inner.read_at(buf, offset + self.offset)
}
}

// Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to
// usize depending on chain parameters
const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
Expand Down
Loading

0 comments on commit d3914a1

Please sign in to comment.