Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: remove tier3_request_queue #12353

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions chain/client/src/test_utils/peer_manager_mock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use near_network::types::{
PeerManagerMessageRequest, PeerManagerMessageResponse, SetChainInfo, StateSyncEvent,
Tier3Request,
};

pub struct PeerManagerMock {
Expand Down Expand Up @@ -43,3 +44,8 @@ impl actix::Handler<StateSyncEvent> for PeerManagerMock {
type Result = ();
fn handle(&mut self, _msg: StateSyncEvent, _ctx: &mut Self::Context) {}
}

impl actix::Handler<Tier3Request> for PeerManagerMock {
type Result = ();
fn handle(&mut self, _msg: Tier3Request, _ctx: &mut Self::Context) {}
}
2 changes: 1 addition & 1 deletion chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ impl PeerActor {
)]
async fn receive_routed_message(
clock: &time::Clock,
network_state: &NetworkState,
network_state: &Arc<NetworkState>,
peer_id: PeerId,
msg_hash: CryptoHash,
body: RoutedMessageBody,
Expand Down
9 changes: 9 additions & 0 deletions chain/network/src/peer/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::state_witness::{
use crate::store;
use crate::tcp;
use crate::testonly::actix::ActixSystem;
use crate::types::{PeerManagerSenderForNetworkInput, PeerManagerSenderForNetworkMessage};
use near_async::messaging::{IntoMultiSender, Sender};
use near_async::time;
use near_o11y::WithSpanContextExt;
Expand Down Expand Up @@ -47,6 +48,7 @@ pub(crate) enum Event {
Client(ClientSenderForNetworkInput),
Network(peer_manager_actor::Event),
PartialWitness(PartialWitnessSenderForNetworkInput),
PeerManager(PeerManagerSenderForNetworkInput),
}

pub(crate) struct PeerHandle {
Expand Down Expand Up @@ -117,6 +119,12 @@ impl PeerHandle {
send.send(Event::Client(event.into_input()));
}
});
let peer_manager_sender = Sender::from_fn({
let send = send.clone();
move |event: PeerManagerSenderForNetworkMessage| {
send.send(Event::PeerManager(event.into_input()));
}
});
let shards_manager_sender = Sender::from_fn({
let send = send.clone();
move |event| {
Expand All @@ -136,6 +144,7 @@ impl PeerHandle {
network_cfg.verify().unwrap(),
cfg.chain.genesis_id.clone(),
client_sender.break_apart().into_multi_sender(),
peer_manager_sender.break_apart().into_multi_sender(),
shards_manager_sender,
state_witness_sender.break_apart().into_multi_sender(),
vec![],
Expand Down
39 changes: 14 additions & 25 deletions chain/network/src/peer_manager/network_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use crate::stats::metrics;
use crate::store;
use crate::tcp;
use crate::types::{
ChainInfo, PeerType, ReasonForBan, StatePartRequestBody, Tier3Request, Tier3RequestBody,
ChainInfo, PeerManagerSenderForNetwork, PeerType, ReasonForBan, StatePartRequestBody,
Tier3Request, Tier3RequestBody,
};
use anyhow::Context;
use arc_swap::ArcSwap;
Expand All @@ -43,7 +44,6 @@ use near_primitives::network::PeerId;
use near_primitives::stateless_validation::chunk_endorsement::ChunkEndorsement;
use near_primitives::types::AccountId;
use parking_lot::{Mutex, RwLock};
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use std::sync::atomic::AtomicUsize;
Expand All @@ -70,9 +70,6 @@ pub const PRUNE_EDGES_AFTER: time::Duration = time::Duration::minutes(30);
/// How long to wait between reconnection attempts to the same peer
pub(crate) const RECONNECT_ATTEMPT_INTERVAL: time::Duration = time::Duration::seconds(10);

/// Limit number of pending tier3 requests to avoid OOM.
pub(crate) const LIMIT_TIER3_REQUESTS: usize = 60;

impl WhitelistNode {
pub fn from_peer_info(pi: &PeerInfo) -> anyhow::Result<Self> {
Ok(Self {
Expand Down Expand Up @@ -111,6 +108,7 @@ pub(crate) struct NetworkState {
/// GenesisId of the chain.
pub genesis_id: GenesisId,
pub client: ClientSenderForNetwork,
pub peer_manager_adapter: PeerManagerSenderForNetwork,
pub shards_manager_adapter: Sender<ShardsManagerRequestFromNetwork>,
pub partial_witness_adapter: PartialWitnessSenderForNetwork,

Expand Down Expand Up @@ -154,9 +152,6 @@ pub(crate) struct NetworkState {
/// TODO(gprusak): consider removing it altogether.
pub tier1_route_back: Mutex<RouteBackCache>,

/// Queue of received requests to which a response should be made over TIER3.
pub tier3_requests: Mutex<VecDeque<Tier3Request>>,

/// Shared counter across all PeerActors, which counts number of `RoutedMessageBody::ForwardTx`
/// messages sincce last block.
pub txns_since_last_block: AtomicUsize,
Expand Down Expand Up @@ -186,6 +181,7 @@ impl NetworkState {
config: config::VerifiedConfig,
genesis_id: GenesisId,
client: ClientSenderForNetwork,
peer_manager_adapter: PeerManagerSenderForNetwork,
shards_manager_adapter: Sender<ShardsManagerRequestFromNetwork>,
partial_witness_adapter: PartialWitnessSenderForNetwork,
whitelist_nodes: Vec<WhitelistNode>,
Expand All @@ -203,6 +199,7 @@ impl NetworkState {
})),
genesis_id,
client,
peer_manager_adapter,
shards_manager_adapter,
partial_witness_adapter,
chain_info: Default::default(),
Expand All @@ -219,7 +216,6 @@ impl NetworkState {
account_announcements: Arc::new(AnnounceAccountCache::new(store)),
tier2_route_back: Mutex::new(RouteBackCache::default()),
tier1_route_back: Mutex::new(RouteBackCache::default()),
tier3_requests: Mutex::new(VecDeque::<Tier3Request>::new()),
recent_routed_messages: Mutex::new(lru::LruCache::new(
NonZeroUsize::new(RECENT_ROUTED_MESSAGES_CACHE_SIZE).unwrap(),
)),
Expand Down Expand Up @@ -697,7 +693,7 @@ impl NetworkState {
}

pub async fn receive_routed_message(
&self,
self: &Arc<Self>,
clock: &time::Clock,
peer_id: PeerId,
msg_hash: CryptoHash,
Expand Down Expand Up @@ -781,21 +777,14 @@ impl NetworkState {
None
}
RoutedMessageBody::StatePartRequest(request) => {
let mut queue = self.tier3_requests.lock();
if queue.len() < LIMIT_TIER3_REQUESTS {
queue.push_back(Tier3Request {
peer_info: PeerInfo {
id: peer_id,
addr: Some(request.addr),
account_id: None,
},
body: Tier3RequestBody::StatePart(StatePartRequestBody {
shard_id: request.shard_id,
sync_hash: request.sync_hash,
part_id: request.part_id,
}),
});
}
self.peer_manager_adapter.send(Tier3Request {
peer_info: PeerInfo { id: peer_id, addr: Some(request.addr), account_id: None },
body: Tier3RequestBody::StatePart(StatePartRequestBody {
shard_id: request.shard_id,
sync_hash: request.sync_hash,
part_id: request.part_id,
}),
});
None
}
RoutedMessageBody::ChunkContractAccesses(accesses) => {
Expand Down
123 changes: 63 additions & 60 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use crate::store;
use crate::tcp;
use crate::types::{
ConnectedPeerInfo, HighestHeightPeerInfo, KnownProducer, NetworkInfo, NetworkRequests,
NetworkResponses, PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse, PeerType,
SetChainInfo, SnapshotHostInfo, StatePartRequestBody, StateSyncEvent, Tier3RequestBody,
NetworkResponses, PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse,
PeerManagerSenderForNetwork, PeerType, SetChainInfo, SnapshotHostInfo, StatePartRequestBody,
StateSyncEvent, Tier3Request, Tier3RequestBody,
};
use ::time::ext::InstantExt as _;
use actix::fut::future::wrap_future;
Expand Down Expand Up @@ -88,8 +89,6 @@ pub(crate) const UPDATE_CONNECTION_STORE_INTERVAL: time::Duration = time::Durati
/// How often to poll the NetworkState for closed connections we'd like to re-establish.
pub(crate) const POLL_CONNECTION_STORE_INTERVAL: time::Duration = time::Duration::minutes(1);

/// How often we check for and process pending Tier3 requests
const PROCESS_TIER3_REQUESTS_INTERVAL: time::Duration = time::Duration::seconds(1);
/// The length of time that a Tier3 connection is allowed to idle before it is stopped
const TIER3_IDLE_TIMEOUT: time::Duration = time::Duration::seconds(15);

Expand Down Expand Up @@ -216,6 +215,7 @@ impl PeerManagerActor {
store: Arc<dyn near_store::db::Database>,
config: config::NetworkConfig,
client: ClientSenderForNetwork,
peer_manager_adapter: PeerManagerSenderForNetwork,
shards_manager_adapter: Sender<ShardsManagerRequestFromNetwork>,
partial_witness_adapter: PartialWitnessSenderForNetwork,
genesis_id: GenesisId,
Expand Down Expand Up @@ -247,6 +247,7 @@ impl PeerManagerActor {
config,
genesis_id,
client,
peer_manager_adapter,
shards_manager_adapter,
partial_witness_adapter,
whitelist_nodes,
Expand Down Expand Up @@ -345,62 +346,6 @@ impl PeerManagerActor {
}
}
});
// Periodically process pending Tier3 requests.
arbiter.spawn({
let clock = clock.clone();
let state = state.clone();
let arbiter = arbiter.clone();
let mut interval = time::Interval::new(clock.now(), PROCESS_TIER3_REQUESTS_INTERVAL);
async move {
loop {
interval.tick(&clock).await;

if let Some(request) = state.tier3_requests.lock().pop_front() {
arbiter.spawn({
let clock = clock.clone();
let state = state.clone();
async move {
let tier3_response = match request.body {
Tier3RequestBody::StatePart(StatePartRequestBody { shard_id, sync_hash, part_id }) => {
match state.client.send_async(StateRequestPart { shard_id, sync_hash, part_id }).await {
Ok(Some(client_response)) => {
PeerMessage::VersionedStateResponse(*client_response.0)
}
Ok(None) => {
tracing::debug!(target: "network", "client declined to respond to {:?}", request);
return;
}
Err(err) => {
tracing::error!(target: "network", ?err, "client failed to respond to {:?}", request);
return;
}
}
}
};

if !state.tier3.load().ready.contains_key(&request.peer_info.id) {
let result = async {
let stream = tcp::Stream::connect(
&request.peer_info,
tcp::Tier::T3,
&state.config.socket_options
).await.context("tcp::Stream::connect()")?;
PeerActor::spawn_and_handshake(clock.clone(),stream,None,state.clone()).await.context("PeerActor::spawn()")?;
anyhow::Ok(())
}.await;

if let Err(ref err) = result {
tracing::info!(target: "network", err = format!("{:#}", err), "tier3 failed to connect to {}", request.peer_info);
}
}

state.tier3.send_message(request.peer_info.id, Arc::new(tier3_response));
}
});
}
}
}
});
}
});
Ok(Self::start_in_arbiter(&arbiter, move |_ctx| Self {
Expand Down Expand Up @@ -1321,6 +1266,64 @@ impl actix::Handler<WithSpanContext<StateSyncEvent>> for PeerManagerActor {
}
}

impl actix::Handler<WithSpanContext<Tier3Request>> for PeerManagerActor {
type Result = ();
#[perf]
fn handle(
&mut self,
request: WithSpanContext<Tier3Request>,
ctx: &mut Self::Context,
) -> Self::Result {
let (_span, request) = handler_debug_span!(target: "network", request);
let _timer = metrics::PEER_MANAGER_TIER3_REQUEST_TIME
.with_label_values(&[(&request.body).into()])
.start_timer();

let state = self.state.clone();
let clock = self.clock.clone();
ctx.spawn(wrap_future(
async move {
let tier3_response = match request.body {
Tier3RequestBody::StatePart(StatePartRequestBody { shard_id, sync_hash, part_id }) => {
match state.client.send_async(StateRequestPart { shard_id, sync_hash, part_id }).await {
Ok(Some(client_response)) => {
PeerMessage::VersionedStateResponse(*client_response.0)
}
Ok(None) => {
tracing::debug!(target: "network", "client declined to respond to {:?}", request);
return;
}
Err(err) => {
tracing::error!(target: "network", ?err, "client failed to respond to {:?}", request);
return;
}
}
}
};

// Establish a tier3 connection if we don't have one already
if !state.tier3.load().ready.contains_key(&request.peer_info.id) {
let result = async {
let stream = tcp::Stream::connect(
&request.peer_info,
tcp::Tier::T3,
&state.config.socket_options
).await.context("tcp::Stream::connect()")?;
PeerActor::spawn_and_handshake(clock.clone(),stream,None,state.clone()).await.context("PeerActor::spawn()")?;
anyhow::Ok(())
}.await;

if let Err(ref err) = result {
tracing::info!(target: "network", err = format!("{:#}", err), "tier3 failed to connect to {}", request.peer_info);
}
}

state.tier3.send_message(request.peer_info.id, Arc::new(tier3_response));
}
));
}
}

impl actix::Handler<GetDebugStatus> for PeerManagerActor {
type Result = DebugStatus;
#[perf]
Expand Down
10 changes: 9 additions & 1 deletion chain/network/src/peer_manager/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::test_utils;
use crate::testonly::actix::ActixSystem;
use crate::types::{
AccountKeys, ChainInfo, KnownPeerStatus, NetworkRequests, PeerManagerMessageRequest,
ReasonForBan,
PeerManagerSenderForNetworkInput, PeerManagerSenderForNetworkMessage, ReasonForBan,
};
use crate::PeerManagerActor;
use futures::FutureExt;
Expand Down Expand Up @@ -74,6 +74,7 @@ pub enum Event {
ShardsManager(ShardsManagerRequestFromNetwork),
Client(ClientSenderForNetworkInput),
PeerManager(PME),
PeerManagerSender(PeerManagerSenderForNetworkInput),
PartialWitness(PartialWitnessSenderForNetworkInput),
}

Expand Down Expand Up @@ -628,6 +629,12 @@ pub(crate) async fn start(
}
}
});
let peer_manager_sender = Sender::from_fn({
let send = send.clone();
move |event: PeerManagerSenderForNetworkMessage| {
send.send(Event::PeerManagerSender(event.into_input()));
}
});
let shards_manager_sender = Sender::from_fn({
let send = send.clone();
move |event| {
Expand All @@ -645,6 +652,7 @@ pub(crate) async fn start(
store,
cfg,
client_sender.break_apart().into_multi_sender(),
peer_manager_sender.break_apart().into_multi_sender(),
shards_manager_sender,
state_witness_sender.break_apart().into_multi_sender(),
genesis_id,
Expand Down
9 changes: 9 additions & 0 deletions chain/network/src/stats/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,15 @@ pub(crate) static PEER_MANAGER_MESSAGES_TIME: LazyLock<HistogramVec> = LazyLock:
)
.unwrap()
});
pub(crate) static PEER_MANAGER_TIER3_REQUEST_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_peer_manager_tier3_request_time",
"Time that PeerManagerActor spends on handling tier3 requests",
&["request"],
Some(exponential_buckets(0.0001, 2., 15).unwrap()),
)
.unwrap()
});
pub(crate) static ROUTED_MESSAGE_DROPPED: LazyLock<IntCounterVec> = LazyLock::new(|| {
try_create_int_counter_vec(
"near_routed_message_dropped",
Expand Down
Loading
Loading