Skip to content

Commit

Permalink
Fix clippy lints rpc (#1401)
Browse files Browse the repository at this point in the history
## Issue Addressed
#1388 partially (eth2_libp2p & network)

## Proposed Changes 
TLDR at the end
- *Complex types* are 3 on the handlers/Behaviours but the types are `Poll<ComplexType>` where `ComplexType` comes from the traits of libp2p. Those, I don't thing are worth an alias. A couple more were from using tokio combinators and were removed writing things the async way and using [`BoxFuture`](https://docs.rs/futures/0.3.5/futures/future/type.BoxFuture.html)
- The *cognitive complexity*.. I tried to address those before (they come from the poll functions too) and tbh they are cognitively simpler to understand the way they are now. Moving separate parts to functions doesn't add much since that code is not repeated and they all do early returns. If moved those returns would now need to be wrapped in an Option, probably, and checked to be returned again. I would leave them like that but that's just preference.
- *Too many arguments*: They are not easily put together in a wrapping struct since the parameters don't relate semantically (Ex: fn new with a log, a reference to the chain, a peer, etc) but some may differ.
- *Needless returns* were indeed needless

## Additional Info
TLDR: removed needless return, used BoxFuture and async, left the rest untouched since those lgtm
  • Loading branch information
divagant-martian committed Jul 28, 2020
1 parent edf250c commit 9ae9df8
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 54 deletions.
95 changes: 44 additions & 51 deletions beacon_node/eth2_libp2p/src/rpc/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(clippy::type_complexity)]

use super::methods::*;
use crate::rpc::{
codec::{
Expand All @@ -11,15 +9,14 @@ use crate::rpc::{
methods::ResponseTermination,
MaxRequestBlocks, MAX_REQUEST_BLOCKS,
};
use futures::future::Ready;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::prelude::{AsyncRead, AsyncWrite};
use libp2p::core::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
use ssz::Encode;
use ssz_types::VariableList;
use std::io;
use std::marker::PhantomData;
use std::pin::Pin;
use std::time::Duration;
use tokio_io_timeout::TimeoutStream;
use tokio_util::{
Expand Down Expand Up @@ -206,13 +203,6 @@ impl ProtocolName for ProtocolId {
pub type InboundOutput<TSocket, TSpec> = (RPCRequest<TSpec>, InboundFramed<TSocket, TSpec>);
pub type InboundFramed<TSocket, TSpec> =
Framed<TimeoutStream<Compat<TSocket>>, InboundCodec<TSpec>>;
type FnAndThen<TSocket, TSpec> = fn(
(
Option<Result<RPCRequest<TSpec>, RPCError>>,
InboundFramed<TSocket, TSpec>,
),
) -> Ready<Result<InboundOutput<TSocket, TSpec>, RPCError>>;
type FnMapErr = fn(tokio::time::Elapsed) -> RPCError;

impl<TSocket, TSpec> InboundUpgrade<TSocket> for RPCProtocol<TSpec>
where
Expand All @@ -221,45 +211,48 @@ where
{
type Output = InboundOutput<TSocket, TSpec>;
type Error = RPCError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future {
let protocol_name = protocol.message_name;
// convert the socket to tokio compatible socket
let socket = socket.compat();
let codec = match protocol.encoding {
Encoding::SSZSnappy => {
let ssz_snappy_codec =
BaseInboundCodec::new(SSZSnappyInboundCodec::new(protocol, MAX_RPC_SIZE));
InboundCodec::SSZSnappy(ssz_snappy_codec)
}
Encoding::SSZ => {
let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE));
InboundCodec::SSZ(ssz_codec)
}
};
let mut timed_socket = TimeoutStream::new(socket);
timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT)));

let socket = Framed::new(timed_socket, codec);

// MetaData requests should be empty, return the stream
Box::pin(match protocol_name {
Protocol::MetaData => {
future::Either::Left(future::ok((RPCRequest::MetaData(PhantomData), socket)))
async move {
let protocol_name = protocol.message_name;
// convert the socket to tokio compatible socket
let socket = socket.compat();
let codec = match protocol.encoding {
Encoding::SSZSnappy => {
let ssz_snappy_codec =
BaseInboundCodec::new(SSZSnappyInboundCodec::new(protocol, MAX_RPC_SIZE));
InboundCodec::SSZSnappy(ssz_snappy_codec)
}
Encoding::SSZ => {
let ssz_codec =
BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE));
InboundCodec::SSZ(ssz_codec)
}
};
let mut timed_socket = TimeoutStream::new(socket);
timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT)));

let socket = Framed::new(timed_socket, codec);

// MetaData requests should be empty, return the stream
match protocol_name {
Protocol::MetaData => Ok((RPCRequest::MetaData(PhantomData), socket)),
_ => {
match tokio::time::timeout(
Duration::from_secs(REQUEST_TIMEOUT),
socket.into_future(),
)
.await
{
Err(e) => Err(RPCError::from(e)),
Ok((Some(Ok(request)), stream)) => Ok((request, stream)),
Ok((Some(Err(_)), _)) | Ok((None, _)) => Err(RPCError::IncompleteStream),
}
}
}

_ => future::Either::Right(
tokio::time::timeout(Duration::from_secs(REQUEST_TIMEOUT), socket.into_future())
.map_err(RPCError::from as FnMapErr)
.and_then({
|(req, stream)| match req {
Some(Ok(request)) => future::ok((request, stream)),
Some(Err(_)) | None => future::err(RPCError::IncompleteStream),
}
} as FnAndThen<TSocket, TSpec>),
),
})
}
.boxed()
}
}

Expand Down Expand Up @@ -375,7 +368,7 @@ where
{
type Output = OutboundFramed<TSocket, TSpec>;
type Error = RPCError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future {
// convert to a tokio compatible socket
Expand All @@ -395,12 +388,12 @@ where

let mut socket = Framed::new(socket, codec);

let future = async {
async {
socket.send(self).await?;
socket.close().await?;
Ok(socket)
};
Box::pin(future)
}
.boxed()
}
}

Expand Down
4 changes: 1 addition & 3 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,11 +593,11 @@ impl<T: BeaconChainTypes> SyncManager<T> {
info!(self.log, "Sync state updated"; "old_state" => format!("{}", old_state), "new_state" => format!("{}",new_state));
}
}

/* Processing State Functions */
// These functions are called in the main poll function to transition the state of the sync
// manager

#[allow(clippy::needless_return)]
/// A new block has been received for a parent lookup query, process it.
fn process_parent_request(&mut self, mut parent_request: ParentRequests<T::EthSpec>) {
// verify the last added block is the parent of the last requested block
Expand Down Expand Up @@ -658,7 +658,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
// add the block back to the queue and continue the search
parent_request.downloaded_blocks.push(newest_block);
self.request_parent(parent_request);
return;
}
Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => {
spawn_block_processor(
Expand All @@ -685,7 +684,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
parent_request.last_submitted_peer,
PeerAction::MidToleranceError,
);
return;
}
}
}
Expand Down

0 comments on commit 9ae9df8

Please sign in to comment.