Skip to content

Commit

Permalink
Convert to extension traits
Browse files Browse the repository at this point in the history
  • Loading branch information
lithp committed Oct 19, 2021
1 parent 743e299 commit 8ecb3b9
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 169 deletions.
6 changes: 4 additions & 2 deletions ethportal-peertest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use log::info;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use trin_core::locks::RwLoggingExt;
use trin_core::portalnet::utp::UtpListener;
use trin_core::portalnet::{
discovery::Discovery,
overlay::{OverlayConfig, OverlayProtocol},
types::{PortalnetConfig, ProtocolKind},
Enr, U256,
};
use trin_core::rw_read;
use trin_core::utils::setup_overlay_db;

#[tokio::main]
Expand All @@ -28,7 +28,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let discovery = Arc::new(RwLock::new(Discovery::new(portal_config).unwrap()));
discovery.write().await.start().await.unwrap();

let db = Arc::new(setup_overlay_db(rw_read!(discovery).local_enr().node_id()));
let db = Arc::new(setup_overlay_db(
discovery.read_with_warn().await.local_enr().node_id(),
));

let overlay = Arc::new(
OverlayProtocol::new(
Expand Down
9 changes: 5 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use tokio::sync::RwLock;

use trin_core::jsonrpc::handlers::JsonRpcHandler;
use trin_core::jsonrpc::types::PortalJsonRpcRequest;
use trin_core::locks::RwLoggingExt;
use trin_core::portalnet::events::PortalnetEvents;
use trin_core::rw_read;
use trin_core::rw_write;
use trin_core::{
cli::{TrinConfig, HISTORY_NETWORK, STATE_NETWORK},
jsonrpc::service::launch_jsonrpc_server,
Expand Down Expand Up @@ -54,10 +53,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let discovery = Arc::new(RwLock::new(
Discovery::new(portalnet_config.clone()).unwrap(),
));
rw_write!(discovery).start().await.unwrap();
discovery.write_with_warn().await.start().await.unwrap();

// Setup Overlay database
let db = Arc::new(setup_overlay_db(rw_read!(discovery).local_enr().node_id()));
let db = Arc::new(setup_overlay_db(
discovery.read_with_warn().await.local_enr().node_id(),
));

debug!("Selected networks to spawn: {:?}", trin_config.networks);
// Initialize state sub-network service and event handlers, if selected
Expand Down
7 changes: 3 additions & 4 deletions trin-core/src/jsonrpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use tokio::sync::RwLock;

use crate::jsonrpc::endpoints::{Discv5Endpoint, HistoryEndpoint, StateEndpoint, TrinEndpoint};
use crate::jsonrpc::types::{HistoryJsonRpcRequest, PortalJsonRpcRequest, StateJsonRpcRequest};
use crate::locks::RwLoggingExt;
use crate::portalnet::discovery::Discovery;
use crate::rw_read;
use crate::rw_write;

type Responder<T, E> = mpsc::UnboundedSender<Result<T, E>>;

Expand All @@ -27,9 +26,9 @@ impl JsonRpcHandler {
while let Some(request) = self.portal_jsonrpc_rx.recv().await {
let response: Value = match request.endpoint {
TrinEndpoint::Discv5Endpoint(endpoint) => match endpoint {
Discv5Endpoint::NodeInfo => rw_read!(self.discovery).node_info(),
Discv5Endpoint::NodeInfo => self.discovery.read_with_warn().await.node_info(),
Discv5Endpoint::RoutingTableInfo => {
rw_write!(self.discovery).routing_table_info()
self.discovery.write_with_warn().await.routing_table_info()
}
},
TrinEndpoint::HistoryEndpoint(endpoint) => {
Expand Down
130 changes: 1 addition & 129 deletions trin-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,135 +3,7 @@ extern crate lazy_static;

pub mod cli;
pub mod jsonrpc;
pub mod locks;
pub mod portalnet;
pub mod socket;
pub mod utils;

pub const ACQUIRE_TIMEOUT_MS: u64 = 100;
pub const HOLD_TIMEOUT_MS: u64 = 100;

pub struct TimedGuard<T> {
pub inner: T,
pub acquisition_line: u32,
pub acquisition_file: &'static str,
pub acquisition_time: std::time::Instant,
pub sleep_task: tokio::task::JoinHandle<()>,
}

impl<T> TimedGuard<T> {
pub async fn new(
inner: T,
acquisition_line: u32,
acquisition_file: &'static str,
) -> TimedGuard<T> {
let now = std::time::Instant::now();
let move_line = acquisition_line;
let move_file = acquisition_file;
let handle = tokio::spawn(async move {
sleep_then_log(move_file, move_line).await;
});

TimedGuard {
inner,
acquisition_line,
acquisition_file,
acquisition_time: now,
sleep_task: handle,
}
}
}

impl<T> std::ops::Deref for TimedGuard<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<T> std::ops::DerefMut for TimedGuard<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl<T> Drop for TimedGuard<T> {
fn drop(&mut self) {
self.sleep_task.abort();
let held_for = self.acquisition_time.elapsed().as_millis();
if held_for > HOLD_TIMEOUT_MS.into() {
log::warn!(
"[{}:{}] lock held for too long: {}ms",
self.acquisition_file,
self.acquisition_line,
held_for,
)
}
}
}

async fn sleep_then_log(file: &'static str, line: u32) {
tokio::time::sleep(std::time::Duration::from_millis(HOLD_TIMEOUT_MS)).await;
log::warn!(
"[{}:{}] lock held for over {}ms, not yet released",
file,
line,
HOLD_TIMEOUT_MS.to_string()
);
}

#[macro_export]
macro_rules! inner_rw {
($meth: ident, $lock_name:expr) => {{
let acquire_timeout = std::time::Duration::from_millis($crate::ACQUIRE_TIMEOUT_MS);
let sleep = tokio::time::sleep(acquire_timeout);
tokio::pin!(sleep);

let mut did_log: bool = false;
let now = std::time::Instant::now();

loop {
tokio::select! {
_ = &mut sleep, if !did_log => {
log::warn!(
"[{}:{}] waiting more than {}ms to acquire lock, still waiting",
std::file!(),
std::line!(),
$crate::ACQUIRE_TIMEOUT_MS,
);
did_log = true;
}
guard = $lock_name.$meth() => {
if did_log {
let wait_time = now.elapsed().as_millis();
log::warn!(
"[{}:{}] waited {}ms to acquire lock",
std::file!(),
std::line!(),
wait_time,
);
}

let wrapped = $crate::TimedGuard::new(
guard, std::line!(), std::file!(),
).await;
break wrapped;
}
}
}
}};
}

#[macro_export]
macro_rules! rw_write {
($lock_name:expr) => {{
$crate::inner_rw!(write, $lock_name)
}}
}

#[macro_export]
macro_rules! rw_read {
($lock_name:expr) => {{
$crate::inner_rw!(read, $lock_name)
}}
}
153 changes: 153 additions & 0 deletions trin-core/src/locks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use futures::future::FutureExt;
use std::future::Future;
use std::marker::Sync;
use std::ops::Deref;
use std::ops::DerefMut;
use std::panic::Location;
use std::pin::Pin;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::RwLock;
use tokio::sync::RwLockReadGuard;
use tokio::sync::RwLockWriteGuard;
use tokio::task::JoinHandle;

const ACQUIRE_TIMEOUT_MS: u64 = 100;
const HOLD_TIMEOUT_MS: u64 = 100;

/// Tries to look exactly like a T, by implementing Deref and DerefMut, but emits
/// a warning if drop() is not called soon enough.
pub struct TimedGuard<T> {
inner: T,
acquisition_line: u32,
acquisition_file: &'static str,
acquisition_time: Instant,
sleep_task: JoinHandle<()>,
}

impl<T> TimedGuard<T> {
fn new(inner: T, acquisition_line: u32, acquisition_file: &'static str) -> TimedGuard<T> {
let now = Instant::now();
let move_line = acquisition_line;
let move_file = acquisition_file;
let handle = tokio::spawn(async move {
sleep_then_log(move_file, move_line).await;
});

TimedGuard {
inner,
acquisition_line,
acquisition_file,
acquisition_time: now,
sleep_task: handle,
}
}
}

impl<T> Deref for TimedGuard<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<T> DerefMut for TimedGuard<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl<T> Drop for TimedGuard<T> {
fn drop(&mut self) {
self.sleep_task.abort();
let held_for = self.acquisition_time.elapsed().as_millis();
if held_for > HOLD_TIMEOUT_MS.into() {
log::warn!(
"[{}:{}] lock held for too long: {}ms",
self.acquisition_file,
self.acquisition_line,
held_for,
)
}
}
}

async fn sleep_then_log(file: &'static str, line: u32) {
tokio::time::sleep(Duration::from_millis(HOLD_TIMEOUT_MS)).await;
log::warn!(
"[{}:{}] lock held for over {}ms, not yet released",
file,
line,
HOLD_TIMEOUT_MS.to_string()
);
}

async fn try_lock<T, Fut>(fut: Fut, file: &'static str, line: u32) -> TimedGuard<T>
where
Fut: Future<Output = T>,
{
let acquire_timeout = Duration::from_millis(ACQUIRE_TIMEOUT_MS);
let sleep = tokio::time::sleep(acquire_timeout).fuse();
let fused = fut.fuse();

futures::pin_mut!(sleep, fused);

let now = Instant::now();

futures::select! {
_ = sleep => {
log::warn!(
"[{}:{}] waiting more than {}ms to acquire lock, still waiting",
file, line, ACQUIRE_TIMEOUT_MS,
);
},
guard = fused => {
return TimedGuard::new(guard, line, file);
}
}

let guard = fused.await;
let wait_time = now.elapsed().as_millis();
log::warn!("[{}:{}] waited {}ms to acquire lock", file, line, wait_time);

TimedGuard::new(guard, line, file)
}

// this is a workaround:
// - Rust does not support async in traits
// https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html
// - async_trait does not give us enough flexibility to implement #[track_caller]
//
// So we manually desugar the async functions and have them return futures
type Async<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

/// These methods should be used in favor of the stock read() and write() methods.
///
/// These methods emit warnings when the lock takes too long to acquire (meaning it's
/// likely some other user is holding onto the lock for too long).
///
/// They also emit warnings when the returned TimedGuard is kept alive for too long.
/// (The lock is held until the returned TimedGuard is dropped, so it should be dropped
/// as soon as possible!)
pub trait RwLoggingExt<T> {
#[track_caller]
fn read_with_warn(&self) -> Async<TimedGuard<RwLockReadGuard<T>>>;

#[track_caller]
fn write_with_warn(&self) -> Async<TimedGuard<RwLockWriteGuard<T>>>;
}

impl<T: Send + Sync> RwLoggingExt<T> for RwLock<T> {
#[track_caller]
fn read_with_warn(&self) -> Async<TimedGuard<RwLockReadGuard<T>>> {
let loc = Location::caller();
Box::pin(try_lock(self.read(), loc.file(), loc.line()))
}

#[track_caller]
fn write_with_warn(&self) -> Async<TimedGuard<RwLockWriteGuard<T>>> {
let loc = Location::caller();
Box::pin(try_lock(self.write(), loc.file(), loc.line()))
}
}
6 changes: 4 additions & 2 deletions trin-core/src/portalnet/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::{
utp::{UtpListener, UTP_PROTOCOL},
};
use crate::cli::{HISTORY_NETWORK, STATE_NETWORK};
use crate::rw_write;
use crate::locks::RwLoggingExt;
use std::collections::HashMap;
use std::convert::TryInto;

Expand All @@ -28,7 +28,9 @@ impl PortalnetEvents {
history_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
state_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
) -> Self {
let protocol_receiver = rw_write!(discovery)
let protocol_receiver = discovery
.write_with_warn()
.await
.discv5
.event_stream()
.await
Expand Down
Loading

0 comments on commit 8ecb3b9

Please sign in to comment.