Skip to content

Commit

Permalink
[TestLoop Refactoring] Basic multi-node Client test based on TestLoop (
Browse files Browse the repository at this point in the history
…#10717)

An integration test written in TestLoop that runs 4 validators while
propagating network messages Block and Approval. We can see from the
visualizer that the four nodes produce blocks together:

<img width="1274" alt="image"
src="https://github.com/near/nearcore/assets/111538878/62529763-de2d-46a2-8106-75c93390aedb">

<img width="1285" alt="image"
src="https://github.com/near/nearcore/assets/111538878/c410dc7b-309d-429c-962b-1073b1a97b5e">

Now, there's an issue - the blocks don't have any chunks. And that is
because the chunks never finish processing, since processing of the
chunks happen in a rayon thread that isn't supported by test loop yet.
The blocks are still produced anyway after waiting long enough (in
virtual time).

The test completes in 0.19s.

Btw, I'm not attempting to make the test modular or clean yet. My goal
right now is just to work towards a functional integration test that
does state sync and later in-memory trie.
  • Loading branch information
robin-near authored Mar 7, 2024
1 parent fb500f5 commit d50e30c
Show file tree
Hide file tree
Showing 14 changed files with 487 additions and 67 deletions.
8 changes: 7 additions & 1 deletion chain/chunks/src/test/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ struct TestData {
network_events: Vec<PeerManagerMessageRequest>,
}

impl AsMut<TestData> for TestData {
fn as_mut(&mut self) -> &mut Self {
self
}
}

impl TestData {
fn new(shards_manager: ShardsManager, chain: MockChainForShardsManager) -> Self {
Self { shards_manager, chain, client_events: vec![], network_events: vec![] }
Expand Down Expand Up @@ -177,7 +183,7 @@ fn test_chunk_forward() {
test.register_handler(forward_client_request_to_shards_manager().widen());
test.register_handler(forward_network_request_to_shards_manager().widen());
test.register_handler(periodically_resend_chunk_requests(CHUNK_REQUEST_RETRY).widen());
test.register_handler(handle_adhoc_events());
test.register_handler(handle_adhoc_events::<TestData>().widen());

// We'll produce a single chunk whose next chunk producer is a chunk-only
// producer, so that we can test that the chunk is forwarded to the next
Expand Down
8 changes: 7 additions & 1 deletion chain/chunks/src/test/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ struct TestData {
account_id: AccountId,
}

impl AsMut<TestData> for TestData {
fn as_mut(&mut self) -> &mut Self {
self
}
}

#[derive(EnumTryInto, Debug, EnumFrom)]
enum TestEvent {
Adhoc(AdhocEvent<TestData>),
Expand Down Expand Up @@ -99,7 +105,7 @@ fn basic_setup(config: BasicSetupConfig) -> ShardsManagerTestLoop {
.collect::<Vec<_>>();
let mut test = builder.build(data);
for idx in 0..test.data.len() {
test.register_handler(handle_adhoc_events().for_index(idx));
test.register_handler(handle_adhoc_events::<TestData>().widen().for_index(idx));
test.register_handler(forward_client_request_to_shards_manager().widen().for_index(idx));
test.register_handler(forward_network_request_to_shards_manager().widen().for_index(idx));
test.register_handler(capture_events::<ShardsManagerResponse>().widen().for_index(idx));
Expand Down
40 changes: 40 additions & 0 deletions chain/client/src/test_utils/client_actions_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,46 @@ use crate::client_actions::{ClientActionHandler, ClientActions, ClientSenderForC
use crate::sync_jobs_actions::ClientSenderForSyncJobsMessage;
use near_async::test_loop::event_handler::LoopEventHandler;
use near_chunks::client::ShardsManagerResponse;
use near_network::client::ClientSenderForNetworkMessage;

pub fn forward_client_messages_from_network_to_client_actions(
) -> LoopEventHandler<ClientActions, ClientSenderForNetworkMessage> {
LoopEventHandler::new(|msg, client_actions: &mut ClientActions, _| {
match msg {
ClientSenderForNetworkMessage::_state_response(msg) => {
(msg.callback)(Ok(client_actions.handle(msg.message)));
}
ClientSenderForNetworkMessage::_block_approval(msg) => {
(msg.callback)(Ok(client_actions.handle(msg.message)));
}
ClientSenderForNetworkMessage::_transaction(msg) => {
(msg.callback)(Ok(client_actions.handle(msg.message)));
}
ClientSenderForNetworkMessage::_block(msg) => {
(msg.callback)(Ok(client_actions.handle(msg.message)));
}
ClientSenderForNetworkMessage::_block_headers(msg) => {
(msg.callback)(Ok(client_actions.handle(msg.message)));
}
ClientSenderForNetworkMessage::_challenge(msg) => {
(msg.callback)(Ok(client_actions.handle(msg.message)));
}
ClientSenderForNetworkMessage::_network_info(msg) => {
(msg.callback)(Ok(client_actions.handle(msg.message)));
}
ClientSenderForNetworkMessage::_chunk_state_witness(msg) => {
(msg.callback)(Ok(client_actions.handle(msg.message)));
}
ClientSenderForNetworkMessage::_chunk_endorsement(msg) => {
(msg.callback)(Ok(client_actions.handle(msg.message)));
}
_ => {
return Err(msg);
}
}
Ok(())
})
}

pub fn forward_client_messages_from_client_to_client_actions(
) -> LoopEventHandler<ClientActions, ClientSenderForClientMessage> {
Expand Down
5 changes: 5 additions & 0 deletions chain/network/src/test_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use near_primitives::types::AccountId;
/// This trait is just a helper for looking up the index.
pub trait SupportsRoutingLookup {
fn index_for_account(&self, account: &AccountId) -> usize;
fn num_accounts(&self) -> usize;
}

impl<InnerData: AsRef<AccountId>> SupportsRoutingLookup for Vec<InnerData> {
Expand All @@ -13,4 +14,8 @@ impl<InnerData: AsRef<AccountId>> SupportsRoutingLookup for Vec<InnerData> {
.position(|data| data.as_ref() == account)
.unwrap_or_else(|| panic!("Account not found: {}", account))
}

fn num_accounts(&self) -> usize {
self.len()
}
}
4 changes: 2 additions & 2 deletions core/async/src/examples/actix_component_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn test_actix_component() {
dummy: (),
example: ExampleComponent::new(builder.sender().into_sender()),
outer: OuterComponent::new(
builder.wrapped_multi_sender::<ExampleComponentAdapterMessage, _>(),
builder.sender().into_wrapped_multi_sender::<ExampleComponentAdapterMessage, _>(),
),
periodic_requests_captured: vec![],
};
Expand All @@ -66,7 +66,7 @@ fn test_actix_component() {
test.register_handler(example_handler().widen());

// We need to redo whatever the ExampleActor does in its `started` method.
test.data.example.start(&mut test.delayed_action_runner());
test.data.example.start(&mut test.sender().into_delayed_action_runner());
// Send some requests; this can be done in the asynchronous context.
test.future_spawner().spawn("wait for 5", {
let res = test.data.outer.call_example_component_for_response(5);
Expand Down
2 changes: 1 addition & 1 deletion core/async/src/examples/async_component_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn inner_request_handler(
fn test_async_component() {
let builder = TestLoopBuilder::<TestEvent>::new();
let sender = builder.sender();
let future_spawner = builder.future_spawner();
let future_spawner = builder.sender().into_future_spawner();
let mut test = builder.build(TestData {
dummy: (),
output: vec![],
Expand Down
10 changes: 8 additions & 2 deletions core/async/src/examples/sum_numbers_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ use crate::{

use super::sum_numbers::{ReportSumMsg, SumNumbersComponent, SumRequest};

#[derive(derive_more::AsMut, derive_more::AsRef)]
#[derive(derive_more::AsMut)]
struct TestData {
summer: SumNumbersComponent,
sums: Vec<ReportSumMsg>,
}

impl AsMut<TestData> for TestData {
fn as_mut(&mut self) -> &mut Self {
self
}
}

#[derive(Debug, EnumTryInto, EnumFrom)]
enum TestEvent {
Request(SumRequest),
Expand Down Expand Up @@ -71,7 +77,7 @@ fn test_simple_with_adhoc() {
let mut test = builder.build(data);
test.register_handler(forward_sum_request().widen());
test.register_handler(capture_events::<ReportSumMsg>().widen());
test.register_handler(handle_adhoc_events());
test.register_handler(handle_adhoc_events::<TestData>().widen());

// It is preferrable to put as much setup logic as possible into an adhoc
// event (queued by .run below), so that as much logic as possible is
Expand Down
48 changes: 3 additions & 45 deletions core/async/src/test_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,10 @@ pub mod multi_instance;
use self::{
delay_sender::DelaySender,
event_handler::LoopEventHandler,
futures::{
TestLoopDelayedActionEvent, TestLoopDelayedActionRunner, TestLoopFutureSpawner,
TestLoopTask,
},
};
use crate::{break_apart::BreakApart, time};
use crate::{
messaging::{IntoMultiSender, IntoSender},
test_loop::event_handler::LoopHandlerContext,
futures::{TestLoopFutureSpawner, TestLoopTask},
};
use crate::test_loop::event_handler::LoopHandlerContext;
use crate::time;
use near_o11y::{testonly::init_test_logger, tracing::log::info};
use serde::Serialize;
use std::{
Expand Down Expand Up @@ -192,40 +186,11 @@ impl<Event: Debug + Send + 'static> TestLoopBuilder<Event> {
self.pending_events_sender.clone()
}

/// A shortcut for a common use case, where we use an enum message to
/// represent all the possible messages that a multisender may be used to
/// send.
///
/// This assumes that S is a multisender with the derive
/// `#[derive(MultiSendMessage, ...)]`, which creates the enum
/// `MyMultiSenderMessage` (where `MyMultiSender` is the name of the struct
/// being derived from).
///
/// To use, first include in the test loop event enum a case for
/// `MyMultiSenderMessage`. Then, call this function to get a multisender,
/// like
/// `builder.wrapped_multi_sender<MyMultiSenderMessage, MyMultiSender>()`.
pub fn wrapped_multi_sender<M: 'static, S: 'static>(&self) -> S
where
DelaySender<Event>: IntoSender<M>,
BreakApart<M>: IntoMultiSender<S>,
{
self.sender().into_sender().break_apart().into_multi_sender()
}

/// Returns a clock that will always return the current virtual time.
pub fn clock(&self) -> time::Clock {
self.clock.clock()
}

/// Returns a FutureSpawner that can be used to spawn futures into the loop.
pub fn future_spawner(&self) -> TestLoopFutureSpawner
where
Event: From<Arc<TestLoopTask>>,
{
self.sender().narrow()
}

pub fn build<Data>(self, data: Data) -> TestLoop<Data, Event> {
TestLoop::new(self.pending_events, self.pending_events_sender, self.clock, data)
}
Expand Down Expand Up @@ -356,13 +321,6 @@ impl<Data, Event: Debug + Send + 'static> TestLoop<Data, Event> {
{
self.sender().narrow()
}

pub fn delayed_action_runner<InnerData>(&self) -> TestLoopDelayedActionRunner<InnerData>
where
Event: From<TestLoopDelayedActionEvent<InnerData>>,
{
TestLoopDelayedActionRunner { sender: self.sender().narrow() }
}
}

impl<Data: 'static, Event: Debug + Send + 'static> Drop for TestLoop<Data, Event> {
Expand Down
11 changes: 3 additions & 8 deletions core/async/src/test_loop/adhoc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use super::{
delay_sender::DelaySender,
event_handler::{LoopEventHandler, TryIntoOrSelf},
};
use super::{delay_sender::DelaySender, event_handler::LoopEventHandler};
use crate::messaging::CanSend;
use crate::time;
use std::fmt::Debug;
Expand Down Expand Up @@ -54,10 +51,8 @@ impl<Data: 'static, Event: From<AdhocEvent<Data>> + 'static> AdhocEventSender<Da
}

/// Handler to handle adhoc events.
pub fn handle_adhoc_events<Data: 'static, Event: TryIntoOrSelf<AdhocEvent<Data>>>(
) -> LoopEventHandler<Data, Event> {
LoopEventHandler::new(|event: Event, data, _ctx| {
let event = event.try_into_or_self()?;
pub fn handle_adhoc_events<Data: 'static>() -> LoopEventHandler<Data, AdhocEvent<Data>> {
LoopEventHandler::new(|event: AdhocEvent<Data>, data, _ctx| {
(event.handler)(data);
Ok(())
})
Expand Down
49 changes: 49 additions & 0 deletions core/async/src/test_loop/delay_sender.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use crate::break_apart::BreakApart;
use crate::messaging;
use crate::messaging::{IntoMultiSender, IntoSender};
use crate::test_loop::futures::{TestLoopDelayedActionEvent, TestLoopDelayedActionRunner};
use crate::time;
use std::sync::Arc;

use super::futures::{TestLoopFutureSpawner, TestLoopTask};

/// Interface to send an event with a delay (in virtual time). It can be
/// converted to a Sender for any message type that can be converted into
/// the event type, so that a DelaySender given by the test loop may be passed
Expand All @@ -23,6 +28,14 @@ impl<Event> DelaySender<Event> {
self.0(event, delay);
}

pub fn with_additional_delay(&self, delay: time::Duration) -> DelaySender<Event>
where
Event: 'static,
{
let f = self.0.clone();
Self(Arc::new(move |event, other_delay| f(event, delay + other_delay)))
}

pub fn narrow<InnerEvent>(self) -> DelaySender<InnerEvent>
where
Event: From<InnerEvent> + 'static,
Expand All @@ -31,6 +44,42 @@ impl<Event> DelaySender<Event> {
self.send_with_delay(event.into(), delay)
})
}

/// A shortcut for a common use case, where we use an enum message to
/// represent all the possible messages that a multisender may be used to
/// send.
///
/// This assumes that S is a multisender with the derive
/// `#[derive(MultiSendMessage, ...)]`, which creates the enum
/// `MyMultiSenderMessage` (where `MyMultiSender` is the name of the struct
/// being derived from).
///
/// To use, first include in the test loop event enum a case for
/// `MyMultiSenderMessage`. Then, call this function to get a multisender,
/// like
/// `builder.wrapped_multi_sender<MyMultiSenderMessage, MyMultiSender>()`.
pub fn into_wrapped_multi_sender<M: 'static, S: 'static>(self) -> S
where
Self: IntoSender<M>,
BreakApart<M>: IntoMultiSender<S>,
{
self.into_sender().break_apart().into_multi_sender()
}

pub fn into_delayed_action_runner<InnerData>(self) -> TestLoopDelayedActionRunner<InnerData>
where
Event: From<TestLoopDelayedActionEvent<InnerData>> + 'static,
{
TestLoopDelayedActionRunner { sender: self.narrow() }
}

/// Returns a FutureSpawner that can be used to spawn futures into the loop.
pub fn into_future_spawner(self) -> TestLoopFutureSpawner
where
Event: From<Arc<TestLoopTask>> + 'static,
{
self.narrow()
}
}

impl<Event: 'static> DelaySender<(usize, Event)> {
Expand Down
1 change: 1 addition & 0 deletions core/chain-configs/src/updateable_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,6 @@ pub struct UpdateableClientConfig {
pub resharding_config: ReshardingConfig,

/// Time limit for adding transactions in produce_chunk()
#[serde(with = "near_async::time::serde_opt_duration_as_std")]
pub produce_chunk_add_transactions_time_limit: Option<Duration>,
}
1 change: 1 addition & 0 deletions integration-tests/src/tests/client/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod increase_deployment_cost;
mod increase_storage_compute_cost;
mod limit_contract_functions_number;
mod lower_storage_key_limit;
mod multinode_test_loop_example;
mod nearvm;
#[cfg(feature = "protocol_feature_nonrefundable_transfer_nep491")]
mod nonrefundable_transfer;
Expand Down
Loading

0 comments on commit d50e30c

Please sign in to comment.