Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Introduce ChainSyncInterface (#12489)
Browse files Browse the repository at this point in the history
* Introduce `ChainSyncInterface`

`ChainSyncInterface` provides an asynchronous interface for other
subsystems to submit calls to `ChainSync`. This allows `NetworkService`
to delegate calls to `ChainSync` while still providing the same API
for other subsystems (for now). This makes it possible to move the
syncing code in piecemeal fashion out of `protocol.rs` as the calls
are just forwarded to `ChainSync`.

* Apply review comments

* Fix tests
  • Loading branch information
altonen authored Oct 17, 2022
1 parent 6f453b5 commit 30a7a5b
Show file tree
Hide file tree
Showing 17 changed files with 301 additions and 45 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,4 +395,14 @@ pub trait ChainSync<Block: BlockT>: Send {

/// Decode implementation-specific state response.
fn decode_state_response(&self, response: &[u8]) -> Result<OpaqueStateResponse, String>;

/// Advance the state of `ChainSync`
///
/// Internally calls [`ChainSync::poll_block_announce_validation()`] and
/// this function should be polled until it returns [`Poll::Pending`] to
/// consume all pending events.
fn poll(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
}
4 changes: 4 additions & 0 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub use sc_network_common::{

pub use libp2p::{build_multiaddr, core::PublicKey, identity};

use crate::ChainSyncInterface;
use core::{fmt, iter};
use libp2p::{
identity::{ed25519, Keypair},
Expand Down Expand Up @@ -91,6 +92,9 @@ where
/// Instance of chain sync implementation.
pub chain_sync: Box<dyn ChainSync<B>>,

/// Interface that can be used to delegate syncing-related function calls to `ChainSync`
pub chain_sync_service: Box<dyn ChainSyncInterface<B>>,

/// Registry for recording prometheus metrics to.
pub metrics_registry: Option<Registry>,

Expand Down
12 changes: 12 additions & 0 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ pub use service::{
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender,
NotificationSenderReady, OutboundFailure, PublicKey,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};

pub use sc_peerset::ReputationChange;

Expand All @@ -293,3 +294,14 @@ const MAX_CONNECTIONS_PER_PEER: usize = 2;

/// The maximum number of concurrent established connections that were incoming.
const MAX_CONNECTIONS_ESTABLISHED_INCOMING: u32 = 10_000;

/// Abstraction over syncing-related services
pub trait ChainSyncInterface<B: BlockT>:
NetworkSyncForkRequest<B::Hash, NumberFor<B>> + Send + Sync
{
}

impl<T, B: BlockT> ChainSyncInterface<B> for T where
T: NetworkSyncForkRequest<B::Hash, NumberFor<B>> + Send + Sync
{
}
19 changes: 5 additions & 14 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,18 +947,6 @@ where
self.chain_sync.clear_justification_requests();
}

/// Request syncing for the given block from given set of peers.
/// Uses `protocol` to queue a new block download request and tries to dispatch all pending
/// requests.
pub fn set_sync_fork_request(
&mut self,
peers: Vec<PeerId>,
hash: &B::Hash,
number: NumberFor<B>,
) {
self.chain_sync.set_sync_fork_request(peers, hash, number)
}

/// A batch of blocks have been processed, with or without errors.
/// Call this when a batch of blocks have been processed by the importqueue, with or without
/// errors.
Expand Down Expand Up @@ -1461,8 +1449,11 @@ where
self.pending_messages.push_back(event);
}

// Check if there is any block announcement validation finished.
while let Poll::Ready(result) = self.chain_sync.poll_block_announce_validation(cx) {
// Advance the state of `ChainSync`
//
// Process any received requests received from `NetworkService` and
// check if there is any block announcement validation finished.
while let Poll::Ready(result) = self.chain_sync.poll(cx) {
match self.process_block_announce_validation_result(result) {
CustomMessageOutcome::None => {},
outcome => self.pending_messages.push_back(outcome),
Expand Down
13 changes: 5 additions & 8 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
protocol::{self, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, Ready},
transport, ReputationChange,
transport, ChainSyncInterface, ReputationChange,
};

use futures::{channel::oneshot, prelude::*};
Expand Down Expand Up @@ -121,6 +121,8 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
peerset: PeersetHandle,
/// Channel that sends messages to the actual worker.
to_worker: TracingUnboundedSender<ServiceToWorkerMsg<B>>,
/// Interface that can be used to delegate calls to `ChainSync`
chain_sync_service: Box<dyn ChainSyncInterface<B>>,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Updated by the [`NetworkWorker`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ProtocolName), NotificationsSink>>>,
Expand Down Expand Up @@ -433,6 +435,7 @@ where
local_peer_id,
local_identity,
to_worker,
chain_sync_service: params.chain_sync_service,
peers_notifications_sinks: peers_notifications_sinks.clone(),
notifications_sizes_metric: metrics
.as_ref()
Expand Down Expand Up @@ -814,7 +817,7 @@ where
/// a stale fork missing.
/// Passing empty `peers` set effectively removes the sync request.
fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::SyncFork(peers, hash, number));
self.chain_sync_service.set_sync_fork_request(peers, hash, number);
}
}

Expand Down Expand Up @@ -1219,7 +1222,6 @@ enum ServiceToWorkerMsg<B: BlockT> {
RemoveSetReserved(ProtocolName, PeerId),
AddToPeersSet(ProtocolName, PeerId),
RemoveFromPeersSet(ProtocolName, PeerId),
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
EventStream(out_events::Sender),
Request {
target: PeerId,
Expand Down Expand Up @@ -1380,11 +1382,6 @@ where
.behaviour_mut()
.user_protocol_mut()
.remove_from_peers_set(protocol, peer_id),
ServiceToWorkerMsg::SyncFork(peer_ids, hash, number) => this
.network_service
.behaviour_mut()
.user_protocol_mut()
.set_sync_fork_request(peer_ids, &hash, number),
ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender),
ServiceToWorkerMsg::Request {
target,
Expand Down
56 changes: 46 additions & 10 deletions client/network/src/service/chainsync_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{config, NetworkWorker};
use crate::{config, ChainSyncInterface, NetworkWorker};

use futures::prelude::*;
use libp2p::PeerId;
Expand All @@ -35,7 +35,7 @@ use sc_network_common::{
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
block_request_handler::BlockRequestHandler, mock::MockChainSync,
state_request_handler::StateRequestHandler,
service::mock::MockChainSyncInterface, state_request_handler::StateRequestHandler,
};
use sp_core::H256;
use sp_runtime::{
Expand All @@ -56,6 +56,7 @@ const PROTOCOL_NAME: &str = "/foo";

fn make_network(
chain_sync: Box<dyn ChainSyncT<substrate_test_runtime_client::runtime::Block>>,
chain_sync_service: Box<dyn ChainSyncInterface<substrate_test_runtime_client::runtime::Block>>,
client: Arc<substrate_test_runtime_client::TestClient>,
) -> (TestNetworkWorker, Arc<substrate_test_runtime_client::TestClient>) {
let network_config = config::NetworkConfiguration {
Expand Down Expand Up @@ -174,6 +175,7 @@ fn make_network(
fork_id,
import_queue,
chain_sync,
chain_sync_service,
metrics_registry: None,
block_request_protocol_config,
state_request_protocol_config,
Expand All @@ -193,7 +195,7 @@ fn set_default_expecations_no_peers(
chain_sync.expect_state_request().returning(|| None);
chain_sync.expect_justification_requests().returning(|| Box::new(iter::empty()));
chain_sync.expect_warp_sync_request().returning(|| None);
chain_sync.expect_poll_block_announce_validation().returning(|_| Poll::Pending);
chain_sync.expect_poll().returning(|_| Poll::Pending);
chain_sync.expect_status().returning(|| SyncStatus {
state: SyncState::Idle,
best_seen_block: None,
Expand All @@ -207,11 +209,18 @@ fn set_default_expecations_no_peers(
#[async_std::test]
async fn normal_network_poll_no_peers() {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

// build `ChainSync` and set default expectations for it
let mut chain_sync =
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());
set_default_expecations_no_peers(&mut chain_sync);

let (mut network, _) = make_network(chain_sync, client);
// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
// called)
let chain_sync_service =
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());

let (mut network, _) = make_network(chain_sync, chain_sync_service, client);

// poll the network once
futures::future::poll_fn(|cx| {
Expand All @@ -224,6 +233,13 @@ async fn normal_network_poll_no_peers() {
#[async_std::test]
async fn request_justification() {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
// called)
let chain_sync_service =
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());

// build `ChainSync` and verify that call to `request_justification()` is made
let mut chain_sync =
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());

Expand All @@ -237,7 +253,7 @@ async fn request_justification() {
.returning(|_, _| ());

set_default_expecations_no_peers(&mut chain_sync);
let (mut network, _) = make_network(chain_sync, client);
let (mut network, _) = make_network(chain_sync, chain_sync_service, client);

// send "request justifiction" message and poll the network
network.service().request_justification(&hash, number);
Expand All @@ -252,13 +268,20 @@ async fn request_justification() {
#[async_std::test]
async fn clear_justification_requests(&mut self) {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
// called)
let chain_sync_service =
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());

// build `ChainSync` and verify that call to `clear_justification_requests()` is made
let mut chain_sync =
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());

chain_sync.expect_clear_justification_requests().once().returning(|| ());

set_default_expecations_no_peers(&mut chain_sync);
let (mut network, _) = make_network(chain_sync, client);
let (mut network, _) = make_network(chain_sync, chain_sync_service, client);

// send "request justifiction" message and poll the network
network.service().clear_justification_requests();
Expand All @@ -273,24 +296,31 @@ async fn clear_justification_requests(&mut self) {
#[async_std::test]
async fn set_sync_fork_request() {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

// build `ChainSync` and set default expectations for it
let mut chain_sync =
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());
set_default_expecations_no_peers(&mut chain_sync);

// build `ChainSyncInterface` provider and verify that the `set_sync_fork_request()`
// call is delegated to `ChainSyncInterface` (which eventually forwards it to `ChainSync`)
let mut chain_sync_service =
MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new();

let hash = H256::random();
let number = 1337u64;
let peers = (0..3).map(|_| PeerId::random()).collect::<Vec<_>>();
let copy_peers = peers.clone();

chain_sync
chain_sync_service
.expect_set_sync_fork_request()
.withf(move |in_peers, in_hash, in_number| {
&peers == in_peers && &hash == in_hash && &number == in_number
})
.once()
.returning(|_, _, _| ());

set_default_expecations_no_peers(&mut chain_sync);
let (mut network, _) = make_network(chain_sync, client);
let (mut network, _) = make_network(chain_sync, Box::new(chain_sync_service), client);

// send "set sync fork request" message and poll the network
network.service().set_sync_fork_request(copy_peers, hash, number);
Expand All @@ -305,6 +335,12 @@ async fn set_sync_fork_request() {
#[async_std::test]
async fn on_block_finalized() {
let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);
// build `ChainSyncInterface` provider and set no expecations for it (i.e., it cannot be
// called)
let chain_sync_service =
Box::new(MockChainSyncInterface::<substrate_test_runtime_client::runtime::Block>::new());

// build `ChainSync` and verify that call to `on_block_finalized()` is made
let mut chain_sync =
Box::new(MockChainSync::<substrate_test_runtime_client::runtime::Block>::new());

Expand All @@ -326,7 +362,7 @@ async fn on_block_finalized() {
.returning(|_, _| ());

set_default_expecations_no_peers(&mut chain_sync);
let (mut network, _) = make_network(chain_sync, client);
let (mut network, _) = make_network(chain_sync, chain_sync_service, client);

// send "set sync fork request" message and poll the network
network.on_block_finalized(hash, header);
Expand Down
3 changes: 2 additions & 1 deletion client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ fn build_test_full_node(
protocol_config
};

let chain_sync = ChainSync::new(
let (chain_sync, chain_sync_service) = ChainSync::new(
match network_config.sync_mode {
config::SyncMode::Full => sc_network_common::sync::SyncMode::Full,
config::SyncMode::Fast { skip_proofs, storage_chain_mode } =>
Expand Down Expand Up @@ -172,6 +172,7 @@ fn build_test_full_node(
fork_id,
import_queue,
chain_sync: Box::new(chain_sync),
chain_sync_service,
metrics_registry: None,
block_request_protocol_config,
state_request_protocol_config,
Expand Down
2 changes: 2 additions & 0 deletions client/network/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ sc-client-api = { version = "4.0.0-dev", path = "../../api" }
sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" }
sc-network-common = { version = "0.10.0-dev", path = "../common" }
sc-peerset = { version = "4.0.0-dev", path = "../../peerset" }
sc-utils = { version = "4.0.0-dev", path = "../../utils" }
sp-arithmetic = { version = "5.0.0", path = "../../../primitives/arithmetic" }
sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" }
sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" }
Expand All @@ -42,6 +43,7 @@ sp-finality-grandpa = { version = "4.0.0-dev", path = "../../../primitives/final
sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" }

[dev-dependencies]
async-std = { version = "1.11.0", features = ["attributes"] }
quickcheck = { version = "1.0.3", default-features = false }
sc-block-builder = { version = "0.10.0-dev", path = "../../block-builder" }
sp-test-primitives = { version = "2.0.0", path = "../../../primitives/test-primitives" }
Expand Down
Loading

0 comments on commit 30a7a5b

Please sign in to comment.