From 3261eff0bf61e1aea1a04270cab801c8bcf32464 Mon Sep 17 00:00:00 2001 From: divma Date: Thu, 17 Jun 2021 00:40:16 +0000 Subject: [PATCH] split outbound and inbound codecs encoded types (#2410) Splits the inbound and outbound requests, for maintainability. --- .../eth2_libp2p/src/behaviour/handler/mod.rs | 2 +- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 30 +-- beacon_node/eth2_libp2p/src/rpc/codec/base.rs | 24 ++- beacon_node/eth2_libp2p/src/rpc/codec/mod.rs | 12 +- .../eth2_libp2p/src/rpc/codec/ssz_snappy.rs | 49 ++--- beacon_node/eth2_libp2p/src/rpc/handler.rs | 17 +- beacon_node/eth2_libp2p/src/rpc/mod.rs | 17 +- beacon_node/eth2_libp2p/src/rpc/outbound.rs | 177 ++++++++++++++++++ beacon_node/eth2_libp2p/src/rpc/protocol.rs | 124 ++++-------- .../eth2_libp2p/src/rpc/rate_limiter.rs | 6 +- 10 files changed, 304 insertions(+), 154 deletions(-) create mode 100644 beacon_node/eth2_libp2p/src/rpc/outbound.rs diff --git a/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs index 93a953e6a74..d587ea6549a 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs @@ -38,7 +38,7 @@ impl BehaviourHandler { pub enum BehaviourHandlerIn { Delegate(DelegateIn), /// Start the shutdown process. - Shutdown(Option<(RequestId, RPCRequest)>), + Shutdown(Option<(RequestId, OutboundRequest)>), } impl ProtocolsHandler for BehaviourHandler { diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 02468285a6a..fd37030e1be 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -595,7 +595,7 @@ impl Behaviour { trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => %peer_id); self.eth2_rpc - .send_request(peer_id, id, RPCRequest::Ping(ping)); + .send_request(peer_id, id, OutboundRequest::Ping(ping)); } /// Sends a Pong response to the peer. @@ -610,7 +610,7 @@ impl Behaviour { /// Sends a METADATA request to a peer. fn send_meta_data_request(&mut self, peer_id: PeerId) { - let event = RPCRequest::MetaData(PhantomData); + let event = OutboundRequest::MetaData(PhantomData); self.eth2_rpc .send_request(peer_id, RequestId::Behaviour, event); } @@ -749,17 +749,17 @@ impl Behaviour { let peer_request_id = (handler_id, id); match request { /* Behaviour managed protocols: Ping and Metadata */ - RPCRequest::Ping(ping) => { + InboundRequest::Ping(ping) => { // inform the peer manager and send the response self.peer_manager.ping_request(&peer_id, ping.data); // send a ping response self.pong(peer_request_id, peer_id); } - RPCRequest::MetaData(_) => { + InboundRequest::MetaData(_) => { // send the requested meta-data self.send_meta_data_response((handler_id, id), peer_id); } - RPCRequest::Goodbye(reason) => { + InboundRequest::Goodbye(reason) => { // queue for disconnection without a goodbye message debug!( self.log, "Peer sent Goodbye"; @@ -775,18 +775,18 @@ impl Behaviour { // inform the application layer early. } /* Protocols propagated to the Network */ - RPCRequest::Status(msg) => { + InboundRequest::Status(msg) => { // inform the peer manager that we have received a status from a peer self.peer_manager.peer_statusd(&peer_id); // propagate the STATUS message upwards self.propagate_request(peer_request_id, peer_id, Request::Status(msg)) } - RPCRequest::BlocksByRange(req) => self.propagate_request( + InboundRequest::BlocksByRange(req) => self.propagate_request( peer_request_id, peer_id, Request::BlocksByRange(req), ), - RPCRequest::BlocksByRoot(req) => { + InboundRequest::BlocksByRoot(req) => { self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req)) } } @@ -834,7 +834,7 @@ impl Behaviour { peer_id, handler: NotifyHandler::Any, event: BehaviourHandlerIn::Shutdown( - reason.map(|reason| (RequestId::Behaviour, RPCRequest::Goodbye(reason))), + reason.map(|reason| (RequestId::Behaviour, OutboundRequest::Goodbye(reason))), ), }); } @@ -878,7 +878,7 @@ impl Behaviour { handler: NotifyHandler::Any, event: BehaviourHandlerIn::Shutdown(Some(( RequestId::Behaviour, - RPCRequest::Goodbye(reason), + OutboundRequest::Goodbye(reason), ))), }); } @@ -1293,12 +1293,12 @@ pub enum Request { BlocksByRoot(BlocksByRootRequest), } -impl std::convert::From for RPCRequest { - fn from(req: Request) -> RPCRequest { +impl std::convert::From for OutboundRequest { + fn from(req: Request) -> OutboundRequest { match req { - Request::BlocksByRoot(r) => RPCRequest::BlocksByRoot(r), - Request::BlocksByRange(r) => RPCRequest::BlocksByRange(r), - Request::Status(s) => RPCRequest::Status(s), + Request::BlocksByRoot(r) => OutboundRequest::BlocksByRoot(r), + Request::BlocksByRange(r) => OutboundRequest::BlocksByRange(r), + Request::Status(s) => OutboundRequest::Status(s), } } } diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs index 0e97157fd34..ff158067aab 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/base.rs @@ -1,7 +1,7 @@ //! This handles the various supported encoding mechanism for the Eth 2.0 RPC. use crate::rpc::methods::ErrorType; -use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse}; +use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse}; use libp2p::bytes::BufMut; use libp2p::bytes::BytesMut; use std::marker::PhantomData; @@ -47,7 +47,7 @@ where // This deals with Decoding RPC Responses from other peers and encoding our requests pub struct BaseOutboundCodec where - TOutboundCodec: OutboundCodec>, + TOutboundCodec: OutboundCodec>, TSpec: EthSpec, { /// Inner codec for handling various encodings. @@ -60,7 +60,7 @@ where impl BaseOutboundCodec where TSpec: EthSpec, - TOutboundCodec: OutboundCodec>, + TOutboundCodec: OutboundCodec>, { pub fn new(codec: TOutboundCodec) -> Self { BaseOutboundCodec { @@ -102,9 +102,9 @@ where impl Decoder for BaseInboundCodec where TSpec: EthSpec, - TCodec: Encoder> + Decoder>, + TCodec: Encoder> + Decoder>, { - type Item = RPCRequest; + type Item = InboundRequest; type Error = ::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { @@ -115,14 +115,18 @@ where /* Base Outbound Codec */ // This Encodes RPC Requests sent to external peers -impl Encoder> for BaseOutboundCodec +impl Encoder> for BaseOutboundCodec where TSpec: EthSpec, - TCodec: OutboundCodec> + Encoder>, + TCodec: OutboundCodec> + Encoder>, { - type Error = >>::Error; + type Error = >>::Error; - fn encode(&mut self, item: RPCRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode( + &mut self, + item: OutboundRequest, + dst: &mut BytesMut, + ) -> Result<(), Self::Error> { self.inner.encode(item, dst) } } @@ -131,7 +135,7 @@ where impl Decoder for BaseOutboundCodec where TSpec: EthSpec, - TCodec: OutboundCodec, CodecErrorType = ErrorType> + TCodec: OutboundCodec, CodecErrorType = ErrorType> + Decoder>, { type Item = RPCCodedResponse; diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2_libp2p/src/rpc/codec/mod.rs index ae8e2abd157..05de328857d 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/mod.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/mod.rs @@ -4,7 +4,7 @@ pub(crate) mod ssz_snappy; use self::base::{BaseInboundCodec, BaseOutboundCodec}; use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}; use crate::rpc::protocol::RPCError; -use crate::rpc::{RPCCodedResponse, RPCRequest}; +use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse}; use libp2p::bytes::BytesMut; use tokio_util::codec::{Decoder, Encoder}; use types::EthSpec; @@ -29,7 +29,7 @@ impl Encoder> for InboundCodec { } impl Decoder for InboundCodec { - type Item = RPCRequest; + type Item = InboundRequest; type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { @@ -39,10 +39,14 @@ impl Decoder for InboundCodec { } } -impl Encoder> for OutboundCodec { +impl Encoder> for OutboundCodec { type Error = RPCError; - fn encode(&mut self, item: RPCRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode( + &mut self, + item: OutboundRequest, + dst: &mut BytesMut, + ) -> Result<(), Self::Error> { match self { OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst), } diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs index 33dc49733c9..4e84d354bc0 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs @@ -1,9 +1,8 @@ -use crate::rpc::methods::*; use crate::rpc::{ codec::base::OutboundCodec, protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN}, }; -use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse}; +use crate::rpc::{methods::*, InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse}; use libp2p::bytes::BytesMut; use snap::read::FrameDecoder; use snap::write::FrameEncoder; @@ -90,7 +89,7 @@ impl Encoder> for SSZSnappyInboundCodec< // Decoder for inbound streams: Decodes RPC requests from peers impl Decoder for SSZSnappyInboundCodec { - type Item = RPCRequest; + type Item = InboundRequest; type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { @@ -133,27 +132,29 @@ impl Decoder for SSZSnappyInboundCodec { // since we have already checked `length` above. match self.protocol.message_name { Protocol::Status => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( - &decoded_buffer, - )?))), + Version::V1 => Ok(Some(InboundRequest::Status( + StatusMessage::from_ssz_bytes(&decoded_buffer)?, + ))), }, Protocol::Goodbye => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Goodbye( + Version::V1 => Ok(Some(InboundRequest::Goodbye( GoodbyeReason::from_ssz_bytes(&decoded_buffer)?, ))), }, Protocol::BlocksByRange => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::BlocksByRange( + Version::V1 => Ok(Some(InboundRequest::BlocksByRange( BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?, ))), }, Protocol::BlocksByRoot => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest { - block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, - }))), + Version::V1 => { + Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest { + block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?, + }))) + } }, Protocol::Ping => match self.protocol.version { - Version::V1 => Ok(Some(RPCRequest::Ping(Ping { + Version::V1 => Ok(Some(InboundRequest::Ping(Ping { data: u64::from_ssz_bytes(&decoded_buffer)?, }))), }, @@ -163,7 +164,7 @@ impl Decoder for SSZSnappyInboundCodec { if !decoded_buffer.is_empty() { Err(RPCError::InvalidData) } else { - Ok(Some(RPCRequest::MetaData(PhantomData))) + Ok(Some(InboundRequest::MetaData(PhantomData))) } } }, @@ -201,17 +202,21 @@ impl SSZSnappyOutboundCodec { } // Encoder for outbound streams: Encodes RPC Requests to peers -impl Encoder> for SSZSnappyOutboundCodec { +impl Encoder> for SSZSnappyOutboundCodec { type Error = RPCError; - fn encode(&mut self, item: RPCRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode( + &mut self, + item: OutboundRequest, + dst: &mut BytesMut, + ) -> Result<(), Self::Error> { let bytes = match item { - RPCRequest::Status(req) => req.as_ssz_bytes(), - RPCRequest::Goodbye(req) => req.as_ssz_bytes(), - RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(), - RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), - RPCRequest::Ping(req) => req.as_ssz_bytes(), - RPCRequest::MetaData(_) => return Ok(()), // no metadata to encode + OutboundRequest::Status(req) => req.as_ssz_bytes(), + OutboundRequest::Goodbye(req) => req.as_ssz_bytes(), + OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(), + OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), + OutboundRequest::Ping(req) => req.as_ssz_bytes(), + OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode }; // SSZ encoded bytes should be within `max_packet_size` if bytes.len() > self.max_packet_size { @@ -318,7 +323,7 @@ impl Decoder for SSZSnappyOutboundCodec { } } -impl OutboundCodec> for SSZSnappyOutboundCodec { +impl OutboundCodec> for SSZSnappyOutboundCodec { type CodecErrorType = ErrorType; fn decode_error( diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index 6f7aff78b70..4761d859948 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -2,9 +2,10 @@ #![allow(clippy::cognitive_complexity)] use super::methods::{RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination}; -use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest}; +use super::protocol::{Protocol, RPCError, RPCProtocol}; use super::{RPCReceived, RPCSend}; -use crate::rpc::protocol::{InboundFramed, OutboundFramed}; +use crate::rpc::outbound::{OutboundFramed, OutboundRequest}; +use crate::rpc::protocol::InboundFramed; use fnv::FnvHashMap; use futures::prelude::*; use futures::{Sink, SinkExt}; @@ -90,7 +91,7 @@ where events_out: SmallVec<[HandlerEvent; 4]>, /// Queue of outbound substreams to open. - dial_queue: SmallVec<[(RequestId, RPCRequest); 4]>, + dial_queue: SmallVec<[(RequestId, OutboundRequest); 4]>, /// Current number of concurrent outbound substreams being opened. dial_negotiated: u32, @@ -186,7 +187,7 @@ pub enum OutboundSubstreamState { /// The framed negotiated substream. substream: Box>, /// Keeps track of the actual request sent. - request: RPCRequest, + request: OutboundRequest, }, /// Closing an outbound substream> Closing(Box>), @@ -221,7 +222,7 @@ where } /// Initiates the handler's shutdown process, sending an optional last message to the peer. - pub fn shutdown(&mut self, final_msg: Option<(RequestId, RPCRequest)>) { + pub fn shutdown(&mut self, final_msg: Option<(RequestId, OutboundRequest)>) { if matches!(self.state, HandlerState::Active) { if !self.dial_queue.is_empty() { debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len()); @@ -247,7 +248,7 @@ where } /// Opens an outbound substream with a request. - fn send_request(&mut self, id: RequestId, req: RPCRequest) { + fn send_request(&mut self, id: RequestId, req: OutboundRequest) { match self.state { HandlerState::Active => { self.dial_queue.push((id, req)); @@ -303,8 +304,8 @@ where type OutEvent = HandlerEvent; type Error = RPCError; type InboundProtocol = RPCProtocol; - type OutboundProtocol = RPCRequest; - type OutboundOpenInfo = (RequestId, RPCRequest); // Keep track of the id and the request + type OutboundProtocol = OutboundRequest; + type OutboundOpenInfo = (RequestId, OutboundRequest); // Keep track of the id and the request type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { diff --git a/beacon_node/eth2_libp2p/src/rpc/mod.rs b/beacon_node/eth2_libp2p/src/rpc/mod.rs index a1f4fac0336..b91ca71fa34 100644 --- a/beacon_node/eth2_libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2_libp2p/src/rpc/mod.rs @@ -21,35 +21,37 @@ use types::EthSpec; pub(crate) use handler::HandlerErr; pub(crate) use methods::{MetaData, Ping, RPCCodedResponse, RPCResponse}; -pub(crate) use protocol::{RPCProtocol, RPCRequest}; +pub(crate) use protocol::{InboundRequest, RPCProtocol}; pub use handler::SubstreamId; pub use methods::{ BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, MaxRequestBlocks, RPCResponseErrorCode, RequestId, ResponseTermination, StatusMessage, MAX_REQUEST_BLOCKS, }; +pub(crate) use outbound::OutboundRequest; pub use protocol::{Protocol, RPCError}; pub(crate) mod codec; mod handler; pub mod methods; +mod outbound; mod protocol; mod rate_limiter; /// RPC events sent from Lighthouse. #[derive(Debug, Clone)] -pub enum RPCSend { +pub enum RPCSend { /// A request sent from Lighthouse. /// /// The `RequestId` is given by the application making the request. These /// go over *outbound* connections. - Request(RequestId, RPCRequest), + Request(RequestId, OutboundRequest), /// A response sent from Lighthouse. /// /// The `SubstreamId` must correspond to the RPC-given ID of the original request received from the /// peer. The second parameter is a single chunk of a response. These go over *inbound* /// connections. - Response(SubstreamId, RPCCodedResponse), + Response(SubstreamId, RPCCodedResponse), } /// RPC events received from outside Lighthouse. @@ -59,7 +61,7 @@ pub enum RPCReceived { /// /// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the /// *inbound* substream over which it is managed. - Request(SubstreamId, RPCRequest), + Request(SubstreamId, InboundRequest), /// A response received from the outside. /// /// The `RequestId` corresponds to the application given ID of the original request sent to the @@ -150,7 +152,7 @@ impl RPC { &mut self, peer_id: PeerId, request_id: RequestId, - event: RPCRequest, + event: OutboundRequest, ) { self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id, @@ -188,7 +190,8 @@ where fn inject_connected(&mut self, peer_id: &PeerId) { // find the peer's meta-data debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id); - let rpc_event = RPCSend::Request(RequestId::Behaviour, RPCRequest::MetaData(PhantomData)); + let rpc_event = + RPCSend::Request(RequestId::Behaviour, OutboundRequest::MetaData(PhantomData)); self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: *peer_id, handler: NotifyHandler::Any, diff --git a/beacon_node/eth2_libp2p/src/rpc/outbound.rs b/beacon_node/eth2_libp2p/src/rpc/outbound.rs new file mode 100644 index 00000000000..b9dbd08b523 --- /dev/null +++ b/beacon_node/eth2_libp2p/src/rpc/outbound.rs @@ -0,0 +1,177 @@ +use std::marker::PhantomData; + +use super::methods::*; +use super::protocol::Protocol; +use super::protocol::ProtocolId; +use super::RPCError; +use crate::rpc::protocol::Encoding; +use crate::rpc::protocol::Version; +use crate::rpc::{ + codec::{base::BaseOutboundCodec, ssz_snappy::SSZSnappyOutboundCodec, OutboundCodec}, + methods::ResponseTermination, +}; +use futures::future::BoxFuture; +use futures::prelude::{AsyncRead, AsyncWrite}; +use futures::{FutureExt, SinkExt}; +use libp2p::core::{OutboundUpgrade, UpgradeInfo}; +use tokio_util::{ + codec::Framed, + compat::{Compat, FuturesAsyncReadCompatExt}, +}; +use types::EthSpec; +/* Outbound request */ + +// Combines all the RPC requests into a single enum to implement `UpgradeInfo` and +// `OutboundUpgrade` + +#[derive(Debug, Clone, PartialEq)] +pub enum OutboundRequest { + Status(StatusMessage), + Goodbye(GoodbyeReason), + BlocksByRange(BlocksByRangeRequest), + BlocksByRoot(BlocksByRootRequest), + Ping(Ping), + MetaData(PhantomData), +} + +impl UpgradeInfo for OutboundRequest { + type Info = ProtocolId; + type InfoIter = Vec; + + // add further protocols as we support more encodings/versions + fn protocol_info(&self) -> Self::InfoIter { + self.supported_protocols() + } +} + +/// Implements the encoding per supported protocol for `RPCRequest`. +impl OutboundRequest { + pub fn supported_protocols(&self) -> Vec { + match self { + // add more protocols when versions/encodings are supported + OutboundRequest::Status(_) => vec![ProtocolId::new( + Protocol::Status, + Version::V1, + Encoding::SSZSnappy, + )], + OutboundRequest::Goodbye(_) => vec![ProtocolId::new( + Protocol::Goodbye, + Version::V1, + Encoding::SSZSnappy, + )], + OutboundRequest::BlocksByRange(_) => vec![ProtocolId::new( + Protocol::BlocksByRange, + Version::V1, + Encoding::SSZSnappy, + )], + OutboundRequest::BlocksByRoot(_) => vec![ProtocolId::new( + Protocol::BlocksByRoot, + Version::V1, + Encoding::SSZSnappy, + )], + OutboundRequest::Ping(_) => vec![ProtocolId::new( + Protocol::Ping, + Version::V1, + Encoding::SSZSnappy, + )], + OutboundRequest::MetaData(_) => vec![ProtocolId::new( + Protocol::MetaData, + Version::V1, + Encoding::SSZSnappy, + )], + } + } + + /* These functions are used in the handler for stream management */ + + /// Number of responses expected for this request. + pub fn expected_responses(&self) -> u64 { + match self { + OutboundRequest::Status(_) => 1, + OutboundRequest::Goodbye(_) => 0, + OutboundRequest::BlocksByRange(req) => req.count, + OutboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, + OutboundRequest::Ping(_) => 1, + OutboundRequest::MetaData(_) => 1, + } + } + + /// Gives the corresponding `Protocol` to this request. + pub fn protocol(&self) -> Protocol { + match self { + OutboundRequest::Status(_) => Protocol::Status, + OutboundRequest::Goodbye(_) => Protocol::Goodbye, + OutboundRequest::BlocksByRange(_) => Protocol::BlocksByRange, + OutboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, + OutboundRequest::Ping(_) => Protocol::Ping, + OutboundRequest::MetaData(_) => Protocol::MetaData, + } + } + + /// Returns the `ResponseTermination` type associated with the request if a stream gets + /// terminated. + pub fn stream_termination(&self) -> ResponseTermination { + match self { + // this only gets called after `multiple_responses()` returns true. Therefore, only + // variants that have `multiple_responses()` can have values. + OutboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, + OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, + OutboundRequest::Status(_) => unreachable!(), + OutboundRequest::Goodbye(_) => unreachable!(), + OutboundRequest::Ping(_) => unreachable!(), + OutboundRequest::MetaData(_) => unreachable!(), + } + } +} + +/* RPC Response type - used for outbound upgrades */ + +/* Outbound upgrades */ + +pub type OutboundFramed = Framed, OutboundCodec>; + +impl OutboundUpgrade for OutboundRequest +where + TSpec: EthSpec + Send + 'static, + TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + type Output = OutboundFramed; + type Error = RPCError; + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future { + // convert to a tokio compatible socket + let socket = socket.compat(); + let codec = match protocol.encoding { + Encoding::SSZSnappy => { + let ssz_snappy_codec = BaseOutboundCodec::new(SSZSnappyOutboundCodec::new( + protocol, + usize::max_value(), + )); + OutboundCodec::SSZSnappy(ssz_snappy_codec) + } + }; + + let mut socket = Framed::new(socket, codec); + + async { + socket.send(self).await?; + socket.close().await?; + Ok(socket) + } + .boxed() + } +} + +impl std::fmt::Display for OutboundRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OutboundRequest::Status(status) => write!(f, "Status Message: {}", status), + OutboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), + OutboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), + OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), + OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), + OutboundRequest::MetaData(_) => write!(f, "MetaData request"), + } + } +} diff --git a/beacon_node/eth2_libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs index de74e98dd7a..81804c82a75 100644 --- a/beacon_node/eth2_libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2_libp2p/src/rpc/protocol.rs @@ -1,17 +1,13 @@ use super::methods::*; use crate::rpc::{ - codec::{ - base::{BaseInboundCodec, BaseOutboundCodec}, - ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}, - InboundCodec, OutboundCodec, - }, + codec::{base::BaseInboundCodec, ssz_snappy::SSZSnappyInboundCodec, InboundCodec}, methods::{MaxErrorLen, ResponseTermination, MAX_ERROR_LEN}, MaxRequestBlocks, MAX_REQUEST_BLOCKS, }; use futures::future::BoxFuture; use futures::prelude::{AsyncRead, AsyncWrite}; -use futures::{FutureExt, SinkExt, StreamExt}; -use libp2p::core::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; +use futures::{FutureExt, StreamExt}; +use libp2p::core::{InboundUpgrade, ProtocolName, UpgradeInfo}; use ssz::Encode; use ssz_types::VariableList; use std::io; @@ -78,7 +74,7 @@ const TTFB_TIMEOUT: u64 = 5; const REQUEST_TIMEOUT: u64 = 15; /// Protocol names to be used. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Protocol { /// The Status protocol name. Status, @@ -276,7 +272,7 @@ impl ProtocolName for ProtocolId { // The inbound protocol reads the request, decodes it and returns the stream to the protocol // handler to respond to once ready. -pub type InboundOutput = (RPCRequest, InboundFramed); +pub type InboundOutput = (InboundRequest, InboundFramed); pub type InboundFramed = Framed>>>, InboundCodec>; @@ -308,7 +304,7 @@ where // MetaData requests should be empty, return the stream match protocol_name { - Protocol::MetaData => Ok((RPCRequest::MetaData(PhantomData), socket)), + Protocol::MetaData => Ok((InboundRequest::MetaData(PhantomData), socket)), _ => { match tokio::time::timeout( Duration::from_secs(REQUEST_TIMEOUT), @@ -328,13 +324,8 @@ where } } -/* Outbound request */ - -// Combines all the RPC requests into a single enum to implement `UpgradeInfo` and -// `OutboundUpgrade` - #[derive(Debug, Clone, PartialEq)] -pub enum RPCRequest { +pub enum InboundRequest { Status(StatusMessage), Goodbye(GoodbyeReason), BlocksByRange(BlocksByRangeRequest), @@ -343,7 +334,7 @@ pub enum RPCRequest { MetaData(PhantomData), } -impl UpgradeInfo for RPCRequest { +impl UpgradeInfo for InboundRequest { type Info = ProtocolId; type InfoIter = Vec; @@ -354,36 +345,36 @@ impl UpgradeInfo for RPCRequest { } /// Implements the encoding per supported protocol for `RPCRequest`. -impl RPCRequest { +impl InboundRequest { pub fn supported_protocols(&self) -> Vec { match self { // add more protocols when versions/encodings are supported - RPCRequest::Status(_) => vec![ProtocolId::new( + InboundRequest::Status(_) => vec![ProtocolId::new( Protocol::Status, Version::V1, Encoding::SSZSnappy, )], - RPCRequest::Goodbye(_) => vec![ProtocolId::new( + InboundRequest::Goodbye(_) => vec![ProtocolId::new( Protocol::Goodbye, Version::V1, Encoding::SSZSnappy, )], - RPCRequest::BlocksByRange(_) => vec![ProtocolId::new( + InboundRequest::BlocksByRange(_) => vec![ProtocolId::new( Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy, )], - RPCRequest::BlocksByRoot(_) => vec![ProtocolId::new( + InboundRequest::BlocksByRoot(_) => vec![ProtocolId::new( Protocol::BlocksByRoot, Version::V1, Encoding::SSZSnappy, )], - RPCRequest::Ping(_) => vec![ProtocolId::new( + InboundRequest::Ping(_) => vec![ProtocolId::new( Protocol::Ping, Version::V1, Encoding::SSZSnappy, )], - RPCRequest::MetaData(_) => vec![ProtocolId::new( + InboundRequest::MetaData(_) => vec![ProtocolId::new( Protocol::MetaData, Version::V1, Encoding::SSZSnappy, @@ -396,24 +387,24 @@ impl RPCRequest { /// Number of responses expected for this request. pub fn expected_responses(&self) -> u64 { match self { - RPCRequest::Status(_) => 1, - RPCRequest::Goodbye(_) => 0, - RPCRequest::BlocksByRange(req) => req.count, - RPCRequest::BlocksByRoot(req) => req.block_roots.len() as u64, - RPCRequest::Ping(_) => 1, - RPCRequest::MetaData(_) => 1, + InboundRequest::Status(_) => 1, + InboundRequest::Goodbye(_) => 0, + InboundRequest::BlocksByRange(req) => req.count, + InboundRequest::BlocksByRoot(req) => req.block_roots.len() as u64, + InboundRequest::Ping(_) => 1, + InboundRequest::MetaData(_) => 1, } } /// Gives the corresponding `Protocol` to this request. pub fn protocol(&self) -> Protocol { match self { - RPCRequest::Status(_) => Protocol::Status, - RPCRequest::Goodbye(_) => Protocol::Goodbye, - RPCRequest::BlocksByRange(_) => Protocol::BlocksByRange, - RPCRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, - RPCRequest::Ping(_) => Protocol::Ping, - RPCRequest::MetaData(_) => Protocol::MetaData, + InboundRequest::Status(_) => Protocol::Status, + InboundRequest::Goodbye(_) => Protocol::Goodbye, + InboundRequest::BlocksByRange(_) => Protocol::BlocksByRange, + InboundRequest::BlocksByRoot(_) => Protocol::BlocksByRoot, + InboundRequest::Ping(_) => Protocol::Ping, + InboundRequest::MetaData(_) => Protocol::MetaData, } } @@ -423,53 +414,18 @@ impl RPCRequest { match self { // this only gets called after `multiple_responses()` returns true. Therefore, only // variants that have `multiple_responses()` can have values. - RPCRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, - RPCRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, - RPCRequest::Status(_) => unreachable!(), - RPCRequest::Goodbye(_) => unreachable!(), - RPCRequest::Ping(_) => unreachable!(), - RPCRequest::MetaData(_) => unreachable!(), + InboundRequest::BlocksByRange(_) => ResponseTermination::BlocksByRange, + InboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, + InboundRequest::Status(_) => unreachable!(), + InboundRequest::Goodbye(_) => unreachable!(), + InboundRequest::Ping(_) => unreachable!(), + InboundRequest::MetaData(_) => unreachable!(), } } } /* RPC Response type - used for outbound upgrades */ -/* Outbound upgrades */ - -pub type OutboundFramed = Framed, OutboundCodec>; - -impl OutboundUpgrade for RPCRequest -where - TSpec: EthSpec + Send + 'static, - TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - type Output = OutboundFramed; - type Error = RPCError; - type Future = BoxFuture<'static, Result>; - - fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future { - // convert to a tokio compatible socket - let socket = socket.compat(); - let codec = match protocol.encoding { - Encoding::SSZSnappy => { - let ssz_snappy_codec = - BaseOutboundCodec::new(SSZSnappyOutboundCodec::new(protocol, MAX_RPC_SIZE)); - OutboundCodec::SSZSnappy(ssz_snappy_codec) - } - }; - - let mut socket = Framed::new(socket, codec); - - async { - socket.send(self).await?; - socket.close().await?; - Ok(socket) - } - .boxed() - } -} - /// Error in RPC Encoding/Decoding. #[derive(Debug, Clone, PartialEq, AsStaticStr)] #[strum(serialize_all = "snake_case")] @@ -556,15 +512,15 @@ impl std::error::Error for RPCError { } } -impl std::fmt::Display for RPCRequest { +impl std::fmt::Display for InboundRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - RPCRequest::Status(status) => write!(f, "Status Message: {}", status), - RPCRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), - RPCRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), - RPCRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), - RPCRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), - RPCRequest::MetaData(_) => write!(f, "MetaData request"), + InboundRequest::Status(status) => write!(f, "Status Message: {}", status), + InboundRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), + InboundRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), + InboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), + InboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), + InboundRequest::MetaData(_) => write!(f, "MetaData request"), } } } diff --git a/beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs b/beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs index 07aa6330f36..5e1b533c600 100644 --- a/beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs +++ b/beacon_node/eth2_libp2p/src/rpc/rate_limiter.rs @@ -1,4 +1,4 @@ -use crate::rpc::{Protocol, RPCRequest}; +use crate::rpc::{InboundRequest, Protocol}; use fnv::FnvHashMap; use libp2p::PeerId; use std::convert::TryInto; @@ -185,7 +185,7 @@ impl RPCRateLimiter { pub fn allows( &mut self, peer_id: &PeerId, - request: &RPCRequest, + request: &InboundRequest, ) -> Result<(), RateLimitedErr> { let time_since_start = self.init_time.elapsed(); let mut tokens = request.expected_responses().max(1); @@ -207,7 +207,7 @@ impl RPCRateLimiter { // 9 | 4 // 10 | 5 - if let RPCRequest::BlocksByRange(bbr_req) = request { + if let InboundRequest::BlocksByRange(bbr_req) = request { let penalty_factor = (bbr_req.step as f64 / 5.0).powi(2) as u64 + 1; tokens *= penalty_factor; }