diff --git a/Cargo.lock b/Cargo.lock index be1af2e31a..647c2ecfe3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4696,6 +4696,8 @@ dependencies = [ "crossterm", "futures 0.3.15", "log 0.4.14", + "opentelemetry", + "opentelemetry-jaeger", "qrcode", "rand 0.8.4", "rpassword", @@ -4716,6 +4718,9 @@ dependencies = [ "thiserror", "tokio 0.2.25", "tonic", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", "tui", "unicode-segmentation", "unicode-width", diff --git a/applications/tari_base_node/src/main.rs b/applications/tari_base_node/src/main.rs index 7e4261d7ee..0a0f4dcf1f 100644 --- a/applications/tari_base_node/src/main.rs +++ b/applications/tari_base_node/src/main.rs @@ -98,10 +98,11 @@ mod utils; use crate::command_handler::CommandHandler; use futures::{pin_mut, FutureExt}; use log::*; -use opentelemetry::{self, global}; +use opentelemetry::{self, global, KeyValue}; use parser::Parser; use rustyline::{config::OutputStreamType, error::ReadlineError, CompletionType, Config, EditMode, Editor}; use std::{ + env, net::SocketAddr, process, sync::Arc, @@ -153,27 +154,7 @@ fn main_inner() -> Result<(), ExitCodes> { /// Sets up the base node and runs the cli_loop async fn run_node(node_config: Arc, bootstrap: ConfigBootstrap) -> Result<(), ExitCodes> { - if bootstrap.tracing_enabled { - global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); - let tracer = opentelemetry_jaeger::new_pipeline() - .with_service_name("tari") - // TODO: uncomment when using tokio 1 - // .install_batch(opentelemetry::runtime::Tokio) - .install_simple() - .unwrap(); - - // let opentelemetry = tracing_opentelemetry::subscriber().with_tracer(tracer); - // tracing_subscriber::registry().with(opentelemetry).try_init()?; - - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - - // Use the tracing subscriber `Registry`, or any other subscriber - // that impls `LookupSpan` - let subscriber = Registry::default().with(telemetry); - - // TODO: this should be called in the executable instead of here as suggested in https://docs.rs/tracing/0.1.26/tracing/ - tracing::subscriber::set_global_default(subscriber).unwrap(); - } + enable_tracing_if_specified(&bootstrap); // Load or create the Node identity let node_identity = setup_node_identity( &node_config.base_node_identity_file, @@ -266,6 +247,25 @@ async fn run_node(node_config: Arc, bootstrap: ConfigBootstrap) -> Ok(()) } +fn enable_tracing_if_specified(bootstrap: &ConfigBootstrap) { + if bootstrap.tracing_enabled { + // To run: docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 \ + // jaegertracing/all-in-one:latest + global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); + let tracer = opentelemetry_jaeger::new_pipeline() + .with_service_name("tari::base_node") + .with_tags(vec![KeyValue::new("pid", process::id().to_string()), KeyValue::new("current_exe", env::current_exe().unwrap().to_str().unwrap_or_default().to_owned())]) + // TODO: uncomment when using tokio 1 + // .install_batch(opentelemetry::runtime::Tokio) + .install_simple() + .unwrap(); + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + let subscriber = Registry::default().with(telemetry); + tracing::subscriber::set_global_default(subscriber) + .expect("Tracing could not be set. Try running without `--tracing-enabled`"); + } +} + /// Runs the gRPC server async fn run_grpc( grpc: crate::grpc::base_node_grpc_server::BaseNodeGrpcServer, diff --git a/applications/tari_console_wallet/Cargo.toml b/applications/tari_console_wallet/Cargo.toml index 53939552f3..f8f2e24104 100644 --- a/applications/tari_console_wallet/Cargo.toml +++ b/applications/tari_console_wallet/Cargo.toml @@ -34,6 +34,15 @@ tokio = { version="0.2.10", features = ["signal"] } thiserror = "1.0.20" tonic = "0.2" +tracing = "0.1.26" +tracing-opentelemetry = "0.15.0" +tracing-subscriber = "0.2.20" + +# network tracing, rt-tokio for async batch export +opentelemetry = { version = "0.16", default-features = false, features = ["trace","rt-tokio"] } +opentelemetry-jaeger = { version="0.15", features=["rt-tokio"]} + + [dependencies.tari_core] path = "../../base_layer/core" version = "^0.9" diff --git a/applications/tari_console_wallet/src/main.rs b/applications/tari_console_wallet/src/main.rs index 25f749034e..4042eb27eb 100644 --- a/applications/tari_console_wallet/src/main.rs +++ b/applications/tari_console_wallet/src/main.rs @@ -19,12 +19,14 @@ use init::{ WalletBoot, }; use log::*; +use opentelemetry::{self, global, KeyValue}; use recovery::prompt_private_key_from_seed_words; -use std::process; +use std::{env, process}; use tari_app_utilities::{consts, initialization::init_configuration, utilities::ExitCodes}; use tari_common::{configuration::bootstrap::ApplicationType, ConfigBootstrap}; use tari_core::transactions::types::PrivateKey; use tari_shutdown::Shutdown; +use tracing_subscriber::{layer::SubscriberExt, Registry}; use wallet_modes::{command_mode, grpc_mode, recovery_mode, script_mode, tui_mode, WalletMode}; pub const LOG_TARGET: &str = "wallet::console_wallet::main"; @@ -90,6 +92,7 @@ fn main_inner() -> Result<(), ExitCodes> { info!(target: LOG_TARGET, "Default configuration created. Done."); } + enable_tracing_if_specified(&bootstrap); // get command line password if provided let arg_password = bootstrap.password.clone(); let seed_words_file_name = bootstrap.seed_words_file_name.clone(); @@ -185,3 +188,22 @@ fn get_recovery_master_key( Ok(None) } } + +fn enable_tracing_if_specified(bootstrap: &ConfigBootstrap) { + if bootstrap.tracing_enabled { + // To run: docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 \ + // jaegertracing/all-in-one:latest + global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); + let tracer = opentelemetry_jaeger::new_pipeline() + .with_service_name("tari::console_wallet") + .with_tags(vec![KeyValue::new("pid", process::id().to_string()), KeyValue::new("current_exe", env::current_exe().unwrap().to_str().unwrap_or_default().to_owned())]) + // TODO: uncomment when using tokio 1 + // .install_batch(opentelemetry::runtime::Tokio) + .install_simple() + .unwrap(); + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + let subscriber = Registry::default().with(telemetry); + tracing::subscriber::set_global_default(subscriber) + .expect("Tracing could not be set. Try running without `--tracing-enabled`"); + } +} diff --git a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs index fccafbfcc1..689c9a8316 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/block_sync.rs @@ -27,7 +27,7 @@ use crate::{ sync::BlockSynchronizer, BaseNodeStateMachine, }, - chain_storage::BlockchainBackend, + chain_storage::{BlockAddResult, BlockchainBackend}, }; use log::*; use std::time::Instant; @@ -71,14 +71,14 @@ impl BlockSync { bootstrapped, state_info: StateInfo::BlockSyncStarting, }); - // let local_nci = shared.local_node_interface.clone(); + let local_nci = shared.local_node_interface.clone(); synchronizer.on_progress(move |block, remote_tip_height, sync_peers| { let local_height = block.height(); - // local_nci.publish_block_event(BlockEvent::ValidBlockAdded( - // block.block().clone().into(), - // BlockAddResult::Ok(block), - // false.into(), - // )); + local_nci.publish_block_event(BlockEvent::ValidBlockAdded( + block.block().clone().into(), + BlockAddResult::Ok(block), + false.into(), + )); let _ = status_event_sender.broadcast(StatusInfo { bootstrapped, diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index 390a52086a..69c44fdf36 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -87,7 +87,7 @@ impl BlockSynchronizer { self.hooks.add_on_complete_hook(hook); } - #[tracing::instrument(level = "info", skip(self), err)] + #[tracing::instrument(skip(self), err)] pub async fn synchronize(&mut self) -> Result<(), BlockSyncError> { let peer_conn = self.get_next_sync_peer().await?; let node_id = peer_conn.peer_node_id().clone(); diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index 908e34df35..38ec1ac116 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -254,7 +254,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { Ok(()) } - #[tracing::instrument(level = "info", skip(self, conn), err)] + #[tracing::instrument(skip(self, conn), err)] async fn attempt_sync(&mut self, mut conn: PeerConnection) -> Result<(), BlockHeaderSyncError> { let peer = conn.peer_node_id().clone(); let mut client = conn.connect_rpc::().await?; diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index 97637e9b88..776c2b7e01 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -118,7 +118,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ const BATCH_SIZE: usize = 4; let (mut tx, rx) = mpsc::channel(BATCH_SIZE); - let span = span!(Level::INFO, "sync_rpc::block_sync::inner_worker"); + let span = span!(Level::TRACE, "sync_rpc::block_sync::inner_worker"); task::spawn( async move { let iter = NonOverlappingIntegerPairIter::new(start, end + 1, BATCH_SIZE); @@ -210,7 +210,7 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ ); let (mut tx, rx) = mpsc::channel(chunk_size); - let span = span!(Level::INFO, "sync_rpc::sync_headers::inner_worker"); + let span = span!(Level::TRACE, "sync_rpc::sync_headers::inner_worker"); task::spawn( async move { let iter = NonOverlappingIntegerPairIter::new( diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 769ac64cfd..3014a240a0 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -1560,8 +1560,8 @@ fn handle_possible_reorg( }, // We want a warning if the number of removed blocks is at least 2. "Chain reorg required from {} to {} (accum_diff:{}, hash:{}) to (accum_diff:{}, hash:{}). Number of \ blocks to remove: {}, to add: {}.", - tip_header.header(), - fork_header.header(), + tip_header.header().height, + fork_header.header().height, tip_header.accumulated_data().total_accumulated_difficulty, tip_header.accumulated_data().hash.to_hex(), fork_header.accumulated_data().total_accumulated_difficulty, diff --git a/comms/src/connection_manager/dialer.rs b/comms/src/connection_manager/dialer.rs index e9c96fff2e..538ffe89bd 100644 --- a/comms/src/connection_manager/dialer.rs +++ b/comms/src/connection_manager/dialer.rs @@ -283,7 +283,7 @@ where let noise_config = self.noise_config.clone(); let config = self.config.clone(); - let span = span!(Level::INFO, "handle_dial_peer_request_inner1"); + let span = span!(Level::TRACE, "handle_dial_peer_request_inner1"); let dial_fut = async move { let (dial_state, dial_result) = Self::dial_peer_with_retry(dial_state, noise_config, transport, backoff, &config).await; diff --git a/comms/src/connection_manager/listener.rs b/comms/src/connection_manager/listener.rs index d07381ba33..60ae3c2d12 100644 --- a/comms/src/connection_manager/listener.rs +++ b/comms/src/connection_manager/listener.rs @@ -245,7 +245,7 @@ where let liveness_session_count = self.liveness_session_count.clone(); let shutdown_signal = self.shutdown_signal.clone(); - let span = span!(Level::INFO, "connection_mann::listener::inbound_task",); + let span = span!(Level::TRACE, "connection_mann::listener::inbound_task",); let inbound_fut = async move { match Self::read_wire_format(&mut socket, config.time_to_first_byte).await { Ok(WireMode::Comms(byte)) if byte == config.network_info.network_byte => { diff --git a/comms/src/connection_manager/manager.rs b/comms/src/connection_manager/manager.rs index fdf3e33583..928b53611f 100644 --- a/comms/src/connection_manager/manager.rs +++ b/comms/src/connection_manager/manager.rs @@ -358,7 +358,7 @@ where reply_tx, tracing_id: _tracing, } => { - let span = span!(Level::INFO, "connection_manager::handle_request"); + let span = span!(Level::TRACE, "connection_manager::handle_request"); // This causes a panic for some reason? // span.follows_from(tracing_id); self.dial_peer(node_id, reply_tx).instrument(span).await diff --git a/comms/src/connection_manager/peer_connection.rs b/comms/src/connection_manager/peer_connection.rs index 4398168ebe..6f0d90da5d 100644 --- a/comms/src/connection_manager/peer_connection.rs +++ b/comms/src/connection_manager/peer_connection.rs @@ -373,7 +373,7 @@ impl PeerConnectionActor { reply_tx, tracing_id, } => { - let span = span!(Level::INFO, "handle_request"); + let span = span!(Level::TRACE, "handle_request"); span.follows_from(tracing_id); let result = self.open_negotiated_protocol_stream(protocol_id).instrument(span).await; log_if_error_fmt!( diff --git a/comms/src/connection_manager/requester.rs b/comms/src/connection_manager/requester.rs index 7cfdd2de11..1f3f5cc887 100644 --- a/comms/src/connection_manager/requester.rs +++ b/comms/src/connection_manager/requester.rs @@ -78,7 +78,7 @@ impl ConnectionManagerRequester { } /// Attempt to connect to a remote peer - #[tracing::instrument(level = "info", skip(self), err)] + #[tracing::instrument(skip(self), err)] pub async fn dial_peer(&mut self, node_id: NodeId) -> Result { let (reply_tx, reply_rx) = oneshot::channel(); self.send_dial_peer(node_id, Some(reply_tx)).await?; @@ -97,7 +97,7 @@ impl ConnectionManagerRequester { } /// Send instruction to ConnectionManager to dial a peer and return the result on the given oneshot - #[tracing::instrument(level = "info", skip(self, reply_tx), err)] + #[tracing::instrument(skip(self, reply_tx), err)] pub(crate) async fn send_dial_peer( &mut self, node_id: NodeId, @@ -124,7 +124,7 @@ impl ConnectionManagerRequester { } /// Send instruction to ConnectionManager to dial a peer without waiting for a result. - #[tracing::instrument(level = "info", skip(self), err)] + #[tracing::instrument(skip(self), err)] pub(crate) async fn send_dial_peer_no_reply(&mut self, node_id: NodeId) -> Result<(), ConnectionManagerError> { self.send_dial_peer(node_id, None).await?; Ok(()) diff --git a/comms/src/connectivity/manager.rs b/comms/src/connectivity/manager.rs index 85c28fff94..05d9e44e13 100644 --- a/comms/src/connectivity/manager.rs +++ b/comms/src/connectivity/manager.rs @@ -157,7 +157,7 @@ impl ConnectivityManagerActor { task::spawn(Self::run(self)) } - #[tracing::instrument(level = "info", name = "connectivity_manager_actor::run", skip(self))] + #[tracing::instrument(name = "connectivity_manager_actor::run", skip(self))] pub async fn run(mut self) { info!(target: LOG_TARGET, "ConnectivityManager started"); let mut shutdown_signal = self @@ -223,7 +223,7 @@ impl ConnectivityManagerActor { reply_tx, tracing_id, } => { - let span = span!(Level::INFO, "handle_request"); + let span = span!(Level::TRACE, "handle_request"); // let _e = span.enter(); span.follows_from(tracing_id); async move { @@ -450,7 +450,7 @@ impl ConnectivityManagerActor { Ok(conns.into_iter().cloned().collect()) } - #[tracing::instrument(level = "info", skip(self))] + #[tracing::instrument(skip(self))] async fn add_managed_peers(&mut self, node_ids: Vec) { let pool = &mut self.pool; let mut should_update_connectivity = false; diff --git a/comms/src/connectivity/requester.rs b/comms/src/connectivity/requester.rs index ab295dd971..740c8a6c81 100644 --- a/comms/src/connectivity/requester.rs +++ b/comms/src/connectivity/requester.rs @@ -128,7 +128,7 @@ impl ConnectivityRequester { self.event_tx.clone() } - #[tracing::instrument(level = "info", skip(self), err)] + #[tracing::instrument(skip(self), err)] pub async fn dial_peer(&mut self, peer: NodeId) -> Result { let mut num_cancels = 0; loop { diff --git a/comms/src/multiplexing/yamux.rs b/comms/src/multiplexing/yamux.rs index bc771a4198..1ba104ce04 100644 --- a/comms/src/multiplexing/yamux.rs +++ b/comms/src/multiplexing/yamux.rs @@ -30,10 +30,9 @@ use futures::{ Stream, StreamExt, }; -use log::*; use std::{future::Future, io, pin::Pin, sync::Arc, task::Poll}; use tari_shutdown::{Shutdown, ShutdownSignal}; -use tracing::{self, event, Level}; +use tracing::{self, debug, error, event, Level}; use yamux::Mode; type IncomingRx = mpsc::Receiver; @@ -261,7 +260,7 @@ where S: Stream> + Unpin while let Some(result) = mux_stream.next().await { match result { Ok(stream) => { - event!(Level::INFO, "yamux::stream received {}", stream); + event!(Level::TRACE, "yamux::stream received {}", stream); if self.sender.send(stream).await.is_err() { debug!( target: LOG_TARGET, diff --git a/comms/src/protocol/messaging/outbound.rs b/comms/src/protocol/messaging/outbound.rs index 74073f405c..9d47895338 100644 --- a/comms/src/protocol/messaging/outbound.rs +++ b/comms/src/protocol/messaging/outbound.rs @@ -86,7 +86,7 @@ impl OutboundMessaging { match self.run_inner().await { Ok(_) => { event!( - Level::INFO, + Level::DEBUG, "Outbound messaging for peer has stopped because the stream was closed" ); @@ -127,7 +127,7 @@ impl OutboundMessaging { let substream = loop { match self.try_establish().await { Ok(substream) => { - event!(Level::INFO, "Substream established"); + event!(Level::DEBUG, "Substream established"); break substream; }, Err(err) => { @@ -290,8 +290,7 @@ impl OutboundMessaging { stream .map(|msg| { msg.map(|mut out_msg| { - event!(Level::INFO, "Message buffered for sending {}", out_msg); - trace!(target: LOG_TARGET, "Message buffered for sending {}", out_msg); + event!(Level::DEBUG, "Message buffered for sending {}", out_msg); out_msg.reply_success(); out_msg.body }) diff --git a/comms/src/protocol/rpc/client.rs b/comms/src/protocol/rpc/client.rs index 9b4ca8349f..2806366f9f 100644 --- a/comms/src/protocol/rpc/client.rs +++ b/comms/src/protocol/rpc/client.rs @@ -86,7 +86,7 @@ impl RpcClient { let (ready_tx, ready_rx) = oneshot::channel(); let tracing_id = tracing::Span::current().id(); task::spawn({ - let span = span!(Level::INFO, "start_rpc_worker"); + let span = span!(Level::TRACE, "start_rpc_worker"); span.follows_from(tracing_id); RpcClientWorker::new(config, request_rx, framed, ready_tx, protocol_name) @@ -515,7 +515,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send let resp = match self.read_reply().await { Ok(resp) => { let latency = start.elapsed(); - event!(Level::INFO, "Message received"); + event!(Level::TRACE, "Message received"); trace!( target: LOG_TARGET, "Received response ({} byte(s)) from request #{} (protocol = {}, method={}) in {:.0?}", diff --git a/comms/src/protocol/rpc/handshake.rs b/comms/src/protocol/rpc/handshake.rs index 9b67f243c2..3abd62cef6 100644 --- a/comms/src/protocol/rpc/handshake.rs +++ b/comms/src/protocol/rpc/handshake.rs @@ -23,11 +23,10 @@ use crate::{framing::CanonicalFraming, message::MessageExt, proto, protocol::rpc::error::HandshakeRejectReason}; use bytes::BytesMut; use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt}; -use log::*; use prost::{DecodeError, Message}; use std::{io, time::Duration}; use tokio::time; -use tracing::{event, span, Instrument, Level}; +use tracing::{debug, error, event, span, warn, Instrument, Level}; const LOG_TARGET: &str = "comms::rpc::handshake"; @@ -78,7 +77,7 @@ where T: AsyncRead + AsyncWrite + Unpin pub async fn perform_server_handshake(&mut self) -> Result { match self.recv_next_frame().await { Ok(Some(Ok(msg))) => { - event!(Level::INFO, "Handshake bytes received"); + event!(Level::DEBUG, "Handshake bytes received"); let msg = proto::rpc::RpcSession::decode(&mut msg.freeze())?; let version = SUPPORTED_RPC_VERSIONS .iter() diff --git a/comms/src/protocol/rpc/server/mod.rs b/comms/src/protocol/rpc/server/mod.rs index 0572396ef3..b2b7dbf76f 100644 --- a/comms/src/protocol/rpc/server/mod.rs +++ b/comms/src/protocol/rpc/server/mod.rs @@ -54,7 +54,6 @@ use crate::{ Bytes, }; use futures::{channel::mpsc, AsyncRead, AsyncWrite, SinkExt, StreamExt}; -use log::*; use prost::Message; use std::{ borrow::Cow, @@ -64,7 +63,7 @@ use std::{ use tokio::time; use tower::Service; use tower_make::MakeService; -use tracing::{instrument, span, Instrument, Level}; +use tracing::{debug, error, instrument, span, trace, warn, Instrument, Level}; const LOG_TARGET: &str = "comms::rpc"; @@ -650,7 +649,7 @@ where async fn log_timing>(request_id: u32, tag: &str, fut: F) -> R { let t = Instant::now(); - let span = span!(Level::INFO, "rpc::internal::timing::{}::{}", request_id, tag); + let span = span!(Level::TRACE, "rpc::internal::timing::{}::{}", request_id, tag); let ret = fut.instrument(span).await; let elapsed = t.elapsed(); trace!(