Skip to content

Commit

Permalink
Merge pull request #3301 from autonomys/refactor-piece-handling
Browse files Browse the repository at this point in the history
Simplify source piece & data handling during object retrieval
  • Loading branch information
nazar-pc authored Dec 10, 2024
2 parents 436f462 + 9c4a417 commit e5fb80a
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 39 deletions.
45 changes: 32 additions & 13 deletions crates/subspace-core-primitives/src/pieces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ impl PieceIndex {
/// Piece index 1.
pub const ONE: PieceIndex = PieceIndex(1);

/// Create new instance
#[inline]
pub const fn new(n: u64) -> Self {
Self(n)
}

/// Create piece index from bytes.
#[inline]
pub const fn from_bytes(bytes: [u8; Self::SIZE]) -> Self {
Expand All @@ -114,8 +120,8 @@ impl PieceIndex {

/// Segment index piece index corresponds to
#[inline]
pub fn segment_index(&self) -> SegmentIndex {
SegmentIndex::from(self.0 / ArchivedHistorySegment::NUM_PIECES as u64)
pub const fn segment_index(&self) -> SegmentIndex {
SegmentIndex::new(self.0 / ArchivedHistorySegment::NUM_PIECES as u64)
}

/// Position of a piece in a segment
Expand All @@ -130,28 +136,41 @@ impl PieceIndex {
#[inline]
pub const fn source_position(&self) -> u32 {
assert!(self.is_source());
self.position() / (Self::source_ratio() as u32)

let source_start = self.position() / RecordedHistorySegment::ERASURE_CODING_RATE.1 as u32
* RecordedHistorySegment::ERASURE_CODING_RATE.0 as u32;
let source_offset = self.position() % RecordedHistorySegment::ERASURE_CODING_RATE.1 as u32;

source_start + source_offset
}

/// Returns the piece index for a source position and segment index.
/// Panics if the piece is not a source piece.
#[inline]
pub const fn from_source_position(
source_position: u32,
segment_index: SegmentIndex,
) -> PieceIndex {
let source_position = source_position as u64;
let start = source_position / RecordedHistorySegment::ERASURE_CODING_RATE.0 as u64
* RecordedHistorySegment::ERASURE_CODING_RATE.1 as u64;
let offset = source_position % RecordedHistorySegment::ERASURE_CODING_RATE.0 as u64;

PieceIndex(segment_index.first_piece_index().0 + start + offset)
}

/// Is this piece index a source piece?
#[inline]
pub const fn is_source(&self) -> bool {
// Source pieces are interleaved with parity pieces, source first
self.0 % Self::source_ratio() == 0
self.0 % (RecordedHistorySegment::ERASURE_CODING_RATE.1 as u64)
< (RecordedHistorySegment::ERASURE_CODING_RATE.0 as u64)
}

/// Returns the next source piece index
#[inline]
pub const fn next_source_index(&self) -> PieceIndex {
PieceIndex(self.0 + Self::source_ratio() - (self.0 % Self::source_ratio()))
}

/// The ratio of source pieces to all pieces
#[inline]
const fn source_ratio() -> u64 {
// Assumes the result is an integer
(RecordedHistorySegment::ERASURE_CODING_RATE.1
/ RecordedHistorySegment::ERASURE_CODING_RATE.0) as u64
PieceIndex::from_source_position(self.source_position() + 1, self.segment_index())
}
}

Expand Down
10 changes: 6 additions & 4 deletions crates/subspace-core-primitives/src/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,15 @@ impl SegmentIndex {
}

/// Get the first piece index in this segment.
pub fn first_piece_index(&self) -> PieceIndex {
PieceIndex::from(self.0 * ArchivedHistorySegment::NUM_PIECES as u64)
#[inline]
pub const fn first_piece_index(&self) -> PieceIndex {
PieceIndex::new(self.0 * ArchivedHistorySegment::NUM_PIECES as u64)
}

/// Get the last piece index in this segment.
pub fn last_piece_index(&self) -> PieceIndex {
PieceIndex::from((self.0 + 1) * ArchivedHistorySegment::NUM_PIECES as u64 - 1)
#[inline]
pub const fn last_piece_index(&self) -> PieceIndex {
PieceIndex::new((self.0 + 1) * ArchivedHistorySegment::NUM_PIECES as u64 - 1)
}

/// List of piece indexes that belong to this segment.
Expand Down
41 changes: 19 additions & 22 deletions shared/subspace-data-retrieval/src/object_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,18 +278,22 @@ where
.read_piece(next_source_piece_index, piece_index, piece_offset)
.await?;
next_source_piece_index = next_source_piece_index.next_source_index();
read_records_data.extend(piece.record().to_raw_record_chunks().flatten().copied());
// Discard piece data before the offset
read_records_data.extend(
piece
.record()
.to_raw_record_chunks()
.flatten()
.skip(piece_offset as usize)
.copied(),
);

if last_data_piece_in_segment {
// The last 2 bytes might contain segment padding, so we can't use them for object length or object data.
read_records_data.truncate(RawRecord::SIZE - 2);
read_records_data.truncate(read_records_data.len() - 2);
}

let data_length = self.decode_data_length(
&read_records_data[piece_offset as usize..],
piece_index,
piece_offset,
)?;
let data_length = self.decode_data_length(&read_records_data, piece_index, piece_offset)?;

let data_length = if let Some(data_length) = data_length {
data_length
Expand All @@ -311,12 +315,8 @@ where
next_source_piece_index = next_source_piece_index.next_source_index();
read_records_data.extend(piece.record().to_raw_record_chunks().flatten().copied());

self.decode_data_length(
&read_records_data[piece_offset as usize..],
piece_index,
piece_offset,
)?
.expect("Extra RawRecord is larger than the length encoding; qed")
self.decode_data_length(&read_records_data, piece_index, piece_offset)?
.expect("Extra RawRecord is larger than the length encoding; qed")
} else {
trace!(
piece_position_in_segment,
Expand Down Expand Up @@ -347,14 +347,10 @@ where
return Ok(None);
}

// Discard piece data before the offset
let mut data = read_records_data[piece_offset as usize..].to_vec();
drop(read_records_data);

// Read more pieces until we have enough data
if data_length as usize > data.len() {
if data_length as usize > read_records_data.len() {
let remaining_piece_count =
(data_length as usize - data.len()).div_ceil(RawRecord::SIZE);
(data_length as usize - read_records_data.len()).div_ceil(RawRecord::SIZE);
let remaining_piece_indexes = (next_source_piece_index..)
.filter(|i| i.is_source())
.take(remaining_piece_count)
Expand All @@ -363,14 +359,15 @@ where
.await?
.into_iter()
.for_each(|piece| {
data.extend(piece.record().to_raw_record_chunks().flatten().copied())
read_records_data
.extend(piece.record().to_raw_record_chunks().flatten().copied())
});
}

// Decode the data, and return it if it's valid
let data = Vec::<u8>::decode(&mut data.as_slice())?;
let read_records_data = Vec::<u8>::decode(&mut read_records_data.as_slice())?;

Ok(Some(data))
Ok(Some(read_records_data))
}

/// Fetch and assemble an object that can cross segment boundaries, which requires assembling
Expand Down

0 comments on commit e5fb80a

Please sign in to comment.