Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment headers store #1692

Merged
merged 7 commits into from
Jul 26, 2023
128 changes: 22 additions & 106 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use jsonrpsee::types::SubscriptionResult;
use jsonrpsee::SubscriptionSink;
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use sc_client_api::BlockBackend;
use sc_client_api::{AuxStore, BlockBackend};
use sc_consensus_subspace::notification::SubspaceNotificationStream;
use sc_consensus_subspace::{
ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification, SubspaceLink,
SubspaceSyncOracle,
ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification,
SegmentHeadersStore, SubspaceSyncOracle,
};
use sc_rpc::SubscriptionTaskExecutor;
use sc_utils::mpsc::TracingUnboundedSender;
Expand All @@ -41,21 +41,20 @@ use sp_consensus_slots::Slot;
use sp_consensus_subspace::{FarmerPublicKey, FarmerSignature, SubspaceApi as SubspaceRuntimeApi};
use sp_core::crypto::ByteArray;
use sp_core::H256;
use sp_runtime::traits::{Block as BlockT, Zero};
use sp_runtime::traits::Block as BlockT;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::error::Error;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::{
Piece, PieceIndex, SegmentCommitment, SegmentHeader, SegmentIndex, Solution,
};
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex, Solution};
use subspace_farmer_components::FarmerProtocolInfo;
use subspace_networking::libp2p::Multiaddr;
use subspace_rpc_primitives::{
FarmerAppInfo, NodeSyncStatus, RewardSignatureResponse, RewardSigningInfo, SlotInfo,
SolutionResponse, MAX_SEGMENT_INDEXES_PER_REQUEST,
SolutionResponse, MAX_SEGMENT_HEADERS_PER_REQUEST,
};
use tracing::{debug, error, warn};

Expand Down Expand Up @@ -108,12 +107,6 @@ pub trait SubspaceRpcApi {
)]
fn subscribe_node_sync_status_change(&self);

#[method(name = "subspace_segmentCommitments")]
async fn segment_commitments(
&self,
segment_indexes: Vec<SegmentIndex>,
) -> RpcResult<Vec<Option<SegmentCommitment>>>;

#[method(name = "subspace_segmentHeaders")]
async fn segment_headers(
&self,
Expand Down Expand Up @@ -148,13 +141,6 @@ struct BlockSignatureSenders {
senders: Vec<async_oneshot::Sender<RewardSignatureResponse>>,
}

pub trait SegmentHeaderProvider {
fn get_segment_header(
&self,
segment_index: SegmentIndex,
) -> Result<Option<SegmentHeader>, Box<dyn Error>>;
}

pub trait PieceProvider {
fn get_piece_by_index(
&self,
Expand All @@ -163,10 +149,9 @@ pub trait PieceProvider {
}

/// Implements the [`SubspaceRpcApiServer`] trait for interacting with Subspace.
pub struct SubspaceRpc<Block, Client, RBP, PP, SO>
pub struct SubspaceRpc<Block, Client, PP, SO, AS>
where
Block: BlockT,
RBP: SegmentHeaderProvider,
PP: PieceProvider,
SO: SyncOracle + Send + Sync + Clone + 'static,
{
Expand All @@ -178,13 +163,13 @@ where
solution_response_senders: Arc<Mutex<SolutionResponseSenders>>,
reward_signature_senders: Arc<Mutex<BlockSignatureSenders>>,
dsn_bootstrap_nodes: Vec<Multiaddr>,
subspace_link: SubspaceLink<Block>,
segment_header_provider: RBP,
segment_headers_store: SegmentHeadersStore<AS>,
piece_provider: Option<PP>,
archived_segment_acknowledgement_senders:
Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
next_subscription_id: AtomicU64,
sync_oracle: SubspaceSyncOracle<SO>,
_block: PhantomData<Block>,
}

/// [`SubspaceRpc`] is used for notifying subscribers about arrival of new slots and for
Expand All @@ -194,12 +179,12 @@ where
/// every subscriber, after which RPC server waits for the same number of
/// `subspace_submitSolutionResponse` requests with `SolutionResponse` in them or until
/// timeout is exceeded. The first valid solution for a particular slot wins, others are ignored.
impl<Block, Client, RBP, PP, SO> SubspaceRpc<Block, Client, RBP, PP, SO>
impl<Block, Client, PP, SO, AS> SubspaceRpc<Block, Client, PP, SO, AS>
where
Block: BlockT,
RBP: SegmentHeaderProvider,
PP: PieceProvider,
SO: SyncOracle + Send + Sync + Clone + 'static,
AS: AuxStore + Send + Sync + 'static,
{
#[allow(clippy::too_many_arguments)]
/// Creates a new instance of the `SubspaceRpc` handler.
Expand All @@ -212,8 +197,7 @@ where
ArchivedSegmentNotification,
>,
dsn_bootstrap_nodes: Vec<Multiaddr>,
subspace_link: SubspaceLink<Block>,
segment_header_provider: RBP,
segment_headers_store: SegmentHeadersStore<AS>,
piece_provider: Option<PP>,
sync_oracle: SubspaceSyncOracle<SO>,
) -> Self {
Expand All @@ -226,18 +210,18 @@ where
solution_response_senders: Arc::default(),
reward_signature_senders: Arc::default(),
dsn_bootstrap_nodes,
subspace_link,
segment_header_provider,
segment_headers_store,
piece_provider,
archived_segment_acknowledgement_senders: Arc::default(),
next_subscription_id: AtomicU64::default(),
sync_oracle,
_block: PhantomData,
}
}
}

#[async_trait]
impl<Block, Client, RBP, PP, SO> SubspaceRpcApiServer for SubspaceRpc<Block, Client, RBP, PP, SO>
impl<Block, Client, PP, SO, AS> SubspaceRpcApiServer for SubspaceRpc<Block, Client, PP, SO, AS>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>
Expand All @@ -247,9 +231,9 @@ where
+ Sync
+ 'static,
Client::Api: SubspaceRuntimeApi<Block, FarmerPublicKey>,
RBP: SegmentHeaderProvider + Send + Sync + 'static,
PP: PieceProvider + Send + Sync + 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
AS: AuxStore + Send + Sync + 'static,
{
fn get_farmer_app_info(&self) -> RpcResult<FarmerAppInfo> {
let best_hash = self.client.info().best_hash;
Expand Down Expand Up @@ -653,93 +637,25 @@ where
Ok(())
}

// TODO: Remove as unnecessary, `segment_headers` can be used instead
async fn segment_commitments(
&self,
segment_indexes: Vec<SegmentIndex>,
) -> RpcResult<Vec<Option<SegmentCommitment>>> {
if segment_indexes.len() > MAX_SEGMENT_INDEXES_PER_REQUEST {
error!(
"segment_indexes length exceed the limit: {} ",
segment_indexes.len()
);

return Err(JsonRpseeError::Custom(format!(
"segment_indexes length exceed the limit {MAX_SEGMENT_INDEXES_PER_REQUEST}"
)));
};

let runtime_api = self.client.runtime_api();
let best_hash = self.client.info().best_hash;
let best_block_number = self.client.info().best_number;

let segment_commitment_result: Result<Vec<_>, JsonRpseeError> = segment_indexes
.into_iter()
.map(|segment_index| {
let api_result = runtime_api
.segment_commitment(best_hash, segment_index)
.map_err(|_| {
JsonRpseeError::Custom(
"Internal error during `segment_commitment` call".to_string(),
)
});

api_result.map(|maybe_segment_commitment| {
// This is not a very nice hack due to the fact that at the time first block is
// produced extrinsics with segment headers are not yet in runtime.
if maybe_segment_commitment.is_none() && best_block_number.is_zero() {
self.subspace_link
.segment_commitment_by_segment_index(segment_index)
} else {
maybe_segment_commitment
}
})
})
.collect();

if let Err(ref err) = segment_commitment_result {
error!(
"Failed to get data from runtime API (segment_commitment): {}",
err
);
}

segment_commitment_result
}

async fn segment_headers(
&self,
segment_indexes: Vec<SegmentIndex>,
) -> RpcResult<Vec<Option<SegmentHeader>>> {
if segment_indexes.len() > MAX_SEGMENT_INDEXES_PER_REQUEST {
if segment_indexes.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
error!(
"segment_indexes length exceed the limit: {} ",
segment_indexes.len()
);

return Err(JsonRpseeError::Custom(format!(
"segment_indexes length exceed the limit {MAX_SEGMENT_INDEXES_PER_REQUEST}"
"segment_indexes length exceed the limit {MAX_SEGMENT_HEADERS_PER_REQUEST}"
)));
};

let segment_commitment_result: Result<Vec<_>, JsonRpseeError> = segment_indexes
Ok(segment_indexes
.into_iter()
.map(|segment_index| {
self.segment_header_provider
.get_segment_header(segment_index)
.map_err(|_| {
JsonRpseeError::Custom(
"Internal error during `segment_headers` call".to_string(),
)
})
})
.collect();

if let Err(err) = &segment_commitment_result {
error!(?err, "Failed to get segment headers.");
}

segment_commitment_result
.map(|segment_index| self.segment_headers_store.get_segment_header(segment_index))
.collect())
}

fn piece(&self, piece_index: PieceIndex) -> RpcResult<Option<Vec<u8>>> {
Expand Down
Loading