Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add root block storage to node. #1120

Merged
merged 4 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/sc-consensus-subspace-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ async-oneshot = "0.5.0"
futures = "0.3.25"
futures-timer = "3.0.2"
jsonrpsee = { version = "0.16.2", features = ["server", "macros"] }
log = "0.4.17"
parity-scale-codec = "3.2.1"
parking_lot = "0.12.1"
sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
Expand All @@ -35,3 +34,4 @@ subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primiti
subspace-farmer-components = { version = "0.1.0", path = "../subspace-farmer-components" }
subspace-networking = { version = "0.1.0", path = "../subspace-networking" }
subspace-rpc-primitives = { version = "0.1.0", path = "../subspace-rpc-primitives" }
tracing = "0.1.37"
70 changes: 62 additions & 8 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use jsonrpsee::core::{async_trait, Error as JsonRpseeError, RpcResult};
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::types::SubscriptionResult;
use jsonrpsee::SubscriptionSink;
use log::{error, warn};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use sc_client_api::BlockBackend;
Expand All @@ -41,20 +40,21 @@ use sp_core::crypto::ByteArray;
use sp_core::H256;
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{Block as BlockT, Zero};
use std::marker::PhantomData;
use std::error::Error;
use std::num::NonZeroU32;
use std::sync::Arc;
use std::time::Duration;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::{
RecordsRoot, SegmentIndex, Solution, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE,
RecordsRoot, RootBlock, SegmentIndex, Solution, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE,
};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_networking::libp2p::Multiaddr;
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
MAX_SEGMENT_INDEXES_PER_REQUEST,
};
use tracing::{error, warn};

const SOLUTION_TIMEOUT: Duration = Duration::from_secs(2);
const REWARD_SIGNING_TIMEOUT: Duration = Duration::from_millis(500);
Expand Down Expand Up @@ -101,6 +101,12 @@ pub trait SubspaceRpcApi {
&self,
segment_indexes: Vec<SegmentIndex>,
) -> RpcResult<Vec<Option<RecordsRoot>>>;

#[method(name = "subspace_rootBlocks")]
async fn root_blocks(
&self,
segment_indexes: Vec<SegmentIndex>,
) -> RpcResult<Vec<Option<RootBlock>>>;
}

#[derive(Default)]
Expand All @@ -115,8 +121,15 @@ struct BlockSignatureSenders {
senders: Vec<async_oneshot::Sender<RewardSignatureResponse>>,
}

pub trait RootBlockProvider {
fn get_root_block(
&self,
segment_index: SegmentIndex,
) -> Result<Option<RootBlock>, Box<dyn Error>>;
}

/// Implements the [`SubspaceRpcApiServer`] trait for interacting with Subspace.
pub struct SubspaceRpc<Block: BlockT, Client> {
pub struct SubspaceRpc<Block: BlockT, Client, RBP: RootBlockProvider> {
client: Arc<Client>,
executor: SubscriptionTaskExecutor,
new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
Expand All @@ -126,7 +139,7 @@ pub struct SubspaceRpc<Block: BlockT, Client> {
reward_signature_senders: Arc<Mutex<BlockSignatureSenders>>,
dsn_bootstrap_nodes: Vec<Multiaddr>,
subspace_link: SubspaceLink<Block>,
_phantom: PhantomData<Block>,
root_block_provider: RBP,
}

/// [`SubspaceRpc`] is used for notifying subscribers about arrival of new slots and for
Expand All @@ -136,7 +149,8 @@ pub struct SubspaceRpc<Block: BlockT, Client> {
/// every subscriber, after which RPC server waits for the same number of
/// `subspace_submitSolutionResponse` requests with `SolutionResponse` in them or until
/// timeout is exceeded. The first valid solution for a particular slot wins, others are ignored.
impl<Block: BlockT, Client> SubspaceRpc<Block, Client> {
impl<Block: BlockT, Client, RBP: RootBlockProvider> SubspaceRpc<Block, Client, RBP> {
#[allow(clippy::too_many_arguments)]
/// Creates a new instance of the `SubspaceRpc` handler.
pub fn new(
client: Arc<Client>,
Expand All @@ -148,6 +162,7 @@ impl<Block: BlockT, Client> SubspaceRpc<Block, Client> {
>,
dsn_bootstrap_nodes: Vec<Multiaddr>,
subspace_link: SubspaceLink<Block>,
root_block_provider: RBP,
) -> Self {
Self {
client,
Expand All @@ -159,13 +174,13 @@ impl<Block: BlockT, Client> SubspaceRpc<Block, Client> {
reward_signature_senders: Arc::default(),
dsn_bootstrap_nodes,
subspace_link,
_phantom: PhantomData::default(),
root_block_provider,
}
}
}

#[async_trait]
impl<Block, Client> SubspaceRpcApiServer for SubspaceRpc<Block, Client>
impl<Block, Client, RBP> SubspaceRpcApiServer for SubspaceRpc<Block, Client, RBP>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>
Expand All @@ -175,6 +190,7 @@ where
+ Sync
+ 'static,
Client::Api: SubspaceRuntimeApi<Block, FarmerPublicKey>,
RBP: RootBlockProvider + Send + Sync + 'static,
{
fn get_farmer_app_info(&self) -> RpcResult<FarmerAppInfo> {
let best_block_id = BlockId::Hash(self.client.info().best_hash);
Expand Down Expand Up @@ -496,4 +512,42 @@ where

records_root_result
}

async fn root_blocks(
&self,
segment_indexes: Vec<SegmentIndex>,
) -> RpcResult<Vec<Option<RootBlock>>> {
if segment_indexes.len() > MAX_SEGMENT_INDEXES_PER_REQUEST {
error!(
"segment_indexes length exceed the limit: {} ",
segment_indexes.len()
);

return Err(JsonRpseeError::Custom(format!(
"segment_indexes length exceed the limit {MAX_SEGMENT_INDEXES_PER_REQUEST}"
)));
};

let records_root_result: Result<Vec<_>, JsonRpseeError> = segment_indexes
.into_iter()
.map(|segment_index| {
let api_result = self
.root_block_provider
.get_root_block(segment_index)
.map_err(|_| {
JsonRpseeError::Custom(
"Internal error during `root_blocks` call".to_string(),
)
});

api_result
})
.collect();

if let Err(err) = &records_root_result {
error!(?err, "Failed to get root blocks.");
}

records_root_result
}
}
8 changes: 7 additions & 1 deletion crates/subspace-farmer/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;
use futures::Stream;
use std::pin::Pin;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::{RecordsRoot, SegmentIndex};
use subspace_core_primitives::{RecordsRoot, RootBlock, SegmentIndex};
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -50,4 +50,10 @@ pub trait NodeClient: Clone + Send + Sync + 'static {
&self,
segment_indexes: Vec<SegmentIndex>,
) -> Result<Vec<Option<RecordsRoot>>, Error>;

/// Get root blocks for the segments
async fn root_blocks(
&self,
segment_indexes: Vec<SegmentIndex>,
) -> Result<Vec<Option<RootBlock>>, Error>;
}
12 changes: 11 additions & 1 deletion crates/subspace-farmer/src/node_client/node_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use std::pin::Pin;
use std::sync::Arc;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::{RecordsRoot, SegmentIndex};
use subspace_core_primitives::{RecordsRoot, RootBlock, SegmentIndex};
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -133,4 +133,14 @@ impl NodeClient for NodeRpcClient {
.request("subspace_recordsRoots", rpc_params![&segment_indexes])
.await?)
}

async fn root_blocks(
&self,
segment_indexes: Vec<SegmentIndex>,
) -> Result<Vec<Option<RootBlock>>, RpcError> {
Ok(self
.client
.request("subspace_rootBlocks", rpc_params![&segment_indexes])
.await?)
}
}
20 changes: 19 additions & 1 deletion crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

pub mod dsn;
pub mod piece_cache;
pub mod root_blocks;
pub mod rpc;

use crate::dsn::create_dsn_instance;
use crate::dsn::import_blocks::import_blocks as import_blocks_from_dsn;
use crate::piece_cache::PieceCache;
use crate::root_blocks::{start_root_block_archiver, RootBlockCache};
use derive_more::{Deref, DerefMut, Into};
use domain_runtime_primitives::Hash as DomainHash;
use dsn::start_dsn_archiver;
Expand Down Expand Up @@ -506,11 +508,26 @@ where
);

task_manager.spawn_essential_handle().spawn_essential(
"archiver",
"dsn-archiver",
Some("subspace-networking"),
Box::pin(dsn_archiving_fut.in_current_span()),
);

let root_block_cache = RootBlockCache::new(client.clone());

let root_block_archiving_fut = start_root_block_archiver(
root_block_cache.clone(),
subspace_link
.archived_segment_notification_stream()
.subscribe(),
);

task_manager.spawn_essential_handle().spawn_essential(
"root-block-archiver",
Some("subspace-networking"),
Box::pin(root_block_archiving_fut.in_current_span()),
);

let dsn_bootstrap_nodes = {
// Fall back to node itself as bootstrap node for DSN so farmer always has someone to
// connect to
Expand Down Expand Up @@ -689,6 +706,7 @@ where
.clone(),
dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(),
subspace_link: subspace_link.clone(),
root_blocks_provider: root_block_cache.clone(),
};

rpc::create_full(deps).map_err(Into::into)
Expand Down
91 changes: 91 additions & 0 deletions crates/subspace-service/src/root_blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use futures::{Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use sc_client_api::backend::AuxStore;
use sc_consensus_subspace::ArchivedSegmentNotification;
use sc_consensus_subspace_rpc::RootBlockProvider;
use std::error::Error;
use std::sync::Arc;
use subspace_core_primitives::{RootBlock, SegmentIndex};
use tracing::{debug, error, trace};

/// Start an archiver that will listen for archived segments and send root block to the storage
pub(crate) async fn start_root_block_archiver<AS: AuxStore>(
mut root_block_cache: RootBlockCache<AS>,
mut archived_segment_notification_stream: impl Stream<Item = ArchivedSegmentNotification> + Unpin,
) {
trace!("Subspace root block archiver started.");

while let Some(ArchivedSegmentNotification {
archived_segment, ..
}) = archived_segment_notification_stream.next().await
{
let segment_index = archived_segment.root_block.segment_index();
let result = root_block_cache.add_root_block(archived_segment.root_block);

if let Err(err) = result {
error!(%segment_index, ?err, "Root block archiving failed.");
} else {
debug!(%segment_index, "Root block archived.");
}
}
}

/// Cache of recently produced root blocks in aux storage
pub struct RootBlockCache<AS> {
aux_store: Arc<AS>,
}

impl<AS> Clone for RootBlockCache<AS> {
fn clone(&self) -> Self {
Self {
aux_store: self.aux_store.clone(),
}
}
}

impl<AS> RootBlockCache<AS>
where
AS: AuxStore,
{
const KEY_PREFIX: &[u8] = b"segment-headers-cache";

/// Create new instance.
pub fn new(aux_store: Arc<AS>) -> Self {
Self { aux_store }
}

/// Add root block to cache (likely as the result of archiving)
pub fn add_root_block(&mut self, root_block: RootBlock) -> Result<(), Box<dyn Error>> {
let key = Self::key(root_block.segment_index());
let value = root_block.encode();
let insert_data = vec![(key.as_slice(), value.as_slice())];

self.aux_store.insert_aux(&insert_data, &Vec::new())?;

Ok(())
}

fn key(segment_index: SegmentIndex) -> Vec<u8> {
Self::key_from_bytes(&u64::to_be_bytes(segment_index))
}

fn key_from_bytes(bytes: &[u8]) -> Vec<u8> {
(Self::KEY_PREFIX, bytes).encode()
}
}

impl<AS: AuxStore> RootBlockProvider for RootBlockCache<AS> {
/// Get root block from storage
fn get_root_block(
&self,
segment_index: SegmentIndex,
) -> Result<Option<RootBlock>, Box<dyn Error>> {
Ok(self
.aux_store
.get_aux(&Self::key(segment_index))?
.map(|root_block| {
RootBlock::decode(&mut root_block.as_slice())
.expect("Always correct root block unless DB is corrupted; qed")
}))
}
}
Loading