diff --git a/Cargo.lock b/Cargo.lock index 568fa7c277..b8de4db210 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7474,7 +7474,6 @@ dependencies = [ "futures 0.3.25", "futures-timer", "jsonrpsee", - "log", "parity-scale-codec", "parking_lot 0.12.1", "sc-client-api", @@ -7492,6 +7491,7 @@ dependencies = [ "subspace-farmer-components", "subspace-networking", "subspace-rpc-primitives", + "tracing", ] [[package]] diff --git a/crates/sc-consensus-subspace-rpc/Cargo.toml b/crates/sc-consensus-subspace-rpc/Cargo.toml index 05c50667d7..dd3776c106 100644 --- a/crates/sc-consensus-subspace-rpc/Cargo.toml +++ b/crates/sc-consensus-subspace-rpc/Cargo.toml @@ -17,7 +17,6 @@ async-oneshot = "0.5.0" futures = "0.3.25" futures-timer = "3.0.2" jsonrpsee = { version = "0.16.2", features = ["server", "macros"] } -log = "0.4.17" parity-scale-codec = "3.2.1" parking_lot = "0.12.1" sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" } @@ -35,3 +34,4 @@ subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primiti subspace-farmer-components = { version = "0.1.0", path = "../subspace-farmer-components" } subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-rpc-primitives = { version = "0.1.0", path = "../subspace-rpc-primitives" } +tracing = "0.1.37" diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index aba430c25c..2ef2b4cafd 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -24,7 +24,6 @@ use jsonrpsee::core::{async_trait, Error as JsonRpseeError, RpcResult}; use jsonrpsee::proc_macros::rpc; use jsonrpsee::types::SubscriptionResult; use jsonrpsee::SubscriptionSink; -use log::{error, warn}; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use sc_client_api::BlockBackend; @@ -41,13 +40,13 @@ use sp_core::crypto::ByteArray; use sp_core::H256; use sp_runtime::generic::BlockId; use sp_runtime::traits::{Block as BlockT, Zero}; -use std::marker::PhantomData; +use std::error::Error; use std::num::NonZeroU32; use std::sync::Arc; use std::time::Duration; use subspace_archiving::archiver::ArchivedSegment; use subspace_core_primitives::{ - RecordsRoot, SegmentIndex, Solution, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE, + RecordsRoot, RootBlock, SegmentIndex, Solution, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE, }; use subspace_farmer_components::FarmerProtocolInfo; use subspace_networking::libp2p::Multiaddr; @@ -55,6 +54,7 @@ use subspace_rpc_primitives::{ FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, MAX_SEGMENT_INDEXES_PER_REQUEST, }; +use tracing::{error, warn}; const SOLUTION_TIMEOUT: Duration = Duration::from_secs(2); const REWARD_SIGNING_TIMEOUT: Duration = Duration::from_millis(500); @@ -101,6 +101,12 @@ pub trait SubspaceRpcApi { &self, segment_indexes: Vec, ) -> RpcResult>>; + + #[method(name = "subspace_rootBlocks")] + async fn root_blocks( + &self, + segment_indexes: Vec, + ) -> RpcResult>>; } #[derive(Default)] @@ -115,8 +121,15 @@ struct BlockSignatureSenders { senders: Vec>, } +pub trait RootBlockProvider { + fn get_root_block( + &self, + segment_index: SegmentIndex, + ) -> Result, Box>; +} + /// Implements the [`SubspaceRpcApiServer`] trait for interacting with Subspace. -pub struct SubspaceRpc { +pub struct SubspaceRpc { client: Arc, executor: SubscriptionTaskExecutor, new_slot_notification_stream: SubspaceNotificationStream, @@ -126,7 +139,7 @@ pub struct SubspaceRpc { reward_signature_senders: Arc>, dsn_bootstrap_nodes: Vec, subspace_link: SubspaceLink, - _phantom: PhantomData, + root_block_provider: RBP, } /// [`SubspaceRpc`] is used for notifying subscribers about arrival of new slots and for @@ -136,7 +149,8 @@ pub struct SubspaceRpc { /// every subscriber, after which RPC server waits for the same number of /// `subspace_submitSolutionResponse` requests with `SolutionResponse` in them or until /// timeout is exceeded. The first valid solution for a particular slot wins, others are ignored. -impl SubspaceRpc { +impl SubspaceRpc { + #[allow(clippy::too_many_arguments)] /// Creates a new instance of the `SubspaceRpc` handler. pub fn new( client: Arc, @@ -148,6 +162,7 @@ impl SubspaceRpc { >, dsn_bootstrap_nodes: Vec, subspace_link: SubspaceLink, + root_block_provider: RBP, ) -> Self { Self { client, @@ -159,13 +174,13 @@ impl SubspaceRpc { reward_signature_senders: Arc::default(), dsn_bootstrap_nodes, subspace_link, - _phantom: PhantomData::default(), + root_block_provider, } } } #[async_trait] -impl SubspaceRpcApiServer for SubspaceRpc +impl SubspaceRpcApiServer for SubspaceRpc where Block: BlockT, Client: ProvideRuntimeApi @@ -175,6 +190,7 @@ where + Sync + 'static, Client::Api: SubspaceRuntimeApi, + RBP: RootBlockProvider + Send + Sync + 'static, { fn get_farmer_app_info(&self) -> RpcResult { let best_block_id = BlockId::Hash(self.client.info().best_hash); @@ -496,4 +512,42 @@ where records_root_result } + + async fn root_blocks( + &self, + segment_indexes: Vec, + ) -> RpcResult>> { + if segment_indexes.len() > MAX_SEGMENT_INDEXES_PER_REQUEST { + error!( + "segment_indexes length exceed the limit: {} ", + segment_indexes.len() + ); + + return Err(JsonRpseeError::Custom(format!( + "segment_indexes length exceed the limit {MAX_SEGMENT_INDEXES_PER_REQUEST}" + ))); + }; + + let records_root_result: Result, JsonRpseeError> = segment_indexes + .into_iter() + .map(|segment_index| { + let api_result = self + .root_block_provider + .get_root_block(segment_index) + .map_err(|_| { + JsonRpseeError::Custom( + "Internal error during `root_blocks` call".to_string(), + ) + }); + + api_result + }) + .collect(); + + if let Err(err) = &records_root_result { + error!(?err, "Failed to get root blocks."); + } + + records_root_result + } } diff --git a/crates/subspace-farmer/src/node_client.rs b/crates/subspace-farmer/src/node_client.rs index e7600a71cc..595611d6ef 100644 --- a/crates/subspace-farmer/src/node_client.rs +++ b/crates/subspace-farmer/src/node_client.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use futures::Stream; use std::pin::Pin; use subspace_archiving::archiver::ArchivedSegment; -use subspace_core_primitives::{RecordsRoot, SegmentIndex}; +use subspace_core_primitives::{RecordsRoot, RootBlock, SegmentIndex}; use subspace_rpc_primitives::{ FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; @@ -50,4 +50,10 @@ pub trait NodeClient: Clone + Send + Sync + 'static { &self, segment_indexes: Vec, ) -> Result>, Error>; + + /// Get root blocks for the segments + async fn root_blocks( + &self, + segment_indexes: Vec, + ) -> Result>, Error>; } diff --git a/crates/subspace-farmer/src/node_client/node_rpc_client.rs b/crates/subspace-farmer/src/node_client/node_rpc_client.rs index 8797a77f07..eeba5e0aac 100644 --- a/crates/subspace-farmer/src/node_client/node_rpc_client.rs +++ b/crates/subspace-farmer/src/node_client/node_rpc_client.rs @@ -8,7 +8,7 @@ use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; use std::pin::Pin; use std::sync::Arc; use subspace_archiving::archiver::ArchivedSegment; -use subspace_core_primitives::{RecordsRoot, SegmentIndex}; +use subspace_core_primitives::{RecordsRoot, RootBlock, SegmentIndex}; use subspace_rpc_primitives::{ FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; @@ -133,4 +133,14 @@ impl NodeClient for NodeRpcClient { .request("subspace_recordsRoots", rpc_params![&segment_indexes]) .await?) } + + async fn root_blocks( + &self, + segment_indexes: Vec, + ) -> Result>, RpcError> { + Ok(self + .client + .request("subspace_rootBlocks", rpc_params![&segment_indexes]) + .await?) + } } diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 3edc7a6b2e..aed925ea1e 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -19,11 +19,13 @@ pub mod dsn; pub mod piece_cache; +pub mod root_blocks; pub mod rpc; use crate::dsn::create_dsn_instance; use crate::dsn::import_blocks::import_blocks as import_blocks_from_dsn; use crate::piece_cache::PieceCache; +use crate::root_blocks::{start_root_block_archiver, RootBlockCache}; use derive_more::{Deref, DerefMut, Into}; use domain_runtime_primitives::Hash as DomainHash; use dsn::start_dsn_archiver; @@ -506,11 +508,26 @@ where ); task_manager.spawn_essential_handle().spawn_essential( - "archiver", + "dsn-archiver", Some("subspace-networking"), Box::pin(dsn_archiving_fut.in_current_span()), ); + let root_block_cache = RootBlockCache::new(client.clone()); + + let root_block_archiving_fut = start_root_block_archiver( + root_block_cache.clone(), + subspace_link + .archived_segment_notification_stream() + .subscribe(), + ); + + task_manager.spawn_essential_handle().spawn_essential( + "root-block-archiver", + Some("subspace-networking"), + Box::pin(root_block_archiving_fut.in_current_span()), + ); + let dsn_bootstrap_nodes = { // Fall back to node itself as bootstrap node for DSN so farmer always has someone to // connect to @@ -689,6 +706,7 @@ where .clone(), dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(), subspace_link: subspace_link.clone(), + root_blocks_provider: root_block_cache.clone(), }; rpc::create_full(deps).map_err(Into::into) diff --git a/crates/subspace-service/src/root_blocks.rs b/crates/subspace-service/src/root_blocks.rs new file mode 100644 index 0000000000..e89fbc94b5 --- /dev/null +++ b/crates/subspace-service/src/root_blocks.rs @@ -0,0 +1,91 @@ +use futures::{Stream, StreamExt}; +use parity_scale_codec::{Decode, Encode}; +use sc_client_api::backend::AuxStore; +use sc_consensus_subspace::ArchivedSegmentNotification; +use sc_consensus_subspace_rpc::RootBlockProvider; +use std::error::Error; +use std::sync::Arc; +use subspace_core_primitives::{RootBlock, SegmentIndex}; +use tracing::{debug, error, trace}; + +/// Start an archiver that will listen for archived segments and send root block to the storage +pub(crate) async fn start_root_block_archiver( + mut root_block_cache: RootBlockCache, + mut archived_segment_notification_stream: impl Stream + Unpin, +) { + trace!("Subspace root block archiver started."); + + while let Some(ArchivedSegmentNotification { + archived_segment, .. + }) = archived_segment_notification_stream.next().await + { + let segment_index = archived_segment.root_block.segment_index(); + let result = root_block_cache.add_root_block(archived_segment.root_block); + + if let Err(err) = result { + error!(%segment_index, ?err, "Root block archiving failed."); + } else { + debug!(%segment_index, "Root block archived."); + } + } +} + +/// Cache of recently produced root blocks in aux storage +pub struct RootBlockCache { + aux_store: Arc, +} + +impl Clone for RootBlockCache { + fn clone(&self) -> Self { + Self { + aux_store: self.aux_store.clone(), + } + } +} + +impl RootBlockCache +where + AS: AuxStore, +{ + const KEY_PREFIX: &[u8] = b"segment-headers-cache"; + + /// Create new instance. + pub fn new(aux_store: Arc) -> Self { + Self { aux_store } + } + + /// Add root block to cache (likely as the result of archiving) + pub fn add_root_block(&mut self, root_block: RootBlock) -> Result<(), Box> { + let key = Self::key(root_block.segment_index()); + let value = root_block.encode(); + let insert_data = vec![(key.as_slice(), value.as_slice())]; + + self.aux_store.insert_aux(&insert_data, &Vec::new())?; + + Ok(()) + } + + fn key(segment_index: SegmentIndex) -> Vec { + Self::key_from_bytes(&u64::to_be_bytes(segment_index)) + } + + fn key_from_bytes(bytes: &[u8]) -> Vec { + (Self::KEY_PREFIX, bytes).encode() + } +} + +impl RootBlockProvider for RootBlockCache { + /// Get root block from storage + fn get_root_block( + &self, + segment_index: SegmentIndex, + ) -> Result, Box> { + Ok(self + .aux_store + .get_aux(&Self::key(segment_index))? + .map(|root_block| { + RootBlock::decode(&mut root_block.as_slice()) + .expect("Always correct root block unless DB is corrupted; qed") + })) + } +} diff --git a/crates/subspace-service/src/rpc.rs b/crates/subspace-service/src/rpc.rs index 32e821f96d..7ae4a071f9 100644 --- a/crates/subspace-service/src/rpc.rs +++ b/crates/subspace-service/src/rpc.rs @@ -28,7 +28,7 @@ use sc_consensus_subspace::notification::SubspaceNotificationStream; use sc_consensus_subspace::{ ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification, SubspaceLink, }; -use sc_consensus_subspace_rpc::{SubspaceRpc, SubspaceRpcApiServer}; +use sc_consensus_subspace_rpc::{RootBlockProvider, SubspaceRpc, SubspaceRpcApiServer}; use sc_rpc::SubscriptionTaskExecutor; use sc_rpc_api::DenyUnsafe; use sc_rpc_spec_v2::chain_spec::{ChainSpec, ChainSpecApiServer}; @@ -44,7 +44,7 @@ use subspace_runtime_primitives::{AccountId, Balance, Index}; use substrate_frame_rpc_system::{System, SystemApiServer}; /// Full client dependencies. -pub struct FullDeps { +pub struct FullDeps { /// The client instance to use. pub client: Arc, /// Transaction pool instance. @@ -67,11 +67,13 @@ pub struct FullDeps { pub dsn_bootstrap_nodes: Vec, /// SubspaceLink shared state. pub subspace_link: SubspaceLink, + /// Root block provider. + pub root_blocks_provider: RBP, } /// Instantiate all full RPC extensions. -pub fn create_full( - deps: FullDeps, +pub fn create_full( + deps: FullDeps, ) -> Result, Box> where C: ProvideRuntimeApi @@ -86,6 +88,7 @@ where + BlockBuilder + sp_consensus_subspace::SubspaceApi, P: TransactionPool + 'static, + RPB: RootBlockProvider + Send + Sync + 'static, { let mut module = RpcModule::new(()); let FullDeps { @@ -99,6 +102,7 @@ where archived_segment_notification_stream, dsn_bootstrap_nodes, subspace_link, + root_blocks_provider, } = deps; let chain_name = chain_spec.name().to_string(); @@ -118,6 +122,7 @@ where archived_segment_notification_stream, dsn_bootstrap_nodes, subspace_link, + root_blocks_provider, ) .into_rpc(), )?;