From 5dc10c2b31d9cff02ef38b858a5f427b4ac241d0 Mon Sep 17 00:00:00 2001 From: Mike the Tike Date: Fri, 27 Aug 2021 18:40:58 +0200 Subject: [PATCH] test: add tracing to comms via --tracing-enabled (#3238) Description --- Add tracing to comms to debug timings via the `--tracing-enabled` flag Motivation and Context --- It's currently difficult to understand the timings of network calls and errors in the application. How Has This Been Tested? --- Tested manually --- Cargo.lock | 137 +++++++++-- applications/tari_base_node/Cargo.toml | 7 + applications/tari_base_node/src/builder.rs | 1 + applications/tari_base_node/src/main.rs | 26 +- applications/tari_console_wallet/Cargo.toml | 9 + applications/tari_console_wallet/src/main.rs | 24 +- base_layer/core/Cargo.toml | 3 + .../states/block_sync.rs | 6 +- .../states/events_and_states.rs | 11 +- .../base_node/sync/block_sync/synchronizer.rs | 2 + .../sync/header_sync/synchronizer.rs | 2 + .../core/src/base_node/sync/rpc/service.rs | 170 +++++++------ .../src/chain_storage/blockchain_database.rs | 4 +- common/Cargo.toml | 7 + common/src/configuration/bootstrap.rs | 3 + comms/Cargo.toml | 8 +- comms/src/bounded_executor.rs | 7 +- comms/src/connection_manager/dialer.rs | 8 +- comms/src/connection_manager/listener.rs | 5 +- comms/src/connection_manager/manager.rs | 17 +- .../src/connection_manager/peer_connection.rs | 32 ++- comms/src/connection_manager/requester.rs | 31 ++- comms/src/connectivity/manager.rs | 58 +++-- comms/src/connectivity/requester.rs | 14 +- comms/src/multiplexing/yamux.rs | 16 +- comms/src/noise/config.rs | 1 + comms/src/protocol/identity.rs | 2 + comms/src/protocol/messaging/outbound.rs | 226 +++++++++++------- comms/src/protocol/messaging/protocol.rs | 1 + comms/src/protocol/rpc/client.rs | 24 +- comms/src/protocol/rpc/handshake.rs | 46 +++- comms/src/protocol/rpc/server/mod.rs | 8 +- comms/src/socks/client.rs | 3 + .../test_utils/mocks/connection_manager.rs | 6 +- .../test_utils/mocks/connectivity_manager.rs | 10 +- comms/src/test_utils/mocks/peer_connection.rs | 11 +- 36 files changed, 691 insertions(+), 255 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 93f24f8ead..638e5824f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2060,6 +2060,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "integer-encoding" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48dc51180a9b377fd75814d0cc02199c20f8e99433d6762f650d39cdbbd3b56f" + [[package]] name = "iovec" version = "0.1.4" @@ -2890,6 +2896,49 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf9b1c4e9a6c4de793c632496fa490bdc0e1eea73f0c91394f7b6990935d22" +dependencies = [ + "async-trait", + "crossbeam-channel 0.5.1", + "futures 0.3.15", + "js-sys", + "lazy_static 1.4.0", + "percent-encoding 2.1.0", + "pin-project 1.0.7", + "rand 0.8.4", + "thiserror", + "tokio 1.9.0", + "tokio-stream", +] + +[[package]] +name = "opentelemetry-jaeger" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db22f492873ea037bc267b35a0e8e4fb846340058cb7c864efe3d0bf23684593" +dependencies = [ + "async-trait", + "lazy_static 1.4.0", + "opentelemetry", + "opentelemetry-semantic-conventions", + "thiserror", + "thrift", + "tokio 1.9.0", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffeac823339e8b0f27b961f4385057bf9f97f2863bc745bd015fd6091f2270e9" +dependencies = [ + "opentelemetry", +] + [[package]] name = "ordered-float" version = "1.1.1" @@ -4438,6 +4487,8 @@ dependencies = [ "config", "futures 0.3.15", "log 0.4.14", + "opentelemetry", + "opentelemetry-jaeger", "regex", "rustyline", "rustyline-derive", @@ -4458,6 +4509,9 @@ dependencies = [ "thiserror", "tokio 0.2.25", "tonic", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", ] [[package]] @@ -4491,6 +4545,8 @@ dependencies = [ "git2", "log 0.4.14", "log4rs 1.0.0", + "opentelemetry", + "opentelemetry-jaeger", "parity-multiaddr", "path-clean", "prost-build", @@ -4502,6 +4558,9 @@ dependencies = [ "tari_test_utils", "tempfile", "toml 0.5.8", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", ] [[package]] @@ -4536,6 +4595,8 @@ dependencies = [ "log 0.4.14", "nom 5.1.2", "openssl", + "opentelemetry", + "opentelemetry-jaeger", "parity-multiaddr", "pin-project 0.4.28", "prost", @@ -4554,9 +4615,11 @@ dependencies = [ "thiserror", "tokio 0.2.25", "tokio-macros", - "tokio-util 0.2.0", + "tokio-util 0.3.1", "tower", "tower-make", + "tracing", + "tracing-futures", "yamux", ] @@ -4633,6 +4696,8 @@ dependencies = [ "crossterm", "futures 0.3.15", "log 0.4.14", + "opentelemetry", + "opentelemetry-jaeger", "qrcode", "rand 0.8.4", "regex", @@ -4654,6 +4719,9 @@ dependencies = [ "thiserror", "tokio 0.2.25", "tonic", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", "tui", "unicode-segmentation", "unicode-width", @@ -4705,6 +4773,9 @@ dependencies = [ "thiserror", "tokio 0.2.25", "tokio-macros", + "tracing", + "tracing-attributes", + "tracing-futures", "ttl_cache", "uint", ] @@ -5172,6 +5243,28 @@ dependencies = [ "once_cell", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + +[[package]] +name = "thrift" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6d965454947cc7266d22716ebfd07b18d84ebaf35eec558586bbb2a8cb6b5b" +dependencies = [ + "byteorder", + "integer-encoding", + "log 0.4.14", + "ordered-float 1.1.1", + "threadpool", +] + [[package]] name = "time" version = "0.1.44" @@ -5277,6 +5370,17 @@ dependencies = [ "tokio 1.9.0", ] +[[package]] +name = "tokio-stream" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f" +dependencies = [ + "futures-core", + "pin-project-lite 0.2.7", + "tokio 1.9.0", +] + [[package]] name = "tokio-test" version = "0.2.1" @@ -5298,20 +5402,6 @@ dependencies = [ "tokio 0.2.25", ] -[[package]] -name = "tokio-util" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930" -dependencies = [ - "bytes 0.5.6", - "futures-core", - "futures-sink", - "log 0.4.14", - "pin-project-lite 0.1.12", - "tokio 0.2.25", -] - [[package]] name = "tokio-util" version = "0.3.1" @@ -5646,6 +5736,19 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "599f388ecb26b28d9c1b2e4437ae019a7b336018b45ed911458cd9ebf91129f6" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", +] + [[package]] name = "tracing-serde" version = "0.1.2" @@ -5658,9 +5761,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.2.19" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab69019741fca4d98be3c62d2b75254528b5432233fd8a4d2739fec20278de48" +checksum = "b9cbe87a2fa7e35900ce5de20220a582a9483a7063811defce79d7cbd59d4cfe" dependencies = [ "ansi_term 0.12.1", "chrono", diff --git a/applications/tari_base_node/Cargo.toml b/applications/tari_base_node/Cargo.toml index 7ac4914c50..db832f8a9a 100644 --- a/applications/tari_base_node/Cargo.toml +++ b/applications/tari_base_node/Cargo.toml @@ -35,6 +35,13 @@ strum = "^0.19" strum_macros = "0.18.0" thiserror = "^1.0.20" tonic = "0.2" +tracing = "0.1.26" +tracing-opentelemetry = "0.15.0" +tracing-subscriber = "0.2.20" + +# network tracing, rt-tokio for async batch export +opentelemetry = { version = "0.16", default-features = false, features = ["trace","rt-tokio"] } +opentelemetry-jaeger = { version="0.15", features=["rt-tokio"]} [features] avx2 = ["tari_core/avx2", "tari_crypto/avx2", "tari_p2p/avx2", "tari_wallet/avx2", "tari_comms/avx2", "tari_comms_dht/avx2"] diff --git a/applications/tari_base_node/src/builder.rs b/applications/tari_base_node/src/builder.rs index 87efdaf9bf..dc36f64f59 100644 --- a/applications/tari_base_node/src/builder.rs +++ b/applications/tari_base_node/src/builder.rs @@ -67,6 +67,7 @@ pub struct BaseNodeContext { impl BaseNodeContext { /// Starts the node container. This entails the base node state machine. /// This call consumes the NodeContainer instance. + #[tracing::instrument(name = "base_node::run", skip(self))] pub async fn run(self) { info!(target: LOG_TARGET, "Tari base node has STARTED"); diff --git a/applications/tari_base_node/src/main.rs b/applications/tari_base_node/src/main.rs index e40f3f9cc4..8ef2c9b68a 100644 --- a/applications/tari_base_node/src/main.rs +++ b/applications/tari_base_node/src/main.rs @@ -98,9 +98,11 @@ mod utils; use crate::command_handler::{CommandHandler, StatusOutput}; use futures::{future::Fuse, pin_mut, FutureExt}; use log::*; +use opentelemetry::{self, global, KeyValue}; use parser::Parser; use rustyline::{config::OutputStreamType, error::ReadlineError, CompletionType, Config, EditMode, Editor}; use std::{ + env, net::SocketAddr, process, sync::Arc, @@ -120,6 +122,7 @@ use tokio::{ time::{self, Delay}, }; use tonic::transport::Server; +use tracing_subscriber::{layer::SubscriberExt, Registry}; const LOG_TARGET: &str = "base_node::app"; /// Application entry point @@ -148,12 +151,14 @@ fn main_inner() -> Result<(), ExitCodes> { })?; rt.block_on(run_node(node_config.into(), bootstrap))?; - + // Shutdown and send any traces + global::shutdown_tracer_provider(); Ok(()) } /// Sets up the base node and runs the cli_loop async fn run_node(node_config: Arc, bootstrap: ConfigBootstrap) -> Result<(), ExitCodes> { + enable_tracing_if_specified(&bootstrap); // Load or create the Node identity let node_identity = setup_node_identity( &node_config.base_node_identity_file, @@ -247,6 +252,25 @@ async fn run_node(node_config: Arc, bootstrap: ConfigBootstrap) -> Ok(()) } +fn enable_tracing_if_specified(bootstrap: &ConfigBootstrap) { + if bootstrap.tracing_enabled { + // To run: docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 \ + // jaegertracing/all-in-one:latest + global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); + let tracer = opentelemetry_jaeger::new_pipeline() + .with_service_name("tari::base_node") + .with_tags(vec![KeyValue::new("pid", process::id().to_string()), KeyValue::new("current_exe", env::current_exe().unwrap().to_str().unwrap_or_default().to_owned())]) + // TODO: uncomment when using tokio 1 + // .install_batch(opentelemetry::runtime::Tokio) + .install_simple() + .unwrap(); + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + let subscriber = Registry::default().with(telemetry); + tracing::subscriber::set_global_default(subscriber) + .expect("Tracing could not be set. Try running without `--tracing-enabled`"); + } +} + /// Runs the gRPC server async fn run_grpc( grpc: crate::grpc::base_node_grpc_server::BaseNodeGrpcServer, diff --git a/applications/tari_console_wallet/Cargo.toml b/applications/tari_console_wallet/Cargo.toml index 5f43a75189..5163910d37 100644 --- a/applications/tari_console_wallet/Cargo.toml +++ b/applications/tari_console_wallet/Cargo.toml @@ -35,6 +35,15 @@ tokio = { version="0.2.10", features = ["signal"] } thiserror = "1.0.20" tonic = "0.2" +tracing = "0.1.26" +tracing-opentelemetry = "0.15.0" +tracing-subscriber = "0.2.20" + +# network tracing, rt-tokio for async batch export +opentelemetry = { version = "0.16", default-features = false, features = ["trace","rt-tokio"] } +opentelemetry-jaeger = { version="0.15", features=["rt-tokio"]} + + [dependencies.tari_core] path = "../../base_layer/core" version = "^0.9" diff --git a/applications/tari_console_wallet/src/main.rs b/applications/tari_console_wallet/src/main.rs index 25f749034e..4042eb27eb 100644 --- a/applications/tari_console_wallet/src/main.rs +++ b/applications/tari_console_wallet/src/main.rs @@ -19,12 +19,14 @@ use init::{ WalletBoot, }; use log::*; +use opentelemetry::{self, global, KeyValue}; use recovery::prompt_private_key_from_seed_words; -use std::process; +use std::{env, process}; use tari_app_utilities::{consts, initialization::init_configuration, utilities::ExitCodes}; use tari_common::{configuration::bootstrap::ApplicationType, ConfigBootstrap}; use tari_core::transactions::types::PrivateKey; use tari_shutdown::Shutdown; +use tracing_subscriber::{layer::SubscriberExt, Registry}; use wallet_modes::{command_mode, grpc_mode, recovery_mode, script_mode, tui_mode, WalletMode}; pub const LOG_TARGET: &str = "wallet::console_wallet::main"; @@ -90,6 +92,7 @@ fn main_inner() -> Result<(), ExitCodes> { info!(target: LOG_TARGET, "Default configuration created. Done."); } + enable_tracing_if_specified(&bootstrap); // get command line password if provided let arg_password = bootstrap.password.clone(); let seed_words_file_name = bootstrap.seed_words_file_name.clone(); @@ -185,3 +188,22 @@ fn get_recovery_master_key( Ok(None) } } + +fn enable_tracing_if_specified(bootstrap: &ConfigBootstrap) { + if bootstrap.tracing_enabled { + // To run: docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 \ + // jaegertracing/all-in-one:latest + global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); + let tracer = opentelemetry_jaeger::new_pipeline() + .with_service_name("tari::console_wallet") + .with_tags(vec![KeyValue::new("pid", process::id().to_string()), KeyValue::new("current_exe", env::current_exe().unwrap().to_str().unwrap_or_default().to_owned())]) + // TODO: uncomment when using tokio 1 + // .install_batch(opentelemetry::runtime::Tokio) + .install_simple() + .unwrap(); + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + let subscriber = Registry::default().with(telemetry); + tracing::subscriber::set_global_default(subscriber) + .expect("Tracing could not be set. Try running without `--tracing-enabled`"); + } +} diff --git a/base_layer/core/Cargo.toml b/base_layer/core/Cargo.toml index 6e802b7fbc..1212b2a40c 100644 --- a/base_layer/core/Cargo.toml +++ b/base_layer/core/Cargo.toml @@ -59,6 +59,9 @@ tokio = { version="^0.2", features = ["blocking", "time", "sync"] } ttl_cache = "0.5.1" uint = { version = "0.9", default-features = false } num-format = "0.4.0" +tracing = "0.1.26" +tracing-futures="*" +tracing-attributes="*" [dev-dependencies] tari_p2p = { version = "^0.9", path = "../../base_layer/p2p", features=["test-mocks"]} diff --git a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs index cc596d61d4..689c9a8316 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs @@ -66,8 +66,12 @@ impl BlockSync { ); let status_event_sender = shared.status_event_sender.clone(); - let local_nci = shared.local_node_interface.clone(); let bootstrapped = shared.is_bootstrapped(); + let _ = status_event_sender.broadcast(StatusInfo { + bootstrapped, + state_info: StateInfo::BlockSyncStarting, + }); + let local_nci = shared.local_node_interface.clone(); synchronizer.on_progress(move |block, remote_tip_height, sync_peers| { let local_height = block.height(); local_nci.publish_block_event(BlockEvent::ValidBlockAdded( diff --git a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs index 8c65886792..4e864624bc 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs @@ -162,6 +162,7 @@ pub enum StateInfo { StartUp, HeaderSync(BlockSyncInfo), HorizonSync(HorizonSyncInfo), + BlockSyncStarting, BlockSync(BlockSyncInfo), Listening(ListeningInfo), } @@ -193,12 +194,17 @@ impl StateInfo { HorizonSyncStatus::Finalizing => "Finalizing horizon sync".to_string(), }, Self::BlockSync(info) => format!( - "Syncing blocks: {}/{} ({:.0}%)", + "Syncing blocks with {}: {}/{} ({:.0}%) ", + info.sync_peers + .first() + .map(|s| s.short_str()) + .unwrap_or_else(|| "".to_string()), info.local_height, info.tip_height, info.local_height as f64 / info.tip_height as f64 * 100.0 ), Self::Listening(_) => "Listening".to_string(), + Self::BlockSyncStarting => "Starting block sync".to_string(), } } @@ -212,7 +218,7 @@ impl StateInfo { pub fn is_synced(&self) -> bool { use StateInfo::*; match self { - StartUp | HeaderSync(_) | HorizonSync(_) | BlockSync(_) => false, + StartUp | HeaderSync(_) | HorizonSync(_) | BlockSync(_) | BlockSyncStarting => false, Listening(info) => info.is_synced(), } } @@ -226,6 +232,7 @@ impl Display for StateInfo { Self::HorizonSync(info) => write!(f, "Synchronizing horizon state: {}", info), Self::BlockSync(info) => write!(f, "Synchronizing blocks: {}", info), Self::Listening(info) => write!(f, "Listening: {}", info), + Self::BlockSyncStarting => write!(f, "Synchronizing blocks: Starting"), } } } diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index c2898962c8..69c44fdf36 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -46,6 +46,7 @@ use tari_comms::{ PeerConnection, }; use tokio::task; +use tracing; const LOG_TARGET: &str = "c::bn::block_sync"; @@ -86,6 +87,7 @@ impl BlockSynchronizer { self.hooks.add_on_complete_hook(hook); } + #[tracing::instrument(skip(self), err)] pub async fn synchronize(&mut self) -> Result<(), BlockSyncError> { let peer_conn = self.get_next_sync_peer().await?; let node_id = peer_conn.peer_node_id().clone(); diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index 8ff487a8e5..b7483ff70e 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -48,6 +48,7 @@ use tari_comms::{ protocol::rpc::{RpcError, RpcHandshakeError}, PeerConnection, }; +use tracing; const LOG_TARGET: &str = "c::bn::header_sync"; @@ -253,6 +254,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { Ok(()) } + #[tracing::instrument(skip(self, conn), err)] async fn attempt_sync(&mut self, mut conn: PeerConnection) -> Result<(), BlockHeaderSyncError> { let peer = conn.peer_node_id().clone(); let mut client = conn.connect_rpc::().await?; diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index fc2d621113..776c2b7e01 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -41,6 +41,7 @@ use std::cmp; use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming}; use tari_crypto::tari_utilities::hex::Hex; use tokio::task; +use tracing::{instrument, span, Instrument, Level}; const LOG_TARGET: &str = "c::base_node::sync_rpc"; @@ -61,6 +62,7 @@ impl BaseNodeSyncRpcService { #[tari_comms::async_trait] impl BaseNodeSyncService for BaseNodeSyncRpcService { + #[instrument(name = "sync_rpc::sync_blocks", skip(self), err)] async fn sync_blocks( &self, request: Request, @@ -116,56 +118,61 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ const BATCH_SIZE: usize = 4; let (mut tx, rx) = mpsc::channel(BATCH_SIZE); - task::spawn(async move { - let iter = NonOverlappingIntegerPairIter::new(start, end + 1, BATCH_SIZE); - for (start, end) in iter { - if tx.is_closed() { - break; - } - - debug!(target: LOG_TARGET, "Sending blocks #{} - #{}", start, end); - let blocks = db - .fetch_blocks(start..=end) - .await - .map_err(RpcStatus::log_internal_error(LOG_TARGET)); - - match blocks { - Ok(blocks) if blocks.is_empty() => { + let span = span!(Level::TRACE, "sync_rpc::block_sync::inner_worker"); + task::spawn( + async move { + let iter = NonOverlappingIntegerPairIter::new(start, end + 1, BATCH_SIZE); + for (start, end) in iter { + if tx.is_closed() { break; - }, - Ok(blocks) => { - let mut blocks = stream::iter( - blocks - .into_iter() - .map(|hb| hb.try_into_block().map_err(RpcStatus::log_internal_error(LOG_TARGET))) - .map(|block| match block { - Ok(b) => Ok(proto::base_node::BlockBodyResponse::from(b)), - Err(err) => Err(err), - }) - .map(Ok), - ); + } - // Ensure task stops if the peer prematurely stops their RPC session - if tx.send_all(&mut blocks).await.is_err() { + debug!(target: LOG_TARGET, "Sending blocks #{} - #{}", start, end); + let blocks = db + .fetch_blocks(start..=end) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET)); + + match blocks { + Ok(blocks) if blocks.is_empty() => { break; - } - }, - Err(err) => { - let _ = tx.send(Err(err)).await; - break; - }, + }, + Ok(blocks) => { + let mut blocks = stream::iter( + blocks + .into_iter() + .map(|hb| hb.try_into_block().map_err(RpcStatus::log_internal_error(LOG_TARGET))) + .map(|block| match block { + Ok(b) => Ok(proto::base_node::BlockBodyResponse::from(b)), + Err(err) => Err(err), + }) + .map(Ok), + ); + + // Ensure task stops if the peer prematurely stops their RPC session + if tx.send_all(&mut blocks).await.is_err() { + break; + } + }, + Err(err) => { + let _ = tx.send(Err(err)).await; + break; + }, + } } - } - debug!( - target: LOG_TARGET, - "Block sync round complete for peer `{}`.", peer_node_id, - ); - }); + debug!( + target: LOG_TARGET, + "Block sync round complete for peer `{}`.", peer_node_id, + ); + } + .instrument(span), + ); Ok(Streaming::new(rx)) } + #[instrument(name = "sync_rpc::sync_headers", skip(self), err)] async fn sync_headers( &self, request: Request, @@ -203,50 +210,55 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ ); let (mut tx, rx) = mpsc::channel(chunk_size); - task::spawn(async move { - let iter = NonOverlappingIntegerPairIter::new( - start_header.height + 1, - start_header.height.saturating_add(count).saturating_add(1), - chunk_size, - ); - for (start, end) in iter { - if tx.is_closed() { - break; - } - debug!(target: LOG_TARGET, "Sending headers #{} - #{}", start, end); - let headers = db - .fetch_headers(start..=end) - .await - .map_err(RpcStatus::log_internal_error(LOG_TARGET)); - - match headers { - Ok(headers) if headers.is_empty() => { + let span = span!(Level::TRACE, "sync_rpc::sync_headers::inner_worker"); + task::spawn( + async move { + let iter = NonOverlappingIntegerPairIter::new( + start_header.height + 1, + start_header.height.saturating_add(count).saturating_add(1), + chunk_size, + ); + for (start, end) in iter { + if tx.is_closed() { break; - }, - Ok(headers) => { - let mut headers = - stream::iter(headers.into_iter().map(proto::core::BlockHeader::from).map(Ok).map(Ok)); - // Ensure task stops if the peer prematurely stops their RPC session - if tx.send_all(&mut headers).await.is_err() { + } + debug!(target: LOG_TARGET, "Sending headers #{} - #{}", start, end); + let headers = db + .fetch_headers(start..=end) + .await + .map_err(RpcStatus::log_internal_error(LOG_TARGET)); + + match headers { + Ok(headers) if headers.is_empty() => { break; - } - }, - Err(err) => { - let _ = tx.send(Err(err)).await; - break; - }, + }, + Ok(headers) => { + let mut headers = + stream::iter(headers.into_iter().map(proto::core::BlockHeader::from).map(Ok).map(Ok)); + // Ensure task stops if the peer prematurely stops their RPC session + if tx.send_all(&mut headers).await.is_err() { + break; + } + }, + Err(err) => { + let _ = tx.send(Err(err)).await; + break; + }, + } } - } - debug!( - target: LOG_TARGET, - "Header sync round complete for peer `{}`.", peer_node_id, - ); - }); + debug!( + target: LOG_TARGET, + "Header sync round complete for peer `{}`.", peer_node_id, + ); + } + .instrument(span), + ); Ok(Streaming::new(rx)) } + #[instrument(skip(self), err)] async fn get_header_by_height( &self, request: Request, @@ -262,6 +274,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ Ok(Response::new(header.into())) } + #[instrument(skip(self), err)] async fn find_chain_split( &self, request: Request, @@ -324,6 +337,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ } } + #[instrument(skip(self), err)] async fn get_chain_metadata(&self, _: Request<()>) -> Result, RpcStatus> { let chain_metadata = self .db() @@ -333,6 +347,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ Ok(Response::new(chain_metadata.into())) } + #[instrument(skip(self), err)] async fn sync_kernels( &self, request: Request, @@ -401,6 +416,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ Ok(Streaming::new(rx)) } + #[instrument(skip(self), err)] async fn sync_utxos(&self, request: Request) -> Result, RpcStatus> { let req = request.message(); let peer = request.context().peer_node_id(); diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 769ac64cfd..3014a240a0 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -1560,8 +1560,8 @@ fn handle_possible_reorg( }, // We want a warning if the number of removed blocks is at least 2. "Chain reorg required from {} to {} (accum_diff:{}, hash:{}) to (accum_diff:{}, hash:{}). Number of \ blocks to remove: {}, to add: {}.", - tip_header.header(), - fork_header.header(), + tip_header.header().height, + fork_header.header().height, tip_header.accumulated_data().total_accumulated_difficulty, tip_header.accumulated_data().hash.to_hex(), fork_header.accumulated_data().total_accumulated_difficulty, diff --git a/common/Cargo.toml b/common/Cargo.toml index 87d98a6163..aa9d646654 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -26,6 +26,13 @@ multiaddr={package="parity-multiaddr", version = "0.11.0"} sha2 = "0.9.5" path-clean = "0.1.0" tari_storage = { version = "^0.9", path = "../infrastructure/storage"} +tracing = "0.1.26" +tracing-opentelemetry = "0.15.0" +tracing-subscriber = "0.2.20" + +# network tracing, rt-tokio for async batch export +opentelemetry = { version = "0.16", default-features = false, features = ["trace","rt-tokio"] } +opentelemetry-jaeger = { version="0.15", features=["rt-tokio"]} anyhow = { version = "1.0", optional = true } git2 = { version = "0.8", optional = true } diff --git a/common/src/configuration/bootstrap.rs b/common/src/configuration/bootstrap.rs index 22c801b9fd..6e0a668541 100644 --- a/common/src/configuration/bootstrap.rs +++ b/common/src/configuration/bootstrap.rs @@ -146,6 +146,8 @@ pub struct ConfigBootstrap { pub miner_min_diff: Option, #[structopt(long, alias = "max-difficulty")] pub miner_max_diff: Option, + #[structopt(long, alias = "tracing")] + pub tracing_enabled: bool, } fn normalize_path(path: PathBuf) -> PathBuf { @@ -180,6 +182,7 @@ impl Default for ConfigBootstrap { miner_max_blocks: None, miner_min_diff: None, miner_max_diff: None, + tracing_enabled: false, } } } diff --git a/comms/Cargo.toml b/comms/Cargo.toml index 4ae5e3ae2e..71b8eec12f 100644 --- a/comms/Cargo.toml +++ b/comms/Cargo.toml @@ -38,10 +38,16 @@ serde_derive = "1.0.119" snow = {version="=0.8.0", features=["default-resolver"]} thiserror = "1.0.20" tokio = {version="~0.2.19", features=["blocking", "time", "tcp", "dns", "sync", "stream", "signal"]} -tokio-util = {version="0.2.0", features=["codec"]} +tokio-util = {version="0.3.1", features=["codec"]} tower= "0.3.1" +tracing = "0.1.26" +tracing-futures = "0.2.5" yamux = "=0.9.0" +# network tracing, rt-tokio for async batch export +opentelemetry = { version = "0.16", default-features = false, features = ["trace","rt-tokio"] } +opentelemetry-jaeger = { version="0.15", features=["rt-tokio"]} + # RPC dependencies tower-make = {version="0.3.0", optional=true} anyhow = "1.0.32" diff --git a/comms/src/bounded_executor.rs b/comms/src/bounded_executor.rs index f82b4a9f84..ee65e68476 100644 --- a/comms/src/bounded_executor.rs +++ b/comms/src/bounded_executor.rs @@ -28,6 +28,7 @@ use tokio::{ sync::{OwnedSemaphorePermit, Semaphore}, task::JoinHandle, }; +use tracing::{span, Instrument, Level}; /// Error emitted from [`try_spawn`](self::BoundedExecutor::try_spawn) when there are no tasks available #[derive(Debug)] @@ -143,7 +144,8 @@ impl BoundedExecutor { F: Future + Send + 'static, F::Output: Send + 'static, { - let permit = self.semaphore.clone().acquire_owned().await; + let span = span!(Level::TRACE, "bounded_executor::waiting_time"); + let permit = self.semaphore.clone().acquire_owned().instrument(span).await; self.do_spawn(permit, future) } @@ -153,7 +155,8 @@ impl BoundedExecutor { F::Output: Send + 'static, { self.inner.spawn(async move { - let ret = future.await; + let span = span!(Level::TRACE, "bounded_executor::do_work"); + let ret = future.instrument(span).await; // Task is finished, release the permit drop(permit); ret diff --git a/comms/src/connection_manager/dialer.rs b/comms/src/connection_manager/dialer.rs index b8fffb3029..538ffe89bd 100644 --- a/comms/src/connection_manager/dialer.rs +++ b/comms/src/connection_manager/dialer.rs @@ -55,6 +55,7 @@ use log::*; use std::{collections::HashMap, sync::Arc, time::Duration}; use tari_shutdown::{Shutdown, ShutdownSignal}; use tokio::{task::JoinHandle, time}; +use tracing::{self, span, Instrument, Level}; const LOG_TARGET: &str = "comms::connection_manager::dialer"; @@ -254,6 +255,7 @@ where }); } + #[tracing::instrument(skip(self, pending_dials, reply_tx))] fn handle_dial_peer_request( &mut self, pending_dials: &mut DialFuturesUnordered, @@ -281,6 +283,7 @@ where let noise_config = self.noise_config.clone(); let config = self.config.clone(); + let span = span!(Level::TRACE, "handle_dial_peer_request_inner1"); let dial_fut = async move { let (dial_state, dial_result) = Self::dial_peer_with_retry(dial_state, noise_config, transport, backoff, &config).await; @@ -314,7 +317,8 @@ where }, Err(err) => (dial_state, Err(err)), } - }; + } + .instrument(span); pending_dials.push(dial_fut.boxed()); } @@ -335,6 +339,7 @@ where } #[allow(clippy::too_many_arguments)] + #[tracing::instrument(skip(peer_manager, socket, conn_man_notifier, config, cancel_signal), err)] async fn perform_socket_upgrade_procedure( peer_manager: Arc, node_identity: Arc, @@ -419,6 +424,7 @@ where ) } + #[tracing::instrument(skip(dial_state, noise_config, transport, backoff, config))] async fn dial_peer_with_retry( dial_state: DialState, noise_config: NoiseConfig, diff --git a/comms/src/connection_manager/listener.rs b/comms/src/connection_manager/listener.rs index 5984ffcca2..60ae3c2d12 100644 --- a/comms/src/connection_manager/listener.rs +++ b/comms/src/connection_manager/listener.rs @@ -71,6 +71,7 @@ use std::{ use tari_crypto::tari_utilities::hex::Hex; use tari_shutdown::ShutdownSignal; use tokio::time; +use tracing::{span, Instrument, Level}; const LOG_TARGET: &str = "comms::connection_manager::listener"; @@ -244,6 +245,7 @@ where let liveness_session_count = self.liveness_session_count.clone(); let shutdown_signal = self.shutdown_signal.clone(); + let span = span!(Level::TRACE, "connection_mann::listener::inbound_task",); let inbound_fut = async move { match Self::read_wire_format(&mut socket, config.time_to_first_byte).await { Ok(WireMode::Comms(byte)) if byte == config.network_info.network_byte => { @@ -331,7 +333,8 @@ where ); }, } - }; + } + .instrument(span); // This will block (asynchronously) if we have reached the maximum simultaneous connections, creating // back-pressure on nodes connecting to this node diff --git a/comms/src/connection_manager/manager.rs b/comms/src/connection_manager/manager.rs index d1019f33d4..928b53611f 100644 --- a/comms/src/connection_manager/manager.rs +++ b/comms/src/connection_manager/manager.rs @@ -50,6 +50,7 @@ use std::{fmt, sync::Arc}; use tari_shutdown::{Shutdown, ShutdownSignal}; use time::Duration; use tokio::{sync::broadcast, task, time}; +use tracing::{span, Instrument, Level}; const LOG_TARGET: &str = "comms::connection_manager::manager"; @@ -121,7 +122,7 @@ impl Default for ConnectionManagerConfig { .expect("DEFAULT_LISTENER_ADDRESS is malformed"), #[cfg(test)] listener_address: "/memory/0".parse().unwrap(), - max_dial_attempts: 3, + max_dial_attempts: 1, max_simultaneous_inbound_connects: 100, network_info: Default::default(), #[cfg(not(test))] @@ -258,6 +259,8 @@ where } pub async fn run(mut self) { + let span = span!(Level::DEBUG, "comms::connection_manager::run"); + let _enter = span.enter(); let mut shutdown = self .shutdown_signal .take() @@ -350,7 +353,16 @@ where use ConnectionManagerRequest::*; trace!(target: LOG_TARGET, "Connection manager got request: {:?}", request); match request { - DialPeer(node_id, reply) => self.dial_peer(node_id, reply).await, + DialPeer { + node_id, + reply_tx, + tracing_id: _tracing, + } => { + let span = span!(Level::TRACE, "connection_manager::handle_request"); + // This causes a panic for some reason? + // span.follows_from(tracing_id); + self.dial_peer(node_id, reply_tx).instrument(span).await + }, CancelDial(node_id) => { if let Err(err) = self.dialer_tx.send(DialerRequest::CancelPendingDial(node_id)).await { error!( @@ -432,6 +444,7 @@ where let _ = self.connection_manager_events_tx.send(Arc::new(event)); } + #[tracing::instrument(skip(self, reply))] async fn dial_peer( &mut self, node_id: NodeId, diff --git a/comms/src/connection_manager/peer_connection.rs b/comms/src/connection_manager/peer_connection.rs index 260befaeee..6f0d90da5d 100644 --- a/comms/src/connection_manager/peer_connection.rs +++ b/comms/src/connection_manager/peer_connection.rs @@ -58,6 +58,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::time; +use tracing::{self, span, Instrument, Level, Span}; const LOG_TARGET: &str = "comms::connection_manager::peer_connection"; @@ -111,10 +112,11 @@ pub fn create( #[derive(Debug)] pub enum PeerConnectionRequest { /// Open a new substream and negotiate the given protocol - OpenSubstream( - ProtocolId, - oneshot::Sender, PeerConnectionError>>, - ), + OpenSubstream { + protocol_id: ProtocolId, + reply_tx: oneshot::Sender, PeerConnectionError>>, + tracing_id: Option, + }, /// Disconnect all substreams and close the transport connection Disconnect(bool, oneshot::Sender>), } @@ -188,19 +190,25 @@ impl PeerConnection { self.substream_counter.get() } + #[tracing::instrument("peer_connection::open_substream", skip(self), err)] pub async fn open_substream( &mut self, protocol_id: &ProtocolId, ) -> Result, PeerConnectionError> { let (reply_tx, reply_rx) = oneshot::channel(); self.request_tx - .send(PeerConnectionRequest::OpenSubstream(protocol_id.clone(), reply_tx)) + .send(PeerConnectionRequest::OpenSubstream { + protocol_id: protocol_id.clone(), + reply_tx, + tracing_id: Span::current().id(), + }) .await?; reply_rx .await .map_err(|_| PeerConnectionError::InternalReplyCancelled)? } + #[tracing::instrument("peer_connection::open_framed_substream", skip(self), err)] pub async fn open_framed_substream( &mut self, protocol_id: &ProtocolId, @@ -211,12 +219,14 @@ impl PeerConnection { } #[cfg(feature = "rpc")] + #[tracing::instrument("peer_connection::connect_rpc", skip(self), fields(peer_node_id = self.peer_node_id.to_string().as_str()), err)] pub async fn connect_rpc(&mut self) -> Result where T: From + NamedProtocolService { self.connect_rpc_using_builder(Default::default()).await } #[cfg(feature = "rpc")] + #[tracing::instrument("peer_connection::connect_rpc_with_builder", skip(self, builder), err)] pub async fn connect_rpc_using_builder(&mut self, builder: RpcClientBuilder) -> Result where T: From + NamedProtocolService { let protocol = ProtocolId::from_static(T::PROTOCOL_NAME); @@ -358,8 +368,14 @@ impl PeerConnectionActor { async fn handle_request(&mut self, request: PeerConnectionRequest) { use PeerConnectionRequest::*; match request { - OpenSubstream(proto, reply_tx) => { - let result = self.open_negotiated_protocol_stream(proto).await; + OpenSubstream { + protocol_id, + reply_tx, + tracing_id, + } => { + let span = span!(Level::TRACE, "handle_request"); + span.follows_from(tracing_id); + let result = self.open_negotiated_protocol_stream(protocol_id).instrument(span).await; log_if_error_fmt!( target: LOG_TARGET, reply_tx.send(result), @@ -380,6 +396,7 @@ impl PeerConnectionActor { } } + #[tracing::instrument(skip(self, stream), err, fields(comms.direction="inbound"))] async fn handle_incoming_substream(&mut self, mut stream: Substream) -> Result<(), PeerConnectionError> { let selected_protocol = ProtocolNegotiation::new(&mut stream) .negotiate_protocol_inbound(&self.our_supported_protocols) @@ -395,6 +412,7 @@ impl PeerConnectionActor { Ok(()) } + #[tracing::instrument(skip(self), err)] async fn open_negotiated_protocol_stream( &mut self, protocol: ProtocolId, diff --git a/comms/src/connection_manager/requester.rs b/comms/src/connection_manager/requester.rs index 3b86a88bc2..1f3f5cc887 100644 --- a/comms/src/connection_manager/requester.rs +++ b/comms/src/connection_manager/requester.rs @@ -36,7 +36,11 @@ use tokio::sync::broadcast; #[derive(Debug)] pub enum ConnectionManagerRequest { /// Dial a given peer by node id. - DialPeer(NodeId, oneshot::Sender>), + DialPeer { + node_id: NodeId, + reply_tx: oneshot::Sender>, + tracing_id: Option, + }, /// Cancels a pending dial if one exists CancelDial(NodeId), /// Register a oneshot to get triggered when the node is listening, or has failed to listen @@ -74,9 +78,10 @@ impl ConnectionManagerRequester { } /// Attempt to connect to a remote peer + #[tracing::instrument(skip(self), err)] pub async fn dial_peer(&mut self, node_id: NodeId) -> Result { let (reply_tx, reply_rx) = oneshot::channel(); - self.send_dial_peer(node_id, reply_tx).await?; + self.send_dial_peer(node_id, Some(reply_tx)).await?; reply_rx .await .map_err(|_| ConnectionManagerError::ActorRequestCanceled)? @@ -92,22 +97,36 @@ impl ConnectionManagerRequester { } /// Send instruction to ConnectionManager to dial a peer and return the result on the given oneshot + #[tracing::instrument(skip(self, reply_tx), err)] pub(crate) async fn send_dial_peer( &mut self, node_id: NodeId, - reply_tx: oneshot::Sender>, + reply_tx: Option>>, ) -> Result<(), ConnectionManagerError> { + let tracing_id; + let reply_tx = if let Some(r) = reply_tx { + tracing_id = tracing::Span::current().id(); + r + } else { + let (tx, _) = oneshot::channel(); + tracing_id = None; + tx + }; self.sender - .send(ConnectionManagerRequest::DialPeer(node_id, reply_tx)) + .send(ConnectionManagerRequest::DialPeer { + node_id, + reply_tx, + tracing_id, + }) .await .map_err(|_| ConnectionManagerError::SendToActorFailed)?; Ok(()) } /// Send instruction to ConnectionManager to dial a peer without waiting for a result. + #[tracing::instrument(skip(self), err)] pub(crate) async fn send_dial_peer_no_reply(&mut self, node_id: NodeId) -> Result<(), ConnectionManagerError> { - let (reply_tx, _) = oneshot::channel(); - self.send_dial_peer(node_id, reply_tx).await?; + self.send_dial_peer(node_id, None).await?; Ok(()) } diff --git a/comms/src/connectivity/manager.rs b/comms/src/connectivity/manager.rs index b692c8c5af..35f37627c4 100644 --- a/comms/src/connectivity/manager.rs +++ b/comms/src/connectivity/manager.rs @@ -53,6 +53,7 @@ use std::{ }; use tari_shutdown::ShutdownSignal; use tokio::{sync::broadcast, task::JoinHandle, time}; +use tracing::{span, Instrument, Level}; const LOG_TARGET: &str = "comms::connectivity::manager"; @@ -156,6 +157,7 @@ impl ConnectivityManagerActor { task::spawn(Self::run(self)) } + #[tracing::instrument(name = "connectivity_manager_actor::run", skip(self))] pub async fn run(mut self) { info!(target: LOG_TARGET, "ConnectivityManager started"); let mut shutdown_signal = self @@ -216,28 +218,41 @@ impl ConnectivityManagerActor { GetConnectivityStatus(reply) => { let _ = reply.send(self.status); }, - DialPeer(node_id, reply) => match self.pool.get(&node_id) { - Some(state) if state.is_connected() => { - debug!( - target: LOG_TARGET, - "Found existing connection for peer `{}`", - node_id.short_str() - ); - let _ = reply.send(Ok(state.connection().cloned().expect("Already checked"))); - }, - _ => { - debug!( - target: LOG_TARGET, - "No existing connection found for peer `{}`. Dialing...", - node_id.short_str() - ); - if let Err(err) = self.connection_manager.send_dial_peer(node_id, reply).await { - error!( - target: LOG_TARGET, - "Failed to send dial request to connection manager: {:?}", err - ); + DialPeer { + node_id, + reply_tx, + tracing_id, + } => { + let span = span!(Level::TRACE, "handle_request"); + // let _e = span.enter(); + span.follows_from(tracing_id); + async move { + match self.pool.get(&node_id) { + Some(state) if state.is_connected() => { + debug!( + target: LOG_TARGET, + "Found existing connection for peer `{}`", + node_id.short_str() + ); + let _ = reply_tx.send(Ok(state.connection().cloned().expect("Already checked"))); + }, + _ => { + debug!( + target: LOG_TARGET, + "No existing connection found for peer `{}`. Dialing...", + node_id.short_str() + ); + if let Err(err) = self.connection_manager.send_dial_peer(node_id, Some(reply_tx)).await { + error!( + target: LOG_TARGET, + "Failed to send dial request to connection manager: {:?}", err + ); + } + }, } - }, + } + .instrument(span) + .await }, AddManagedPeers(node_ids) => { self.add_managed_peers(node_ids).await; @@ -435,6 +450,7 @@ impl ConnectivityManagerActor { Ok(conns.into_iter().cloned().collect()) } + #[tracing::instrument(skip(self))] async fn add_managed_peers(&mut self, node_ids: Vec) { let pool = &mut self.pool; let mut should_update_connectivity = false; diff --git a/comms/src/connectivity/requester.rs b/comms/src/connectivity/requester.rs index b092b80c4d..740c8a6c81 100644 --- a/comms/src/connectivity/requester.rs +++ b/comms/src/connectivity/requester.rs @@ -44,6 +44,7 @@ use std::{ }; use tokio::{sync::broadcast, time}; const LOG_TARGET: &str = "comms::connectivity::requester"; +use tracing; pub type ConnectivityEventRx = broadcast::Receiver>; pub type ConnectivityEventTx = broadcast::Sender>; @@ -90,7 +91,11 @@ impl fmt::Display for ConnectivityEvent { #[derive(Debug)] pub enum ConnectivityRequest { WaitStarted(oneshot::Sender<()>), - DialPeer(NodeId, oneshot::Sender>), + DialPeer { + node_id: NodeId, + reply_tx: oneshot::Sender>, + tracing_id: Option, + }, GetConnectivityStatus(oneshot::Sender), AddManagedPeers(Vec), RemovePeer(NodeId), @@ -123,12 +128,17 @@ impl ConnectivityRequester { self.event_tx.clone() } + #[tracing::instrument(skip(self), err)] pub async fn dial_peer(&mut self, peer: NodeId) -> Result { let mut num_cancels = 0; loop { let (reply_tx, reply_rx) = oneshot::channel(); self.sender - .send(ConnectivityRequest::DialPeer(peer.clone(), reply_tx)) + .send(ConnectivityRequest::DialPeer { + node_id: peer.clone(), + reply_tx, + tracing_id: tracing::Span::current().id(), + }) .await .map_err(|_| ConnectivityError::ActorDisconnected)?; diff --git a/comms/src/multiplexing/yamux.rs b/comms/src/multiplexing/yamux.rs index 4c78b38174..1ba104ce04 100644 --- a/comms/src/multiplexing/yamux.rs +++ b/comms/src/multiplexing/yamux.rs @@ -30,9 +30,9 @@ use futures::{ Stream, StreamExt, }; -use log::*; use std::{future::Future, io, pin::Pin, sync::Arc, task::Poll}; use tari_shutdown::{Shutdown, ShutdownSignal}; +use tracing::{self, debug, error, event, Level}; use yamux::Mode; type IncomingRx = mpsc::Receiver; @@ -62,10 +62,7 @@ impl Yamux { }; let mut config = yamux::Config::default(); - // Use OnRead mode instead of OnReceive mode to provide back pressure to the sending side. - // Caveat: the OnRead mode has the risk of deadlock, where both sides send data larger than - // receive window and don't read before finishing writes. - // This should never happen as the window size should be large enough for all protocol messages. + config.set_window_update_mode(yamux::WindowUpdateMode::OnRead); // Because OnRead mode increases the RTT of window update, bigger buffer size and receive // window size perform better. @@ -257,11 +254,13 @@ where S: Stream> + Unpin } } + #[tracing::instrument(name = "yamux::incoming_worker::run", skip(self))] pub async fn run(mut self) { let mut mux_stream = self.inner.take_until(&mut self.shutdown_signal); while let Some(result) = mux_stream.next().await { match result { Ok(stream) => { + event!(Level::TRACE, "yamux::stream received {}", stream); if self.sender.send(stream).await.is_err() { debug!( target: LOG_TARGET, @@ -272,7 +271,12 @@ where S: Stream> + Unpin } }, Err(err) => { - debug!( + event!( + Level::ERROR, + "Incoming peer substream task received an error because '{}'", + err + ); + error!( target: LOG_TARGET, "Incoming peer substream task received an error because '{}'", err ); diff --git a/comms/src/noise/config.rs b/comms/src/noise/config.rs index 7776ade335..30cab07c48 100644 --- a/comms/src/noise/config.rs +++ b/comms/src/noise/config.rs @@ -60,6 +60,7 @@ impl NoiseConfig { /// Upgrades the given socket to using the noise protocol. The upgraded socket and the peer's static key /// is returned. + #[tracing::instrument(name = "noise::upgrade_socket", skip(self, socket), err)] pub async fn upgrade_socket( &self, socket: TSocket, diff --git a/comms/src/protocol/identity.rs b/comms/src/protocol/identity.rs index df2900e1af..2c4eba1db5 100644 --- a/comms/src/protocol/identity.rs +++ b/comms/src/protocol/identity.rs @@ -34,10 +34,12 @@ use std::{io, time::Duration}; use thiserror::Error; use tokio::time; use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use tracing; pub static IDENTITY_PROTOCOL: ProtocolId = ProtocolId::from_static(b"t/identity/1.0"); const LOG_TARGET: &str = "comms::protocol::identity"; +#[tracing::instrument(skip(socket, our_supported_protocols), err)] pub async fn identity_exchange<'p, TSocket, P>( node_identity: &NodeIdentity, direction: ConnectionDirection, diff --git a/comms/src/protocol/messaging/outbound.rs b/comms/src/protocol/messaging/outbound.rs index 377f7a2d8a..9d47895338 100644 --- a/comms/src/protocol/messaging/outbound.rs +++ b/comms/src/protocol/messaging/outbound.rs @@ -36,6 +36,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::stream as tokio_stream; +use tracing::{event, span, Instrument, Level}; const LOG_TARGET: &str = "comms::protocol::messaging::outbound"; /// The number of times to retry sending a failed message before publishing a SendMessageFailed event. @@ -69,49 +70,69 @@ impl OutboundMessaging { } pub async fn run(self) { - debug!( - target: LOG_TARGET, - "Attempting to dial peer '{}' if required", - self.peer_node_id.short_str() + let span = span!( + Level::DEBUG, + "comms::messaging::outbound", + node_id = self.peer_node_id.to_string().as_str() ); - let peer_node_id = self.peer_node_id.clone(); - let mut messaging_events_tx = self.messaging_events_tx.clone(); - match self.run_inner().await { - Ok(_) => { - debug!( - target: LOG_TARGET, - "Outbound messaging for peer '{}' has stopped because the stream was closed", - peer_node_id.short_str() - ); - }, - Err(MessagingProtocolError::Inactivity) => { - debug!( - target: LOG_TARGET, - "Outbound messaging for peer '{}' has stopped because it was inactive", - peer_node_id.short_str() - ); - }, - Err(err) => { - debug!(target: LOG_TARGET, "Outbound messaging substream failed: {}", err); - }, - } + async move { + debug!( + target: LOG_TARGET, + "Attempting to dial peer '{}' if required", + self.peer_node_id.short_str() + ); + let peer_node_id = self.peer_node_id.clone(); + let mut messaging_events_tx = self.messaging_events_tx.clone(); + match self.run_inner().await { + Ok(_) => { + event!( + Level::DEBUG, + "Outbound messaging for peer has stopped because the stream was closed" + ); - let _ = messaging_events_tx - .send(MessagingEvent::OutboundProtocolExited(peer_node_id)) - .await; + debug!( + target: LOG_TARGET, + "Outbound messaging for peer '{}' has stopped because the stream was closed", + peer_node_id.short_str() + ); + }, + Err(MessagingProtocolError::Inactivity) => { + event!( + Level::ERROR, + "Outbound messaging for peer has stopped because it was inactive" + ); + debug!( + target: LOG_TARGET, + "Outbound messaging for peer '{}' has stopped because it was inactive", + peer_node_id.short_str() + ); + }, + Err(err) => { + event!(Level::ERROR, "Outbound messaging substream failed:{}", err); + debug!(target: LOG_TARGET, "Outbound messaging substream failed: {}", err); + }, + } + + let _ = messaging_events_tx + .send(MessagingEvent::OutboundProtocolExited(peer_node_id)) + .await; + } + .instrument(span) + .await } async fn run_inner(mut self) -> Result<(), MessagingProtocolError> { let mut attempts = 0; + let substream = loop { match self.try_establish().await { - Ok(substream) => break substream, + Ok(substream) => { + event!(Level::DEBUG, "Substream established"); + break substream; + }, Err(err) => { - assert!( - attempts <= MAX_SEND_RETRIES, - "Attempt count was greater than the maximum" - ); - if attempts == MAX_SEND_RETRIES { + event!(Level::ERROR, "Error establishing messaging protocol"); + if attempts >= MAX_SEND_RETRIES { debug!( target: LOG_TARGET, "Error establishing messaging protocol: {}. Aborting because maximum retries reached.", err @@ -133,77 +154,110 @@ impl OutboundMessaging { } async fn try_dial_peer(&mut self) -> Result { - loop { - match self.connectivity.dial_peer(self.peer_node_id.clone()).await { - Ok(conn) => break Ok(conn), - Err(ConnectivityError::DialCancelled) => { - debug!( - target: LOG_TARGET, - "Dial was cancelled for peer '{}'. This is probably because of connection tie-breaking. \ - Retrying...", - self.peer_node_id.short_str(), - ); - continue; - }, - Err(err) => { - debug!( - target: LOG_TARGET, - "MessagingProtocol failed to dial peer '{}' because '{:?}'", - self.peer_node_id.short_str(), - err - ); + let span = span!( + Level::DEBUG, + "dial_peer", + node_id = self.peer_node_id.to_string().as_str() + ); + async move { + loop { + match self.connectivity.dial_peer(self.peer_node_id.clone()).await { + Ok(conn) => break Ok(conn), + Err(ConnectivityError::DialCancelled) => { + debug!( + target: LOG_TARGET, + "Dial was cancelled for peer '{}'. This is probably because of connection tie-breaking. \ + Retrying...", + self.peer_node_id.short_str(), + ); + continue; + }, + Err(err) => { + debug!( + target: LOG_TARGET, + "MessagingProtocol failed to dial peer '{}' because '{:?}'", + self.peer_node_id.short_str(), + err + ); - break Err(MessagingProtocolError::PeerDialFailed); - }, + break Err(MessagingProtocolError::PeerDialFailed); + }, + } } } + .instrument(span) + .await } async fn try_establish(&mut self) -> Result, MessagingProtocolError> { - debug!( - target: LOG_TARGET, - "Attempting to establish messaging protocol connection to peer `{}`", - self.peer_node_id.short_str() + let span = span!( + Level::DEBUG, + "establish_connection", + node_id = self.peer_node_id.to_string().as_str() ); - let start = Instant::now(); - let conn = self.try_dial_peer().await?; - debug!( - target: LOG_TARGET, - "Connection succeeded for peer `{}` in {:.0?}", - self.peer_node_id.short_str(), - start.elapsed() - ); - let substream = self.try_open_substream(conn).await?; - debug!( - target: LOG_TARGET, - "Substream established for peer `{}`", - self.peer_node_id.short_str(), - ); - Ok(substream) + async move { + debug!( + target: LOG_TARGET, + "Attempting to establish messaging protocol connection to peer `{}`", + self.peer_node_id.short_str() + ); + let start = Instant::now(); + let conn = self.try_dial_peer().await?; + debug!( + target: LOG_TARGET, + "Connection succeeded for peer `{}` in {:.0?}", + self.peer_node_id.short_str(), + start.elapsed() + ); + let substream = self.try_open_substream(conn).await?; + debug!( + target: LOG_TARGET, + "Substream established for peer `{}`", + self.peer_node_id.short_str(), + ); + Ok(substream) + } + .instrument(span) + .await } async fn try_open_substream( &mut self, mut conn: PeerConnection, ) -> Result, MessagingProtocolError> { - match conn.open_substream(&MESSAGING_PROTOCOL).await { - Ok(substream) => Ok(substream), - Err(err) => { - debug!( - target: LOG_TARGET, - "MessagingProtocol failed to open a substream to peer '{}' because '{}'", - self.peer_node_id.short_str(), - err - ); - Err(err.into()) - }, + let span = span!( + Level::DEBUG, + "open_substream", + node_id = self.peer_node_id.to_string().as_str() + ); + async move { + match conn.open_substream(&MESSAGING_PROTOCOL).await { + Ok(substream) => Ok(substream), + Err(err) => { + debug!( + target: LOG_TARGET, + "MessagingProtocol failed to open a substream to peer '{}' because '{}'", + self.peer_node_id.short_str(), + err + ); + Err(err.into()) + }, + } } + .instrument(span) + .await } async fn start_forwarding_messages( self, substream: NegotiatedSubstream, ) -> Result<(), MessagingProtocolError> { + let span = span!( + Level::DEBUG, + "start_forwarding_messages", + node_id = self.peer_node_id.to_string().as_str() + ); + let _enter = span.enter(); debug!( target: LOG_TARGET, "Starting direct message forwarding for peer `{}`", @@ -236,7 +290,7 @@ impl OutboundMessaging { stream .map(|msg| { msg.map(|mut out_msg| { - trace!(target: LOG_TARGET, "Message buffered for sending {}", out_msg); + event!(Level::DEBUG, "Message buffered for sending {}", out_msg); out_msg.reply_success(); out_msg.body }) diff --git a/comms/src/protocol/messaging/protocol.rs b/comms/src/protocol/messaging/protocol.rs index 6e53e80e6c..1f6fe029ab 100644 --- a/comms/src/protocol/messaging/protocol.rs +++ b/comms/src/protocol/messaging/protocol.rs @@ -247,6 +247,7 @@ impl MessagingProtocol { Ok(()) } + // #[tracing::instrument(skip(self, out_msg), err)] async fn send_message(&mut self, out_msg: OutboundMessage) -> Result<(), MessagingProtocolError> { let peer_node_id = out_msg.peer_node_id.clone(); let sender = loop { diff --git a/comms/src/protocol/rpc/client.rs b/comms/src/protocol/rpc/client.rs index 79cd5540a7..2806366f9f 100644 --- a/comms/src/protocol/rpc/client.rs +++ b/comms/src/protocol/rpc/client.rs @@ -62,6 +62,7 @@ use std::{ }; use tokio::time; use tower::{Service, ServiceExt}; +use tracing::{event, span, Instrument, Level}; const LOG_TARGET: &str = "comms::rpc::client"; @@ -83,7 +84,15 @@ impl RpcClient { let (request_tx, request_rx) = mpsc::channel(1); let connector = ClientConnector::new(request_tx); let (ready_tx, ready_rx) = oneshot::channel(); - task::spawn(RpcClientWorker::new(config, request_rx, framed, ready_tx, protocol_name).run()); + let tracing_id = tracing::Span::current().id(); + task::spawn({ + let span = span!(Level::TRACE, "start_rpc_worker"); + span.follows_from(tracing_id); + + RpcClientWorker::new(config, request_rx, framed, ready_tx, protocol_name) + .run() + .instrument(span) + }); ready_rx .await .expect("ready_rx oneshot is never dropped without a reply")?; @@ -364,6 +373,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send String::from_utf8_lossy(&self.protocol_id) } + #[tracing::instrument(name = "rpc_client_worker run", skip(self), fields(next_request_id= self.next_request_id))] async fn run(mut self) { debug!( target: LOG_TARGET, @@ -472,6 +482,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send Ok(()) } + #[tracing::instrument(name = "rpc_do_request_response", skip(self, reply), err)] async fn do_request_response( &mut self, request: BaseRequest, @@ -494,14 +505,17 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send let (mut response_tx, response_rx) = mpsc::channel(10); if reply.send(response_rx).is_err() { - debug!(target: LOG_TARGET, "Client request was cancelled."); + event!(Level::WARN, "Client request was cancelled"); + warn!(target: LOG_TARGET, "Client request was cancelled."); response_tx.close_channel(); + // TODO: Should this not exit here? } loop { let resp = match self.read_reply().await { Ok(resp) => { let latency = start.elapsed(); + event!(Level::TRACE, "Message received"); trace!( target: LOG_TARGET, "Received response ({} byte(s)) from request #{} (protocol = {}, method={}) in {:.0?}", @@ -522,11 +536,15 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send method, start.elapsed() ); + event!(Level::ERROR, "Response timed out"); let _ = response_tx.send(Err(RpcStatus::timed_out("Response timed out"))).await; response_tx.close_channel(); break; }, - Err(err) => return Err(err), + Err(err) => { + event!(Level::ERROR, "Errored:{}", err); + return Err(err); + }, }; match Self::convert_to_result(resp, request_id) { diff --git a/comms/src/protocol/rpc/handshake.rs b/comms/src/protocol/rpc/handshake.rs index 7c4dca1ae9..3abd62cef6 100644 --- a/comms/src/protocol/rpc/handshake.rs +++ b/comms/src/protocol/rpc/handshake.rs @@ -23,10 +23,10 @@ use crate::{framing::CanonicalFraming, message::MessageExt, proto, protocol::rpc::error::HandshakeRejectReason}; use bytes::BytesMut; use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt}; -use log::*; use prost::{DecodeError, Message}; use std::{io, time::Duration}; use tokio::time; +use tracing::{debug, error, event, span, warn, Instrument, Level}; const LOG_TARGET: &str = "comms::rpc::handshake"; @@ -73,30 +73,48 @@ where T: AsyncRead + AsyncWrite + Unpin } /// Server-side handshake protocol + #[tracing::instrument(name = "rpc::server::perform_server_handshake", skip(self), err, fields(comms.direction="inbound"))] pub async fn perform_server_handshake(&mut self) -> Result { match self.recv_next_frame().await { Ok(Some(Ok(msg))) => { + event!(Level::DEBUG, "Handshake bytes received"); let msg = proto::rpc::RpcSession::decode(&mut msg.freeze())?; let version = SUPPORTED_RPC_VERSIONS .iter() .find(|v| msg.supported_versions.contains(v)); if let Some(version) = version { + event!(Level::INFO, version = version, "Server accepted version"); debug!(target: LOG_TARGET, "Server accepted version {}", version); let reply = proto::rpc::RpcSessionReply { session_result: Some(proto::rpc::rpc_session_reply::SessionResult::AcceptedVersion(*version)), ..Default::default() }; - self.framed.send(reply.to_encoded_bytes().into()).await?; + let span = span!(Level::INFO, "rpc::server::handshake::send_accept_version_reply"); + self.framed + .send(reply.to_encoded_bytes().into()) + .instrument(span) + .await?; return Ok(*version); } + let span = span!(Level::INFO, "rpc::server::handshake::send_rejection"); self.reject_with_reason(HandshakeRejectReason::UnsupportedVersion) + .instrument(span) .await?; Err(RpcHandshakeError::ClientNoSupportedVersion) }, - Ok(Some(Err(err))) => Err(err.into()), - Ok(None) => Err(RpcHandshakeError::ClientClosed), - Err(_elapsed) => Err(RpcHandshakeError::TimedOut), + Ok(Some(Err(err))) => { + event!(Level::ERROR, "Error: {}", err); + Err(err.into()) + }, + Ok(None) => { + event!(Level::ERROR, "Client closed request"); + Err(RpcHandshakeError::ClientClosed) + }, + Err(_elapsed) => { + event!(Level::ERROR, "Timed out"); + Err(RpcHandshakeError::TimedOut) + }, } } @@ -112,6 +130,7 @@ where T: AsyncRead + AsyncWrite + Unpin } /// Client-side handshake protocol + #[tracing::instrument(name = "rpc::client::perform_client_handshake", skip(self), err, fields(comms.direction="outbound"))] pub async fn perform_client_handshake(&mut self) -> Result<(), RpcHandshakeError> { let msg = proto::rpc::RpcSession { supported_versions: SUPPORTED_RPC_VERSIONS.to_vec(), @@ -129,15 +148,26 @@ where T: AsyncRead + AsyncWrite + Unpin Ok(Some(Ok(msg))) => { let msg = proto::rpc::RpcSessionReply::decode(&mut msg.freeze())?; let version = msg.result()?; + event!(Level::INFO, "Server accepted version: {}", version); debug!(target: LOG_TARGET, "Server accepted version {}", version); Ok(()) }, - Ok(Some(Err(err))) => Err(err.into()), - Ok(None) => Err(RpcHandshakeError::ServerClosedRequest), - Err(_) => Err(RpcHandshakeError::TimedOut), + Ok(Some(Err(err))) => { + event!(Level::ERROR, "Error: {}", err); + Err(err.into()) + }, + Ok(None) => { + event!(Level::ERROR, "Server closed request"); + Err(RpcHandshakeError::ServerClosedRequest) + }, + Err(_) => { + event!(Level::ERROR, "Timed out"); + Err(RpcHandshakeError::TimedOut) + }, } } + #[tracing::instrument(name = "rpc::receive_handshake_reply", skip(self), err)] async fn recv_next_frame(&mut self) -> Result>, time::Elapsed> { match self.timeout { Some(timeout) => time::timeout(timeout, self.framed.next()).await, diff --git a/comms/src/protocol/rpc/server/mod.rs b/comms/src/protocol/rpc/server/mod.rs index 7615b1497e..b2b7dbf76f 100644 --- a/comms/src/protocol/rpc/server/mod.rs +++ b/comms/src/protocol/rpc/server/mod.rs @@ -54,7 +54,6 @@ use crate::{ Bytes, }; use futures::{channel::mpsc, AsyncRead, AsyncWrite, SinkExt, StreamExt}; -use log::*; use prost::Message; use std::{ borrow::Cow, @@ -64,6 +63,7 @@ use std::{ use tokio::time; use tower::Service; use tower_make::MakeService; +use tracing::{debug, error, instrument, span, trace, warn, Instrument, Level}; const LOG_TARGET: &str = "comms::rpc"; @@ -286,6 +286,7 @@ where } } + #[tracing::instrument(name = "rpc::server::new_client_connection", skip(self, notification), err)] async fn handle_protocol_notification( &mut self, notification: ProtocolNotification, @@ -312,6 +313,7 @@ where Ok(()) } + #[tracing::instrument(name = "rpc::server::try_initiate_service", skip(self, framed), err)] async fn try_initiate_service( &mut self, protocol: ProtocolId, @@ -436,6 +438,7 @@ where Ok(()) } + #[instrument(name = "rpc::server::handle_req", skip(self), err)] async fn handle(&mut self, mut request: Bytes) -> Result<(), RpcServerError> { let decoded_msg = proto::rpc::RpcRequest::decode(&mut request)?; @@ -646,7 +649,8 @@ where async fn log_timing>(request_id: u32, tag: &str, fut: F) -> R { let t = Instant::now(); - let ret = fut.await; + let span = span!(Level::TRACE, "rpc::internal::timing::{}::{}", request_id, tag); + let ret = fut.instrument(span).await; let elapsed = t.elapsed(); trace!( target: LOG_TARGET, diff --git a/comms/src/socks/client.rs b/comms/src/socks/client.rs index b9d7dc255d..f155f2f420 100644 --- a/comms/src/socks/client.rs +++ b/comms/src/socks/client.rs @@ -104,6 +104,7 @@ where TSocket: AsyncRead + AsyncWrite + Unpin /// Connects to a address through a SOCKS5 proxy and returns the 'upgraded' socket. This consumes the /// `Socks5Client` as once connected, the socks protocol does not recognise any further commands. + #[tracing::instrument(name = "socks::connect", skip(self), err)] pub async fn connect(mut self, address: &Multiaddr) -> Result<(TSocket, Multiaddr)> { let address = self.execute_command(Command::Connect, address).await?; Ok((self.protocol.socket, address)) @@ -111,6 +112,7 @@ where TSocket: AsyncRead + AsyncWrite + Unpin /// Requests the tor proxy to resolve a DNS address is resolved into an IP address. /// This operation only works with the tor SOCKS proxy. + #[tracing::instrument(name = "socks:tor_resolve", skip(self), err)] pub async fn tor_resolve(&mut self, address: &Multiaddr) -> Result { // Tor resolve does not return the port back let (dns, rest) = multiaddr_split_first(&address); @@ -124,6 +126,7 @@ where TSocket: AsyncRead + AsyncWrite + Unpin /// Requests the tor proxy to reverse resolve an IP address into a DNS address if it is able. /// This operation only works with the tor SOCKS proxy. + #[tracing::instrument(name = "socks::tor_resolve_ptr", skip(self), err)] pub async fn tor_resolve_ptr(&mut self, address: &Multiaddr) -> Result { self.execute_command(Command::TorResolvePtr, address).await } diff --git a/comms/src/test_utils/mocks/connection_manager.rs b/comms/src/test_utils/mocks/connection_manager.rs index 1637074ff7..cc489af60e 100644 --- a/comms/src/test_utils/mocks/connection_manager.rs +++ b/comms/src/test_utils/mocks/connection_manager.rs @@ -131,7 +131,11 @@ impl ConnectionManagerMock { self.state.inc_call_count(); self.state.add_call(format!("{:?}", req)).await; match req { - DialPeer(node_id, reply_tx) => { + DialPeer { + node_id, + reply_tx, + tracing_id: _, + } => { // Send Ok(conn) if we have an active connection, otherwise Err(DialConnectFailedAllAddresses) let _ = reply_tx.send( self.state diff --git a/comms/src/test_utils/mocks/connectivity_manager.rs b/comms/src/test_utils/mocks/connectivity_manager.rs index c3479006d2..122a60127b 100644 --- a/comms/src/test_utils/mocks/connectivity_manager.rs +++ b/comms/src/test_utils/mocks/connectivity_manager.rs @@ -204,15 +204,19 @@ impl ConnectivityManagerMock { use ConnectivityRequest::*; self.state.add_call(format!("{:?}", req)).await; match req { - DialPeer(node_id, reply) => { + DialPeer { + node_id, + reply_tx, + tracing_id: _, + } => { // Send Ok(conn) if we have an active connection, otherwise Err(DialConnectFailedAllAddresses) self.state .with_state(|state| match state.pending_conns.get_mut(&node_id) { Some(replies) => { - replies.push(reply); + replies.push(reply_tx); }, None => { - let _ = reply.send( + let _ = reply_tx.send( state .active_conns .get(&node_id) diff --git a/comms/src/test_utils/mocks/peer_connection.rs b/comms/src/test_utils/mocks/peer_connection.rs index 00756cda1c..13b0c77bbf 100644 --- a/comms/src/test_utils/mocks/peer_connection.rs +++ b/comms/src/test_utils/mocks/peer_connection.rs @@ -190,9 +190,16 @@ impl PeerConnectionMock { use PeerConnectionRequest::*; self.state.inc_call_count(); match req { - OpenSubstream(protocol, reply_tx) => match self.state.open_substream().await { + OpenSubstream { + protocol_id, + reply_tx, + tracing_id: _, + } => match self.state.open_substream().await { Ok(stream) => { - let negotiated_substream = NegotiatedSubstream { protocol, stream }; + let negotiated_substream = NegotiatedSubstream { + protocol: protocol_id, + stream, + }; reply_tx.send(Ok(negotiated_substream)).unwrap(); }, Err(err) => {