Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Epoch Sync] Change EpochSyncRequest/Response to direct network messages #12236

Merged
merged 4 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions chain/client/src/sync/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,6 @@ impl EpochSync {
update.save_block_header_no_update_tree(
proof.current_epoch.second_last_block_header_in_prev_epoch.clone(),
)?;
tracing::info!(
"last final block of last past epoch: {:?}",
proof.past_epochs.last().unwrap().last_final_block_header.hash()
);
update.save_block_header_no_update_tree(
proof.past_epochs.last().unwrap().last_final_block_header.clone(),
)?;
Expand Down Expand Up @@ -745,7 +741,7 @@ impl Handler<EpochSyncRequestMessage> for ClientActorInner {
}
let store = self.client.chain.chain_store.store().clone();
let network_adapter = self.client.network_adapter.clone();
let route_back = msg.route_back;
let requester_peer_id = msg.from_peer;
let cache = self.client.epoch_sync.last_epoch_sync_response_cache.clone();
self.client.epoch_sync.async_computation_spawner.spawn(
"respond to epoch sync request",
Expand All @@ -758,7 +754,7 @@ impl Handler<EpochSyncRequestMessage> for ClientActorInner {
}
};
network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::EpochSyncResponse { route_back, proof },
NetworkRequests::EpochSyncResponse { peer_id: requester_peer_id, proof },
));
},
)
Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub struct ChunkEndorsementMessage(pub ChunkEndorsement);
#[derive(actix::Message, Debug, Clone, PartialEq, Eq)]
#[rtype(result = "()")]
pub struct EpochSyncRequestMessage {
pub route_back: CryptoHash,
pub from_peer: PeerId,
}

#[derive(actix::Message, Debug, Clone, PartialEq, Eq)]
Expand Down
4 changes: 4 additions & 0 deletions chain/network/src/network_protocol/borsh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use near_primitives::transaction::SignedTransaction;
use near_primitives::types::ShardId;
use near_schema_checker_lib::ProtocolSchema;

use near_primitives::epoch_sync::CompressedEpochSyncProof;
use std::fmt;
use std::fmt::Formatter;

Expand Down Expand Up @@ -166,6 +167,9 @@ pub(super) enum PeerMessage {
StateRequestPart(ShardId, CryptoHash, u64),
VersionedStateResponse(StateResponseInfo),
SyncSnapshotHosts(SyncSnapshotHosts),

EpochSyncRequest,
EpochSyncResponse(CompressedEpochSyncProof),
}
#[cfg(target_arch = "x86_64")] // Non-x86_64 doesn't match this requirement yet but it's not bad as it's not production-ready
const _: () = assert!(std::mem::size_of::<PeerMessage>() <= 1500, "PeerMessage > 1500 bytes");
8 changes: 8 additions & 0 deletions chain/network/src/network_protocol/borsh_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ impl TryFrom<&net::PeerMessage> for mem::PeerMessage {
mem::PeerMessage::VersionedStateResponse(sri)
}
net::PeerMessage::SyncSnapshotHosts(ssh) => mem::PeerMessage::SyncSnapshotHosts(ssh),
net::PeerMessage::EpochSyncRequest => mem::PeerMessage::EpochSyncRequest,
net::PeerMessage::EpochSyncResponse(proof) => {
mem::PeerMessage::EpochSyncResponse(proof)
}
})
}
}
Expand Down Expand Up @@ -256,6 +260,10 @@ impl From<&mem::PeerMessage> for net::PeerMessage {
net::PeerMessage::VersionedStateResponse(sri)
}
mem::PeerMessage::SyncSnapshotHosts(ssh) => net::PeerMessage::SyncSnapshotHosts(ssh),
mem::PeerMessage::EpochSyncRequest => net::PeerMessage::EpochSyncRequest,
mem::PeerMessage::EpochSyncResponse(proof) => {
net::PeerMessage::EpochSyncResponse(proof)
}
}
}
}
13 changes: 8 additions & 5 deletions chain/network/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,9 @@ pub enum PeerMessage {
StateRequestHeader(ShardId, CryptoHash),
StateRequestPart(ShardId, CryptoHash, u64),
VersionedStateResponse(StateResponseInfo),

EpochSyncRequest,
EpochSyncResponse(CompressedEpochSyncProof),
}

impl fmt::Display for PeerMessage {
Expand Down Expand Up @@ -554,8 +557,9 @@ pub enum RoutedMessageBody {
PartialEncodedStateWitness(PartialEncodedStateWitness),
PartialEncodedStateWitnessForward(PartialEncodedStateWitness),
VersionedChunkEndorsement(ChunkEndorsement),
EpochSyncRequest,
EpochSyncResponse(CompressedEpochSyncProof),
/// Not used, but needed for borsh backward compatibility.
_UnusedEpochSyncRequest,
_UnusedEpochSyncResponse(CompressedEpochSyncProof),
StatePartRequest(StatePartRequest),
ChunkContractAccesses(ChunkContractAccesses),
ContractCodeRequest(ContractCodeRequest),
Expand Down Expand Up @@ -651,8 +655,8 @@ impl fmt::Debug for RoutedMessageBody {
RoutedMessageBody::VersionedChunkEndorsement(_) => {
write!(f, "VersionedChunkEndorsement")
}
RoutedMessageBody::EpochSyncRequest => write!(f, "EpochSyncRequest"),
RoutedMessageBody::EpochSyncResponse(_) => {
RoutedMessageBody::_UnusedEpochSyncRequest => write!(f, "EpochSyncRequest"),
RoutedMessageBody::_UnusedEpochSyncResponse(_) => {
write!(f, "EpochSyncResponse")
}
RoutedMessageBody::StatePartRequest(_) => write!(f, "StatePartRequest"),
Expand Down Expand Up @@ -747,7 +751,6 @@ impl RoutedMessage {
RoutedMessageBody::Ping(_)
| RoutedMessageBody::TxStatusRequest(_, _)
| RoutedMessageBody::PartialEncodedChunkRequest(_)
| RoutedMessageBody::EpochSyncRequest
)
}

Expand Down
9 changes: 9 additions & 0 deletions chain/network/src/network_protocol/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,12 @@ message SyncSnapshotHosts {
repeated SnapshotHostInfo hosts = 1;
}

message EpochSyncRequest {}

message EpochSyncResponse {
bytes compressed_proof = 1;
}

// PeerMessage is a wrapper of all message types exchanged between NEAR nodes.
// The wire format of a single message M consists of len(M)+4 bytes:
// <len(M)> : 4 bytes : little endian uint32
Expand Down Expand Up @@ -496,5 +502,8 @@ message PeerMessage {
StateRequestPart state_request_part = 30;
StateResponse state_response = 31;
SyncSnapshotHosts sync_snapshot_hosts = 32;

EpochSyncRequest epoch_sync_request = 34;
EpochSyncResponse epoch_sync_response = 35;
}
}
14 changes: 14 additions & 0 deletions chain/network/src/network_protocol/proto_conv/peer_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use near_async::time::error::ComponentRange;
use near_primitives::block::{Block, BlockHeader};
use near_primitives::challenge::Challenge;
use near_primitives::transaction::SignedTransaction;
use near_primitives::utils::compression::CompressedData;
use protobuf::MessageField as MF;
use std::sync::Arc;

Expand Down Expand Up @@ -332,6 +333,15 @@ impl From<&PeerMessage> for proto::PeerMessage {
..Default::default()
})
}
PeerMessage::EpochSyncRequest => {
ProtoMT::EpochSyncRequest(proto::EpochSyncRequest { ..Default::default() })
}
PeerMessage::EpochSyncResponse(esp) => {
ProtoMT::EpochSyncResponse(proto::EpochSyncResponse {
compressed_proof: esp.as_slice().to_vec(),
..Default::default()
})
}
}),
..Default::default()
}
Expand Down Expand Up @@ -491,6 +501,10 @@ impl TryFrom<&proto::PeerMessage> for PeerMessage {
ProtoMT::SyncSnapshotHosts(srh) => PeerMessage::SyncSnapshotHosts(
srh.try_into().map_err(Self::Error::SyncSnapshotHosts)?,
),
ProtoMT::EpochSyncRequest(_) => PeerMessage::EpochSyncRequest,
ProtoMT::EpochSyncResponse(esr) => PeerMessage::EpochSyncResponse(
CompressedData::from_boxed_slice(esr.compressed_proof.clone().into_boxed_slice()),
),
})
}
}
15 changes: 13 additions & 2 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::accounts_data::AccountDataError;
use crate::client::{
AnnounceAccountRequest, BlockHeadersRequest, BlockHeadersResponse, BlockRequest, BlockResponse,
ProcessTxRequest, RecvChallenge, StateRequestHeader, StateRequestPart, StateResponseReceived,
EpochSyncRequestMessage, EpochSyncResponseMessage, ProcessTxRequest, RecvChallenge,
StateRequestHeader, StateRequestPart, StateResponseReceived,
};
use crate::concurrency::atomic_cell::AtomicCell;
use crate::concurrency::demux;
Expand Down Expand Up @@ -32,7 +33,7 @@ use crate::types::{
use actix::fut::future::wrap_future;
use actix::{Actor as _, ActorContext as _, ActorFutureExt as _, AsyncContext as _};
use lru::LruCache;
use near_async::messaging::SendAsync;
use near_async::messaging::{CanSend, SendAsync};
use near_async::time;
use near_crypto::Signature;
use near_o11y::{handler_debug_span, log_assert, WithSpanContext};
Expand Down Expand Up @@ -1110,6 +1111,16 @@ impl PeerActor {
.ok();
None
}
PeerMessage::EpochSyncRequest => {
network_state.client.send(EpochSyncRequestMessage { from_peer: peer_id });
None
}
PeerMessage::EpochSyncResponse(proof) => {
network_state
.client
.send(EpochSyncResponseMessage { from_peer: peer_id, proof });
None
}
msg => {
tracing::error!(target: "network", "Peer received unexpected type: {:?}", msg);
None
Expand Down
12 changes: 2 additions & 10 deletions chain/network/src/peer_manager/network_state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::accounts_data::{AccountDataCache, AccountDataError};
use crate::announce_accounts::AnnounceAccountCache;
use crate::client::{
BlockApproval, ChunkEndorsementMessage, ClientSenderForNetwork, EpochSyncRequestMessage,
EpochSyncResponseMessage, ProcessTxRequest, TxStatusRequest, TxStatusResponse,
BlockApproval, ChunkEndorsementMessage, ClientSenderForNetwork, ProcessTxRequest,
TxStatusRequest, TxStatusResponse,
};
use crate::concurrency::demux;
use crate::concurrency::runtime::Runtime;
Expand Down Expand Up @@ -779,14 +779,6 @@ impl NetworkState {
self.client.send_async(ChunkEndorsementMessage(endorsement)).await.ok();
None
}
RoutedMessageBody::EpochSyncRequest => {
self.client.send(EpochSyncRequestMessage { route_back: msg_hash });
None
}
RoutedMessageBody::EpochSyncResponse(proof) => {
self.client.send(EpochSyncResponseMessage { from_peer: peer_id, proof });
None
}
RoutedMessageBody::StatePartRequest(request) => {
let mut queue = self.tier3_requests.lock();
if queue.len() < LIMIT_TIER3_REQUESTS {
Expand Down
34 changes: 7 additions & 27 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,40 +1151,20 @@ impl PeerManagerActor {
NetworkResponses::NoResponse
}
NetworkRequests::EpochSyncRequest { peer_id } => {
if self.state.send_message_to_peer(
&self.clock,
tcp::Tier::T2,
self.state.sign_message(
&self.clock,
RawRoutedMessage {
target: PeerIdOrHash::PeerId(peer_id),
body: RoutedMessageBody::EpochSyncRequest,
},
),
) {
if self.state.tier2.send_message(peer_id, PeerMessage::EpochSyncRequest.into()) {
NetworkResponses::NoResponse
} else {
NetworkResponses::RouteNotFound
}
}
NetworkRequests::EpochSyncResponse { route_back, proof } => {
if self.state.send_message_to_peer(
&self.clock,
tcp::Tier::T2,
self.state.sign_message(
&self.clock,
RawRoutedMessage {
target: PeerIdOrHash::Hash(route_back),
body: RoutedMessageBody::EpochSyncResponse(proof),
},
),
) {
NetworkRequests::EpochSyncResponse { peer_id, proof } => {
if self
.state
.tier2
.send_message(peer_id, PeerMessage::EpochSyncResponse(proof).into())
{
NetworkResponses::NoResponse
} else {
tracing::info!(
"Failed to send EpochSyncResponse to {}, route not found",
route_back
);
NetworkResponses::RouteNotFound
}
}
Expand Down
36 changes: 31 additions & 5 deletions chain/network/src/rate_limits/messages_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,18 @@ impl Config {

/// Returns a good preset of rate limit configuration valid for any type of node.
pub fn standard_preset() -> Self {
// TODO(trisfald): make preset
Self::default()
// TODO(trisfald): make presets for other message types
let mut config = Self::default();
// EpochSyncRequest is a very simple amplication attack vector, as it requires no arguments
// and the response is large. So we rate limit it to 1 request per 30 seconds. In practice,
// a peer should not need to epoch sync except when bootstrapping a node, so a request
// should be rarely received. We still set it to a reasonable rate limit so a bootstrapping
// node can retry without waiting for too long.
config.rate_limits.insert(
RateLimitedPeerMessageKey::EpochSyncRequest,
SingleMessageConfig::new(1, 1.0 / 30.0, None),
);
config
}

/// Applies rate limits configuration overrides to `self`. In practice, merges the two configurations
Expand Down Expand Up @@ -173,6 +183,7 @@ pub enum RateLimitedPeerMessageKey {
ChunkContractAccesses,
ContractCodeRequest,
ContractCodeResponse,
EpochSyncRequest,
}

/// Given a `PeerMessage` returns a tuple containing the `RateLimitedPeerMessageKey`
Expand Down Expand Up @@ -224,8 +235,8 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa
RoutedMessageBody::ContractCodeRequest(_) => Some((ContractCodeRequest, 1)),
RoutedMessageBody::ContractCodeResponse(_) => Some((ContractCodeResponse, 1)),
RoutedMessageBody::VersionedChunkEndorsement(_) => Some((ChunkEndorsement, 1)),
RoutedMessageBody::EpochSyncRequest => None,
RoutedMessageBody::EpochSyncResponse(_) => None,
RoutedMessageBody::_UnusedEpochSyncRequest => None,
RoutedMessageBody::_UnusedEpochSyncResponse(_) => None,
RoutedMessageBody::StatePartRequest(_) => None, // TODO
RoutedMessageBody::Ping(_)
| RoutedMessageBody::Pong(_)
Expand All @@ -244,6 +255,8 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa
PeerMessage::StateRequestHeader(_, _) => Some((StateRequestHeader, 1)),
PeerMessage::StateRequestPart(_, _, _) => Some((StateRequestPart, 1)),
PeerMessage::VersionedStateResponse(_) => Some((VersionedStateResponse, 1)),
PeerMessage::EpochSyncRequest => Some((EpochSyncRequest, 1)),
PeerMessage::EpochSyncResponse(_) => None,
PeerMessage::Tier1Handshake(_)
| PeerMessage::Tier2Handshake(_)
| PeerMessage::Tier3Handshake(_)
Expand All @@ -256,7 +269,7 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa

#[cfg(test)]
mod tests {
use near_async::time::Duration;
use near_async::time::{Duration, FakeClock};
use near_primitives::hash::CryptoHash;

use crate::network_protocol::{Disconnect, PeerMessage};
Expand Down Expand Up @@ -452,4 +465,17 @@ mod tests {
}});
assert!(serde_json::from_value::<OverrideConfig>(json).is_err());
}

#[test]
fn test_epoch_sync_rate_limit() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

let config = Config::standard_preset();
let clock = FakeClock::default();
let mut rate_limits = RateLimits::from_config(&config, clock.now());
assert!(rate_limits.is_allowed(&PeerMessage::EpochSyncRequest, clock.now()));
assert!(!rate_limits.is_allowed(&PeerMessage::EpochSyncRequest, clock.now()));
clock.advance(Duration::seconds(1));
assert!(!rate_limits.is_allowed(&PeerMessage::EpochSyncRequest, clock.now()));
clock.advance(Duration::seconds(30));
assert!(rate_limits.is_allowed(&PeerMessage::EpochSyncRequest, clock.now()));
}
}
11 changes: 6 additions & 5 deletions chain/network/src/test_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,16 @@ fn network_message_to_client_handler(
NetworkRequests::EpochSyncRequest { peer_id } => {
let my_peer_id = shared_state.account_to_peer_id.get(&my_account_id).unwrap();
assert_ne!(&peer_id, my_peer_id, "Sending message to self not supported.");
shared_state.senders_for_peer(&peer_id).client_sender.send(EpochSyncRequestMessage {
route_back: shared_state.generate_route_back(my_peer_id),
});
shared_state
.senders_for_peer(&peer_id)
.client_sender
.send(EpochSyncRequestMessage { from_peer: my_peer_id.clone() });
None
}
NetworkRequests::EpochSyncResponse { route_back, proof } => {
NetworkRequests::EpochSyncResponse { peer_id, proof } => {
let my_peer_id = shared_state.account_to_peer_id.get(&my_account_id).unwrap();
shared_state
.senders_for_route_back(&route_back)
.senders_for_peer(&peer_id)
.client_sender
.send(EpochSyncResponseMessage { from_peer: my_peer_id.clone(), proof });
None
Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ pub enum NetworkRequests {
/// Requests an epoch sync
EpochSyncRequest { peer_id: PeerId },
/// Response to an epoch sync request
EpochSyncResponse { route_back: CryptoHash, proof: CompressedEpochSyncProof },
EpochSyncResponse { peer_id: PeerId, proof: CompressedEpochSyncProof },
/// Message from chunk producer to chunk validators containing the code-hashes of contracts
/// accessed for the main state transition in the witness.
ChunkContractAccesses(Vec<AccountId>, ChunkContractAccesses),
Expand Down
Loading
Loading