From 9341819c9d2c8076841957e1dc25e979718b28be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cain=C3=A3=20Costa?= Date: Wed, 30 Aug 2023 23:21:37 -0300 Subject: [PATCH 1/5] chore: refactor miner spawn process --- src/cmd/mine.rs | 274 +++++++++++++++++------------------------------- 1 file changed, 94 insertions(+), 180 deletions(-) diff --git a/src/cmd/mine.rs b/src/cmd/mine.rs index 8bfc828..10cd6ba 100644 --- a/src/cmd/mine.rs +++ b/src/cmd/mine.rs @@ -10,7 +10,10 @@ use naumachia::{ }, trireme_ledger_client::get_trireme_ledger_client_from_file, }; -use tokio::task::JoinSet; +use tokio::{ + sync::{mpsc::UnboundedSender, oneshot, watch}, + task::{self, AbortHandle, JoinHandle, JoinSet}, +}; use crate::{ contract::{self, tuna_validators, MASTER_TOKEN_NAME}, @@ -66,133 +69,29 @@ impl From for PlutusData { } } -pub async fn thing() -> miette::Result<()> { - // 1. watcher to wait for latest datum - // 2. cancellable workers to calculate the next datum - // 3. submitter to listen for new datums - // two future to select on - let (sender, receiver) = tokio::sync::watch::channel::>(None); +pub async fn exec() -> miette::Result<()> { + let (sender, receiver) = watch::channel::>(None); let mut tasks = JoinSet::new(); - tasks.spawn(async move { - // do some chainsync in a loop now - sender.send(Some(State { - block_number: todo!(), - current_hash: todo!(), - leading_zeros: todo!(), - difficulty_number: todo!(), - epoch_time: todo!(), - current_time: todo!(), - extra: todo!(), - interlink: todo!(), - })) - }); - - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - for _ in 0..num_cpus::get() { - let tx = tx.clone(); - let receiver = receiver.clone(); - - tasks.spawn_blocking(move || loop { - let thing = receiver.borrow(); - - if let Some(thing) = &*thing { - while !receiver.has_changed().unwrap() { - // try to find targetHash - // - tx.send(State { - block_number: todo!(), - current_hash: todo!(), - leading_zeros: todo!(), - difficulty_number: todo!(), - epoch_time: todo!(), - current_time: todo!(), - extra: todo!(), - interlink: todo!(), - }); - } - }; - }); - } - - tasks.spawn(async move { - while let Some(new_datum) = rx.recv().await { - todo!(); - } - - Ok(()) - }); - - if let Some(_) = tasks.join_next().await { - // If any task finishes for some reason, we drop all of them cleanly. - tasks.abort_all(); + tasks.spawn_local(spawn_miner(receiver.clone())); } - Ok(()) -} - -pub async fn exec() -> miette::Result<()> { - let mut last_data = None; - - let mut handle = None; - let ledger_client = get_trireme_ledger_client_from_file::() .await .into_diagnostic()?; - let (tx, mut rx) = tokio::sync::mpsc::channel(1); - - let datum_thread = tokio::spawn(async move { - let mut last_data = None; - - let ledger_client = get_trireme_ledger_client_from_file::() - .await - .unwrap(); - - loop { - // TODO: snooze if error and try again - let data = get_latest_datum(&ledger_client).await.unwrap(); - - match last_data { - Some(ld) if ld != data => { - tx.send(data.clone()).await.unwrap(); - last_data = Some(data); - } - Some(_) => { - tokio::time::sleep(Duration::from_secs(10)).await; - } - None => { - tx.send(data.clone()).await.unwrap(); - last_data = Some(data); - } - } - } - }); - loop { - let data = get_latest_datum(&ledger_client).await?; - - match last_data { - None => { - last_data = Some(data.clone()); - - handle = Some(tokio::spawn(async move { mine(data).await }).abort_handle()); - } - Some(ld) if ld != data => { - if let Some(handle) = handle.take() { - handle.abort(); - } - - let worker_data = data.clone(); - - handle = Some(tokio::spawn(async move { mine(worker_data).await }).abort_handle()); - - last_data = Some(data) + match get_latest_datum(&ledger_client).await { + Ok(d) => sender.send(Some(d)).into_diagnostic()?, + Err(_) => { + tokio::time::sleep(Duration::from_secs(10)); } - _ => (), } } + + #[allow(unreachable_code)] + Ok(()) } async fn get_latest_datum(ledger_client: &LC) -> miette::Result @@ -222,89 +121,104 @@ where Ok(datum) } -async fn mine(data: State) -> miette::Result<()> { - let block_number = data.block_number; - let difficulty_number = data.difficulty_number; - let leading_zeros = data.leading_zeros; - let epoch_time = data.epoch_time; - let current_time = data.current_time; - let interlink = data.interlink.clone(); - - let current_difficulty_hash = get_difficulty_hash(difficulty_number, leading_zeros); - - let target_data_without_nonce: PlutusData = data.into(); - - let PlutusData::Constr(Constr { fields, .. }) = target_data_without_nonce else { - unreachable!() - }; +async fn spawn_miner(receiver: watch::Receiver>) -> miette::Result<()> { + loop { + match &*receiver.borrow() { + Some(data) => { + let block_number = data.block_number; + let difficulty_number = data.difficulty_number; + let leading_zeros = data.leading_zeros; + let epoch_time = data.epoch_time; + let current_time = data.current_time; + let interlink = data.interlink.clone(); - let fields: Vec = fields.into_iter().take(5).collect(); + let current_difficulty_hash = get_difficulty_hash(difficulty_number, leading_zeros); - let (nonce, new_hash) = tokio::task::spawn_blocking(move || { - let mut nonce: [u8; 32]; + let target_data_without_nonce: PlutusData = data.clone().into(); - let mut new_hash: Vec; + let PlutusData::Constr(Constr { fields, .. }) = target_data_without_nonce else { + unreachable!() + }; - loop { - nonce = random::<[u8; 32]>(); - let mut fields = fields.clone(); + let fields: Vec = fields.into_iter().take(5).collect(); - fields.push(PlutusData::BoundedBytes(nonce.to_vec())); + let task = task::spawn_blocking(|| mine(current_difficulty_hash, fields)); + let handle = task.abort_handle(); - let target_bytes = PlutusData::Constr(Constr { constr: 0, fields }).bytes(); + let receiver = receiver.clone(); + task::spawn(async move { + while let Ok(false) = receiver.has_changed() { + tokio::time::sleep(Duration::from_millis(100)); + } - let hasher = Sha256::new_with_prefix(target_bytes); + handle.abort(); + }); + + let (nonce, new_hash) = match task.await { + Ok(v) => v, + Err(_) => continue, + }; + + let redeemer = InputNonce { + nonce: nonce.to_vec(), + }; + + let utc: DateTime = Utc::now(); + let new_time_off_chain = utc.timestamp() as u64; + let new_time_on_chain = new_time_off_chain + ON_CHAIN_HALF_TIME_RANGE; + let new_slot_time = new_time_off_chain; + + let mut new_state = State { + block_number: block_number + 1, + current_hash: new_hash, + leading_zeros, + difficulty_number, + epoch_time, + current_time: new_time_on_chain, + extra: 0, + interlink, + }; + + if block_number % EPOCH_NUMBER == 0 { + new_state.epoch_time = epoch_time + new_state.current_time - current_time; + + change_difficulty(&mut new_state); + } else { + // get cardano slot time + new_state.epoch_time = epoch_time + new_state.current_time - current_time; + } - let hasher = Sha256::new_with_prefix(hasher.finalize()); + calculate_interlink(&mut new_state, difficulty_number, leading_zeros); - new_hash = hasher.finalize().to_vec(); + contract::mine(new_state, redeemer, new_slot_time) + .await + .into_diagnostic()?; - if new_hash.le(¤t_difficulty_hash) { - break; + return Ok(()); } + None => continue, } + } +} - (nonce, new_hash) - }) - .await - .into_diagnostic()?; - - let redeemer = InputNonce { - nonce: nonce.to_vec(), - }; - - let utc: DateTime = Utc::now(); - let new_time_off_chain = utc.timestamp() as u64; - let new_time_on_chain = new_time_off_chain + ON_CHAIN_HALF_TIME_RANGE; - let new_slot_time = new_time_off_chain; - - let mut new_state = State { - block_number: block_number + 1, - current_hash: new_hash, - leading_zeros, - difficulty_number, - epoch_time, - current_time: new_time_on_chain, - extra: 0, - interlink, - }; +pub fn mine(current_difficulty_hash: Vec, fields: Vec) -> ([u8; 32], Vec) { + loop { + let nonce = random::<[u8; 32]>(); + let mut fields = fields.clone(); - if block_number % EPOCH_NUMBER == 0 { - new_state.epoch_time = epoch_time + new_state.current_time - current_time; + fields.push(PlutusData::BoundedBytes(nonce.to_vec())); - change_difficulty(&mut new_state); - } else { - // get cardano slot time - new_state.epoch_time = epoch_time + new_state.current_time - current_time; - } + let target_bytes = PlutusData::Constr(Constr { constr: 0, fields }).bytes(); - calculate_interlink(&mut new_state, difficulty_number, leading_zeros); + let hasher = Sha256::new_with_prefix(target_bytes); + let hasher = Sha256::new_with_prefix(hasher.finalize()); - contract::mine(new_state, redeemer, new_slot_time) - .await - .into_diagnostic()?; + let new_hash = hasher.finalize().to_vec(); - Ok(()) + if new_hash.le(¤t_difficulty_hash) { + return (nonce, new_hash); + } + } } fn calculate_interlink(new_state: &mut State, difficulty_number: u16, leading_zeros: u8) { From 4159093f886ade5d7e6897077031539501bcd752 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cain=C3=A3=20Costa?= Date: Wed, 30 Aug 2023 23:27:52 -0300 Subject: [PATCH 2/5] fix: add missing awaits --- src/cmd/mine.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cmd/mine.rs b/src/cmd/mine.rs index 10cd6ba..540bcde 100644 --- a/src/cmd/mine.rs +++ b/src/cmd/mine.rs @@ -85,7 +85,7 @@ pub async fn exec() -> miette::Result<()> { match get_latest_datum(&ledger_client).await { Ok(d) => sender.send(Some(d)).into_diagnostic()?, Err(_) => { - tokio::time::sleep(Duration::from_secs(10)); + tokio::time::sleep(Duration::from_secs(10)).await; } } } @@ -148,7 +148,7 @@ async fn spawn_miner(receiver: watch::Receiver>) -> miette::Result let receiver = receiver.clone(); task::spawn(async move { while let Ok(false) = receiver.has_changed() { - tokio::time::sleep(Duration::from_millis(100)); + tokio::time::sleep(Duration::from_millis(100)).await; } handle.abort(); From c65865a5ef6ce0935e8988244d8d4432fbec2b83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cain=C3=A3=20Costa?= Date: Wed, 30 Aug 2023 23:28:01 -0300 Subject: [PATCH 3/5] chore: read from mainnet.json --- src/contract.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/contract.rs b/src/contract.rs index 6ab08d8..bde6083 100644 --- a/src/contract.rs +++ b/src/contract.rs @@ -23,7 +23,7 @@ use crate::{ redeemers::{FortunaRedeemer, InputNonce, MintingState}, }; -const BLUEPRINT: &str = include_str!("../genesis/plutus.json"); +const BLUEPRINT: &str = include_str!("../genesis/mainnet.json"); const SPEND_VALIDATOR_NAME: &str = "tuna.spend"; const MINT_VALIDATOR_NAME: &str = "tuna.mint"; pub const MASTER_TOKEN_NAME: &str = "lord tuna"; From 0cd3a1f6607de44c9baa4dbd24158b2a7a578975 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cain=C3=A3=20Costa?= Date: Wed, 30 Aug 2023 23:28:07 -0300 Subject: [PATCH 4/5] chore: enable asm feature for sha2 (faster) --- Cargo.lock | 10 ++++++++++ Cargo.toml | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 3fb8c41..172bdba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2256,6 +2256,16 @@ dependencies = [ "cfg-if", "cpufeatures", "digest 0.10.7", + "sha2-asm", +] + +[[package]] +name = "sha2-asm" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f27ba7066011e3fb30d808b51affff34f0a66d3a03a58edd787c6e420e40e44e" +dependencies = [ + "cc", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a5712b6..7abb7dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ uplc = { git = "https://github.com/aiken-lang/aiken.git" } rand = "0.8.5" serde = "1.0.163" serde_json = "1.0.96" -sha2 = "0.10.6" +sha2 = { version = "0.10.6", features = ["asm"] } thiserror = "1.0.40" tokio = { version = "1.28.0", features = ["full"] } num_cpus = "1.16.0" From 2c9b054233bea07153b7dc9fd0fa0aaa2ae7d017 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cain=C3=A3=20Costa?= Date: Wed, 30 Aug 2023 23:28:32 -0300 Subject: [PATCH 5/5] chore: remove unused imports --- src/cmd/mine.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cmd/mine.rs b/src/cmd/mine.rs index 540bcde..83ec1ca 100644 --- a/src/cmd/mine.rs +++ b/src/cmd/mine.rs @@ -1,4 +1,4 @@ -use std::{iter, time::Duration}; +use std::{time::Duration}; use miette::IntoDiagnostic; use naumachia::{ @@ -11,8 +11,8 @@ use naumachia::{ trireme_ledger_client::get_trireme_ledger_client_from_file, }; use tokio::{ - sync::{mpsc::UnboundedSender, oneshot, watch}, - task::{self, AbortHandle, JoinHandle, JoinSet}, + sync::{watch}, + task::{self, JoinSet}, }; use crate::{