diff --git a/chain/chunks/src/test/basic.rs b/chain/chunks/src/test/basic.rs index e2c32a4819b..bcebb48534e 100644 --- a/chain/chunks/src/test/basic.rs +++ b/chain/chunks/src/test/basic.rs @@ -42,6 +42,12 @@ struct TestData { network_events: Vec, } +impl AsMut for TestData { + fn as_mut(&mut self) -> &mut Self { + self + } +} + impl TestData { fn new(shards_manager: ShardsManager, chain: MockChainForShardsManager) -> Self { Self { shards_manager, chain, client_events: vec![], network_events: vec![] } @@ -177,7 +183,7 @@ fn test_chunk_forward() { test.register_handler(forward_client_request_to_shards_manager().widen()); test.register_handler(forward_network_request_to_shards_manager().widen()); test.register_handler(periodically_resend_chunk_requests(CHUNK_REQUEST_RETRY).widen()); - test.register_handler(handle_adhoc_events()); + test.register_handler(handle_adhoc_events::().widen()); // We'll produce a single chunk whose next chunk producer is a chunk-only // producer, so that we can test that the chunk is forwarded to the next diff --git a/chain/chunks/src/test/multi.rs b/chain/chunks/src/test/multi.rs index aad1dc5b015..24e8bf84dd3 100644 --- a/chain/chunks/src/test/multi.rs +++ b/chain/chunks/src/test/multi.rs @@ -41,6 +41,12 @@ struct TestData { account_id: AccountId, } +impl AsMut for TestData { + fn as_mut(&mut self) -> &mut Self { + self + } +} + #[derive(EnumTryInto, Debug, EnumFrom)] enum TestEvent { Adhoc(AdhocEvent), @@ -99,7 +105,7 @@ fn basic_setup(config: BasicSetupConfig) -> ShardsManagerTestLoop { .collect::>(); let mut test = builder.build(data); for idx in 0..test.data.len() { - test.register_handler(handle_adhoc_events().for_index(idx)); + test.register_handler(handle_adhoc_events::().widen().for_index(idx)); test.register_handler(forward_client_request_to_shards_manager().widen().for_index(idx)); test.register_handler(forward_network_request_to_shards_manager().widen().for_index(idx)); test.register_handler(capture_events::().widen().for_index(idx)); diff --git a/chain/client/src/test_utils/client_actions_test_utils.rs b/chain/client/src/test_utils/client_actions_test_utils.rs index 31ce0fe9973..7f06bd79ad7 100644 --- a/chain/client/src/test_utils/client_actions_test_utils.rs +++ b/chain/client/src/test_utils/client_actions_test_utils.rs @@ -2,6 +2,46 @@ use crate::client_actions::{ClientActionHandler, ClientActions, ClientSenderForC use crate::sync_jobs_actions::ClientSenderForSyncJobsMessage; use near_async::test_loop::event_handler::LoopEventHandler; use near_chunks::client::ShardsManagerResponse; +use near_network::client::ClientSenderForNetworkMessage; + +pub fn forward_client_messages_from_network_to_client_actions( +) -> LoopEventHandler { + LoopEventHandler::new(|msg, client_actions: &mut ClientActions, _| { + match msg { + ClientSenderForNetworkMessage::_state_response(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_block_approval(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_transaction(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_block(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_block_headers(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_challenge(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_network_info(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_chunk_state_witness(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + ClientSenderForNetworkMessage::_chunk_endorsement(msg) => { + (msg.callback)(Ok(client_actions.handle(msg.message))); + } + _ => { + return Err(msg); + } + } + Ok(()) + }) +} pub fn forward_client_messages_from_client_to_client_actions( ) -> LoopEventHandler { diff --git a/chain/network/src/test_loop.rs b/chain/network/src/test_loop.rs index 3a53b754538..f35cccfd612 100644 --- a/chain/network/src/test_loop.rs +++ b/chain/network/src/test_loop.rs @@ -5,6 +5,7 @@ use near_primitives::types::AccountId; /// This trait is just a helper for looking up the index. pub trait SupportsRoutingLookup { fn index_for_account(&self, account: &AccountId) -> usize; + fn num_accounts(&self) -> usize; } impl> SupportsRoutingLookup for Vec { @@ -13,4 +14,8 @@ impl> SupportsRoutingLookup for Vec { .position(|data| data.as_ref() == account) .unwrap_or_else(|| panic!("Account not found: {}", account)) } + + fn num_accounts(&self) -> usize { + self.len() + } } diff --git a/core/async/src/examples/actix_component_test.rs b/core/async/src/examples/actix_component_test.rs index a5463cd9fde..a3917305399 100644 --- a/core/async/src/examples/actix_component_test.rs +++ b/core/async/src/examples/actix_component_test.rs @@ -48,7 +48,7 @@ fn test_actix_component() { dummy: (), example: ExampleComponent::new(builder.sender().into_sender()), outer: OuterComponent::new( - builder.wrapped_multi_sender::(), + builder.sender().into_wrapped_multi_sender::(), ), periodic_requests_captured: vec![], }; @@ -66,7 +66,7 @@ fn test_actix_component() { test.register_handler(example_handler().widen()); // We need to redo whatever the ExampleActor does in its `started` method. - test.data.example.start(&mut test.delayed_action_runner()); + test.data.example.start(&mut test.sender().into_delayed_action_runner()); // Send some requests; this can be done in the asynchronous context. test.future_spawner().spawn("wait for 5", { let res = test.data.outer.call_example_component_for_response(5); diff --git a/core/async/src/examples/async_component_test.rs b/core/async/src/examples/async_component_test.rs index bd51cd492e4..d8fec354ffa 100644 --- a/core/async/src/examples/async_component_test.rs +++ b/core/async/src/examples/async_component_test.rs @@ -51,7 +51,7 @@ fn inner_request_handler( fn test_async_component() { let builder = TestLoopBuilder::::new(); let sender = builder.sender(); - let future_spawner = builder.future_spawner(); + let future_spawner = builder.sender().into_future_spawner(); let mut test = builder.build(TestData { dummy: (), output: vec![], diff --git a/core/async/src/examples/sum_numbers_test.rs b/core/async/src/examples/sum_numbers_test.rs index b01eca9e36e..428c8db81d0 100644 --- a/core/async/src/examples/sum_numbers_test.rs +++ b/core/async/src/examples/sum_numbers_test.rs @@ -12,12 +12,18 @@ use crate::{ use super::sum_numbers::{ReportSumMsg, SumNumbersComponent, SumRequest}; -#[derive(derive_more::AsMut, derive_more::AsRef)] +#[derive(derive_more::AsMut)] struct TestData { summer: SumNumbersComponent, sums: Vec, } +impl AsMut for TestData { + fn as_mut(&mut self) -> &mut Self { + self + } +} + #[derive(Debug, EnumTryInto, EnumFrom)] enum TestEvent { Request(SumRequest), @@ -71,7 +77,7 @@ fn test_simple_with_adhoc() { let mut test = builder.build(data); test.register_handler(forward_sum_request().widen()); test.register_handler(capture_events::().widen()); - test.register_handler(handle_adhoc_events()); + test.register_handler(handle_adhoc_events::().widen()); // It is preferrable to put as much setup logic as possible into an adhoc // event (queued by .run below), so that as much logic as possible is diff --git a/core/async/src/test_loop.rs b/core/async/src/test_loop.rs index 173e9122a79..a99c90279c9 100644 --- a/core/async/src/test_loop.rs +++ b/core/async/src/test_loop.rs @@ -68,16 +68,10 @@ pub mod multi_instance; use self::{ delay_sender::DelaySender, event_handler::LoopEventHandler, - futures::{ - TestLoopDelayedActionEvent, TestLoopDelayedActionRunner, TestLoopFutureSpawner, - TestLoopTask, - }, -}; -use crate::{break_apart::BreakApart, time}; -use crate::{ - messaging::{IntoMultiSender, IntoSender}, - test_loop::event_handler::LoopHandlerContext, + futures::{TestLoopFutureSpawner, TestLoopTask}, }; +use crate::test_loop::event_handler::LoopHandlerContext; +use crate::time; use near_o11y::{testonly::init_test_logger, tracing::log::info}; use serde::Serialize; use std::{ @@ -192,40 +186,11 @@ impl TestLoopBuilder { self.pending_events_sender.clone() } - /// A shortcut for a common use case, where we use an enum message to - /// represent all the possible messages that a multisender may be used to - /// send. - /// - /// This assumes that S is a multisender with the derive - /// `#[derive(MultiSendMessage, ...)]`, which creates the enum - /// `MyMultiSenderMessage` (where `MyMultiSender` is the name of the struct - /// being derived from). - /// - /// To use, first include in the test loop event enum a case for - /// `MyMultiSenderMessage`. Then, call this function to get a multisender, - /// like - /// `builder.wrapped_multi_sender()`. - pub fn wrapped_multi_sender(&self) -> S - where - DelaySender: IntoSender, - BreakApart: IntoMultiSender, - { - self.sender().into_sender().break_apart().into_multi_sender() - } - /// Returns a clock that will always return the current virtual time. pub fn clock(&self) -> time::Clock { self.clock.clock() } - /// Returns a FutureSpawner that can be used to spawn futures into the loop. - pub fn future_spawner(&self) -> TestLoopFutureSpawner - where - Event: From>, - { - self.sender().narrow() - } - pub fn build(self, data: Data) -> TestLoop { TestLoop::new(self.pending_events, self.pending_events_sender, self.clock, data) } @@ -356,13 +321,6 @@ impl TestLoop { { self.sender().narrow() } - - pub fn delayed_action_runner(&self) -> TestLoopDelayedActionRunner - where - Event: From>, - { - TestLoopDelayedActionRunner { sender: self.sender().narrow() } - } } impl Drop for TestLoop { diff --git a/core/async/src/test_loop/adhoc.rs b/core/async/src/test_loop/adhoc.rs index f8e51e3ee26..dee459cffa0 100644 --- a/core/async/src/test_loop/adhoc.rs +++ b/core/async/src/test_loop/adhoc.rs @@ -1,7 +1,4 @@ -use super::{ - delay_sender::DelaySender, - event_handler::{LoopEventHandler, TryIntoOrSelf}, -}; +use super::{delay_sender::DelaySender, event_handler::LoopEventHandler}; use crate::messaging::CanSend; use crate::time; use std::fmt::Debug; @@ -54,10 +51,8 @@ impl> + 'static> AdhocEventSender>>( -) -> LoopEventHandler { - LoopEventHandler::new(|event: Event, data, _ctx| { - let event = event.try_into_or_self()?; +pub fn handle_adhoc_events() -> LoopEventHandler> { + LoopEventHandler::new(|event: AdhocEvent, data, _ctx| { (event.handler)(data); Ok(()) }) diff --git a/core/async/src/test_loop/delay_sender.rs b/core/async/src/test_loop/delay_sender.rs index cc9ec5f3b7a..205166afa2a 100644 --- a/core/async/src/test_loop/delay_sender.rs +++ b/core/async/src/test_loop/delay_sender.rs @@ -1,7 +1,12 @@ +use crate::break_apart::BreakApart; use crate::messaging; +use crate::messaging::{IntoMultiSender, IntoSender}; +use crate::test_loop::futures::{TestLoopDelayedActionEvent, TestLoopDelayedActionRunner}; use crate::time; use std::sync::Arc; +use super::futures::{TestLoopFutureSpawner, TestLoopTask}; + /// Interface to send an event with a delay (in virtual time). It can be /// converted to a Sender for any message type that can be converted into /// the event type, so that a DelaySender given by the test loop may be passed @@ -23,6 +28,14 @@ impl DelaySender { self.0(event, delay); } + pub fn with_additional_delay(&self, delay: time::Duration) -> DelaySender + where + Event: 'static, + { + let f = self.0.clone(); + Self(Arc::new(move |event, other_delay| f(event, delay + other_delay))) + } + pub fn narrow(self) -> DelaySender where Event: From + 'static, @@ -31,6 +44,42 @@ impl DelaySender { self.send_with_delay(event.into(), delay) }) } + + /// A shortcut for a common use case, where we use an enum message to + /// represent all the possible messages that a multisender may be used to + /// send. + /// + /// This assumes that S is a multisender with the derive + /// `#[derive(MultiSendMessage, ...)]`, which creates the enum + /// `MyMultiSenderMessage` (where `MyMultiSender` is the name of the struct + /// being derived from). + /// + /// To use, first include in the test loop event enum a case for + /// `MyMultiSenderMessage`. Then, call this function to get a multisender, + /// like + /// `builder.wrapped_multi_sender()`. + pub fn into_wrapped_multi_sender(self) -> S + where + Self: IntoSender, + BreakApart: IntoMultiSender, + { + self.into_sender().break_apart().into_multi_sender() + } + + pub fn into_delayed_action_runner(self) -> TestLoopDelayedActionRunner + where + Event: From> + 'static, + { + TestLoopDelayedActionRunner { sender: self.narrow() } + } + + /// Returns a FutureSpawner that can be used to spawn futures into the loop. + pub fn into_future_spawner(self) -> TestLoopFutureSpawner + where + Event: From> + 'static, + { + self.narrow() + } } impl DelaySender<(usize, Event)> { diff --git a/core/chain-configs/src/updateable_config.rs b/core/chain-configs/src/updateable_config.rs index b53394a54c8..833315ab160 100644 --- a/core/chain-configs/src/updateable_config.rs +++ b/core/chain-configs/src/updateable_config.rs @@ -98,5 +98,6 @@ pub struct UpdateableClientConfig { pub resharding_config: ReshardingConfig, /// Time limit for adding transactions in produce_chunk() + #[serde(with = "near_async::time::serde_opt_duration_as_std")] pub produce_chunk_add_transactions_time_limit: Option, } diff --git a/integration-tests/src/tests/client/features.rs b/integration-tests/src/tests/client/features.rs index ccc2ad93033..c6e8c0cf4a4 100644 --- a/integration-tests/src/tests/client/features.rs +++ b/integration-tests/src/tests/client/features.rs @@ -15,6 +15,7 @@ mod increase_deployment_cost; mod increase_storage_compute_cost; mod limit_contract_functions_number; mod lower_storage_key_limit; +mod multinode_test_loop_example; mod nearvm; #[cfg(feature = "protocol_feature_nonrefundable_transfer_nep491")] mod nonrefundable_transfer; diff --git a/integration-tests/src/tests/client/features/multinode_test_loop_example.rs b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs new file mode 100644 index 00000000000..0e514145ccb --- /dev/null +++ b/integration-tests/src/tests/client/features/multinode_test_loop_example.rs @@ -0,0 +1,350 @@ +use derive_enum_from_into::{EnumFrom, EnumTryInto}; +use near_async::messaging::{noop, IntoMultiSender, IntoSender, MessageWithCallback, SendAsync}; +use near_async::test_loop::adhoc::{handle_adhoc_events, AdhocEvent, AdhocEventSender}; +use near_async::test_loop::event_handler::{ + ignore_events, LoopEventHandler, LoopHandlerContext, TryIntoOrSelf, +}; +use near_async::test_loop::futures::{ + drive_delayed_action_runners, drive_futures, TestLoopDelayedActionEvent, TestLoopTask, +}; +use near_async::test_loop::TestLoopBuilder; +use near_async::time::Duration; +use near_chain::chunks_store::ReadOnlyChunksStore; +use near_chain::ChainGenesis; +use near_chain_configs::{ClientConfig, Genesis, GenesisConfig, GenesisRecords}; +use near_chunks::adapter::ShardsManagerRequestFromClient; +use near_chunks::client::ShardsManagerResponse; +use near_chunks::test_loop::forward_client_request_to_shards_manager; +use near_chunks::ShardsManager; +use near_client::client_actions::{ + ClientActions, ClientSenderForClientMessage, SyncJobsSenderForClientMessage, +}; +use near_client::sync_jobs_actions::{ + ClientSenderForSyncJobsMessage, SyncJobsActions, SyncJobsSenderForSyncJobsMessage, +}; +use near_client::test_utils::client_actions_test_utils::{ + forward_client_messages_from_client_to_client_actions, + forward_client_messages_from_network_to_client_actions, + forward_client_messages_from_shards_manager, + forward_client_messages_from_sync_jobs_to_client_actions, +}; +use near_client::test_utils::sync_jobs_test_utils::forward_sync_jobs_messages_from_client_to_sync_jobs_actions; +use near_client::{Client, SyncAdapter, SyncMessage}; +use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; +use near_epoch_manager::EpochManager; +use near_network::client::{ + BlockApproval, BlockResponse, ClientSenderForNetwork, ClientSenderForNetworkMessage, +}; +use near_network::test_loop::SupportsRoutingLookup; +use near_network::types::{ + NetworkRequests, PeerManagerMessageRequest, PeerManagerMessageResponse, SetChainInfo, +}; +use near_primitives::network::PeerId; +use near_primitives::shard_layout::ShardLayout; +use near_primitives::state_record::StateRecord; +use near_primitives::test_utils::{create_test_signer, create_user_test_signer}; +use near_primitives::types::{AccountId, AccountInfo}; +use near_primitives::version::PROTOCOL_VERSION; +use near_primitives_core::account::{AccessKey, Account}; +use near_primitives_core::hash::CryptoHash; +use near_store::genesis::initialize_genesis_state; +use near_store::test_utils::create_test_store; +use nearcore::NightshadeRuntime; +use std::path::Path; +use std::sync::{Arc, RwLock}; + +#[derive(derive_more::AsMut, derive_more::AsRef)] +struct TestData { + pub dummy: (), + pub account: AccountId, + pub client: ClientActions, + pub sync_jobs: SyncJobsActions, + pub shards_manager: ShardsManager, +} + +impl AsMut for TestData { + fn as_mut(&mut self) -> &mut Self { + self + } +} + +#[derive(EnumTryInto, Debug, EnumFrom)] +#[allow(clippy::large_enum_variant)] +enum TestEvent { + Task(Arc), + Adhoc(AdhocEvent), + ClientDelayedActions(TestLoopDelayedActionEvent), + ClientEventFromNetwork(ClientSenderForNetworkMessage), + ClientEventFromClient(ClientSenderForClientMessage), + ClientEventFromSyncJobs(ClientSenderForSyncJobsMessage), + ClientEventFromShardsManager(ShardsManagerResponse), + SyncJobsEventFromClient(SyncJobsSenderForClientMessage), + SyncJobsEventFromSyncJobs(SyncJobsSenderForSyncJobsMessage), + ShardsManagerRequestFromClient(ShardsManagerRequestFromClient), + ClientEventFromStateSyncAdapter(SyncMessage), + NetworkMessage(PeerManagerMessageRequest), + NetworkMessageForResult( + MessageWithCallback, + ), + SetChainInfo(SetChainInfo), +} + +const ONE_NEAR: u128 = 1_000_000_000_000_000_000_000_000; + +// TODO(robin-near): Complete this test so that it will actually run a chain. +// TODO(robin-near): Make this a multi-node test. +// TODO(robin-near): Make the network layer send messages. +#[test] +fn test_client_with_multi_test_loop() { + const NUM_CLIENTS: usize = 4; + let builder = TestLoopBuilder::<(usize, TestEvent)>::new(); + + let validator_stake = 1000000 * ONE_NEAR; + let initial_balance = 10000 * ONE_NEAR; + let accounts = + (0..100).map(|i| format!("account{}", i).parse().unwrap()).collect::>(); + + // TODO: Make some builder for genesis. + let mut genesis_config = GenesisConfig { + protocol_version: PROTOCOL_VERSION, + genesis_height: 10000, + shard_layout: ShardLayout::v1( + vec!["account3", "account5", "account7"] + .into_iter() + .map(|a| a.parse().unwrap()) + .collect(), + None, + 1, + ), + min_gas_price: 0, + max_gas_price: 0, + gas_limit: 100000000000000, + transaction_validity_period: 1000, + validators: (0..NUM_CLIENTS) + .map(|idx| AccountInfo { + account_id: accounts[idx].clone(), + amount: validator_stake, + public_key: create_test_signer(accounts[idx].as_str()).public_key(), + }) + .collect(), + epoch_length: 10, + protocol_treasury_account: accounts[NUM_CLIENTS].clone(), + num_block_producer_seats: 4, + minimum_validators_per_shard: 1, + num_block_producer_seats_per_shard: vec![4, 4, 4, 4], + ..Default::default() + }; + let mut records = Vec::new(); + for (i, account) in accounts.iter().enumerate() { + // The staked amount must be consistent with validators from genesis. + let staked = if i < NUM_CLIENTS { validator_stake } else { 0 }; + records.push(StateRecord::Account { + account_id: account.clone(), + account: Account::new( + initial_balance, + staked, + 0, + CryptoHash::default(), + 0, + PROTOCOL_VERSION, + ), + }); + records.push(StateRecord::AccessKey { + account_id: account.clone(), + public_key: create_user_test_signer(&account).public_key, + access_key: AccessKey::full_access(), + }); + // The total supply must be correct to pass validation. + genesis_config.total_supply += initial_balance + staked; + } + let genesis = Genesis::new(genesis_config, GenesisRecords(records)).unwrap(); + + let mut datas = Vec::new(); + for idx in 0..NUM_CLIENTS { + let mut client_config = ClientConfig::test(true, 600, 2000, 4, false, true, false, false); + client_config.max_block_wait_delay = Duration::seconds(6); + let store = create_test_store(); + initialize_genesis_state(store.clone(), &genesis, None); + + let sync_jobs_actions = SyncJobsActions::new( + builder + .sender() + .for_index(idx) + .into_wrapped_multi_sender::(), + builder + .sender() + .for_index(idx) + .into_wrapped_multi_sender::(), + ); + let chain_genesis = ChainGenesis::new(&genesis.config); + let epoch_manager = EpochManager::new_arc_handle(store.clone(), &genesis.config); + let shard_tracker = ShardTracker::new(TrackedConfig::AllShards, epoch_manager.clone()); + let state_sync_adapter = Arc::new(RwLock::new(SyncAdapter::new( + builder.sender().for_index(idx).into_sender(), + builder.sender().for_index(idx).into_sender(), + ))); + let runtime_adapter = NightshadeRuntime::test( + Path::new("."), + store.clone(), + &genesis.config, + epoch_manager.clone(), + ); + + let client = Client::new( + builder.clock(), + client_config.clone(), + chain_genesis, + epoch_manager.clone(), + shard_tracker.clone(), + state_sync_adapter, + runtime_adapter, + builder.sender().for_index(idx).into_multi_sender(), + builder.sender().for_index(idx).into_sender(), + Some(Arc::new(create_test_signer(accounts[idx].as_str()))), + true, + [0; 32], + None, + ) + .unwrap(); + + let shards_manager = ShardsManager::new( + builder.clock(), + Some(accounts[idx].clone()), + epoch_manager, + shard_tracker, + builder.sender().for_index(idx).into_sender(), + builder.sender().for_index(idx).into_sender(), + ReadOnlyChunksStore::new(store), + client.chain.head().unwrap(), + client.chain.header_head().unwrap(), + ); + + let client_actions = ClientActions::new( + builder.clock(), + client, + builder + .sender() + .for_index(idx) + .into_wrapped_multi_sender::(), + client_config, + PeerId::random(), + builder.sender().for_index(idx).into_multi_sender(), + None, + noop().into_sender(), + None, + Default::default(), + None, + builder + .sender() + .for_index(idx) + .into_wrapped_multi_sender::(), + Box::new(builder.sender().for_index(idx).into_future_spawner()), + ) + .unwrap(); + + let data = TestData { + dummy: (), + account: accounts[idx].clone(), + client: client_actions, + sync_jobs: sync_jobs_actions, + shards_manager, + }; + datas.push(data); + } + + let mut test = builder.build(datas); + for idx in 0..NUM_CLIENTS { + test.register_handler(handle_adhoc_events::().widen().for_index(idx)); + test.register_handler( + forward_client_messages_from_network_to_client_actions().widen().for_index(idx), + ); + test.register_handler( + forward_client_messages_from_client_to_client_actions().widen().for_index(idx), + ); + test.register_handler( + forward_client_messages_from_sync_jobs_to_client_actions().widen().for_index(idx), + ); + test.register_handler(forward_client_messages_from_shards_manager().widen().for_index(idx)); + test.register_handler( + forward_sync_jobs_messages_from_client_to_sync_jobs_actions( + test.sender().for_index(idx).into_future_spawner(), + ) + .widen() + .for_index(idx), + ); + test.register_handler(drive_futures().widen().for_index(idx)); + test.register_handler( + drive_delayed_action_runners::().widen().for_index(idx), + ); + test.register_handler(forward_client_request_to_shards_manager().widen().for_index(idx)); + test.register_handler(ignore_events::().widen().for_index(idx)); + } + test.register_handler(route_network_messages_to_client(Duration::milliseconds(10))); + + for idx in 0..NUM_CLIENTS { + let mut delayed_action_runner = test.sender().for_index(idx).into_delayed_action_runner(); + test.sender().for_index(idx).send_adhoc_event("start_client", move |data| { + data.client.start(&mut delayed_action_runner); + }); + } + test.run_for(Duration::seconds(10)); +} + +/// Handles outgoing network messages, and turns them into incoming client messages. +pub fn route_network_messages_to_client< + Data: SupportsRoutingLookup, + Event: TryIntoOrSelf + + From + + From, +>( + network_delay: Duration, +) -> LoopEventHandler { + // let mut route_back_lookup: HashMap = HashMap::new(); + // let mut next_hash: u64 = 0; + LoopEventHandler::new( + move |event: (usize, Event), + data: &mut Data, + context: &LoopHandlerContext<(usize, Event)>| { + let (idx, event) = event; + let message = event.try_into_or_self().map_err(|event| (idx, event.into()))?; + let PeerManagerMessageRequest::NetworkRequests(request) = message else { + return Err((idx, message.into())); + }; + + let client_senders = (0..data.num_accounts()) + .map(|idx| { + context + .sender + .with_additional_delay(network_delay) + .for_index(idx) + .into_wrapped_multi_sender::() + }) + .collect::>(); + + match request { + NetworkRequests::Block { block } => { + for other_idx in 0..data.num_accounts() { + if other_idx != idx { + drop(client_senders[other_idx].send_async(BlockResponse { + block: block.clone(), + peer_id: PeerId::random(), + was_requested: false, + })); + } + } + } + NetworkRequests::Approval { approval_message } => { + let other_idx = data.index_for_account(&approval_message.target); + drop( + client_senders[other_idx] + .send_async(BlockApproval(approval_message.approval, PeerId::random())), + ); + } + // TODO: Support more network message types as we expand the test. + _ => return Err((idx, PeerManagerMessageRequest::NetworkRequests(request).into())), + } + + Ok(()) + }, + ) +} diff --git a/integration-tests/src/tests/client/features/simple_test_loop_example.rs b/integration-tests/src/tests/client/features/simple_test_loop_example.rs index 5169b5148fe..28453a3e5c3 100644 --- a/integration-tests/src/tests/client/features/simple_test_loop_example.rs +++ b/integration-tests/src/tests/client/features/simple_test_loop_example.rs @@ -76,8 +76,8 @@ const ONE_NEAR: u128 = 1_000_000_000_000_000_000_000_000; fn test_client_with_simple_test_loop() { let builder = TestLoopBuilder::::new(); let sync_jobs_actions = SyncJobsActions::new( - builder.wrapped_multi_sender::(), - builder.wrapped_multi_sender::(), + builder.sender().into_wrapped_multi_sender::(), + builder.sender().into_wrapped_multi_sender::(), ); let client_config = ClientConfig::test( true, @@ -196,7 +196,7 @@ fn test_client_with_simple_test_loop() { let client_actions = ClientActions::new( builder.clock(), client, - builder.wrapped_multi_sender::(), + builder.sender().into_wrapped_multi_sender::(), client_config, PeerId::random(), noop().into_multi_sender(), @@ -205,8 +205,8 @@ fn test_client_with_simple_test_loop() { None, Default::default(), None, - builder.wrapped_multi_sender::(), - Box::new(builder.future_spawner()), + builder.sender().into_wrapped_multi_sender::(), + Box::new(builder.sender().into_future_spawner()), ) .unwrap(); @@ -222,14 +222,17 @@ fn test_client_with_simple_test_loop() { test.register_handler(forward_client_messages_from_sync_jobs_to_client_actions().widen()); test.register_handler(forward_client_messages_from_shards_manager().widen()); test.register_handler( - forward_sync_jobs_messages_from_client_to_sync_jobs_actions(test.future_spawner()).widen(), + forward_sync_jobs_messages_from_client_to_sync_jobs_actions( + test.sender().into_future_spawner(), + ) + .widen(), ); test.register_handler(drive_futures().widen()); test.register_handler(drive_delayed_action_runners::().widen()); test.register_handler(forward_client_request_to_shards_manager().widen()); // TODO: handle additional events. - test.delayed_action_runner::().run_later( + test.sender().into_delayed_action_runner::().run_later( "start_client", Duration::ZERO, |client, runner| {