diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index d099333012..65a128dd98 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -26,11 +26,11 @@ use jsonrpsee::types::SubscriptionResult; use jsonrpsee::SubscriptionSink; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; -use sc_client_api::BlockBackend; +use sc_client_api::{AuxStore, BlockBackend}; use sc_consensus_subspace::notification::SubspaceNotificationStream; use sc_consensus_subspace::{ - ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification, SubspaceLink, - SubspaceSyncOracle, + ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification, + SegmentHeadersStore, SubspaceSyncOracle, }; use sc_rpc::SubscriptionTaskExecutor; use sc_utils::mpsc::TracingUnboundedSender; @@ -41,21 +41,20 @@ use sp_consensus_slots::Slot; use sp_consensus_subspace::{FarmerPublicKey, FarmerSignature, SubspaceApi as SubspaceRuntimeApi}; use sp_core::crypto::ByteArray; use sp_core::H256; -use sp_runtime::traits::{Block as BlockT, Zero}; +use sp_runtime::traits::Block as BlockT; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::error::Error; +use std::marker::PhantomData; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; -use subspace_core_primitives::{ - Piece, PieceIndex, SegmentCommitment, SegmentHeader, SegmentIndex, Solution, -}; +use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex, Solution}; use subspace_farmer_components::FarmerProtocolInfo; use subspace_networking::libp2p::Multiaddr; use subspace_rpc_primitives::{ FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, - SolutionResponse, MAX_SEGMENT_INDEXES_PER_REQUEST, + SolutionResponse, MAX_SEGMENT_HEADERS_PER_REQUEST, }; use tracing::{debug, error, warn}; @@ -108,12 +107,6 @@ pub trait SubspaceRpcApi { )] fn subscribe_node_sync_status_change(&self); - #[method(name = "subspace_segmentCommitments")] - async fn segment_commitments( - &self, - segment_indexes: Vec, - ) -> RpcResult>>; - #[method(name = "subspace_segmentHeaders")] async fn segment_headers( &self, @@ -148,13 +141,6 @@ struct BlockSignatureSenders { senders: Vec>, } -pub trait SegmentHeaderProvider { - fn get_segment_header( - &self, - segment_index: SegmentIndex, - ) -> Result, Box>; -} - pub trait PieceProvider { fn get_piece_by_index( &self, @@ -163,10 +149,9 @@ pub trait PieceProvider { } /// Implements the [`SubspaceRpcApiServer`] trait for interacting with Subspace. -pub struct SubspaceRpc +pub struct SubspaceRpc where Block: BlockT, - RBP: SegmentHeaderProvider, PP: PieceProvider, SO: SyncOracle + Send + Sync + Clone + 'static, { @@ -178,13 +163,13 @@ where solution_response_senders: Arc>, reward_signature_senders: Arc>, dsn_bootstrap_nodes: Vec, - subspace_link: SubspaceLink, - segment_header_provider: RBP, + segment_headers_store: SegmentHeadersStore, piece_provider: Option, archived_segment_acknowledgement_senders: Arc>, next_subscription_id: AtomicU64, sync_oracle: SubspaceSyncOracle, + _block: PhantomData, } /// [`SubspaceRpc`] is used for notifying subscribers about arrival of new slots and for @@ -194,12 +179,12 @@ where /// every subscriber, after which RPC server waits for the same number of /// `subspace_submitSolutionResponse` requests with `SolutionResponse` in them or until /// timeout is exceeded. The first valid solution for a particular slot wins, others are ignored. -impl SubspaceRpc +impl SubspaceRpc where Block: BlockT, - RBP: SegmentHeaderProvider, PP: PieceProvider, SO: SyncOracle + Send + Sync + Clone + 'static, + AS: AuxStore + Send + Sync + 'static, { #[allow(clippy::too_many_arguments)] /// Creates a new instance of the `SubspaceRpc` handler. @@ -212,8 +197,7 @@ where ArchivedSegmentNotification, >, dsn_bootstrap_nodes: Vec, - subspace_link: SubspaceLink, - segment_header_provider: RBP, + segment_headers_store: SegmentHeadersStore, piece_provider: Option, sync_oracle: SubspaceSyncOracle, ) -> Self { @@ -226,18 +210,18 @@ where solution_response_senders: Arc::default(), reward_signature_senders: Arc::default(), dsn_bootstrap_nodes, - subspace_link, - segment_header_provider, + segment_headers_store, piece_provider, archived_segment_acknowledgement_senders: Arc::default(), next_subscription_id: AtomicU64::default(), sync_oracle, + _block: PhantomData, } } } #[async_trait] -impl SubspaceRpcApiServer for SubspaceRpc +impl SubspaceRpcApiServer for SubspaceRpc where Block: BlockT, Client: ProvideRuntimeApi @@ -247,9 +231,9 @@ where + Sync + 'static, Client::Api: SubspaceRuntimeApi, - RBP: SegmentHeaderProvider + Send + Sync + 'static, PP: PieceProvider + Send + Sync + 'static, SO: SyncOracle + Send + Sync + Clone + 'static, + AS: AuxStore + Send + Sync + 'static, { fn get_farmer_app_info(&self) -> RpcResult { let best_hash = self.client.info().best_hash; @@ -653,93 +637,25 @@ where Ok(()) } - // TODO: Remove as unnecessary, `segment_headers` can be used instead - async fn segment_commitments( - &self, - segment_indexes: Vec, - ) -> RpcResult>> { - if segment_indexes.len() > MAX_SEGMENT_INDEXES_PER_REQUEST { - error!( - "segment_indexes length exceed the limit: {} ", - segment_indexes.len() - ); - - return Err(JsonRpseeError::Custom(format!( - "segment_indexes length exceed the limit {MAX_SEGMENT_INDEXES_PER_REQUEST}" - ))); - }; - - let runtime_api = self.client.runtime_api(); - let best_hash = self.client.info().best_hash; - let best_block_number = self.client.info().best_number; - - let segment_commitment_result: Result, JsonRpseeError> = segment_indexes - .into_iter() - .map(|segment_index| { - let api_result = runtime_api - .segment_commitment(best_hash, segment_index) - .map_err(|_| { - JsonRpseeError::Custom( - "Internal error during `segment_commitment` call".to_string(), - ) - }); - - api_result.map(|maybe_segment_commitment| { - // This is not a very nice hack due to the fact that at the time first block is - // produced extrinsics with segment headers are not yet in runtime. - if maybe_segment_commitment.is_none() && best_block_number.is_zero() { - self.subspace_link - .segment_commitment_by_segment_index(segment_index) - } else { - maybe_segment_commitment - } - }) - }) - .collect(); - - if let Err(ref err) = segment_commitment_result { - error!( - "Failed to get data from runtime API (segment_commitment): {}", - err - ); - } - - segment_commitment_result - } - async fn segment_headers( &self, segment_indexes: Vec, ) -> RpcResult>> { - if segment_indexes.len() > MAX_SEGMENT_INDEXES_PER_REQUEST { + if segment_indexes.len() > MAX_SEGMENT_HEADERS_PER_REQUEST { error!( "segment_indexes length exceed the limit: {} ", segment_indexes.len() ); return Err(JsonRpseeError::Custom(format!( - "segment_indexes length exceed the limit {MAX_SEGMENT_INDEXES_PER_REQUEST}" + "segment_indexes length exceed the limit {MAX_SEGMENT_HEADERS_PER_REQUEST}" ))); }; - let segment_commitment_result: Result, JsonRpseeError> = segment_indexes + Ok(segment_indexes .into_iter() - .map(|segment_index| { - self.segment_header_provider - .get_segment_header(segment_index) - .map_err(|_| { - JsonRpseeError::Custom( - "Internal error during `segment_headers` call".to_string(), - ) - }) - }) - .collect(); - - if let Err(err) = &segment_commitment_result { - error!(?err, "Failed to get segment headers."); - } - - segment_commitment_result + .map(|segment_index| self.segment_headers_store.get_segment_header(segment_index)) + .collect()) } fn piece(&self, piece_index: PieceIndex) -> RpcResult>> { diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index ebb3e7558e..ca6cda1925 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -18,9 +18,10 @@ use crate::{ get_chain_constants, ArchivedSegmentNotification, BlockImportingNotification, SubspaceLink, SubspaceNotificationSender, }; -use codec::Encode; +use codec::{Decode, Encode}; use futures::StreamExt; use log::{debug, error, info, warn}; +use parking_lot::Mutex; use rand::prelude::*; use rand_chacha::ChaCha8Rng; use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun}; @@ -33,11 +34,113 @@ use sp_objects::ObjectsApi; use sp_runtime::generic::SignedBlock; use sp_runtime::traits::{Block as BlockT, CheckedSub, Header, NumberFor, One, Zero}; use std::future::Future; +use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; use subspace_archiving::archiver::{Archiver, NewArchivedSegment}; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::objects::BlockObjectMapping; -use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader}; +use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader, SegmentIndex}; + +#[derive(Debug)] +struct SegmentHeadersStoreInner { + aux_store: Arc, + next_key_index: AtomicU16, + /// In-memory cache of segment headers + cache: Mutex>, +} + +/// Persistent storage of segment headers +#[derive(Debug)] +pub struct SegmentHeadersStore { + inner: Arc>, +} + +impl Clone for SegmentHeadersStore { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +impl SegmentHeadersStore +where + AS: AuxStore, +{ + const KEY_PREFIX: &[u8] = b"segment-headers"; + const INITIAL_CACHE_CAPACITY: usize = 1_000; + + /// Create new instance + pub fn new(aux_store: Arc) -> Result { + let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY); + let mut next_key_index = 0; + + debug!( + target: "subspace", + "Started loading segment headers into cache" + ); + while let Some(segment_headers) = + aux_store + .get_aux(&Self::key(next_key_index))? + .map(|segment_header| { + Vec::::decode(&mut segment_header.as_slice()) + .expect("Always correct segment header unless DB is corrupted; qed") + }) + { + cache.extend(segment_headers); + next_key_index += 1; + } + debug!( + target: "subspace", + "Finished loading segment headers into cache" + ); + + Ok(Self { + inner: Arc::new(SegmentHeadersStoreInner { + aux_store, + next_key_index: AtomicU16::new(next_key_index), + cache: Mutex::new(cache), + }), + }) + } + + /// Returns last observed segment index + pub fn max_segment_index(&self) -> SegmentIndex { + SegmentIndex::from(self.inner.cache.lock().len().saturating_sub(0) as u64) + } + + /// Add segment headers + pub fn add_segment_headers( + &self, + segment_headers: &[SegmentHeader], + ) -> Result<(), sp_blockchain::Error> { + // TODO: Check that segment headers are inserted sequentially + // TODO: Do compaction when we have too many keys: combine multiple segment headers into a + // single entry for faster retrievals and more compact storage + let key_index = self.inner.next_key_index.fetch_add(1, Ordering::SeqCst); + let key = Self::key(key_index); + let value = segment_headers.encode(); + let insert_data = vec![(key.as_slice(), value.as_slice())]; + + self.inner.aux_store.insert_aux(&insert_data, &[])?; + self.inner.cache.lock().extend_from_slice(segment_headers); + + Ok(()) + } + + /// Get a single segment header + pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option { + self.inner + .cache + .lock() + .get(u64::from(segment_index) as usize) + .copied() + } + + fn key(key_index: u16) -> Vec { + (Self::KEY_PREFIX, key_index.to_le_bytes()).encode() + } +} /// How deep (in segments) should block be in order to be finalized. /// @@ -193,9 +296,10 @@ where best_archived_block: (Block::Hash, NumberFor), } -fn initialize_archiver( +fn initialize_archiver( best_block_hash: Block::Hash, best_block_number: NumberFor, + segment_headers_store: &SegmentHeadersStore, subspace_link: &SubspaceLink, client: &Client, kzg: Kzg, @@ -204,6 +308,7 @@ where Block: BlockT, Client: ProvideRuntimeApi + BlockBackend + HeaderBackend + AuxStore, Client::Api: SubspaceApi + ObjectsApi, + AS: AuxStore, { let confirmation_depth_k = get_chain_constants(client) .expect("Must always be able to get chain constants") @@ -351,6 +456,11 @@ where older_archived_segments.extend(archived_segments); if !new_segment_headers.is_empty() { + if let Err(error) = + segment_headers_store.add_segment_headers(&new_segment_headers) + { + panic!("Failed to store segment headers: {error}"); + } // Set list of expected segment headers for the block where we expect segment // header extrinsic to be included subspace_link.segment_headers.lock().put( @@ -421,7 +531,8 @@ fn finalize_block( /// `store_segment_header` extrinsic). /// /// NOTE: Archiver is doing blocking operations and must run in a dedicated task. -pub fn create_subspace_archiver( +pub fn create_subspace_archiver( + segment_headers_store: SegmentHeadersStore, subspace_link: &SubspaceLink, client: Arc, telemetry: Option, @@ -439,6 +550,7 @@ where + Sync + 'static, Client::Api: SubspaceApi + ObjectsApi, + AS: AuxStore + Send + Sync + 'static, { let client_info = client.info(); let best_block_hash = client_info.best_hash; @@ -452,6 +564,7 @@ where } = initialize_archiver( best_block_hash, best_block_number, + &segment_headers_store, subspace_link, client.as_ref(), subspace_link.kzg.clone(), @@ -572,6 +685,14 @@ where } if !new_segment_headers.is_empty() { + if let Err(error) = segment_headers_store.add_segment_headers(&new_segment_headers) + { + error!( + target: "subspace", + "Failed to store segment headers: {error}" + ); + return; + } let maybe_block_number_to_finalize = { let mut segment_headers = segment_headers.lock(); segment_headers.put(block_number + One::one(), new_segment_headers); diff --git a/crates/sc-consensus-subspace/src/aux_schema.rs b/crates/sc-consensus-subspace/src/aux_schema.rs index 98d34b8e64..2485ee2996 100644 --- a/crates/sc-consensus-subspace/src/aux_schema.rs +++ b/crates/sc-consensus-subspace/src/aux_schema.rs @@ -21,7 +21,7 @@ use codec::{Decode, Encode}; use sc_client_api::backend::AuxStore; use sp_blockchain::{Error as ClientError, Result as ClientResult}; use sp_consensus_subspace::ChainConstants; -use subspace_core_primitives::{BlockWeight, SegmentCommitment, SegmentIndex}; +use subspace_core_primitives::BlockWeight; fn load_decode(backend: &B, key: &[u8]) -> ClientResult> where @@ -63,32 +63,6 @@ pub(crate) fn load_block_weight( load_decode(backend, block_weight_key(block_hash).as_slice()) } -/// The aux storage key used to store the segment commitment of the given segment. -fn segment_commitment_key(segment_index: SegmentIndex) -> Vec { - (b"segment_commitment", segment_index).encode() -} - -/// Write the cumulative segment commitment of a segment to aux storage. -pub(crate) fn write_segment_commitment( - segment_index: SegmentIndex, - segment_commitment: &SegmentCommitment, - write_aux: F, -) -> R -where - F: FnOnce(&[(Vec, &[u8])]) -> R, -{ - let key = segment_commitment_key(segment_index); - segment_commitment.using_encoded(|s| write_aux(&[(key, s)])) -} - -/// Load the cumulative chain-weight associated with a block. -pub(crate) fn load_segment_commitment( - backend: &B, - segment_index: SegmentIndex, -) -> ClientResult> { - load_decode(backend, segment_commitment_key(segment_index).as_slice()) -} - /// The aux storage key used to store the chain constants. fn chain_constants_key() -> Vec { b"chain_constants".encode() diff --git a/crates/sc-consensus-subspace/src/lib.rs b/crates/sc-consensus-subspace/src/lib.rs index 87e7b67dca..ffcbcfdb21 100644 --- a/crates/sc-consensus-subspace/src/lib.rs +++ b/crates/sc-consensus-subspace/src/lib.rs @@ -31,7 +31,7 @@ use crate::archiver::FINALIZATION_DEPTH_IN_SEGMENTS; use crate::notification::{SubspaceNotificationSender, SubspaceNotificationStream}; use crate::slot_worker::SubspaceSlotWorker; pub use crate::slot_worker::SubspaceSyncOracle; -pub use archiver::create_subspace_archiver; +pub use archiver::{create_subspace_archiver, SegmentHeadersStore}; use futures::channel::mpsc; use futures::StreamExt; use log::{debug, info, trace, warn}; @@ -78,8 +78,8 @@ use std::sync::Arc; use subspace_archiving::archiver::NewArchivedSegment; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{ - HistorySize, PublicKey, Randomness, SectorId, SegmentCommitment, SegmentHeader, SegmentIndex, - Solution, SolutionRange, + HistorySize, PublicKey, Randomness, SectorId, SegmentHeader, SegmentIndex, Solution, + SolutionRange, }; use subspace_proof_of_space::Table; use subspace_solving::REWARD_SIGNING_CONTEXT; @@ -348,7 +348,7 @@ where } /// Parameters for Subspace. -pub struct SubspaceParams +pub struct SubspaceParams where SO: SyncOracle + Send + Sync, { @@ -384,6 +384,9 @@ where /// The source of timestamps for relative slots pub subspace_link: SubspaceLink, + /// Persistent storage of segment headers + pub segment_headers_store: SegmentHeadersStore, + /// The proportion of the slot dedicated to proposing. /// /// The block proposing will be limited to this proportion of the slot from the starting of the @@ -400,7 +403,7 @@ where } /// Start the Subspace worker. -pub fn start_subspace( +pub fn start_subspace( SubspaceParams { client, select_chain, @@ -412,10 +415,11 @@ pub fn start_subspace force_authoring, backoff_authoring_blocks, subspace_link, + segment_headers_store, block_proposal_slot_portion, max_block_proposal_slot_portion, telemetry, - }: SubspaceParams, + }: SubspaceParams, ) -> Result where PosTable: Table, @@ -442,6 +446,7 @@ where CIDP: CreateInherentDataProviders + Send + Sync + 'static, CIDP::InherentDataProviders: InherentDataProviderExt + Send, BS: BackoffAuthoringBlocksStrategy> + Send + Sync + 'static, + AS: AuxStore + Send + Sync + 'static, Error: std::error::Error + Send + From + From + 'static, { let worker = SubspaceSlotWorker { @@ -457,6 +462,7 @@ where block_proposal_slot_portion, max_block_proposal_slot_portion, telemetry, + segment_headers_store, _pos_table: PhantomData::, }; @@ -552,25 +558,6 @@ impl SubspaceLink { .cloned() .unwrap_or_default() } - - /// Get the first found segment commitment by segment index. - pub fn segment_commitment_by_segment_index( - &self, - segment_index: SegmentIndex, - ) -> Option { - self.segment_headers - .lock() - .iter() - .find_map(|(_block_number, segment_headers)| { - segment_headers.iter().find_map(|segment_header| { - if segment_header.segment_index() == segment_index { - Some(segment_header.segment_commitment()) - } else { - None - } - }) - }) - } } /// A verifier for Subspace blocks. @@ -798,18 +785,22 @@ where } /// A block-import handler for Subspace. -pub struct SubspaceBlockImport { +pub struct SubspaceBlockImport +where + Block: BlockT, +{ inner: I, client: Arc, block_importing_notification_sender: SubspaceNotificationSender>, subspace_link: SubspaceLink, create_inherent_data_providers: CIDP, + segment_headers_store: SegmentHeadersStore, _pos_table: PhantomData, } -impl Clone - for SubspaceBlockImport +impl Clone + for SubspaceBlockImport where Block: BlockT, I: Clone, @@ -822,18 +813,20 @@ where block_importing_notification_sender: self.block_importing_notification_sender.clone(), subspace_link: self.subspace_link.clone(), create_inherent_data_providers: self.create_inherent_data_providers.clone(), + segment_headers_store: self.segment_headers_store.clone(), _pos_table: PhantomData, } } } -impl SubspaceBlockImport +impl SubspaceBlockImport where PosTable: Table, Block: BlockT, Client: ProvideRuntimeApi + BlockBackend + HeaderBackend + AuxStore, Client::Api: BlockBuilderApi + SubspaceApi + ApiExt, CIDP: CreateInherentDataProviders> + Send + Sync + 'static, + AS: AuxStore + Send + Sync + 'static, { fn new( client: Arc, @@ -843,6 +836,7 @@ where >, subspace_link: SubspaceLink, create_inherent_data_providers: CIDP, + segment_headers_store: SegmentHeadersStore, ) -> Self { SubspaceBlockImport { client, @@ -850,6 +844,7 @@ where block_importing_notification_sender, subspace_link, create_inherent_data_providers, + segment_headers_store, _pos_table: PhantomData, } } @@ -974,34 +969,24 @@ where ); let segment_index = piece_index.segment_index(); - // This is not a very nice hack due to the fact that at the time first block is produced - // extrinsics with segment headers are not yet in runtime. - let maybe_segment_commitment = if block_number.is_one() { - self.subspace_link - .segment_headers - .lock() - .get(&One::one()) - .and_then(|segment_headers| { - segment_headers - .first() - .map(|segment_header| segment_header.segment_commitment()) - }) - } else { - aux_schema::load_segment_commitment(self.client.as_ref(), segment_index)? - }; - - let segment_commitment = - maybe_segment_commitment.ok_or(Error::SegmentCommitmentNotFound(segment_index))?; - let sector_expiration_check_segment_commitment = aux_schema::load_segment_commitment( - self.client.as_ref(), - subspace_digest_items - .pre_digest - .solution - .history_size - .sector_expiration_check(chain_constants.min_sector_lifetime()) - .ok_or(Error::InvalidHistorySize)? - .segment_index(), - )?; + let segment_commitment = self + .segment_headers_store + .get_segment_header(segment_index) + .map(|segment_header| segment_header.segment_commitment()) + .ok_or(Error::SegmentCommitmentNotFound(segment_index))?; + + let sector_expiration_check_segment_commitment = self + .segment_headers_store + .get_segment_header( + subspace_digest_items + .pre_digest + .solution + .history_size + .sector_expiration_check(chain_constants.min_sector_lifetime()) + .ok_or(Error::InvalidHistorySize)? + .segment_index(), + ) + .map(|segment_header| segment_header.segment_commitment()); // Piece is not checked during initial block verification because it requires access to // segment header and runtime, check it now. @@ -1078,8 +1063,8 @@ where } #[async_trait::async_trait] -impl BlockImport - for SubspaceBlockImport +impl BlockImport + for SubspaceBlockImport where PosTable: Table, Block: BlockT, @@ -1095,6 +1080,7 @@ where + Sync, Client::Api: BlockBuilderApi + SubspaceApi + ApiExt, CIDP: CreateInherentDataProviders> + Send + Sync + 'static, + AS: AuxStore + Send + Sync + 'static, { type Error = ConsensusError; type Transaction = TransactionFor; @@ -1164,23 +1150,21 @@ where }); for (&segment_index, segment_commitment) in &subspace_digest_items.segment_commitments { - if let Some(found_segment_commitment) = - aux_schema::load_segment_commitment(self.client.as_ref(), segment_index) - .map_err(|e| ConsensusError::ClientImport(e.to_string()))? - { - if &found_segment_commitment != segment_commitment { - return Err(ConsensusError::ClientImport( - Error::::DifferentSegmentCommitment(segment_index) - .to_string(), - )); - } - } + let found_segment_commitment = self + .segment_headers_store + .get_segment_header(segment_index) + .ok_or_else(|| { + ConsensusError::ClientImport(format!( + "Segment header for index {segment_index} not found" + )) + })? + .segment_commitment(); - aux_schema::write_segment_commitment(segment_index, segment_commitment, |values| { - block - .auxiliary - .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec())))) - }); + if &found_segment_commitment != segment_commitment { + return Err(ConsensusError::ClientImport( + Error::::DifferentSegmentCommitment(segment_index).to_string(), + )); + } } // The fork choice rule is that we pick the heaviest chain (i.e. smallest solution @@ -1267,14 +1251,15 @@ where /// /// Also returns a link object used to correctly instantiate the import queue and background worker. #[allow(clippy::type_complexity)] -pub fn block_import( +pub fn block_import( slot_duration: SlotDuration, wrapped_block_import: I, client: Arc, kzg: Kzg, create_inherent_data_providers: CIDP, + segment_headers_store: SegmentHeadersStore, ) -> ClientResult<( - SubspaceBlockImport, + SubspaceBlockImport, SubspaceLink, )> where @@ -1283,6 +1268,7 @@ where Client: ProvideRuntimeApi + BlockBackend + HeaderBackend + AuxStore, Client::Api: BlockBuilderApi + SubspaceApi, CIDP: CreateInherentDataProviders> + Send + Sync + 'static, + AS: AuxStore + Send + Sync + 'static, { let (new_slot_notification_sender, new_slot_notification_stream) = notification::channel("subspace_new_slot_notification_stream"); @@ -1323,6 +1309,7 @@ where block_importing_notification_sender, link.clone(), create_inherent_data_providers, + segment_headers_store, ); Ok((import, link)) diff --git a/crates/sc-consensus-subspace/src/slot_worker.rs b/crates/sc-consensus-subspace/src/slot_worker.rs index f59da01971..dba55affa9 100644 --- a/crates/sc-consensus-subspace/src/slot_worker.rs +++ b/crates/sc-consensus-subspace/src/slot_worker.rs @@ -17,7 +17,7 @@ use crate::{ get_chain_constants, BlockImportingNotification, NewSlotInfo, NewSlotNotification, - RewardSigningNotification, SubspaceLink, + RewardSigningNotification, SegmentHeadersStore, SubspaceLink, }; use futures::channel::mpsc; use futures::{StreamExt, TryFutureExt}; @@ -92,7 +92,10 @@ where } } -pub(super) struct SubspaceSlotWorker { +pub(super) struct SubspaceSlotWorker +where + Block: BlockT, +{ pub(super) client: Arc, pub(super) block_import: I, pub(super) env: E, @@ -105,12 +108,13 @@ pub(super) struct SubspaceSlotWorker, pub(super) telemetry: Option, + pub(super) segment_headers_store: SegmentHeadersStore, pub(super) _pos_table: PhantomData, } #[async_trait::async_trait] -impl SimpleSlotWorker - for SubspaceSlotWorker +impl SimpleSlotWorker + for SubspaceSlotWorker where PosTable: Table, Block: BlockT, @@ -127,6 +131,7 @@ where L: JustificationSyncLink, BS: BackoffAuthoringBlocksStrategy> + Send + Sync, Error: std::error::Error + Send + From + From + 'static, + AS: AuxStore + Send + Sync + 'static, { type BlockImport = I; type SyncOracle = SO; @@ -264,17 +269,10 @@ where chain_constants.recent_history_fraction(), ) .segment_index(); - let mut maybe_segment_commitment = runtime_api - .segment_commitment(parent_hash, segment_index) - .ok()?; - - // This is not a very nice hack due to the fact that at the time first block is produced - // extrinsics with segment headers are not yet in runtime. - if maybe_segment_commitment.is_none() && parent_header.number().is_zero() { - maybe_segment_commitment = self - .subspace_link - .segment_commitment_by_segment_index(segment_index); - } + let maybe_segment_commitment = self + .segment_headers_store + .get_segment_header(segment_index) + .map(|segment_header| segment_header.segment_commitment()); let segment_commitment = match maybe_segment_commitment { Some(segment_commitment) => segment_commitment, @@ -451,8 +449,8 @@ where } } -impl - SubspaceSlotWorker +impl + SubspaceSlotWorker where PosTable: Table, Block: BlockT, @@ -469,6 +467,7 @@ where L: JustificationSyncLink, BS: BackoffAuthoringBlocksStrategy> + Send + Sync, Error: std::error::Error + Send + From + From + 'static, + AS: AuxStore + Send + Sync + 'static, { async fn create_vote( &self, diff --git a/crates/subspace-core-primitives/src/segments.rs b/crates/subspace-core-primitives/src/segments.rs index 8180769fe0..681bf5c7cc 100644 --- a/crates/subspace-core-primitives/src/segments.rs +++ b/crates/subspace-core-primitives/src/segments.rs @@ -24,6 +24,8 @@ use serde::{Deserialize, Serialize}; Eq, PartialEq, Hash, + From, + Into, Encode, Decode, Add, @@ -58,20 +60,6 @@ impl Step for SegmentIndex { } } -impl From for SegmentIndex { - #[inline] - fn from(original: u64) -> Self { - Self(original) - } -} - -impl From for u64 { - #[inline] - fn from(original: SegmentIndex) -> Self { - original.0 - } -} - impl SegmentIndex { /// Segment index 0. pub const ZERO: SegmentIndex = SegmentIndex(0); diff --git a/crates/subspace-farmer/src/node_client.rs b/crates/subspace-farmer/src/node_client.rs index fdda2d3920..00aa85ea42 100644 --- a/crates/subspace-farmer/src/node_client.rs +++ b/crates/subspace-farmer/src/node_client.rs @@ -3,7 +3,7 @@ pub(crate) mod node_rpc_client; use async_trait::async_trait; use futures::Stream; use std::pin::Pin; -use subspace_core_primitives::{Piece, PieceIndex, SegmentCommitment, SegmentHeader, SegmentIndex}; +use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; use subspace_rpc_primitives::{ FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, @@ -50,12 +50,6 @@ pub trait NodeClient: Clone + Send + Sync + 'static { &self, ) -> Result + Send + 'static>>, Error>; - /// Get segment commitments for the segments - async fn segment_commitments( - &self, - segment_indexes: Vec, - ) -> Result>, Error>; - /// Get segment headers for the segments async fn segment_headers( &self, diff --git a/crates/subspace-farmer/src/node_client/node_rpc_client.rs b/crates/subspace-farmer/src/node_client/node_rpc_client.rs index b74b36e2ca..a9db6e4e64 100644 --- a/crates/subspace-farmer/src/node_client/node_rpc_client.rs +++ b/crates/subspace-farmer/src/node_client/node_rpc_client.rs @@ -7,7 +7,7 @@ use jsonrpsee::rpc_params; use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; use std::pin::Pin; use std::sync::Arc; -use subspace_core_primitives::{Piece, PieceIndex, SegmentCommitment, SegmentHeader, SegmentIndex}; +use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex}; use subspace_rpc_primitives::{ FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, @@ -141,16 +141,6 @@ impl NodeClient for NodeRpcClient { ))) } - async fn segment_commitments( - &self, - segment_indexes: Vec, - ) -> Result>, RpcError> { - Ok(self - .client - .request("subspace_segmentCommitments", rpc_params![&segment_indexes]) - .await?) - } - async fn segment_headers( &self, segment_indexes: Vec, diff --git a/crates/subspace-farmer/src/utils/piece_validator.rs b/crates/subspace-farmer/src/utils/piece_validator.rs index 0ed0c7ca4a..089ac16a18 100644 --- a/crates/subspace-farmer/src/utils/piece_validator.rs +++ b/crates/subspace-farmer/src/utils/piece_validator.rs @@ -55,25 +55,21 @@ where let segment_commitment = match maybe_segment_commitment { Some(segment_commitment) => segment_commitment, None => { - let segment_commitments = match self - .node_client - .segment_commitments(vec![segment_index]) - .await - { - Ok(segment_commitments) => segment_commitments, - Err(error) => { - error!( - %piece_index, - ?error, - "Failed tor retrieve segment commitment from node" - ); - return None; - } - }; + let segment_headers = + match self.node_client.segment_headers(vec![segment_index]).await { + Ok(segment_headers) => segment_headers, + Err(error) => { + error!( + %piece_index, + ?error, + "Failed tor retrieve segment headers from node" + ); + return None; + } + }; - let segment_commitment = match segment_commitments.into_iter().next().flatten() - { - Some(segment_commitment) => segment_commitment, + let segment_commitment = match segment_headers.into_iter().next().flatten() { + Some(segment_header) => segment_header.segment_commitment(), None => { error!( %piece_index, diff --git a/crates/subspace-node/src/bin/subspace-node.rs b/crates/subspace-node/src/bin/subspace-node.rs index 80681e1bb4..0695ed7976 100644 --- a/crates/subspace-node/src/bin/subspace-node.rs +++ b/crates/subspace-node/src/bin/subspace-node.rs @@ -167,41 +167,6 @@ fn main() -> Result<(), Error> { )) })?; } - Some(Subcommand::ImportBlocksFromDsn(cmd)) => { - let runner = cli.create_runner(cmd)?; - set_default_ss58_version(&runner.config().chain_spec); - runner.async_run(|config| { - let PartialComponents { - client, - import_queue, - task_manager, - other: (_block_import, subspace_link, _telemetry, _bundle_validator), - .. - } = subspace_service::new_partial::( - &config, None, - )?; - - let subspace_archiver = sc_consensus_subspace::create_subspace_archiver( - &subspace_link, - client.clone(), - None, - ); - - task_manager - .spawn_essential_handle() - .spawn_essential_blocking( - "subspace-archiver", - None, - Box::pin(subspace_archiver), - ); - - Ok(( - cmd.run(client, import_queue, task_manager.spawn_essential_handle()) - .map_err(Error::SubstrateCli), - task_manager, - )) - })?; - } Some(Subcommand::PurgeChain(cmd)) => { // This is a compatibility layer to make sure we wipe old data from disks of our users if let Some(base_dir) = dirs::data_local_dir() { diff --git a/crates/subspace-node/src/import_blocks_from_dsn.rs b/crates/subspace-node/src/import_blocks_from_dsn.rs deleted file mode 100644 index a23f2d8ceb..0000000000 --- a/crates/subspace-node/src/import_blocks_from_dsn.rs +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright (C) 2021 Subspace Labs, Inc. -// SPDX-License-Identifier: GPL-3.0-or-later - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -use clap::Parser; -use log::info; -use sc_cli::{CliConfiguration, ImportParams, SharedParams}; -use sc_client_api::{BlockBackend, HeaderBackend}; -use sp_core::traits::SpawnEssentialNamed; -use sp_runtime::traits::Block as BlockT; -use std::sync::Arc; -use subspace_networking::libp2p::Multiaddr; -use subspace_networking::{BootstrappedNetworkingParameters, Config, PieceByHashRequestHandler}; -use subspace_service::dsn::import_blocks::initial_block_import_from_dsn; - -/// The `import-blocks-from-network` command used to import blocks from Subspace Network DSN. -#[derive(Debug, Parser)] -pub struct ImportBlocksFromDsnCmd { - /// Multiaddrs of bootstrap nodes to connect to on startup, multiple are supported - #[arg(long)] - pub bootstrap_node: Vec, - - /// The default number of 64KB pages to ever allocate for Wasm execution. - /// - /// Don't alter this unless you know what you're doing. - #[arg(long, value_name = "COUNT")] - pub default_heap_pages: Option, - - #[allow(missing_docs)] - #[clap(flatten)] - pub shared_params: SharedParams, - - #[allow(missing_docs)] - #[clap(flatten)] - pub import_params: ImportParams, -} - -impl ImportBlocksFromDsnCmd { - /// Run the import-blocks command - pub async fn run( - &self, - client: Arc, - mut import_queue: IQ, - spawner: impl SpawnEssentialNamed, - ) -> sc_cli::Result<()> - where - C: HeaderBackend + BlockBackend + Send + Sync + 'static, - B: BlockT + for<'de> serde::Deserialize<'de>, - IQ: sc_service::ImportQueue + 'static, - { - let (node, mut node_runner) = subspace_networking::create(Config { - networking_parameters_registry: BootstrappedNetworkingParameters::new( - self.bootstrap_node.clone(), - ) - .boxed(), - allow_non_global_addresses_in_dht: true, - request_response_protocols: vec![PieceByHashRequestHandler::create( - move |_, _| async { None }, - )], - ..Config::default() - }) - .map_err(|error| sc_service::Error::Other(error.to_string()))?; - - spawner.spawn_essential( - "node-runner", - Some("subspace-networking"), - Box::pin(async move { - node_runner.run().await; - }), - ); - - let mut imported_blocks = 0; - - // Repeat until no new blocks are imported - loop { - let new_imported_blocks = - initial_block_import_from_dsn(&node, Arc::clone(&client), &mut import_queue, false) - .await?; - - if new_imported_blocks == 0 { - break; - } - - imported_blocks += new_imported_blocks; - - info!( - "🎉 Imported {} blocks from DSN, current best #{}/#{}", - imported_blocks, - client.info().best_number, - client.info().best_hash - ); - } - - info!( - "🎉 Imported {} blocks from DSN, best #{}/#{}, check against reliable sources to make \ - sure it is a block on canonical chain", - imported_blocks, - client.info().best_number, - client.info().best_hash - ); - - Ok(()) - } -} - -impl CliConfiguration for ImportBlocksFromDsnCmd { - fn shared_params(&self) -> &SharedParams { - &self.shared_params - } - - fn import_params(&self) -> Option<&ImportParams> { - Some(&self.import_params) - } -} diff --git a/crates/subspace-node/src/lib.rs b/crates/subspace-node/src/lib.rs index 8b74dd71fd..8337dff49d 100644 --- a/crates/subspace-node/src/lib.rs +++ b/crates/subspace-node/src/lib.rs @@ -19,9 +19,7 @@ mod chain_spec; mod chain_spec_utils; pub mod domain; -mod import_blocks_from_dsn; -pub use crate::import_blocks_from_dsn::ImportBlocksFromDsnCmd; use bytesize::ByteSize; use clap::Parser; use sc_cli::{RunCmd, SubstrateCli}; @@ -148,9 +146,6 @@ pub enum Subcommand { /// Import blocks. ImportBlocks(sc_cli::ImportBlocksCmd), - /// Import blocks from Subspace Network DSN. - ImportBlocksFromDsn(ImportBlocksFromDsnCmd), - /// Remove the whole chain. PurgeChain(PurgeChainCmd), diff --git a/crates/subspace-rpc-primitives/src/lib.rs b/crates/subspace-rpc-primitives/src/lib.rs index 7abde7c711..8f63a71525 100644 --- a/crates/subspace-rpc-primitives/src/lib.rs +++ b/crates/subspace-rpc-primitives/src/lib.rs @@ -22,8 +22,8 @@ use subspace_core_primitives::{ use subspace_farmer_components::FarmerProtocolInfo; use subspace_networking::libp2p::Multiaddr; -/// Defines a limit for segment indexes array. It affects storage access on the runtime side. -pub const MAX_SEGMENT_INDEXES_PER_REQUEST: usize = 300; +/// Defines a limit for number of segments that can be requested over RPC +pub const MAX_SEGMENT_HEADERS_PER_REQUEST: usize = 300; /// Information necessary for farmer application #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index 5deb275652..a747987094 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -3,10 +3,9 @@ pub mod node_provider_storage; use crate::dsn::node_provider_storage::NodeProviderStorage; use crate::piece_cache::PieceCache; -use crate::SegmentHeaderCache; use either::Either; use sc_client_api::AuxStore; -use sc_consensus_subspace_rpc::SegmentHeaderProvider; +use sc_consensus_subspace::SegmentHeadersStore; use std::num::NonZeroUsize; use std::path::PathBuf; use std::sync::Arc; @@ -88,7 +87,7 @@ pub(crate) fn create_dsn_instance( dsn_protocol_version: String, dsn_config: DsnConfig, piece_cache: PieceCache, - segment_header_cache: SegmentHeaderCache, + segment_headers_store: SegmentHeadersStore, ) -> Result<(Node, NodeRunner>), DsnConfigurationError> where AS: AuxStore + Sync + Send + 'static, @@ -200,7 +199,7 @@ where block_limit = ROOT_BLOCK_NUMBER_LIMIT; } - let max_segment_index = segment_header_cache.max_segment_index(); + let max_segment_index = segment_headers_store.max_segment_index(); // several last segment indexes (SegmentIndex::ZERO..=max_segment_index) @@ -210,21 +209,16 @@ where } }; - let internal_result = segment_indexes + let maybe_segment_headers = segment_indexes .iter() - .map(|segment_index| segment_header_cache.get_segment_header(*segment_index)) - .collect::>, _>>(); + .map(|segment_index| segment_headers_store.get_segment_header(*segment_index)) + .collect::>>(); - let result = match internal_result { - Ok(Some(segment_headers)) => Some(SegmentHeaderResponse { segment_headers }), - Ok(None) => { + let result = match maybe_segment_headers { + Some(segment_headers) => Some(SegmentHeaderResponse { segment_headers }), + None => { error!("Segment header collection contained empty segment headers."); - None - } - Err(error) => { - error!(%error, "Failed to get segment headers from cache"); - None } }; diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 000c259a49..67c1475ae5 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -21,7 +21,6 @@ pub mod dsn; mod metrics; pub mod piece_cache; pub mod rpc; -pub mod segment_headers; mod sync_from_dsn; pub mod tx_pre_validator; @@ -29,7 +28,6 @@ use crate::dsn::import_blocks::initial_block_import_from_dsn; use crate::dsn::{create_dsn_instance, DsnConfigurationError}; use crate::metrics::NodeMetrics; use crate::piece_cache::PieceCache; -use crate::segment_headers::{start_segment_header_archiver, SegmentHeaderCache}; use crate::tx_pre_validator::ConsensusChainTxPreValidator; use cross_domain_message_gossip::cdm_gossip_peers_set_config; use derive_more::{Deref, DerefMut, Into}; @@ -50,7 +48,8 @@ use sc_consensus_slots::SlotProportion; use sc_consensus_subspace::notification::SubspaceNotificationStream; use sc_consensus_subspace::{ ArchivedSegmentNotification, BlockImportingNotification, NewSlotNotification, - RewardSigningNotification, SubspaceLink, SubspaceParams, SubspaceSyncOracle, + RewardSigningNotification, SegmentHeadersStore, SubspaceLink, SubspaceParams, + SubspaceSyncOracle, }; use sc_executor::{NativeElseWasmExecutor, NativeExecutionDispatch}; use sc_network::NetworkService; @@ -266,6 +265,7 @@ pub fn new_partial( Transaction = TransactionFor, Block>, >, SubspaceLink, + SegmentHeadersStore>, Option, BundleValidator>, ), @@ -371,10 +371,19 @@ where tx_pre_validator, ); + let segment_headers_store = SegmentHeadersStore::new(client.clone()) + .map_err(|error| ServiceError::Application(error.into()))?; let fraud_proof_block_import = sc_consensus_fraud_proof::block_import(client.clone(), client.clone(), proof_verifier); - let (block_import, subspace_link) = sc_consensus_subspace::block_import::( + let (block_import, subspace_link) = sc_consensus_subspace::block_import::< + PosTable, + _, + _, + _, + _, + _, + >( sc_consensus_subspace::slot_duration(&*client)?, fraud_proof_block_import, client.clone(), @@ -406,6 +415,7 @@ where } } }, + segment_headers_store.clone(), )?; let slot_duration = subspace_link.slot_duration(); @@ -434,7 +444,13 @@ where keystore_container, select_chain, transaction_pool, - other: (block_import, subspace_link, telemetry, bundle_validator), + other: ( + block_import, + subspace_link, + segment_headers_store, + telemetry, + bundle_validator, + ), }) } @@ -514,6 +530,7 @@ pub async fn new_full( ( I, SubspaceLink, + SegmentHeadersStore>, Option, BundleValidator>, ), @@ -556,13 +573,10 @@ where keystore_container, select_chain, transaction_pool, - other: (block_import, subspace_link, mut telemetry, mut bundle_validator), + other: + (block_import, subspace_link, segment_headers_store, mut telemetry, mut bundle_validator), } = partial_components; - let segment_header_cache = SegmentHeaderCache::new(client.clone()).map_err(|error| { - Error::Other(format!("Failed to instantiate segment header cache: {error}").into()) - })?; - let (node, bootstrap_nodes, piece_cache) = match config.subspace_networking.clone() { SubspaceNetworking::Reuse { node, @@ -622,7 +636,7 @@ where dsn_protocol_version, dsn_config.clone(), piece_cache.clone(), - segment_header_cache.clone(), + segment_headers_store.clone(), )?; info!("Subspace networking initialized: Node ID is {}", node.id()); @@ -656,21 +670,6 @@ where } }; - let segment_header_archiving_fut = start_segment_header_archiver( - segment_header_cache.clone(), - subspace_link - .archived_segment_notification_stream() - .subscribe(), - ); - - task_manager - .spawn_essential_handle() - .spawn_essential_blocking( - "segment-header-archiver", - Some("subspace-networking"), - Box::pin(segment_header_archiving_fut.in_current_span()), - ); - let dsn_bootstrap_nodes = { // Fall back to node itself as bootstrap node for DSN so farmer always has someone to // connect to @@ -719,6 +718,7 @@ where }; let subspace_archiver = sc_consensus_subspace::create_subspace_archiver( + segment_headers_store.clone(), &subspace_link, client.clone(), telemetry.as_ref().map(|telemetry| telemetry.handle()), @@ -933,13 +933,14 @@ where force_authoring: config.force_authoring, backoff_authoring_blocks, subspace_link: subspace_link.clone(), + segment_headers_store: segment_headers_store.clone(), block_proposal_slot_portion, max_block_proposal_slot_portion: None, telemetry: None, }; let subspace = - sc_consensus_subspace::start_subspace::( + sc_consensus_subspace::start_subspace::( subspace_config, )?; @@ -978,8 +979,7 @@ where archived_segment_notification_stream: archived_segment_notification_stream .clone(), dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(), - subspace_link: subspace_link.clone(), - segment_headers_provider: segment_header_cache.clone(), + segment_headers_store: segment_headers_store.clone(), piece_provider: piece_cache.clone(), sync_oracle: subspace_sync_oracle.clone(), }; diff --git a/crates/subspace-service/src/rpc.rs b/crates/subspace-service/src/rpc.rs index 5c3a229873..afa3319eb9 100644 --- a/crates/subspace-service/src/rpc.rs +++ b/crates/subspace-service/src/rpc.rs @@ -23,15 +23,13 @@ use jsonrpsee::RpcModule; use pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer}; -use sc_client_api::BlockBackend; +use sc_client_api::{AuxStore, BlockBackend}; use sc_consensus_subspace::notification::SubspaceNotificationStream; use sc_consensus_subspace::{ - ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification, SubspaceLink, - SubspaceSyncOracle, -}; -use sc_consensus_subspace_rpc::{ - PieceProvider, SegmentHeaderProvider, SubspaceRpc, SubspaceRpcApiServer, + ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification, + SegmentHeadersStore, SubspaceSyncOracle, }; +use sc_consensus_subspace_rpc::{PieceProvider, SubspaceRpc, SubspaceRpcApiServer}; use sc_rpc::SubscriptionTaskExecutor; use sc_rpc_api::DenyUnsafe; use sc_rpc_spec_v2::chain_spec::{ChainSpec, ChainSpecApiServer}; @@ -48,7 +46,7 @@ use subspace_runtime_primitives::{AccountId, Balance, Index}; use substrate_frame_rpc_system::{System, SystemApiServer}; /// Full client dependencies. -pub struct FullDeps +pub struct FullDeps where SO: SyncOracle + Send + Sync + Clone, { @@ -72,10 +70,8 @@ where SubspaceNotificationStream, /// Bootstrap nodes for DSN. pub dsn_bootstrap_nodes: Vec, - /// SubspaceLink shared state. - pub subspace_link: SubspaceLink, /// Segment header provider. - pub segment_headers_provider: RBP, + pub segment_headers_store: SegmentHeadersStore, /// Provides pieces from piece cache. pub piece_provider: Option, /// Subspace sync oracle @@ -83,8 +79,8 @@ where } /// Instantiate all full RPC extensions. -pub fn create_full( - deps: FullDeps, +pub fn create_full( + deps: FullDeps, ) -> Result, Box> where C: ProvideRuntimeApi @@ -99,9 +95,9 @@ where + BlockBuilder + sp_consensus_subspace::SubspaceApi, P: TransactionPool + 'static, - RPB: SegmentHeaderProvider + Send + Sync + 'static, PP: PieceProvider + Send + Sync + 'static, SO: SyncOracle + Send + Sync + Clone + 'static, + AS: AuxStore + Send + Sync + 'static, { let mut module = RpcModule::new(()); let FullDeps { @@ -114,8 +110,7 @@ where reward_signing_notification_stream, archived_segment_notification_stream, dsn_bootstrap_nodes, - subspace_link, - segment_headers_provider, + segment_headers_store, piece_provider, sync_oracle, } = deps; @@ -136,8 +131,7 @@ where reward_signing_notification_stream, archived_segment_notification_stream, dsn_bootstrap_nodes, - subspace_link, - segment_headers_provider, + segment_headers_store, piece_provider, sync_oracle, ) diff --git a/crates/subspace-service/src/segment_headers.rs b/crates/subspace-service/src/segment_headers.rs deleted file mode 100644 index 6b8ba5521a..0000000000 --- a/crates/subspace-service/src/segment_headers.rs +++ /dev/null @@ -1,139 +0,0 @@ -use futures::{Stream, StreamExt}; -use parity_scale_codec::{Decode, Encode}; -use sc_client_api::backend::AuxStore; -use sc_consensus_subspace::ArchivedSegmentNotification; -use sc_consensus_subspace_rpc::SegmentHeaderProvider; -use std::error::Error; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; -use subspace_core_primitives::{SegmentHeader, SegmentIndex}; -use tracing::{debug, error, trace}; - -/// Start an archiver that will listen for archived segments and send segment header to the storage -pub(crate) async fn start_segment_header_archiver( - mut segment_header_cache: SegmentHeaderCache, - mut archived_segment_notification_stream: impl Stream + Unpin, -) { - trace!("Subspace segment header archiver started."); - - while let Some(ArchivedSegmentNotification { - archived_segment, .. - }) = archived_segment_notification_stream.next().await - { - let segment_index = archived_segment.segment_header.segment_index(); - let result = segment_header_cache.add_segment_header(archived_segment.segment_header); - - if let Err(err) = result { - error!(%segment_index, ?err, "Segment header archiving failed."); - } else { - debug!(%segment_index, "Segment header archived."); - } - } -} - -/// Cache of recently produced segment headers in aux storage -pub struct SegmentHeaderCache { - aux_store: Arc, - // TODO: Consider introducing and using global in-memory segment header cache (this comment is - // in multiple files) - max_segment_index: Arc, -} - -impl Clone for SegmentHeaderCache { - fn clone(&self) -> Self { - Self { - aux_store: self.aux_store.clone(), - max_segment_index: self.max_segment_index.clone(), - } - } -} - -impl SegmentHeaderCache -where - AS: AuxStore, -{ - const KEY_PREFIX: &[u8] = b"segment-headers-cache"; - - /// Create new instance. - pub fn new(aux_store: Arc) -> Result> { - let instance = Self { - aux_store, - max_segment_index: Default::default(), - }; - - // Crude way to find last segment index without checking every index - // TODO: Binary search would be more efficient in the inner loop, but I'm lazy right now - 'outer: for segment_index in (SegmentIndex::ZERO..).step_by(100) { - if instance.get_segment_header(segment_index)?.is_none() { - for segment_index in (SegmentIndex::ZERO..segment_index).rev() { - if instance.get_segment_header(segment_index)?.is_some() - || segment_index == SegmentIndex::ZERO - { - instance - .max_segment_index - .store(u64::from(segment_index), Ordering::Release); - break 'outer; - } - } - } - } - - Ok(instance) - } - - /// Returns last observed segment index. - pub fn max_segment_index(&self) -> SegmentIndex { - SegmentIndex::from(self.max_segment_index.load(Ordering::Acquire)) - } - - /// Add segment header to cache (likely as the result of archiving) - pub fn add_segment_header( - &mut self, - segment_header: SegmentHeader, - ) -> Result<(), Box> { - let key = Self::key(segment_header.segment_index()); - let value = segment_header.encode(); - let insert_data = vec![(key.as_slice(), value.as_slice())]; - - self.aux_store.insert_aux(&insert_data, &Vec::new())?; - self.max_segment_index - .store(u64::from(segment_header.segment_index()), Ordering::Release); - - Ok(()) - } - - /// Get segment header from storage - fn get_segment_header( - &self, - segment_index: SegmentIndex, - ) -> Result, Box> { - Ok(self - .aux_store - .get_aux(&Self::key(segment_index))? - .map(|segment_header| { - SegmentHeader::decode(&mut segment_header.as_slice()) - .expect("Always correct segment header unless DB is corrupted; qed") - })) - } - - fn key(segment_index: SegmentIndex) -> Vec { - Self::key_from_bytes(&u64::from(segment_index).to_le_bytes()) - } - - fn key_from_bytes(bytes: &[u8]) -> Vec { - (Self::KEY_PREFIX, bytes).encode() - } -} - -impl SegmentHeaderProvider for SegmentHeaderCache -where - AS: AuxStore, -{ - /// Get segment header from storage - fn get_segment_header( - &self, - segment_index: SegmentIndex, - ) -> Result, Box> { - self.get_segment_header(segment_index) - } -}