Skip to content
This repository has been archived by the owner on Feb 15, 2024. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
i1i1 committed Apr 4, 2023
1 parent 21d167f commit 1547775
Show file tree
Hide file tree
Showing 8 changed files with 879 additions and 563 deletions.
1,247 changes: 779 additions & 468 deletions Cargo.lock

Large diffs are not rendered by default.

123 changes: 62 additions & 61 deletions Cargo.toml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/dsn/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use derive_more::{Deref, DerefMut, Display, From};
use either::*;
use futures::prelude::*;
use libp2p_core::Multiaddr;
use sc_network_common::config::MultiaddrWithPeerId;
use sc_service::config::MultiaddrWithPeerId;
use serde::{Deserialize, Serialize};
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer_components::piece_caching::PieceMemoryCache;
Expand Down
4 changes: 2 additions & 2 deletions src/node/domains/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ impl CoreDomainNode {
domain_id: DomainId::CORE_PAYMENTS,
core_domain_config,
system_domain_client: system_domain_node.client.clone(),
system_domain_network: system_domain_node.network.clone(),
system_domain_sync_service: system_domain_node.sync_service.clone(),
primary_chain_client: primary_chain_node.client.clone(),
primary_network_sync_oracle: primary_chain_node.network.clone(),
primary_network_sync_oracle: primary_chain_node.sync_service.clone(),
select_chain: primary_chain_node.select_chain.clone(),
executor_streams,
gossip_message_sink,
Expand Down
4 changes: 2 additions & 2 deletions src/node/domains/eth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ impl EthDomainNode {
domain_id: DomainId::CORE_ETH_RELAY,
core_domain_config,
system_domain_client: system_domain_node.client.clone(),
system_domain_network: system_domain_node.network.clone(),
system_domain_sync_service: system_domain_node.sync_service.clone(),
primary_chain_client: primary_chain_node.client.clone(),
primary_network_sync_oracle: primary_chain_node.network.clone(),
primary_network_sync_oracle: primary_chain_node.sync_service.clone(),
select_chain: primary_chain_node.select_chain.clone(),
executor_streams,
gossip_message_sink,
Expand Down
5 changes: 3 additions & 2 deletions src/node/domains/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl SystemDomainNode {
let system_domain_node = domain_service::new_full_system(
system_domain_config,
primary_new_full.client.clone(),
primary_new_full.network.clone(),
primary_new_full.sync_service.clone(),
&primary_new_full.select_chain,
executor_streams,
gossip_msg_sink.clone(),
Expand Down Expand Up @@ -210,7 +210,8 @@ impl SystemDomainNode {

let cross_domain_message_gossip_worker =
cross_domain_message_gossip::GossipWorker::<Block>::new(
primary_new_full.network.clone(),
primary_new_full.network_service.clone(),
primary_new_full.sync_service.clone(),
domain_tx_pool_sinks,
);

Expand Down
49 changes: 26 additions & 23 deletions src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use futures::channel::{mpsc, oneshot};
use futures::{FutureExt, SinkExt, Stream, StreamExt};
use sc_consensus_subspace_rpc::SegmentHeaderProvider;
use sc_network::network_state::NetworkState;
use sc_network::{NetworkService, NetworkStateInfo, NetworkStatusProvider, SyncState};
use sc_network_common::config::MultiaddrWithPeerId;
use sc_network::{NetworkService, NetworkStateInfo, SyncState};
use sc_rpc_api::state::StateApiClient;
use sc_service::config::MultiaddrWithPeerId;
use sp_consensus::SyncOracle;
use sp_core::H256;
use subspace_core_primitives::{PieceIndexHash, SegmentIndex};
Expand Down Expand Up @@ -195,8 +195,9 @@ impl Config {
client,
rpc_handlers,
network_starter,
network,
sync_service,
backend,
network_service,

select_chain: _,
reward_signing_notification_stream: _,
Expand Down Expand Up @@ -243,7 +244,8 @@ impl Config {
Ok(Node {
client,
system_domain,
network,
network_service,
sync_service,
name,
rpc_handle,
dsn,
Expand Down Expand Up @@ -299,7 +301,10 @@ pub struct Node {
#[derivative(Debug = "ignore")]
client: Arc<FullClient>,
#[derivative(Debug = "ignore")]
network: Arc<NetworkService<RuntimeBlock, Hash>>,
sync_service: Arc<sc_network_sync::service::chain_sync::SyncingService<RuntimeBlock>>,
#[derivative(Debug = "ignore")]
network_service: Arc<NetworkService<RuntimeBlock, Hash>>,

pub(crate) rpc_handle: crate::utils::Rpc,
pub(crate) stop_sender: mpsc::Sender<oneshot::Sender<()>>,
pub(crate) name: String,
Expand Down Expand Up @@ -444,8 +449,8 @@ impl Node {

/// Get listening addresses of the node
pub async fn listen_addresses(&self) -> anyhow::Result<Vec<MultiaddrWithPeerId>> {
let peer_id = self.network.local_peer_id();
self.network
let peer_id = self.network_service.local_peer_id();
self.network_service
.network_state()
.await
.map(|state| {
Expand Down Expand Up @@ -492,7 +497,7 @@ impl Node {
.build();

backoff::future::retry(check_offline_backoff, || {
futures::future::ready(if self.network.is_offline() {
futures::future::ready(if self.sync_service.is_offline() {
Err(backoff::Error::transient(()))
} else {
Ok(())
Expand All @@ -505,10 +510,10 @@ impl Node {
let inner = tokio_stream::wrappers::ReceiverStream::new(receiver);

let result = backoff::future::retry(check_synced_backoff.clone(), || {
self.network.status().map(|result| match result.map(|status| status.sync_state) {
self.sync_service.status().map(|result| match result.map(|status| status.state) {
Ok(SyncState::Importing { target }) => Ok((target, SyncStatus::Importing)),
Ok(SyncState::Downloading { target }) => Ok((target, SyncStatus::Downloading)),
_ if self.network.is_offline() =>
_ if self.sync_service.is_offline() =>
Err(backoff::Error::transient(Some(anyhow::anyhow!("Node went offline")))),
Err(()) => Err(backoff::Error::transient(Some(anyhow::anyhow!(
"Failed to fetch networking status"
Expand All @@ -532,24 +537,22 @@ impl Node {
.expect("We are holding receiver, so it will never panic");

tokio::spawn({
let network = Arc::clone(&self.network);
let sync = Arc::clone(&self.sync_service);
let client = Arc::clone(&self.client);
async move {
loop {
tokio::time::sleep(CHECK_SYNCED_EVERY).await;

let result = backoff::future::retry(check_synced_backoff.clone(), || {
network.status().map(|result| {
match result.map(|status| status.sync_state) {
Ok(SyncState::Importing { target }) =>
Ok(Ok((target, SyncStatus::Importing))),
Ok(SyncState::Downloading { target }) =>
Ok(Ok((target, SyncStatus::Downloading))),
Err(()) =>
Ok(Err(anyhow::anyhow!("Failed to fetch networking status"))),
Ok(SyncState::Idle | SyncState::Pending) =>
Err(backoff::Error::transient(())),
}
sync.status().map(|result| match result.map(|status| status.state) {
Ok(SyncState::Importing { target }) =>
Ok(Ok((target, SyncStatus::Importing))),
Ok(SyncState::Downloading { target }) =>
Ok(Ok((target, SyncStatus::Downloading))),
Err(()) =>
Ok(Err(anyhow::anyhow!("Failed to fetch networking status"))),
Ok(SyncState::Idle | SyncState::Pending) =>
Err(backoff::Error::transient(())),
})
})
.await;
Expand Down Expand Up @@ -608,7 +611,7 @@ impl Node {
/// Get node info
pub async fn get_info(&self) -> anyhow::Result<Info> {
let NetworkState { connected_peers, not_connected_peers, .. } = self
.network
.network_service
.network_state()
.await
.map_err(|()| anyhow::anyhow!("Failed to fetch node info: node already exited"))?;
Expand Down
8 changes: 4 additions & 4 deletions src/node/substrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use libp2p_core::Multiaddr;
use sc_executor::{WasmExecutionMethod, WasmtimeInstantiationStrategy};
use sc_network::config::{NodeKeyConfig, Secret};
use sc_network::ProtocolName;
use sc_network_common::config::{MultiaddrWithPeerId, NonDefaultSetConfig, TransportConfig};
use sc_service::config::{KeystoreConfig, NetworkConfiguration};
use sc_service::config::{
KeystoreConfig, MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig, TransportConfig,
};
use sc_service::{BasePath, Configuration, DatabaseSource, TracingReceiver};
use serde::{Deserialize, Serialize};
pub use types::*;
Expand Down Expand Up @@ -225,7 +226,7 @@ impl Base {
network.default_peers_set.out_peers = 50;
// Full + Light clients
network.default_peers_set.in_peers = 25 + 100;
let (keystore_remote, keystore) = (None, KeystoreConfig::InMemory);
let keystore = KeystoreConfig::InMemory;

// HACK: Tricky way to add extra endpoints as we can't push into telemetry
// endpoints
Expand Down Expand Up @@ -255,7 +256,6 @@ impl Base {
tokio_handle: tokio::runtime::Handle::current(),
transaction_pool: Default::default(),
network,
keystore_remote,
keystore,
database: DatabaseSource::ParityDb { path: config_dir.join("paritydb").join("full") },
trie_cache_maximum_size: Some(67_108_864),
Expand Down

0 comments on commit 1547775

Please sign in to comment.