Skip to content

Commit

Permalink
Merge pull request #26 from aiken-lang/miner-2.0
Browse files Browse the repository at this point in the history
chore: refactor miner spawn process
  • Loading branch information
cfcosta authored Aug 31, 2023
2 parents 0757883 + 2c9b054 commit 73b159d
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 183 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ uplc = { git = "https://github.com/aiken-lang/aiken.git" }
rand = "0.8.5"
serde = "1.0.163"
serde_json = "1.0.96"
sha2 = "0.10.6"
sha2 = { version = "0.10.6", features = ["asm"] }
thiserror = "1.0.40"
tokio = { version = "1.28.0", features = ["full"] }
num_cpus = "1.16.0"
276 changes: 95 additions & 181 deletions src/cmd/mine.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{iter, time::Duration};
use std::{time::Duration};

use miette::IntoDiagnostic;
use naumachia::{
Expand All @@ -10,7 +10,10 @@ use naumachia::{
},
trireme_ledger_client::get_trireme_ledger_client_from_file,
};
use tokio::task::JoinSet;
use tokio::{
sync::{watch},
task::{self, JoinSet},
};

use crate::{
contract::{self, tuna_validators, MASTER_TOKEN_NAME},
Expand Down Expand Up @@ -66,133 +69,29 @@ impl From<TargetState> for PlutusData {
}
}

pub async fn thing() -> miette::Result<()> {
// 1. watcher to wait for latest datum
// 2. cancellable workers to calculate the next datum
// 3. submitter to listen for new datums
// two future to select on
let (sender, receiver) = tokio::sync::watch::channel::<Option<State>>(None);
pub async fn exec() -> miette::Result<()> {
let (sender, receiver) = watch::channel::<Option<State>>(None);
let mut tasks = JoinSet::new();

tasks.spawn(async move {
// do some chainsync in a loop now
sender.send(Some(State {
block_number: todo!(),
current_hash: todo!(),
leading_zeros: todo!(),
difficulty_number: todo!(),
epoch_time: todo!(),
current_time: todo!(),
extra: todo!(),
interlink: todo!(),
}))
});

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

for _ in 0..num_cpus::get() {
let tx = tx.clone();
let receiver = receiver.clone();

tasks.spawn_blocking(move || loop {
let thing = receiver.borrow();

if let Some(thing) = &*thing {
while !receiver.has_changed().unwrap() {
// try to find targetHash
//
tx.send(State {
block_number: todo!(),
current_hash: todo!(),
leading_zeros: todo!(),
difficulty_number: todo!(),
epoch_time: todo!(),
current_time: todo!(),
extra: todo!(),
interlink: todo!(),
});
}
};
});
}

tasks.spawn(async move {
while let Some(new_datum) = rx.recv().await {
todo!();
}

Ok(())
});

if let Some(_) = tasks.join_next().await {
// If any task finishes for some reason, we drop all of them cleanly.
tasks.abort_all();
tasks.spawn_local(spawn_miner(receiver.clone()));
}

Ok(())
}

pub async fn exec() -> miette::Result<()> {
let mut last_data = None;

let mut handle = None;

let ledger_client = get_trireme_ledger_client_from_file::<State, FortunaRedeemer>()
.await
.into_diagnostic()?;

let (tx, mut rx) = tokio::sync::mpsc::channel(1);

let datum_thread = tokio::spawn(async move {
let mut last_data = None;

let ledger_client = get_trireme_ledger_client_from_file::<State, FortunaRedeemer>()
.await
.unwrap();

loop {
// TODO: snooze if error and try again
let data = get_latest_datum(&ledger_client).await.unwrap();

match last_data {
Some(ld) if ld != data => {
tx.send(data.clone()).await.unwrap();
last_data = Some(data);
}
Some(_) => {
tokio::time::sleep(Duration::from_secs(10)).await;
}
None => {
tx.send(data.clone()).await.unwrap();
last_data = Some(data);
}
}
}
});

loop {
let data = get_latest_datum(&ledger_client).await?;

match last_data {
None => {
last_data = Some(data.clone());

handle = Some(tokio::spawn(async move { mine(data).await }).abort_handle());
}
Some(ld) if ld != data => {
if let Some(handle) = handle.take() {
handle.abort();
}

let worker_data = data.clone();

handle = Some(tokio::spawn(async move { mine(worker_data).await }).abort_handle());

last_data = Some(data)
match get_latest_datum(&ledger_client).await {
Ok(d) => sender.send(Some(d)).into_diagnostic()?,
Err(_) => {
tokio::time::sleep(Duration::from_secs(10)).await;
}
_ => (),
}
}

#[allow(unreachable_code)]
Ok(())
}

async fn get_latest_datum<LC>(ledger_client: &LC) -> miette::Result<State>
Expand Down Expand Up @@ -222,89 +121,104 @@ where
Ok(datum)
}

async fn mine(data: State) -> miette::Result<()> {
let block_number = data.block_number;
let difficulty_number = data.difficulty_number;
let leading_zeros = data.leading_zeros;
let epoch_time = data.epoch_time;
let current_time = data.current_time;
let interlink = data.interlink.clone();

let current_difficulty_hash = get_difficulty_hash(difficulty_number, leading_zeros);

let target_data_without_nonce: PlutusData = data.into();

let PlutusData::Constr(Constr { fields, .. }) = target_data_without_nonce else {
unreachable!()
};
async fn spawn_miner(receiver: watch::Receiver<Option<State>>) -> miette::Result<()> {
loop {
match &*receiver.borrow() {
Some(data) => {
let block_number = data.block_number;
let difficulty_number = data.difficulty_number;
let leading_zeros = data.leading_zeros;
let epoch_time = data.epoch_time;
let current_time = data.current_time;
let interlink = data.interlink.clone();

let fields: Vec<PlutusData> = fields.into_iter().take(5).collect();
let current_difficulty_hash = get_difficulty_hash(difficulty_number, leading_zeros);

let (nonce, new_hash) = tokio::task::spawn_blocking(move || {
let mut nonce: [u8; 32];
let target_data_without_nonce: PlutusData = data.clone().into();

let mut new_hash: Vec<u8>;
let PlutusData::Constr(Constr { fields, .. }) = target_data_without_nonce else {
unreachable!()
};

loop {
nonce = random::<[u8; 32]>();
let mut fields = fields.clone();
let fields: Vec<PlutusData> = fields.into_iter().take(5).collect();

fields.push(PlutusData::BoundedBytes(nonce.to_vec()));
let task = task::spawn_blocking(|| mine(current_difficulty_hash, fields));
let handle = task.abort_handle();

let target_bytes = PlutusData::Constr(Constr { constr: 0, fields }).bytes();
let receiver = receiver.clone();
task::spawn(async move {
while let Ok(false) = receiver.has_changed() {
tokio::time::sleep(Duration::from_millis(100)).await;
}

let hasher = Sha256::new_with_prefix(target_bytes);
handle.abort();
});

let (nonce, new_hash) = match task.await {
Ok(v) => v,
Err(_) => continue,
};

let redeemer = InputNonce {
nonce: nonce.to_vec(),
};

let utc: DateTime<Utc> = Utc::now();
let new_time_off_chain = utc.timestamp() as u64;
let new_time_on_chain = new_time_off_chain + ON_CHAIN_HALF_TIME_RANGE;
let new_slot_time = new_time_off_chain;

let mut new_state = State {
block_number: block_number + 1,
current_hash: new_hash,
leading_zeros,
difficulty_number,
epoch_time,
current_time: new_time_on_chain,
extra: 0,
interlink,
};

if block_number % EPOCH_NUMBER == 0 {
new_state.epoch_time = epoch_time + new_state.current_time - current_time;

change_difficulty(&mut new_state);
} else {
// get cardano slot time
new_state.epoch_time = epoch_time + new_state.current_time - current_time;
}

let hasher = Sha256::new_with_prefix(hasher.finalize());
calculate_interlink(&mut new_state, difficulty_number, leading_zeros);

new_hash = hasher.finalize().to_vec();
contract::mine(new_state, redeemer, new_slot_time)
.await
.into_diagnostic()?;

if new_hash.le(&current_difficulty_hash) {
break;
return Ok(());
}
None => continue,
}
}
}

(nonce, new_hash)
})
.await
.into_diagnostic()?;

let redeemer = InputNonce {
nonce: nonce.to_vec(),
};

let utc: DateTime<Utc> = Utc::now();
let new_time_off_chain = utc.timestamp() as u64;
let new_time_on_chain = new_time_off_chain + ON_CHAIN_HALF_TIME_RANGE;
let new_slot_time = new_time_off_chain;

let mut new_state = State {
block_number: block_number + 1,
current_hash: new_hash,
leading_zeros,
difficulty_number,
epoch_time,
current_time: new_time_on_chain,
extra: 0,
interlink,
};
pub fn mine(current_difficulty_hash: Vec<u8>, fields: Vec<PlutusData>) -> ([u8; 32], Vec<u8>) {
loop {
let nonce = random::<[u8; 32]>();
let mut fields = fields.clone();

if block_number % EPOCH_NUMBER == 0 {
new_state.epoch_time = epoch_time + new_state.current_time - current_time;
fields.push(PlutusData::BoundedBytes(nonce.to_vec()));

change_difficulty(&mut new_state);
} else {
// get cardano slot time
new_state.epoch_time = epoch_time + new_state.current_time - current_time;
}
let target_bytes = PlutusData::Constr(Constr { constr: 0, fields }).bytes();

calculate_interlink(&mut new_state, difficulty_number, leading_zeros);
let hasher = Sha256::new_with_prefix(target_bytes);
let hasher = Sha256::new_with_prefix(hasher.finalize());

contract::mine(new_state, redeemer, new_slot_time)
.await
.into_diagnostic()?;
let new_hash = hasher.finalize().to_vec();

Ok(())
if new_hash.le(&current_difficulty_hash) {
return (nonce, new_hash);
}
}
}

fn calculate_interlink(new_state: &mut State, difficulty_number: u16, leading_zeros: u8) {
Expand Down
2 changes: 1 addition & 1 deletion src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
redeemers::{FortunaRedeemer, InputNonce, MintingState},
};

const BLUEPRINT: &str = include_str!("../genesis/plutus.json");
const BLUEPRINT: &str = include_str!("../genesis/mainnet.json");
const SPEND_VALIDATOR_NAME: &str = "tuna.spend";
const MINT_VALIDATOR_NAME: &str = "tuna.mint";
pub const MASTER_TOKEN_NAME: &str = "lord tuna";
Expand Down

0 comments on commit 73b159d

Please sign in to comment.