Skip to content

Commit

Permalink
[stateless_validation] Rename StateWitnessDistributionActor to StateW…
Browse files Browse the repository at this point in the history
…itnessActor (#11089)

Part 3

Rename StateWitnessDistributionActor and equivalent classes and remove
"Distribution" from the name.

Should be pretty straight forward. No logical change in code.

NOTE: I couldn't create stacked PRs. Please review [these
commits](85d64ef)
only.
  • Loading branch information
Shreyan Gupta authored Apr 18, 2024
1 parent f4c358c commit 0542c38
Show file tree
Hide file tree
Showing 25 changed files with 160 additions and 213 deletions.
15 changes: 5 additions & 10 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use crate::debug::BlockProductionTracker;
use crate::debug::PRODUCTION_TIMES_CACHE_SIZE;
use crate::stateless_validation::chunk_endorsement_tracker::ChunkEndorsementTracker;
use crate::stateless_validation::chunk_validator::ChunkValidator;
use crate::stateless_validation::state_witness_distribution_actor::StateWitnessDistributionSenderForClient;
use crate::stateless_validation::state_witness_tracker::ChunkStateWitnessTracker;
use crate::stateless_validation::state_witness_actor::StateWitnessSenderForClient;
use crate::sync::adapter::SyncShardInfo;
use crate::sync::block::BlockSync;
use crate::sync::epoch::EpochSync;
Expand Down Expand Up @@ -189,11 +188,8 @@ pub struct Client {
pub chunk_inclusion_tracker: ChunkInclusionTracker,
/// Tracks chunk endorsements received from chunk validators. Used to filter out chunks ready for inclusion
pub chunk_endorsement_tracker: Arc<ChunkEndorsementTracker>,
/// Tracks a collection of state witnesses sent from chunk producers to chunk validators.
pub state_witness_tracker: ChunkStateWitnessTracker,
/// Adapter to send request to state_witness_distribution_actor to distribute state witness.
pub state_witness_distribution_adapter: StateWitnessDistributionSenderForClient,

/// Adapter to send request to state_witness_actor to distribute state witness.
pub state_witness_adapter: StateWitnessSenderForClient,
// Optional value used for the Chunk Distribution Network Feature.
chunk_distribution_network: Option<ChunkDistributionNetwork>,
}
Expand Down Expand Up @@ -255,7 +251,7 @@ impl Client {
rng_seed: RngSeed,
snapshot_callbacks: Option<SnapshotCallbacks>,
async_computation_spawner: Arc<dyn AsyncComputationSpawner>,
state_witness_distribution_adapter: StateWitnessDistributionSenderForClient,
state_witness_adapter: StateWitnessSenderForClient,
) -> Result<Self, Error> {
let doomslug_threshold_mode = if enable_doomslug {
DoomslugThresholdMode::TwoThirds
Expand Down Expand Up @@ -410,8 +406,7 @@ impl Client {
chunk_validator,
chunk_inclusion_tracker: ChunkInclusionTracker::new(),
chunk_endorsement_tracker,
state_witness_tracker: ChunkStateWitnessTracker::new(clock),
state_witness_distribution_adapter,
state_witness_adapter,
chunk_distribution_network,
})
}
Expand Down
6 changes: 3 additions & 3 deletions chain/client/src/client_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::debug::new_network_info_view;
use crate::info::{display_sync_status, InfoHelper};
use crate::sync::adapter::{SyncMessage, SyncShardInfo};
use crate::sync::state::{StateSync, StateSyncResult};
use crate::{metrics, DistributeChunkStateWitnessRequest, StatusResponse};
use crate::{metrics, DistributeStateWitnessRequest, StatusResponse};
use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt, FutureSpawner};
use near_async::messaging::{CanSend, Sender};
use near_async::time::{Clock, Utc};
Expand Down Expand Up @@ -95,8 +95,8 @@ pub struct SyncJobsSenderForClient {
pub resharding: Sender<ReshardingRequest>,
}

pub struct StateWitnessDistributionSenderForClient {
pub distribute_chunk_state_witness: Sender<DistributeChunkStateWitnessRequest>,
pub struct StateWitnessSenderForClient {
pub distribute_chunk_state_witness: Sender<DistributeStateWitnessRequest>,
}

pub struct ClientActions {
Expand Down
6 changes: 3 additions & 3 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::sync::{Arc, RwLock};
use tokio::sync::broadcast;

use crate::client_actions::{ClientActionHandler, ClientActions, ClientSenderForClient};
use crate::stateless_validation::state_witness_distribution_actor::StateWitnessDistributionSenderForClient;
use crate::stateless_validation::state_witness_actor::StateWitnessSenderForClient;
use crate::sync_jobs_actions::SyncJobsActions;
use crate::sync_jobs_actor::SyncJobsActor;
use crate::{metrics, Client, ConfigUpdater, SyncAdapter};
Expand Down Expand Up @@ -196,7 +196,7 @@ pub fn start_client(
sender: Option<broadcast::Sender<()>>,
adv: crate::adversarial::Controls,
config_updater: Option<ConfigUpdater>,
state_witness_distribution_adapter: StateWitnessDistributionSenderForClient,
state_witness_adapter: StateWitnessSenderForClient,
) -> (Addr<ClientActor>, ArbiterHandle, ReshardingHandle) {
let client_arbiter = Arbiter::new();
let client_arbiter_handle = client_arbiter.handle();
Expand All @@ -217,7 +217,7 @@ pub fn start_client(
random_seed_from_thread(),
snapshot_callbacks,
Arc::new(RayonAsyncComputationSpawner),
state_witness_distribution_adapter,
state_witness_adapter,
)
.unwrap();
let resharding_handle = client.chain.resharding_handle.clone();
Expand Down
7 changes: 3 additions & 4 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ pub use near_network::client::{
BlockApproval, BlockResponse, ProcessTxRequest, ProcessTxResponse, SetNetworkInfo,
};
pub use stateless_validation::processing_tracker::{ProcessingDoneTracker, ProcessingDoneWaiter};
pub use stateless_validation::state_witness_distribution_actions::StateWitnessDistributionActions;
pub use stateless_validation::state_witness_distribution_actor::{
DistributeChunkStateWitnessRequest, StateWitnessDistributionActor,
StateWitnessDistributionSenderForClientMessage,
pub use stateless_validation::state_witness_actions::StateWitnessActions;
pub use stateless_validation::state_witness_actor::{
DistributeStateWitnessRequest, StateWitnessActor, StateWitnessSenderForClientMessage,
};

pub mod adapter;
Expand Down
4 changes: 2 additions & 2 deletions chain/client/src/stateless_validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod chunk_endorsement_tracker;
pub mod chunk_validator;
pub mod processing_tracker;
mod shadow_validate;
pub mod state_witness_distribution_actions;
pub mod state_witness_distribution_actor;
pub mod state_witness_actions;
pub mod state_witness_actor;
mod state_witness_producer;
pub mod state_witness_tracker;
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use near_primitives::validator_signer::ValidatorSigner;

use crate::metrics;

use super::state_witness_distribution_actor::DistributeChunkStateWitnessRequest;
use super::state_witness_actor::DistributeStateWitnessRequest;
use super::state_witness_tracker::ChunkStateWitnessTracker;

pub struct StateWitnessDistributionActions {
pub struct StateWitnessActions {
/// Adapter to send messages to the network.
network_adapter: PeerManagerAdapter,
/// Validator signer to sign the state witness.
Expand All @@ -24,7 +24,7 @@ pub struct StateWitnessDistributionActions {
state_witness_tracker: ChunkStateWitnessTracker,
}

impl StateWitnessDistributionActions {
impl StateWitnessActions {
pub fn new(
clock: Clock,
network_adapter: PeerManagerAdapter,
Expand All @@ -37,11 +37,11 @@ impl StateWitnessDistributionActions {
}
}

pub fn handle_distribute_chunk_state_witness_request(
pub fn handle_distribute_state_witness_request(
&mut self,
msg: DistributeChunkStateWitnessRequest,
msg: DistributeStateWitnessRequest,
) -> Result<(), Error> {
let DistributeChunkStateWitnessRequest { chunk_validators, state_witness } = msg;
let DistributeStateWitnessRequest { chunk_validators, state_witness } = msg;

let signed_witness = create_signed_witness(&state_witness, self.my_signer.as_ref())?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,72 +4,68 @@ use actix::Actor;
use near_async::messaging::Sender;
use near_async::time::Clock;
use near_async::{MultiSend, MultiSendMessage, MultiSenderFrom};
use near_network::state_witness_distribution::ChunkStateWitnessAckMessage;
use near_network::state_witness::ChunkStateWitnessAckMessage;
use near_network::types::PeerManagerAdapter;
use near_o11y::{handler_debug_span, WithSpanContext};
use near_performance_metrics_macros::perf;
use near_primitives::stateless_validation::ChunkStateWitness;
use near_primitives::types::AccountId;
use near_primitives::validator_signer::ValidatorSigner;

use super::state_witness_distribution_actions::StateWitnessDistributionActions;
use super::state_witness_actions::StateWitnessActions;

pub struct StateWitnessDistributionActor {
pub actions: StateWitnessDistributionActions,
pub struct StateWitnessActor {
pub actions: StateWitnessActions,
}

impl StateWitnessDistributionActor {
impl StateWitnessActor {
pub fn spawn(
clock: Clock,
network_adapter: PeerManagerAdapter,
my_signer: Arc<dyn ValidatorSigner>,
) -> (actix::Addr<Self>, actix::ArbiterHandle) {
let arbiter = actix::Arbiter::new().handle();
let addr = Self::start_in_arbiter(&arbiter, |_ctx| Self {
actions: StateWitnessDistributionActions::new(clock, network_adapter, my_signer),
actions: StateWitnessActions::new(clock, network_adapter, my_signer),
});
(addr, arbiter)
}
}

impl actix::Actor for StateWitnessDistributionActor {
impl actix::Actor for StateWitnessActor {
type Context = actix::Context<Self>;
}

#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct DistributeChunkStateWitnessRequest {
pub struct DistributeStateWitnessRequest {
pub chunk_validators: Vec<AccountId>,
pub state_witness: ChunkStateWitness,
}

#[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)]
#[multi_send_message_derive(Debug)]
pub struct StateWitnessDistributionSenderForClient {
pub distribute_chunk_state_witness: Sender<DistributeChunkStateWitnessRequest>,
pub struct StateWitnessSenderForClient {
pub distribute_chunk_state_witness: Sender<DistributeStateWitnessRequest>,
}

impl actix::Handler<WithSpanContext<DistributeChunkStateWitnessRequest>>
for StateWitnessDistributionActor
{
impl actix::Handler<WithSpanContext<DistributeStateWitnessRequest>> for StateWitnessActor {
type Result = ();

#[perf]
fn handle(
&mut self,
msg: WithSpanContext<DistributeChunkStateWitnessRequest>,
msg: WithSpanContext<DistributeStateWitnessRequest>,
_: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_debug_span!(target: "stateless_validation", msg);
if let Err(err) = self.actions.handle_distribute_chunk_state_witness_request(msg) {
if let Err(err) = self.actions.handle_distribute_state_witness_request(msg) {
tracing::error!(target: "stateless_validation", ?err, "Failed to handle distribute chunk state witness request");
}
}
}

impl actix::Handler<WithSpanContext<ChunkStateWitnessAckMessage>>
for StateWitnessDistributionActor
{
impl actix::Handler<WithSpanContext<ChunkStateWitnessAckMessage>> for StateWitnessActor {
type Result = ();

fn handle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use near_primitives::stateless_validation::{
use near_primitives::types::{AccountId, EpochId};

use crate::stateless_validation::chunk_validator::send_chunk_endorsement_to_block_producers;
use crate::stateless_validation::state_witness_distribution_actor::DistributeChunkStateWitnessRequest;
use crate::stateless_validation::state_witness_actor::DistributeStateWitnessRequest;
use crate::Client;

impl Client {
Expand Down Expand Up @@ -66,8 +66,8 @@ impl Client {
// Remove ourselves from the list of chunk validators. Network can't send messages to ourselves.
chunk_validators.retain(|validator| validator != my_signer.validator_id());

self.state_witness_distribution_adapter
.send(DistributeChunkStateWitnessRequest { chunk_validators, state_witness });
self.state_witness_adapter
.send(DistributeStateWitnessRequest { chunk_validators, state_witness });
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod block_stats;
pub mod client;
pub mod peer_manager_mock;
pub mod setup;
mod state_witness_distribution_mock;
mod synchronous_state_witness_adapter;
pub mod test_env;
pub mod test_env_builder;
pub mod test_loop;
Expand Down
20 changes: 8 additions & 12 deletions chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

use super::block_stats::BlockStats;
use super::peer_manager_mock::PeerManagerMock;
use crate::stateless_validation::state_witness_distribution_actor::{
StateWitnessDistributionActor, StateWitnessDistributionSenderForClient,
use crate::stateless_validation::state_witness_actor::{
StateWitnessActor, StateWitnessSenderForClient,
};
use crate::{start_view_client, Client, ClientActor, SyncAdapter, SyncStatus, ViewClientActor};
use actix::{Actor, Addr, AsyncContext, Context};
Expand Down Expand Up @@ -174,13 +174,9 @@ pub fn setup(
let state_sync_adapter =
Arc::new(RwLock::new(SyncAdapter::new(noop().into_sender(), noop().into_sender())));

let (state_witness_distribution_addr, _) = StateWitnessDistributionActor::spawn(
clock.clone(),
network_adapter.clone(),
signer.clone(),
);
let state_witness_distribution_adapter =
state_witness_distribution_addr.with_auto_span_context();
let (state_witness_addr, _) =
StateWitnessActor::spawn(clock.clone(), network_adapter.clone(), signer.clone());
let state_witness_adapter = state_witness_addr.with_auto_span_context();

let client = Client::new(
clock.clone(),
Expand All @@ -197,7 +193,7 @@ pub fn setup(
TEST_SEED,
None,
Arc::new(RayonAsyncComputationSpawner),
state_witness_distribution_adapter.into_multi_sender(),
state_witness_adapter.into_multi_sender(),
)
.unwrap();
let client_actor = ClientActor::new(
Expand Down Expand Up @@ -975,7 +971,7 @@ pub fn setup_client_with_runtime(
archive: bool,
save_trie_changes: bool,
snapshot_callbacks: Option<SnapshotCallbacks>,
state_witness_distribution_adapter: StateWitnessDistributionSenderForClient,
state_witness_adapter: StateWitnessSenderForClient,
validator_signer: Arc<dyn ValidatorSigner>,
) -> Client {
let mut config = ClientConfig::test(
Expand Down Expand Up @@ -1006,7 +1002,7 @@ pub fn setup_client_with_runtime(
rng_seed,
snapshot_callbacks,
Arc::new(RayonAsyncComputationSpawner),
state_witness_distribution_adapter,
state_witness_adapter,
)
.unwrap();
client.sync_status = SyncStatus::NoSync;
Expand Down
23 changes: 0 additions & 23 deletions chain/client/src/test_utils/state_witness_distribution_mock.rs

This file was deleted.

23 changes: 23 additions & 0 deletions chain/client/src/test_utils/synchronous_state_witness_adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::sync::{Arc, Mutex};

use near_async::messaging::CanSend;

use crate::stateless_validation::state_witness_actions::StateWitnessActions;
use crate::stateless_validation::state_witness_actor::DistributeStateWitnessRequest;

pub struct SynchronousStateWitnessAdapter {
actions: Arc<Mutex<StateWitnessActions>>,
}

impl SynchronousStateWitnessAdapter {
pub fn new(actions: StateWitnessActions) -> Self {
Self { actions: Arc::new(Mutex::new(actions)) }
}
}

impl CanSend<DistributeStateWitnessRequest> for SynchronousStateWitnessAdapter {
fn send(&self, msg: DistributeStateWitnessRequest) {
let mut actions = self.actions.lock().unwrap();
actions.handle_distribute_state_witness_request(msg).unwrap();
}
}
2 changes: 1 addition & 1 deletion chain/client/src/test_utils/test_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ impl TestEnv {
self.archive,
self.save_trie_changes,
None,
self.clients[idx].state_witness_distribution_adapter.clone(),
self.clients[idx].state_witness_adapter.clone(),
self.clients[idx].validator_signer.clone().unwrap(),
)
}
Expand Down
Loading

0 comments on commit 0542c38

Please sign in to comment.