Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment headers store #1692

Merged
merged 7 commits into from
Jul 26, 2023
99 changes: 97 additions & 2 deletions crates/sc-consensus-subspace/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use crate::{
get_chain_constants, ArchivedSegmentNotification, BlockImportingNotification, SubspaceLink,
SubspaceNotificationSender,
};
use codec::Encode;
use codec::{Decode, Encode};
use futures::StreamExt;
use log::{debug, error, info, warn};
use parking_lot::Mutex;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun};
Expand All @@ -33,11 +34,105 @@ use sp_objects::ObjectsApi;
use sp_runtime::generic::SignedBlock;
use sp_runtime::traits::{Block as BlockT, CheckedSub, Header, NumberFor, One, Zero};
use std::future::Future;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use subspace_archiving::archiver::{Archiver, NewArchivedSegment};
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::objects::BlockObjectMapping;
use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader};
use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader, SegmentIndex};

#[derive(Debug)]
struct SegmentHeadersStoreInner<AS> {
aux_store: Arc<AS>,
next_key_index: AtomicU16,
/// In-memory cache of segment headers
cache: Mutex<Vec<SegmentHeader>>,
}

/// Persistent storage of segment headers
#[derive(Debug)]
pub struct SegmentHeadersStore<AS> {
inner: Arc<SegmentHeadersStoreInner<AS>>,
}

impl<AS> Clone for SegmentHeadersStore<AS> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}

impl<AS> SegmentHeadersStore<AS>
where
AS: AuxStore,
{
const KEY_PREFIX: &[u8] = b"segment-headers";
const INITIAL_CACHE_CAPACITY: usize = 1_000;

/// Create new instance
pub fn new(aux_store: Arc<AS>) -> Result<Self, sp_blockchain::Error> {
let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
let mut next_key_index = 0;

while let Some(segment_headers) =
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved
aux_store
.get_aux(&Self::key(next_key_index))?
.map(|segment_header| {
Vec::<SegmentHeader>::decode(&mut segment_header.as_slice())
.expect("Always correct segment header unless DB is corrupted; qed")
})
{
cache.extend(segment_headers);
next_key_index += 1;
}

Ok(Self {
inner: Arc::new(SegmentHeadersStoreInner {
aux_store,
next_key_index: AtomicU16::new(next_key_index),
cache: Mutex::new(cache),
}),
})
}

/// Returns last observed segment index
pub fn max_segment_index(&self) -> SegmentIndex {
SegmentIndex::from(self.inner.cache.lock().len().saturating_sub(0) as u64)
}

/// Add segment headers
pub fn add_segment_headers(
&self,
segment_headers: &[SegmentHeader],
) -> Result<(), sp_blockchain::Error> {
// TODO: Check that segment headers are inserted sequentially
// TODO: Do compaction when we have too many keys: combine multiple segment headers into a
// single entry for faster retrievals and more compact storage
let key_index = self.inner.next_key_index.fetch_add(1, Ordering::SeqCst);
let key = Self::key(key_index);
let value = segment_headers.encode();
let insert_data = vec![(key.as_slice(), value.as_slice())];

self.inner.aux_store.insert_aux(&insert_data, &[])?;
self.inner.cache.lock().extend_from_slice(segment_headers);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean ever-growing memory usage? How do you estimate memory usage during the network life?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is estimated to be less than 1 GiB for 1 PiB of pieces, so should not be an issue for a very long time. We can always switch to loading data from disk, but it is much better to have in-memory cache.


Ok(())
}

/// Get a single segment header
pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
self.inner
.cache
.lock()
.get(u64::from(segment_index) as usize)
.copied()
}

fn key(key_index: u16) -> Vec<u8> {
(Self::KEY_PREFIX, key_index.to_le_bytes()).encode()
}
}

/// How deep (in segments) should block be in order to be finalized.
///
Expand Down
2 changes: 1 addition & 1 deletion crates/sc-consensus-subspace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::archiver::FINALIZATION_DEPTH_IN_SEGMENTS;
use crate::notification::{SubspaceNotificationSender, SubspaceNotificationStream};
use crate::slot_worker::SubspaceSlotWorker;
pub use crate::slot_worker::SubspaceSyncOracle;
pub use archiver::create_subspace_archiver;
pub use archiver::{create_subspace_archiver, SegmentHeadersStore};
use futures::channel::mpsc;
use futures::StreamExt;
use log::{debug, info, trace, warn};
Expand Down
16 changes: 2 additions & 14 deletions crates/subspace-core-primitives/src/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use serde::{Deserialize, Serialize};
Eq,
PartialEq,
Hash,
From,
Into,
Encode,
Decode,
Add,
Expand Down Expand Up @@ -58,20 +60,6 @@ impl Step for SegmentIndex {
}
}

impl From<u64> for SegmentIndex {
#[inline]
fn from(original: u64) -> Self {
Self(original)
}
}

impl From<SegmentIndex> for u64 {
#[inline]
fn from(original: SegmentIndex) -> Self {
original.0
}
}

impl SegmentIndex {
/// Segment index 0.
pub const ZERO: SegmentIndex = SegmentIndex(0);
Expand Down