Skip to content

Commit

Permalink
tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Aug 26, 2021
1 parent b827fc5 commit b04968b
Show file tree
Hide file tree
Showing 21 changed files with 94 additions and 62 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 22 additions & 22 deletions applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ mod utils;
use crate::command_handler::CommandHandler;
use futures::{pin_mut, FutureExt};
use log::*;
use opentelemetry::{self, global};
use opentelemetry::{self, global, KeyValue};
use parser::Parser;
use rustyline::{config::OutputStreamType, error::ReadlineError, CompletionType, Config, EditMode, Editor};
use std::{
env,
net::SocketAddr,
process,
sync::Arc,
Expand Down Expand Up @@ -153,27 +154,7 @@ fn main_inner() -> Result<(), ExitCodes> {

/// Sets up the base node and runs the cli_loop
async fn run_node(node_config: Arc<GlobalConfig>, bootstrap: ConfigBootstrap) -> Result<(), ExitCodes> {
if bootstrap.tracing_enabled {
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("tari")
// TODO: uncomment when using tokio 1
// .install_batch(opentelemetry::runtime::Tokio)
.install_simple()
.unwrap();

// let opentelemetry = tracing_opentelemetry::subscriber().with_tracer(tracer);
// tracing_subscriber::registry().with(opentelemetry).try_init()?;

let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

// Use the tracing subscriber `Registry`, or any other subscriber
// that impls `LookupSpan`
let subscriber = Registry::default().with(telemetry);

// TODO: this should be called in the executable instead of here as suggested in https://docs.rs/tracing/0.1.26/tracing/
tracing::subscriber::set_global_default(subscriber).unwrap();
}
enable_tracing_if_specified(&bootstrap);
// Load or create the Node identity
let node_identity = setup_node_identity(
&node_config.base_node_identity_file,
Expand Down Expand Up @@ -266,6 +247,25 @@ async fn run_node(node_config: Arc<GlobalConfig>, bootstrap: ConfigBootstrap) ->
Ok(())
}

fn enable_tracing_if_specified(bootstrap: &ConfigBootstrap) {
if bootstrap.tracing_enabled {
// To run: docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 \
// jaegertracing/all-in-one:latest
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("tari::base_node")
.with_tags(vec![KeyValue::new("pid", process::id().to_string()), KeyValue::new("current_exe", env::current_exe().unwrap().to_str().unwrap_or_default().to_owned())])
// TODO: uncomment when using tokio 1
// .install_batch(opentelemetry::runtime::Tokio)
.install_simple()
.unwrap();
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default().with(telemetry);
tracing::subscriber::set_global_default(subscriber)
.expect("Tracing could not be set. Try running without `--tracing-enabled`");
}
}

/// Runs the gRPC server
async fn run_grpc(
grpc: crate::grpc::base_node_grpc_server::BaseNodeGrpcServer,
Expand Down
9 changes: 9 additions & 0 deletions applications/tari_console_wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ tokio = { version="0.2.10", features = ["signal"] }
thiserror = "1.0.20"
tonic = "0.2"

tracing = "0.1.26"
tracing-opentelemetry = "0.15.0"
tracing-subscriber = "0.2.20"

# network tracing, rt-tokio for async batch export
opentelemetry = { version = "0.16", default-features = false, features = ["trace","rt-tokio"] }
opentelemetry-jaeger = { version="0.15", features=["rt-tokio"]}


[dependencies.tari_core]
path = "../../base_layer/core"
version = "^0.9"
Expand Down
24 changes: 23 additions & 1 deletion applications/tari_console_wallet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ use init::{
WalletBoot,
};
use log::*;
use opentelemetry::{self, global, KeyValue};
use recovery::prompt_private_key_from_seed_words;
use std::process;
use std::{env, process};
use tari_app_utilities::{consts, initialization::init_configuration, utilities::ExitCodes};
use tari_common::{configuration::bootstrap::ApplicationType, ConfigBootstrap};
use tari_core::transactions::types::PrivateKey;
use tari_shutdown::Shutdown;
use tracing_subscriber::{layer::SubscriberExt, Registry};
use wallet_modes::{command_mode, grpc_mode, recovery_mode, script_mode, tui_mode, WalletMode};

pub const LOG_TARGET: &str = "wallet::console_wallet::main";
Expand Down Expand Up @@ -90,6 +92,7 @@ fn main_inner() -> Result<(), ExitCodes> {
info!(target: LOG_TARGET, "Default configuration created. Done.");
}

enable_tracing_if_specified(&bootstrap);
// get command line password if provided
let arg_password = bootstrap.password.clone();
let seed_words_file_name = bootstrap.seed_words_file_name.clone();
Expand Down Expand Up @@ -185,3 +188,22 @@ fn get_recovery_master_key(
Ok(None)
}
}

fn enable_tracing_if_specified(bootstrap: &ConfigBootstrap) {
if bootstrap.tracing_enabled {
// To run: docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 \
// jaegertracing/all-in-one:latest
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("tari::console_wallet")
.with_tags(vec![KeyValue::new("pid", process::id().to_string()), KeyValue::new("current_exe", env::current_exe().unwrap().to_str().unwrap_or_default().to_owned())])
// TODO: uncomment when using tokio 1
// .install_batch(opentelemetry::runtime::Tokio)
.install_simple()
.unwrap();
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default().with(telemetry);
tracing::subscriber::set_global_default(subscriber)
.expect("Tracing could not be set. Try running without `--tracing-enabled`");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
sync::BlockSynchronizer,
BaseNodeStateMachine,
},
chain_storage::BlockchainBackend,
chain_storage::{BlockAddResult, BlockchainBackend},
};
use log::*;
use std::time::Instant;
Expand Down Expand Up @@ -71,14 +71,14 @@ impl BlockSync {
bootstrapped,
state_info: StateInfo::BlockSyncStarting,
});
// let local_nci = shared.local_node_interface.clone();
let local_nci = shared.local_node_interface.clone();
synchronizer.on_progress(move |block, remote_tip_height, sync_peers| {
let local_height = block.height();
// local_nci.publish_block_event(BlockEvent::ValidBlockAdded(
// block.block().clone().into(),
// BlockAddResult::Ok(block),
// false.into(),
// ));
local_nci.publish_block_event(BlockEvent::ValidBlockAdded(
block.block().clone().into(),
BlockAddResult::Ok(block),
false.into(),
));

let _ = status_event_sender.broadcast(StatusInfo {
bootstrapped,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl<B: BlockchainBackend + 'static> BlockSynchronizer<B> {
self.hooks.add_on_complete_hook(hook);
}

#[tracing::instrument(level = "info", skip(self), err)]
#[tracing::instrument(skip(self), err)]
pub async fn synchronize(&mut self) -> Result<(), BlockSyncError> {
let peer_conn = self.get_next_sync_peer().await?;
let node_id = peer_conn.peer_node_id().clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
Ok(())
}

#[tracing::instrument(level = "info", skip(self, conn), err)]
#[tracing::instrument(skip(self, conn), err)]
async fn attempt_sync(&mut self, mut conn: PeerConnection) -> Result<(), BlockHeaderSyncError> {
let peer = conn.peer_node_id().clone();
let mut client = conn.connect_rpc::<rpc::BaseNodeSyncRpcClient>().await?;
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
const BATCH_SIZE: usize = 4;
let (mut tx, rx) = mpsc::channel(BATCH_SIZE);

let span = span!(Level::INFO, "sync_rpc::block_sync::inner_worker");
let span = span!(Level::TRACE, "sync_rpc::block_sync::inner_worker");
task::spawn(
async move {
let iter = NonOverlappingIntegerPairIter::new(start, end + 1, BATCH_SIZE);
Expand Down Expand Up @@ -210,7 +210,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
);

let (mut tx, rx) = mpsc::channel(chunk_size);
let span = span!(Level::INFO, "sync_rpc::sync_headers::inner_worker");
let span = span!(Level::TRACE, "sync_rpc::sync_headers::inner_worker");
task::spawn(
async move {
let iter = NonOverlappingIntegerPairIter::new(
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1560,8 +1560,8 @@ fn handle_possible_reorg<T: BlockchainBackend>(
}, // We want a warning if the number of removed blocks is at least 2.
"Chain reorg required from {} to {} (accum_diff:{}, hash:{}) to (accum_diff:{}, hash:{}). Number of \
blocks to remove: {}, to add: {}.",
tip_header.header(),
fork_header.header(),
tip_header.header().height,
fork_header.header().height,
tip_header.accumulated_data().total_accumulated_difficulty,
tip_header.accumulated_data().hash.to_hex(),
fork_header.accumulated_data().total_accumulated_difficulty,
Expand Down
2 changes: 1 addition & 1 deletion comms/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ where
let noise_config = self.noise_config.clone();
let config = self.config.clone();

let span = span!(Level::INFO, "handle_dial_peer_request_inner1");
let span = span!(Level::TRACE, "handle_dial_peer_request_inner1");
let dial_fut = async move {
let (dial_state, dial_result) =
Self::dial_peer_with_retry(dial_state, noise_config, transport, backoff, &config).await;
Expand Down
2 changes: 1 addition & 1 deletion comms/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ where
let liveness_session_count = self.liveness_session_count.clone();
let shutdown_signal = self.shutdown_signal.clone();

let span = span!(Level::INFO, "connection_mann::listener::inbound_task",);
let span = span!(Level::TRACE, "connection_mann::listener::inbound_task",);
let inbound_fut = async move {
match Self::read_wire_format(&mut socket, config.time_to_first_byte).await {
Ok(WireMode::Comms(byte)) if byte == config.network_info.network_byte => {
Expand Down
2 changes: 1 addition & 1 deletion comms/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ where
reply_tx,
tracing_id: _tracing,
} => {
let span = span!(Level::INFO, "connection_manager::handle_request");
let span = span!(Level::TRACE, "connection_manager::handle_request");
// This causes a panic for some reason?
// span.follows_from(tracing_id);
self.dial_peer(node_id, reply_tx).instrument(span).await
Expand Down
2 changes: 1 addition & 1 deletion comms/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ impl PeerConnectionActor {
reply_tx,
tracing_id,
} => {
let span = span!(Level::INFO, "handle_request");
let span = span!(Level::TRACE, "handle_request");
span.follows_from(tracing_id);
let result = self.open_negotiated_protocol_stream(protocol_id).instrument(span).await;
log_if_error_fmt!(
Expand Down
6 changes: 3 additions & 3 deletions comms/src/connection_manager/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl ConnectionManagerRequester {
}

/// Attempt to connect to a remote peer
#[tracing::instrument(level = "info", skip(self), err)]
#[tracing::instrument(skip(self), err)]
pub async fn dial_peer(&mut self, node_id: NodeId) -> Result<PeerConnection, ConnectionManagerError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.send_dial_peer(node_id, Some(reply_tx)).await?;
Expand All @@ -97,7 +97,7 @@ impl ConnectionManagerRequester {
}

/// Send instruction to ConnectionManager to dial a peer and return the result on the given oneshot
#[tracing::instrument(level = "info", skip(self, reply_tx), err)]
#[tracing::instrument(skip(self, reply_tx), err)]
pub(crate) async fn send_dial_peer(
&mut self,
node_id: NodeId,
Expand All @@ -124,7 +124,7 @@ impl ConnectionManagerRequester {
}

/// Send instruction to ConnectionManager to dial a peer without waiting for a result.
#[tracing::instrument(level = "info", skip(self), err)]
#[tracing::instrument(skip(self), err)]
pub(crate) async fn send_dial_peer_no_reply(&mut self, node_id: NodeId) -> Result<(), ConnectionManagerError> {
self.send_dial_peer(node_id, None).await?;
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions comms/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl ConnectivityManagerActor {
task::spawn(Self::run(self))
}

#[tracing::instrument(level = "info", name = "connectivity_manager_actor::run", skip(self))]
#[tracing::instrument(name = "connectivity_manager_actor::run", skip(self))]
pub async fn run(mut self) {
info!(target: LOG_TARGET, "ConnectivityManager started");
let mut shutdown_signal = self
Expand Down Expand Up @@ -223,7 +223,7 @@ impl ConnectivityManagerActor {
reply_tx,
tracing_id,
} => {
let span = span!(Level::INFO, "handle_request");
let span = span!(Level::TRACE, "handle_request");
// let _e = span.enter();
span.follows_from(tracing_id);
async move {
Expand Down Expand Up @@ -450,7 +450,7 @@ impl ConnectivityManagerActor {
Ok(conns.into_iter().cloned().collect())
}

#[tracing::instrument(level = "info", skip(self))]
#[tracing::instrument(skip(self))]
async fn add_managed_peers(&mut self, node_ids: Vec<NodeId>) {
let pool = &mut self.pool;
let mut should_update_connectivity = false;
Expand Down
2 changes: 1 addition & 1 deletion comms/src/connectivity/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl ConnectivityRequester {
self.event_tx.clone()
}

#[tracing::instrument(level = "info", skip(self), err)]
#[tracing::instrument(skip(self), err)]
pub async fn dial_peer(&mut self, peer: NodeId) -> Result<PeerConnection, ConnectivityError> {
let mut num_cancels = 0;
loop {
Expand Down
5 changes: 2 additions & 3 deletions comms/src/multiplexing/yamux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ use futures::{
Stream,
StreamExt,
};
use log::*;
use std::{future::Future, io, pin::Pin, sync::Arc, task::Poll};
use tari_shutdown::{Shutdown, ShutdownSignal};
use tracing::{self, event, Level};
use tracing::{self, debug, error, event, Level};
use yamux::Mode;

type IncomingRx = mpsc::Receiver<yamux::Stream>;
Expand Down Expand Up @@ -261,7 +260,7 @@ where S: Stream<Item = Result<yamux::Stream, yamux::ConnectionError>> + Unpin
while let Some(result) = mux_stream.next().await {
match result {
Ok(stream) => {
event!(Level::INFO, "yamux::stream received {}", stream);
event!(Level::TRACE, "yamux::stream received {}", stream);
if self.sender.send(stream).await.is_err() {
debug!(
target: LOG_TARGET,
Expand Down
7 changes: 3 additions & 4 deletions comms/src/protocol/messaging/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl OutboundMessaging {
match self.run_inner().await {
Ok(_) => {
event!(
Level::INFO,
Level::DEBUG,
"Outbound messaging for peer has stopped because the stream was closed"
);

Expand Down Expand Up @@ -127,7 +127,7 @@ impl OutboundMessaging {
let substream = loop {
match self.try_establish().await {
Ok(substream) => {
event!(Level::INFO, "Substream established");
event!(Level::DEBUG, "Substream established");
break substream;
},
Err(err) => {
Expand Down Expand Up @@ -290,8 +290,7 @@ impl OutboundMessaging {
stream
.map(|msg| {
msg.map(|mut out_msg| {
event!(Level::INFO, "Message buffered for sending {}", out_msg);
trace!(target: LOG_TARGET, "Message buffered for sending {}", out_msg);
event!(Level::DEBUG, "Message buffered for sending {}", out_msg);
out_msg.reply_success();
out_msg.body
})
Expand Down
4 changes: 2 additions & 2 deletions comms/src/protocol/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl RpcClient {
let (ready_tx, ready_rx) = oneshot::channel();
let tracing_id = tracing::Span::current().id();
task::spawn({
let span = span!(Level::INFO, "start_rpc_worker");
let span = span!(Level::TRACE, "start_rpc_worker");
span.follows_from(tracing_id);

RpcClientWorker::new(config, request_rx, framed, ready_tx, protocol_name)
Expand Down Expand Up @@ -515,7 +515,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send
let resp = match self.read_reply().await {
Ok(resp) => {
let latency = start.elapsed();
event!(Level::INFO, "Message received");
event!(Level::TRACE, "Message received");
trace!(
target: LOG_TARGET,
"Received response ({} byte(s)) from request #{} (protocol = {}, method={}) in {:.0?}",
Expand Down
Loading

0 comments on commit b04968b

Please sign in to comment.