Skip to content

Commit

Permalink
Merge branch 'development' into core-output-features-canonical-encoding
Browse files Browse the repository at this point in the history
* development:
  fix: ensure that accumulated orphan chain data is committed before header validation (tari-project#3462)
  fix: remove is_synced check for transaction validation (tari-project#3459)
  feat: improve logging for tari_mining_node (tari-project#3449)
  • Loading branch information
sdbondi committed Oct 18, 2021
2 parents 90e21f5 + 80f7c78 commit bc2c9e7
Show file tree
Hide file tree
Showing 30 changed files with 1,033 additions and 405 deletions.
83 changes: 56 additions & 27 deletions applications/tari_mining_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ mod utils;

use crate::{
miner::MiningReport,
stratum::{stratum_controller::controller::Controller, stratum_miner::miner::StratumMiner},
stratum::{
stratum_controller::controller::Controller,
stratum_miner::miner::StratumMiner,
stratum_statistics::stats::Statistics,
},
};
use errors::{err_empty, MinerError};
use miner::Miner;
Expand All @@ -42,6 +46,7 @@ use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
RwLock,
},
thread,
time::Instant,
Expand All @@ -61,14 +66,17 @@ use tokio::{runtime::Runtime, time::sleep};
use tonic::transport::Channel;
use utils::{coinbase_request, extract_outputs_and_kernels};

pub const LOG_TARGET: &str = "tari_mining_node::miner::main";
pub const LOG_TARGET_FILE: &str = "tari_mining_node::logging::miner::main";

/// Application entry point
fn main() {
let rt = Runtime::new().expect("Failed to start tokio runtime");
match rt.block_on(main_inner()) {
Ok(_) => std::process::exit(0),
Err(exit_code) => {
eprintln!("Fatal error: {:?}", exit_code);
error!("Exiting with code: {:?}", exit_code);
error!(target: LOG_TARGET, "Exiting with code: {:?}", exit_code);
std::process::exit(exit_code.as_i32())
},
}
Expand All @@ -83,8 +91,8 @@ async fn main_inner() -> Result<(), ExitCodes> {
config.mining_worker_name = global.mining_worker_name.clone();
config.mining_wallet_address = global.mining_wallet_address.clone();
config.mining_pool_address = global.mining_pool_address.clone();
debug!("{:?}", bootstrap);
debug!("{:?}", config);
debug!(target: LOG_TARGET_FILE, "{:?}", bootstrap);
debug!(target: LOG_TARGET_FILE, "{:?}", config);

if !config.mining_wallet_address.is_empty() && !config.mining_pool_address.is_empty() {
let url = config.mining_pool_address.clone();
Expand All @@ -94,18 +102,25 @@ async fn main_inner() -> Result<(), ExitCodes> {
if !config.mining_worker_name.is_empty() {
miner_address += &format!("{}{}", ".", &config.mining_worker_name);
}
let mut mc = Controller::new().unwrap_or_else(|e| {
let stats = Arc::new(RwLock::new(Statistics::default()));
let mut mc = Controller::new(stats.clone()).unwrap_or_else(|e| {
debug!(target: LOG_TARGET_FILE, "Error loading mining controller: {}", e);
panic!("Error loading mining controller: {}", e);
});
let cc = stratum::controller::Controller::new(&url, Some(miner_address), None, None, mc.tx.clone())
.unwrap_or_else(|e| {
panic!("Error loading stratum client controller: {:?}", e);
});
let cc =
stratum::controller::Controller::new(&url, Some(miner_address), None, None, mc.tx.clone(), stats.clone())
.unwrap_or_else(|e| {
debug!(
target: LOG_TARGET_FILE,
"Error loading stratum client controller: {:?}", e
);
panic!("Error loading stratum client controller: {:?}", e);
});
let miner_stopped = Arc::new(AtomicBool::new(false));
let client_stopped = Arc::new(AtomicBool::new(false));

mc.set_client_tx(cc.tx.clone());
let mut miner = StratumMiner::new(config);
let mut miner = StratumMiner::new(config, stats);
if let Err(e) = miner.start_solvers() {
println!("Error. Please check logs for further info.");
println!("Error details:");
Expand All @@ -118,7 +133,7 @@ async fn main_inner() -> Result<(), ExitCodes> {
.name("mining_controller".to_string())
.spawn(move || {
if let Err(e) = mc.run(miner) {
error!("Error. Please check logs for further info: {:?}", e);
error!(target: LOG_TARGET, "Error. Please check logs for further info: {:?}", e);
return;
}
miner_stopped_internal.store(true, Ordering::Relaxed);
Expand All @@ -142,19 +157,21 @@ async fn main_inner() -> Result<(), ExitCodes> {
Ok(())
} else {
config.mine_on_tip_only = global.mine_on_tip_only;
debug!("mine_on_tip_only is {}", config.mine_on_tip_only);

debug!(
target: LOG_TARGET_FILE,
"mine_on_tip_only is {}", config.mine_on_tip_only
);
let (mut node_conn, mut wallet_conn) = connect(&config, &global).await.map_err(ExitCodes::grpc)?;

let mut blocks_found: u64 = 0;
loop {
debug!("Starting new mining cycle");
debug!(target: LOG_TARGET_FILE, "Starting new mining cycle");
match mining_cycle(&mut node_conn, &mut wallet_conn, &config, &bootstrap).await {
err @ Err(MinerError::GrpcConnection(_)) | err @ Err(MinerError::GrpcStatus(_)) => {
// Any GRPC error we will try to reconnect with a standard delay
error!("Connection error: {:?}", err);
error!(target: LOG_TARGET, "Connection error: {:?}", err);
loop {
debug!("Holding for {:?}", config.wait_timeout());
debug!(target: LOG_TARGET_FILE, "Holding for {:?}", config.wait_timeout());
sleep(config.wait_timeout()).await;
match connect(&config, &global).await {
Ok((nc, wc)) => {
Expand All @@ -163,22 +180,28 @@ async fn main_inner() -> Result<(), ExitCodes> {
break;
},
Err(err) => {
error!("Connection error: {:?}", err);
error!(target: LOG_TARGET, "Connection error: {:?}", err);
continue;
},
}
}
},
Err(MinerError::MineUntilHeightReached(h)) => {
info!("Prescribed blockchain height {} reached. Aborting ...", h);
info!(
target: LOG_TARGET,
"Prescribed blockchain height {} reached. Aborting ...", h
);
return Ok(());
},
Err(MinerError::MinerLostBlock(h)) => {
info!("Height {} already mined by other node. Restarting ...", h);
info!(
target: LOG_TARGET,
"Height {} already mined by other node. Restarting ...", h
);
},
Err(err) => {
error!("Error: {:?}", err);
debug!("Holding for {:?}", config.wait_timeout());
error!(target: LOG_TARGET, "Error: {:?}", err);
debug!(target: LOG_TARGET_FILE, "Holding for {:?}", config.wait_timeout());
sleep(config.wait_timeout()).await;
},
Ok(submitted) => {
Expand All @@ -201,10 +224,10 @@ async fn connect(
global: &GlobalConfig,
) -> Result<(BaseNodeClient<Channel>, WalletClient<Channel>), MinerError> {
let base_node_addr = config.base_node_addr(global);
info!("Connecting to base node at {}", base_node_addr);
info!(target: LOG_TARGET, "Connecting to base node at {}", base_node_addr);
let node_conn = BaseNodeClient::connect(base_node_addr.clone()).await?;
let wallet_addr = config.wallet_addr(global);
info!("Connecting to wallet at {}", wallet_addr);
info!(target: LOG_TARGET, "Connecting to wallet at {}", wallet_addr);
let wallet_conn = WalletClient::connect(wallet_addr.clone()).await?;

Ok((node_conn, wallet_conn))
Expand Down Expand Up @@ -267,26 +290,31 @@ async fn mining_cycle(
if report.difficulty < min_diff {
submit = false;
debug!(
"Mined difficulty {} below minimum difficulty {}. Not submitting.",
report.difficulty, min_diff
target: LOG_TARGET_FILE,
"Mined difficulty {} below minimum difficulty {}. Not submitting.", report.difficulty, min_diff
);
}
}
if let Some(max_diff) = bootstrap.miner_max_diff {
if report.difficulty > max_diff {
submit = false;
debug!(
target: LOG_TARGET_FILE,
"Mined difficulty {} greater than maximum difficulty {}. Not submitting.",
report.difficulty, max_diff
report.difficulty,
max_diff
);
}
}
if submit {
// Mined a block fitting the difficulty
let block_header = BlockHeader::try_from(header.clone()).map_err(MinerError::Conversion)?;
info!(
target: LOG_TARGET,
"Miner {} found block header {} with difficulty {:?}",
report.miner, block_header, report.difficulty,
report.miner,
block_header,
report.difficulty,
);
let mut mined_block = block.clone();
mined_block.header = Some(header);
Expand All @@ -313,6 +341,7 @@ async fn mining_cycle(
async fn display_report(report: &MiningReport, config: &MinerConfig) {
let hashrate = report.hashes as f64 / report.elapsed.as_micros() as f64;
debug!(
target: LOG_TARGET_FILE,
"Miner {} reported {:.2}MH/s with total {:.2}MH/s over {} threads. Height: {}. Target: {})",
report.miner,
hashrate,
Expand Down
26 changes: 14 additions & 12 deletions applications/tari_mining_node/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use std::{
use tari_app_grpc::{conversions::timestamp, tari_rpc::BlockHeader};
use thread::JoinHandle;

pub const LOG_TARGET: &str = "tari_mining_node::miner::standalone";

// Identify how often mining thread is reporting / checking context
// ~400_000 hashes per second
const REPORTING_FREQUENCY: u64 = 3_000_000;
Expand Down Expand Up @@ -107,20 +109,20 @@ impl Stream for Miner {
type Item = MiningReport;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
trace!("Polling Miner");
trace!(target: LOG_TARGET, "Polling Miner");
// First poll would start all the threads passing async context waker
if self.threads.is_empty() && self.num_threads > 0 {
debug!(
"Starting {} mining threads for target difficulty {}",
self.num_threads, self.target_difficulty
target: LOG_TARGET,
"Starting {} mining threads for target difficulty {}", self.num_threads, self.target_difficulty
);
self.start_threads(ctx);
return Poll::Pending;
} else if self.num_threads == 0 {
error!("Cannot mine: no mining threads");
error!(target: LOG_TARGET, "Cannot mine: no mining threads");
return Poll::Ready(None);
} else if self.channels.is_empty() {
debug!("Finished mining");
debug!(target: LOG_TARGET, "Finished mining");
return Poll::Ready(None);
}

Expand Down Expand Up @@ -167,14 +169,14 @@ pub fn mining_task(
let mut hasher = BlockHeaderSha3::new(header).unwrap();
hasher.random_nonce();
// We're mining over here!
info!("Mining thread {} started", miner);
info!(target: LOG_TARGET, "Mining thread {} started", miner);
// Mining work
loop {
let difficulty = hasher.difficulty();
if difficulty >= target_difficulty {
debug!(
"Miner {} found nonce {} with matching difficulty {}",
miner, hasher.nonce, difficulty
target: LOG_TARGET,
"Miner {} found nonce {} with matching difficulty {}", miner, hasher.nonce, difficulty
);
if let Err(err) = sender.try_send(MiningReport {
miner,
Expand All @@ -186,10 +188,10 @@ pub fn mining_task(
header: Some(hasher.into_header()),
target_difficulty,
}) {
error!("Miner {} failed to send report: {}", miner, err);
error!(target: LOG_TARGET, "Miner {} failed to send report: {}", miner, err);
}
waker.wake();
info!("Mining thread {} stopped", miner);
info!(target: LOG_TARGET, "Mining thread {} stopped", miner);
return;
}
if hasher.nonce % REPORTING_FREQUENCY == 0 {
Expand All @@ -204,9 +206,9 @@ pub fn mining_task(
target_difficulty,
});
waker.clone().wake();
trace!("Reporting from {} result {:?}", miner, res);
trace!(target: LOG_TARGET, "Reporting from {} result {:?}", miner, res);
if let Err(TrySendError::Disconnected(_)) = res {
info!("Mining thread {} disconnected", miner);
info!(target: LOG_TARGET, "Mining thread {} disconnected", miner);
return;
}
hasher.set_forward_timestamp(timestamp().seconds as u64);
Expand Down
Loading

0 comments on commit bc2c9e7

Please sign in to comment.