Skip to content

Commit

Permalink
Merge pull request #1692 from subspace/segment-headers-store
Browse files Browse the repository at this point in the history
Segment headers store
  • Loading branch information
nazar-pc authored Jul 26, 2023
2 parents 229c59e + fd85a02 commit 1e12a6e
Show file tree
Hide file tree
Showing 17 changed files with 297 additions and 649 deletions.
128 changes: 22 additions & 106 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use jsonrpsee::types::SubscriptionResult;
use jsonrpsee::SubscriptionSink;
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use sc_client_api::BlockBackend;
use sc_client_api::{AuxStore, BlockBackend};
use sc_consensus_subspace::notification::SubspaceNotificationStream;
use sc_consensus_subspace::{
ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification, SubspaceLink,
SubspaceSyncOracle,
ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification,
SegmentHeadersStore, SubspaceSyncOracle,
};
use sc_rpc::SubscriptionTaskExecutor;
use sc_utils::mpsc::TracingUnboundedSender;
Expand All @@ -41,21 +41,20 @@ use sp_consensus_slots::Slot;
use sp_consensus_subspace::{FarmerPublicKey, FarmerSignature, SubspaceApi as SubspaceRuntimeApi};
use sp_core::crypto::ByteArray;
use sp_core::H256;
use sp_runtime::traits::{Block as BlockT, Zero};
use sp_runtime::traits::Block as BlockT;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::error::Error;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::{
Piece, PieceIndex, SegmentCommitment, SegmentHeader, SegmentIndex, Solution,
};
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex, Solution};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_networking::libp2p::Multiaddr;
use subspace_rpc_primitives::{
FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
SolutionResponse, MAX_SEGMENT_INDEXES_PER_REQUEST,
SolutionResponse, MAX_SEGMENT_HEADERS_PER_REQUEST,
};
use tracing::{debug, error, warn};

Expand Down Expand Up @@ -108,12 +107,6 @@ pub trait SubspaceRpcApi {
)]
fn subscribe_node_sync_status_change(&self);

#[method(name = "subspace_segmentCommitments")]
async fn segment_commitments(
&self,
segment_indexes: Vec<SegmentIndex>,
) -> RpcResult<Vec<Option<SegmentCommitment>>>;

#[method(name = "subspace_segmentHeaders")]
async fn segment_headers(
&self,
Expand Down Expand Up @@ -148,13 +141,6 @@ struct BlockSignatureSenders {
senders: Vec<async_oneshot::Sender<RewardSignatureResponse>>,
}

pub trait SegmentHeaderProvider {
fn get_segment_header(
&self,
segment_index: SegmentIndex,
) -> Result<Option<SegmentHeader>, Box<dyn Error>>;
}

pub trait PieceProvider {
fn get_piece_by_index(
&self,
Expand All @@ -163,10 +149,9 @@ pub trait PieceProvider {
}

/// Implements the [`SubspaceRpcApiServer`] trait for interacting with Subspace.
pub struct SubspaceRpc<Block, Client, RBP, PP, SO>
pub struct SubspaceRpc<Block, Client, PP, SO, AS>
where
Block: BlockT,
RBP: SegmentHeaderProvider,
PP: PieceProvider,
SO: SyncOracle + Send + Sync + Clone + 'static,
{
Expand All @@ -178,13 +163,13 @@ where
solution_response_senders: Arc<Mutex<SolutionResponseSenders>>,
reward_signature_senders: Arc<Mutex<BlockSignatureSenders>>,
dsn_bootstrap_nodes: Vec<Multiaddr>,
subspace_link: SubspaceLink<Block>,
segment_header_provider: RBP,
segment_headers_store: SegmentHeadersStore<AS>,
piece_provider: Option<PP>,
archived_segment_acknowledgement_senders:
Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
next_subscription_id: AtomicU64,
sync_oracle: SubspaceSyncOracle<SO>,
_block: PhantomData<Block>,
}

/// [`SubspaceRpc`] is used for notifying subscribers about arrival of new slots and for
Expand All @@ -194,12 +179,12 @@ where
/// every subscriber, after which RPC server waits for the same number of
/// `subspace_submitSolutionResponse` requests with `SolutionResponse` in them or until
/// timeout is exceeded. The first valid solution for a particular slot wins, others are ignored.
impl<Block, Client, RBP, PP, SO> SubspaceRpc<Block, Client, RBP, PP, SO>
impl<Block, Client, PP, SO, AS> SubspaceRpc<Block, Client, PP, SO, AS>
where
Block: BlockT,
RBP: SegmentHeaderProvider,
PP: PieceProvider,
SO: SyncOracle + Send + Sync + Clone + 'static,
AS: AuxStore + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
/// Creates a new instance of the `SubspaceRpc` handler.
Expand All @@ -212,8 +197,7 @@ where
ArchivedSegmentNotification,
>,
dsn_bootstrap_nodes: Vec<Multiaddr>,
subspace_link: SubspaceLink<Block>,
segment_header_provider: RBP,
segment_headers_store: SegmentHeadersStore<AS>,
piece_provider: Option<PP>,
sync_oracle: SubspaceSyncOracle<SO>,
) -> Self {
Expand All @@ -226,18 +210,18 @@ where
solution_response_senders: Arc::default(),
reward_signature_senders: Arc::default(),
dsn_bootstrap_nodes,
subspace_link,
segment_header_provider,
segment_headers_store,
piece_provider,
archived_segment_acknowledgement_senders: Arc::default(),
next_subscription_id: AtomicU64::default(),
sync_oracle,
_block: PhantomData,
}
}
}

#[async_trait]
impl<Block, Client, RBP, PP, SO> SubspaceRpcApiServer for SubspaceRpc<Block, Client, RBP, PP, SO>
impl<Block, Client, PP, SO, AS> SubspaceRpcApiServer for SubspaceRpc<Block, Client, PP, SO, AS>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>
Expand All @@ -247,9 +231,9 @@ where
+ Sync
+ 'static,
Client::Api: SubspaceRuntimeApi<Block, FarmerPublicKey>,
RBP: SegmentHeaderProvider + Send + Sync + 'static,
PP: PieceProvider + Send + Sync + 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
AS: AuxStore + Send + Sync + 'static,
{
fn get_farmer_app_info(&self) -> RpcResult<FarmerAppInfo> {
let best_hash = self.client.info().best_hash;
Expand Down Expand Up @@ -653,93 +637,25 @@ where
Ok(())
}

// TODO: Remove as unnecessary, `segment_headers` can be used instead
async fn segment_commitments(
&self,
segment_indexes: Vec<SegmentIndex>,
) -> RpcResult<Vec<Option<SegmentCommitment>>> {
if segment_indexes.len() > MAX_SEGMENT_INDEXES_PER_REQUEST {
error!(
"segment_indexes length exceed the limit: {} ",
segment_indexes.len()
);

return Err(JsonRpseeError::Custom(format!(
"segment_indexes length exceed the limit {MAX_SEGMENT_INDEXES_PER_REQUEST}"
)));
};

let runtime_api = self.client.runtime_api();
let best_hash = self.client.info().best_hash;
let best_block_number = self.client.info().best_number;

let segment_commitment_result: Result<Vec<_>, JsonRpseeError> = segment_indexes
.into_iter()
.map(|segment_index| {
let api_result = runtime_api
.segment_commitment(best_hash, segment_index)
.map_err(|_| {
JsonRpseeError::Custom(
"Internal error during `segment_commitment` call".to_string(),
)
});

api_result.map(|maybe_segment_commitment| {
// This is not a very nice hack due to the fact that at the time first block is
// produced extrinsics with segment headers are not yet in runtime.
if maybe_segment_commitment.is_none() && best_block_number.is_zero() {
self.subspace_link
.segment_commitment_by_segment_index(segment_index)
} else {
maybe_segment_commitment
}
})
})
.collect();

if let Err(ref err) = segment_commitment_result {
error!(
"Failed to get data from runtime API (segment_commitment): {}",
err
);
}

segment_commitment_result
}

async fn segment_headers(
&self,
segment_indexes: Vec<SegmentIndex>,
) -> RpcResult<Vec<Option<SegmentHeader>>> {
if segment_indexes.len() > MAX_SEGMENT_INDEXES_PER_REQUEST {
if segment_indexes.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
error!(
"segment_indexes length exceed the limit: {} ",
segment_indexes.len()
);

return Err(JsonRpseeError::Custom(format!(
"segment_indexes length exceed the limit {MAX_SEGMENT_INDEXES_PER_REQUEST}"
"segment_indexes length exceed the limit {MAX_SEGMENT_HEADERS_PER_REQUEST}"
)));
};

let segment_commitment_result: Result<Vec<_>, JsonRpseeError> = segment_indexes
Ok(segment_indexes
.into_iter()
.map(|segment_index| {
self.segment_header_provider
.get_segment_header(segment_index)
.map_err(|_| {
JsonRpseeError::Custom(
"Internal error during `segment_headers` call".to_string(),
)
})
})
.collect();

if let Err(err) = &segment_commitment_result {
error!(?err, "Failed to get segment headers.");
}

segment_commitment_result
.map(|segment_index| self.segment_headers_store.get_segment_header(segment_index))
.collect())
}

fn piece(&self, piece_index: PieceIndex) -> RpcResult<Option<Vec<u8>>> {
Expand Down
Loading

0 comments on commit 1e12a6e

Please sign in to comment.