Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - split outbound and inbound codecs encoded types #2410

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/behaviour/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<TSpec: EthSpec> BehaviourHandler<TSpec> {
pub enum BehaviourHandlerIn<TSpec: EthSpec> {
Delegate(DelegateIn<TSpec>),
/// Start the shutdown process.
Shutdown(Option<(RequestId, RPCRequest<TSpec>)>),
Shutdown(Option<(RequestId, OutboundRequest<TSpec>)>),
}

impl<TSpec: EthSpec> ProtocolsHandler for BehaviourHandler<TSpec> {
Expand Down
30 changes: 15 additions & 15 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
trace!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => %peer_id);

self.eth2_rpc
.send_request(peer_id, id, RPCRequest::Ping(ping));
.send_request(peer_id, id, OutboundRequest::Ping(ping));
}

/// Sends a Pong response to the peer.
Expand All @@ -610,7 +610,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {

/// Sends a METADATA request to a peer.
fn send_meta_data_request(&mut self, peer_id: PeerId) {
let event = RPCRequest::MetaData(PhantomData);
let event = OutboundRequest::MetaData(PhantomData);
self.eth2_rpc
.send_request(peer_id, RequestId::Behaviour, event);
}
Expand Down Expand Up @@ -749,17 +749,17 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
let peer_request_id = (handler_id, id);
match request {
/* Behaviour managed protocols: Ping and Metadata */
RPCRequest::Ping(ping) => {
InboundRequest::Ping(ping) => {
// inform the peer manager and send the response
self.peer_manager.ping_request(&peer_id, ping.data);
// send a ping response
self.pong(peer_request_id, peer_id);
}
RPCRequest::MetaData(_) => {
InboundRequest::MetaData(_) => {
// send the requested meta-data
self.send_meta_data_response((handler_id, id), peer_id);
}
RPCRequest::Goodbye(reason) => {
InboundRequest::Goodbye(reason) => {
// queue for disconnection without a goodbye message
debug!(
self.log, "Peer sent Goodbye";
Expand All @@ -775,18 +775,18 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
// inform the application layer early.
}
/* Protocols propagated to the Network */
RPCRequest::Status(msg) => {
InboundRequest::Status(msg) => {
// inform the peer manager that we have received a status from a peer
self.peer_manager.peer_statusd(&peer_id);
// propagate the STATUS message upwards
self.propagate_request(peer_request_id, peer_id, Request::Status(msg))
}
RPCRequest::BlocksByRange(req) => self.propagate_request(
InboundRequest::BlocksByRange(req) => self.propagate_request(
peer_request_id,
peer_id,
Request::BlocksByRange(req),
),
RPCRequest::BlocksByRoot(req) => {
InboundRequest::BlocksByRoot(req) => {
self.propagate_request(peer_request_id, peer_id, Request::BlocksByRoot(req))
}
}
Expand Down Expand Up @@ -834,7 +834,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
peer_id,
handler: NotifyHandler::Any,
event: BehaviourHandlerIn::Shutdown(
reason.map(|reason| (RequestId::Behaviour, RPCRequest::Goodbye(reason))),
reason.map(|reason| (RequestId::Behaviour, OutboundRequest::Goodbye(reason))),
),
});
}
Expand Down Expand Up @@ -878,7 +878,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
handler: NotifyHandler::Any,
event: BehaviourHandlerIn::Shutdown(Some((
RequestId::Behaviour,
RPCRequest::Goodbye(reason),
OutboundRequest::Goodbye(reason),
))),
});
}
Expand Down Expand Up @@ -1293,12 +1293,12 @@ pub enum Request {
BlocksByRoot(BlocksByRootRequest),
}

impl<TSpec: EthSpec> std::convert::From<Request> for RPCRequest<TSpec> {
fn from(req: Request) -> RPCRequest<TSpec> {
impl<TSpec: EthSpec> std::convert::From<Request> for OutboundRequest<TSpec> {
fn from(req: Request) -> OutboundRequest<TSpec> {
match req {
Request::BlocksByRoot(r) => RPCRequest::BlocksByRoot(r),
Request::BlocksByRange(r) => RPCRequest::BlocksByRange(r),
Request::Status(s) => RPCRequest::Status(s),
Request::BlocksByRoot(r) => OutboundRequest::BlocksByRoot(r),
Request::BlocksByRange(r) => OutboundRequest::BlocksByRange(r),
Request::Status(s) => OutboundRequest::Status(s),
}
}
}
Expand Down
24 changes: 14 additions & 10 deletions beacon_node/eth2_libp2p/src/rpc/codec/base.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This handles the various supported encoding mechanism for the Eth 2.0 RPC.

use crate::rpc::methods::ErrorType;
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse};
use libp2p::bytes::BufMut;
use libp2p::bytes::BytesMut;
use std::marker::PhantomData;
Expand Down Expand Up @@ -47,7 +47,7 @@ where
// This deals with Decoding RPC Responses from other peers and encoding our requests
pub struct BaseOutboundCodec<TOutboundCodec, TSpec>
where
TOutboundCodec: OutboundCodec<RPCRequest<TSpec>>,
TOutboundCodec: OutboundCodec<OutboundRequest<TSpec>>,
TSpec: EthSpec,
{
/// Inner codec for handling various encodings.
Expand All @@ -60,7 +60,7 @@ where
impl<TOutboundCodec, TSpec> BaseOutboundCodec<TOutboundCodec, TSpec>
where
TSpec: EthSpec,
TOutboundCodec: OutboundCodec<RPCRequest<TSpec>>,
TOutboundCodec: OutboundCodec<OutboundRequest<TSpec>>,
{
pub fn new(codec: TOutboundCodec) -> Self {
BaseOutboundCodec {
Expand Down Expand Up @@ -102,9 +102,9 @@ where
impl<TCodec, TSpec> Decoder for BaseInboundCodec<TCodec, TSpec>
where
TSpec: EthSpec,
TCodec: Encoder<RPCCodedResponse<TSpec>> + Decoder<Item = RPCRequest<TSpec>>,
TCodec: Encoder<RPCCodedResponse<TSpec>> + Decoder<Item = InboundRequest<TSpec>>,
{
type Item = RPCRequest<TSpec>;
type Item = InboundRequest<TSpec>;
type Error = <TCodec as Decoder>::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand All @@ -115,14 +115,18 @@ where
/* Base Outbound Codec */

// This Encodes RPC Requests sent to external peers
impl<TCodec, TSpec> Encoder<RPCRequest<TSpec>> for BaseOutboundCodec<TCodec, TSpec>
impl<TCodec, TSpec> Encoder<OutboundRequest<TSpec>> for BaseOutboundCodec<TCodec, TSpec>
where
TSpec: EthSpec,
TCodec: OutboundCodec<RPCRequest<TSpec>> + Encoder<RPCRequest<TSpec>>,
TCodec: OutboundCodec<OutboundRequest<TSpec>> + Encoder<OutboundRequest<TSpec>>,
{
type Error = <TCodec as Encoder<RPCRequest<TSpec>>>::Error;
type Error = <TCodec as Encoder<OutboundRequest<TSpec>>>::Error;

fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(
&mut self,
item: OutboundRequest<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
self.inner.encode(item, dst)
}
}
Expand All @@ -131,7 +135,7 @@ where
impl<TCodec, TSpec> Decoder for BaseOutboundCodec<TCodec, TSpec>
where
TSpec: EthSpec,
TCodec: OutboundCodec<RPCRequest<TSpec>, CodecErrorType = ErrorType>
TCodec: OutboundCodec<OutboundRequest<TSpec>, CodecErrorType = ErrorType>
+ Decoder<Item = RPCResponse<TSpec>>,
{
type Item = RPCCodedResponse<TSpec>;
Expand Down
12 changes: 8 additions & 4 deletions beacon_node/eth2_libp2p/src/rpc/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub(crate) mod ssz_snappy;
use self::base::{BaseInboundCodec, BaseOutboundCodec};
use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec};
use crate::rpc::protocol::RPCError;
use crate::rpc::{RPCCodedResponse, RPCRequest};
use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse};
use libp2p::bytes::BytesMut;
use tokio_util::codec::{Decoder, Encoder};
use types::EthSpec;
Expand All @@ -29,7 +29,7 @@ impl<T: EthSpec> Encoder<RPCCodedResponse<T>> for InboundCodec<T> {
}

impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
type Item = InboundRequest<TSpec>;
type Error = RPCError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand All @@ -39,10 +39,14 @@ impl<TSpec: EthSpec> Decoder for InboundCodec<TSpec> {
}
}

impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for OutboundCodec<TSpec> {
impl<TSpec: EthSpec> Encoder<OutboundRequest<TSpec>> for OutboundCodec<TSpec> {
type Error = RPCError;

fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(
&mut self,
item: OutboundRequest<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
match self {
OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst),
}
Expand Down
49 changes: 27 additions & 22 deletions beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::rpc::methods::*;
use crate::rpc::{
codec::base::OutboundCodec,
protocol::{Encoding, Protocol, ProtocolId, RPCError, Version, ERROR_TYPE_MAX, ERROR_TYPE_MIN},
};
use crate::rpc::{RPCCodedResponse, RPCRequest, RPCResponse};
use crate::rpc::{methods::*, InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse};
use libp2p::bytes::BytesMut;
use snap::read::FrameDecoder;
use snap::write::FrameEncoder;
Expand Down Expand Up @@ -90,7 +89,7 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<

// Decoder for inbound streams: Decodes RPC requests from peers
impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
type Item = RPCRequest<TSpec>;
type Item = InboundRequest<TSpec>;
type Error = RPCError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand Down Expand Up @@ -133,27 +132,29 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
// since we have already checked `length` above.
match self.protocol.message_name {
Protocol::Status => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
&decoded_buffer,
)?))),
Version::V1 => Ok(Some(InboundRequest::Status(
StatusMessage::from_ssz_bytes(&decoded_buffer)?,
))),
},
Protocol::Goodbye => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Goodbye(
Version::V1 => Ok(Some(InboundRequest::Goodbye(
GoodbyeReason::from_ssz_bytes(&decoded_buffer)?,
))),
},
Protocol::BlocksByRange => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRange(
Version::V1 => Ok(Some(InboundRequest::BlocksByRange(
BlocksByRangeRequest::from_ssz_bytes(&decoded_buffer)?,
))),
},
Protocol::BlocksByRoot => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?,
}))),
Version::V1 => {
Ok(Some(InboundRequest::BlocksByRoot(BlocksByRootRequest {
block_roots: VariableList::from_ssz_bytes(&decoded_buffer)?,
})))
}
},
Protocol::Ping => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Ping(Ping {
Version::V1 => Ok(Some(InboundRequest::Ping(Ping {
data: u64::from_ssz_bytes(&decoded_buffer)?,
}))),
},
Expand All @@ -163,7 +164,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
if !decoded_buffer.is_empty() {
Err(RPCError::InvalidData)
} else {
Ok(Some(RPCRequest::MetaData(PhantomData)))
Ok(Some(InboundRequest::MetaData(PhantomData)))
}
}
},
Expand Down Expand Up @@ -201,17 +202,21 @@ impl<TSpec: EthSpec> SSZSnappyOutboundCodec<TSpec> {
}

// Encoder for outbound streams: Encodes RPC Requests to peers
impl<TSpec: EthSpec> Encoder<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
impl<TSpec: EthSpec> Encoder<OutboundRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type Error = RPCError;

fn encode(&mut self, item: RPCRequest<TSpec>, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(
&mut self,
item: OutboundRequest<TSpec>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
let bytes = match item {
RPCRequest::Status(req) => req.as_ssz_bytes(),
RPCRequest::Goodbye(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(),
RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
RPCRequest::Ping(req) => req.as_ssz_bytes(),
RPCRequest::MetaData(_) => return Ok(()), // no metadata to encode
OutboundRequest::Status(req) => req.as_ssz_bytes(),
OutboundRequest::Goodbye(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
// SSZ encoded bytes should be within `max_packet_size`
if bytes.len() > self.max_packet_size {
Expand Down Expand Up @@ -318,7 +323,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
}
}

impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
impl<TSpec: EthSpec> OutboundCodec<OutboundRequest<TSpec>> for SSZSnappyOutboundCodec<TSpec> {
type CodecErrorType = ErrorType;

fn decode_error(
Expand Down
17 changes: 9 additions & 8 deletions beacon_node/eth2_libp2p/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
#![allow(clippy::cognitive_complexity)]

use super::methods::{RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination};
use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest};
use super::protocol::{Protocol, RPCError, RPCProtocol};
use super::{RPCReceived, RPCSend};
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
use crate::rpc::protocol::InboundFramed;
use fnv::FnvHashMap;
use futures::prelude::*;
use futures::{Sink, SinkExt};
Expand Down Expand Up @@ -90,7 +91,7 @@ where
events_out: SmallVec<[HandlerEvent<TSpec>; 4]>,

/// Queue of outbound substreams to open.
dial_queue: SmallVec<[(RequestId, RPCRequest<TSpec>); 4]>,
dial_queue: SmallVec<[(RequestId, OutboundRequest<TSpec>); 4]>,

/// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32,
Expand Down Expand Up @@ -186,7 +187,7 @@ pub enum OutboundSubstreamState<TSpec: EthSpec> {
/// The framed negotiated substream.
substream: Box<OutboundFramed<NegotiatedSubstream, TSpec>>,
/// Keeps track of the actual request sent.
request: RPCRequest<TSpec>,
request: OutboundRequest<TSpec>,
},
/// Closing an outbound substream>
Closing(Box<OutboundFramed<NegotiatedSubstream, TSpec>>),
Expand Down Expand Up @@ -221,7 +222,7 @@ where
}

/// Initiates the handler's shutdown process, sending an optional last message to the peer.
pub fn shutdown(&mut self, final_msg: Option<(RequestId, RPCRequest<TSpec>)>) {
pub fn shutdown(&mut self, final_msg: Option<(RequestId, OutboundRequest<TSpec>)>) {
if matches!(self.state, HandlerState::Active) {
if !self.dial_queue.is_empty() {
debug!(self.log, "Starting handler shutdown"; "unsent_queued_requests" => self.dial_queue.len());
Expand All @@ -247,7 +248,7 @@ where
}

/// Opens an outbound substream with a request.
fn send_request(&mut self, id: RequestId, req: RPCRequest<TSpec>) {
fn send_request(&mut self, id: RequestId, req: OutboundRequest<TSpec>) {
match self.state {
HandlerState::Active => {
self.dial_queue.push((id, req));
Expand Down Expand Up @@ -303,8 +304,8 @@ where
type OutEvent = HandlerEvent<TSpec>;
type Error = RPCError;
type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = RPCRequest<TSpec>;
type OutboundOpenInfo = (RequestId, RPCRequest<TSpec>); // Keep track of the id and the request
type OutboundProtocol = OutboundRequest<TSpec>;
type OutboundOpenInfo = (RequestId, OutboundRequest<TSpec>); // Keep track of the id and the request
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
Expand Down
Loading