Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add root block storage to node. #1120

Merged
merged 4 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

pub mod dsn;
pub mod piece_cache;
pub mod root_blocks;
pub mod rpc;

use crate::dsn::create_dsn_instance;
use crate::dsn::import_blocks::import_blocks as import_blocks_from_dsn;
use crate::piece_cache::PieceCache;
use crate::root_blocks::{start_root_block_archiver, RootBlockCache};
use derive_more::{Deref, DerefMut, Into};
use domain_runtime_primitives::Hash as DomainHash;
use dsn::start_dsn_archiver;
Expand Down Expand Up @@ -506,11 +508,24 @@ where
);

task_manager.spawn_essential_handle().spawn_essential(
"archiver",
"dsn-archiver",
Some("subspace-networking"),
Box::pin(dsn_archiving_fut.in_current_span()),
);

let root_block_archiving_fut = start_root_block_archiver(
RootBlockCache::new(client.clone()),
subspace_link
.archived_segment_notification_stream()
.subscribe(),
);

task_manager.spawn_essential_handle().spawn_essential(
"root-block-archiver",
Some("subspace-networking"),
Box::pin(root_block_archiving_fut.in_current_span()),
);

let dsn_bootstrap_nodes = {
// Fall back to node itself as bootstrap node for DSN so farmer always has someone to
// connect to
Expand Down
80 changes: 80 additions & 0 deletions crates/subspace-service/src/root_blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use futures::{Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use sc_client_api::backend::AuxStore;
use sc_consensus_subspace::ArchivedSegmentNotification;
use std::error::Error;
use std::sync::Arc;
use subspace_core_primitives::{RootBlock, SegmentIndex};
use tracing::{debug, error, trace};

/// Start an archiver that will listen for archived segments and send root block to the storage
pub(crate) async fn start_root_block_archiver<AS: AuxStore>(
mut root_block_cache: RootBlockCache<AS>,
mut archived_segment_notification_stream: impl Stream<Item = ArchivedSegmentNotification> + Unpin,
) {
trace!("Subspace root block archiver started.");

while let Some(ArchivedSegmentNotification {
archived_segment, ..
}) = archived_segment_notification_stream.next().await
{
let segment_index = archived_segment.root_block.segment_index();
let result = root_block_cache.add_root_block(archived_segment.root_block);

if let Err(err) = result {
error!(%segment_index, ?err, "Root block archiving failed.");
} else {
debug!(%segment_index, "Root block archived.");
}
}
}

/// Cache of recently produced root blocks in aux storage
pub struct RootBlockCache<AS> {
aux_store: Arc<AS>,
}

impl<AS> RootBlockCache<AS>
where
AS: AuxStore,
{
const KEY_PREFIX: &[u8] = b"root-block-cache";
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved

/// Create new instance.
pub fn new(aux_store: Arc<AS>) -> Self {
Self { aux_store }
}

/// Get root block from storage
pub fn get_root_block(
&self,
segment_index: SegmentIndex,
) -> Result<Option<RootBlock>, Box<dyn Error>> {
Ok(self
.aux_store
.get_aux(&Self::key(segment_index))?
.map(|root_block| {
RootBlock::decode(&mut root_block.as_slice())
.expect("Always correct root block unless DB is corrupted; qed")
}))
}

/// Add root block to cache (likely as the result of archiving)
pub fn add_root_block(&mut self, root_block: RootBlock) -> Result<(), Box<dyn Error>> {
let key = Self::key(root_block.segment_index());
let value = root_block.encode();
let insert_data = vec![(key.as_slice(), value.as_slice())];

self.aux_store.insert_aux(&insert_data, &Vec::new())?;

Ok(())
}

fn key(segment_index: SegmentIndex) -> Vec<u8> {
Self::key_from_bytes(&u64::to_be_bytes(segment_index))
}

fn key_from_bytes(bytes: &[u8]) -> Vec<u8> {
(Self::KEY_PREFIX, bytes).encode()
}
}