From 5a47014296bd79f99acc6d659126cebe8a912fa1 Mon Sep 17 00:00:00 2001 From: Ash Manning Date: Mon, 20 May 2024 19:12:59 +0800 Subject: [PATCH] WIP --- Cargo.lock | 23 +- Cargo.toml | 4 +- app/Cargo.toml | 2 +- app/app.rs | 11 +- lib/archive.rs | 646 +++++++++++++++++++++------ lib/miner.rs | 21 +- lib/net/mod.rs | 36 +- lib/net/peer.rs | 869 +++++++++++++++++++++++++++++-------- lib/node/mainchain_task.rs | 226 ++++++---- lib/node/mod.rs | 58 ++- lib/node/net_task.rs | 347 +++++++-------- lib/state.rs | 29 +- lib/types/mod.rs | 36 +- lib/util.rs | 26 ++ 14 files changed, 1678 insertions(+), 656 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14afc9b..6d5cfb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -576,7 +576,7 @@ dependencies = [ [[package]] name = "bip300301" version = "0.1.1" -source = "git+https://github.com/Ash-L2L/bip300301.git?rev=b74c5bbf6022701cbdfbd9a412327577e115ceda#b74c5bbf6022701cbdfbd9a412327577e115ceda" +source = "git+https://github.com/Ash-L2L/bip300301.git?rev=eb0b8e41dbe1dd8eb397aa5bfb3b793b46cafc30#eb0b8e41dbe1dd8eb397aa5bfb3b793b46cafc30" dependencies = [ "base64 0.21.7", "bitcoin", @@ -4532,7 +4532,7 @@ dependencies = [ [[package]] name = "thunder" -version = "0.8.14" +version = "0.9.0" dependencies = [ "anyhow", "bincode", @@ -4570,7 +4570,7 @@ dependencies = [ [[package]] name = "thunder_app" -version = "0.8.14" +version = "0.9.0" dependencies = [ "anyhow", "base64 0.21.7", @@ -4607,7 +4607,7 @@ dependencies = [ [[package]] name = "thunder_app_cli" -version = "0.8.14" +version = "0.9.0" dependencies = [ "anyhow", "bip300301", @@ -4622,7 +4622,7 @@ dependencies = [ [[package]] name = "thunder_app_rpc_api" -version = "0.8.14" +version = "0.9.0" dependencies = [ "bip300301", "jsonrpsee", @@ -4900,6 +4900,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -4910,12 +4920,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f29cd7c..3735fc8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,11 @@ authors = [ "Nikita Chashchinskii " ] edition = "2021" -version = "0.8.14" +version = "0.9.0" [workspace.dependencies.bip300301] git = "https://github.com/Ash-L2L/bip300301.git" -rev = "b74c5bbf6022701cbdfbd9a412327577e115ceda" +rev = "eb0b8e41dbe1dd8eb397aa5bfb3b793b46cafc30" [workspace.dependencies.rustreexo] git = "https://github.com/Ash-L2L/rustreexo.git" diff --git a/app/Cargo.toml b/app/Cargo.toml index 7b92437..5a40074 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -39,7 +39,7 @@ tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread"] } tokio-util = { version = "0.7.10", features = ["rt"] } tracing = "0.1.40" tracing-appender = "0.2.3" -tracing-subscriber = "0.3.18" +tracing-subscriber = { version = "0.3.18", features = ["json"] } utoipa = "4.2.3" [[bin]] diff --git a/app/app.rs b/app/app.rs index d4f339f..27358e8 100644 --- a/app/app.rs +++ b/app/app.rs @@ -181,9 +181,14 @@ impl App { .await?; // miner_write.generate().await?; tracing::trace!("confirming bmm..."); - if let Some((header, body)) = miner_write.confirm_bmm().await? { - tracing::trace!("confirmed bmm, submitting block"); - self.node.submit_block(&header, &body).await?; + if let Some((main_hash, header, body)) = + miner_write.confirm_bmm().await? + { + tracing::trace!( + "confirmed bmm, submitting block {}", + header.hash() + ); + self.node.submit_block(main_hash, &header, &body).await?; } drop(miner_write); self.update_wallet()?; diff --git a/lib/archive.rs b/lib/archive.rs index cb0b35d..5672d1e 100644 --- a/lib/archive.rs +++ b/lib/archive.rs @@ -1,13 +1,16 @@ -use std::{cmp::Ordering, collections::HashSet}; +use std::{ + cmp::Ordering, + collections::{HashMap, HashSet}, +}; use bip300301::{ bitcoin::{self, hashes::Hash}, DepositInfo, Header as BitcoinHeader, }; -use fallible_iterator::FallibleIterator; +use fallible_iterator::{FallibleIterator, IteratorExt}; use heed::{types::SerdeBincode, Database, RoTxn, RwTxn}; -use crate::types::{Accumulator, BlockHash, Body, Header}; +use crate::types::{Accumulator, BlockHash, BmmResult, Body, Header, Tip}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -30,16 +33,20 @@ pub enum Error { }, #[error("unknown block hash: {0}")] NoBlockHash(BlockHash), + #[error("no BMM result with block {0}")] + NoBmmResult(BlockHash), #[error("no block body with hash {0}")] NoBody(BlockHash), - #[error("no BMM verification result with for block {0}")] - NoBmmVerification(BlockHash), #[error("no deposits info for block {0}")] NoDepositsInfo(bitcoin::BlockHash), #[error("no header with hash {0}")] NoHeader(BlockHash), #[error("no height info for block hash {0}")] NoHeight(BlockHash), + #[error("unknown mainchain block hash: {0}")] + NoMainBlockHash(bitcoin::BlockHash), + #[error("no BMM commitments data for mainchain block {0}")] + NoMainBmmCommitments(bitcoin::BlockHash), #[error("no mainchain header with hash {0}")] NoMainHeader(bitcoin::BlockHash), #[error("no height info for mainchain block hash {0}")] @@ -50,10 +57,15 @@ pub enum Error { pub struct Archive { accumulators: Database, SerdeBincode>, block_hash_to_height: Database, SerdeBincode>, - /// BMM verification status for each header. - /// A status of false indicates that verification failed. + /// BMM results for each header. /// All ancestors of any block should always be present. - bmm_verifications: Database, SerdeBincode>, + /// All relevant mainchain headers should exist in `main_headers`. + /// Note that it is possible for a block to have BMM commitments in several + /// different mainchain blocks, if there are any mainchain forks. + bmm_results: Database< + SerdeBincode, + SerdeBincode>, + >, bodies: Database, SerdeBincode>, /// Deposits by mainchain block, sorted first-to-last in each block deposits: Database< @@ -82,21 +94,38 @@ pub struct Archive { headers: Database, SerdeBincode
>, main_block_hash_to_height: Database, SerdeBincode>, + /// BMM commitments in each mainchain block. + /// All ancestors must be present. + /// Mainchain blocks MUST be present in `main_headers`, but not all + /// mainchain headers will be present, if the blocks are not available. + /// BMM commitments do not imply existence of a sidechain block header. + /// BMM commitments do not imply BMM validity of a sidechain block, + /// as BMM commitments for ancestors may not exist. + main_bmm_commitments: Database< + SerdeBincode, + SerdeBincode>, + >, /// Mainchain headers. All ancestors of any header should always be present main_headers: Database, SerdeBincode>, + /// Mainchain successor blocks. ALL known block hashes, INCLUDING the zero hash, + /// MUST be present. + main_successors: Database< + SerdeBincode, + SerdeBincode>, + >, /// Successor blocks. ALL known block hashes, INCLUDING the zero hash, /// MUST be present. successors: Database, SerdeBincode>>, - /// Total work for mainchain headers. + /// Total work for mainchain headers with BMM verifications. /// All ancestors of any block should always be present total_work: Database, SerdeBincode>, } impl Archive { - pub const NUM_DBS: u32 = 12; + pub const NUM_DBS: u32 = 14; pub fn new(env: &heed::Env) -> Result { let mut rwtxn = env.write_txn()?; @@ -104,8 +133,8 @@ impl Archive { env.create_database(&mut rwtxn, Some("accumulators"))?; let block_hash_to_height = env.create_database(&mut rwtxn, Some("hash_to_height"))?; - let bmm_verifications = - env.create_database(&mut rwtxn, Some("bmm_verifications"))?; + let bmm_results = + env.create_database(&mut rwtxn, Some("bmm_results"))?; let bodies = env.create_database(&mut rwtxn, Some("bodies"))?; let deposits = env.create_database(&mut rwtxn, Some("deposits"))?; let exponential_ancestors = @@ -115,8 +144,22 @@ impl Archive { let headers = env.create_database(&mut rwtxn, Some("headers"))?; let main_block_hash_to_height = env.create_database(&mut rwtxn, Some("main_hash_to_height"))?; + let main_bmm_commitments = + env.create_database(&mut rwtxn, Some("main_bmm_commitments"))?; let main_headers = env.create_database(&mut rwtxn, Some("main_headers"))?; + let main_successors = + env.create_database(&mut rwtxn, Some("main_successors"))?; + if main_successors + .get(&rwtxn, &bitcoin::BlockHash::all_zeros())? + .is_none() + { + main_successors.put( + &mut rwtxn, + &bitcoin::BlockHash::all_zeros(), + &HashSet::new(), + )?; + } let successors = env.create_database(&mut rwtxn, Some("successors"))?; if successors.get(&rwtxn, &BlockHash::default())?.is_none() { successors.put( @@ -130,14 +173,16 @@ impl Archive { Ok(Self { accumulators, block_hash_to_height, - bmm_verifications, + bmm_results, bodies, deposits, exponential_ancestors, exponential_main_ancestors, headers, + main_bmm_commitments, main_block_hash_to_height, main_headers, + main_successors, successors, total_work, }) @@ -188,27 +233,37 @@ impl Archive { .ok_or(Error::NoHeight(block_hash)) } - pub fn try_get_bmm_verification( + pub fn get_bmm_results( &self, rotxn: &RoTxn, block_hash: BlockHash, - ) -> Result, Error> { - if block_hash == BlockHash::default() { - Ok(Some(true)) - } else { - self.bmm_verifications - .get(rotxn, &block_hash) - .map_err(Error::from) - } + ) -> Result, Error> { + let results = self + .bmm_results + .get(rotxn, &block_hash) + .map_err(Error::from)? + .unwrap_or_default(); + Ok(results) } - pub fn get_bmm_verification( + pub fn try_get_bmm_result( &self, rotxn: &RoTxn, block_hash: BlockHash, - ) -> Result { - self.try_get_bmm_verification(rotxn, block_hash)? - .ok_or(Error::NoBmmVerification(block_hash)) + main_hash: bitcoin::BlockHash, + ) -> Result, Error> { + let results = self.get_bmm_results(rotxn, block_hash)?; + Ok(results.get(&main_hash).copied()) + } + + pub fn get_bmm_result( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + main_hash: bitcoin::BlockHash, + ) -> Result { + self.try_get_bmm_result(rotxn, block_hash, main_hash)? + .ok_or(Error::NoBmmResult(block_hash)) } pub fn try_get_body( @@ -265,6 +320,24 @@ impl Archive { .ok_or(Error::NoHeader(block_hash)) } + pub fn try_get_main_bmm_commitment( + &self, + rotxn: &RoTxn, + main_hash: bitcoin::BlockHash, + ) -> Result>, Error> { + let commitments = self.main_bmm_commitments.get(rotxn, &main_hash)?; + Ok(commitments) + } + + pub fn get_main_bmm_commitment( + &self, + rotxn: &RoTxn, + main_hash: bitcoin::BlockHash, + ) -> Result, Error> { + self.try_get_main_bmm_commitment(rotxn, main_hash)? + .ok_or(Error::NoMainBmmCommitments(main_hash)) + } + pub fn try_get_main_height( &self, rotxn: &RoTxn, @@ -306,6 +379,24 @@ impl Archive { .ok_or(Error::NoMainHeader(block_hash)) } + pub fn try_get_main_successors( + &self, + rotxn: &RoTxn, + block_hash: bitcoin::BlockHash, + ) -> Result>, Error> { + let successors = self.main_successors.get(rotxn, &block_hash)?; + Ok(successors) + } + + pub fn get_main_successors( + &self, + rotxn: &RoTxn, + block_hash: bitcoin::BlockHash, + ) -> Result, Error> { + self.try_get_main_successors(rotxn, block_hash)? + .ok_or(Error::NoMainBlockHash(block_hash)) + } + pub fn try_get_successors( &self, rotxn: &RoTxn, @@ -342,6 +433,36 @@ impl Archive { .ok_or(Error::NoMainHeader(block_hash)) } + /// Try to get the best valid mainchain verification for the specified block. + pub fn try_get_best_main_verification( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + ) -> Result, Error> { + let verifications = self.get_bmm_results(rotxn, block_hash)?; + verifications + .into_iter() + .filter_map(|(main_hash, bmm_result)| { + if bmm_result == BmmResult::Verified { + Some(Ok(main_hash)) + } else { + None + } + }) + .transpose_into_fallible() + .max_by_key(|main_hash| self.get_total_work(rotxn, *main_hash)) + } + + /// Try to get the best valid mainchain verification for the specified block. + pub fn get_best_main_verification( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + ) -> Result { + self.try_get_best_main_verification(rotxn, block_hash)? + .ok_or(Error::NoBmmResult(block_hash)) + } + pub fn get_nth_ancestor( &self, rotxn: &RoTxn, @@ -405,6 +526,24 @@ impl Archive { Ok(block_hash) } + /// Get block locator for the specified block hash + pub fn get_block_locator( + &self, + rotxn: &RoTxn, + block_hash: BlockHash, + ) -> Result, Error> { + if block_hash == BlockHash::default() { + return Ok(Vec::new()); + } + let header = self.get_header(rotxn, block_hash)?; + let mut res = + self.exponential_ancestors.get(rotxn, &block_hash)?.unwrap(); + res.reverse(); + res.push(header.prev_side_hash); + res.reverse(); + Ok(res) + } + /// Returns true if the second specified block is a descendant of the first /// specified block /// Returns an error if either of the specified block headers do not exist @@ -470,18 +609,6 @@ impl Archive { Ok(()) } - /// Store a BMM verification result - pub fn put_bmm_verification( - &self, - rwtxn: &mut RwTxn, - block_hash: BlockHash, - verification_result: bool, - ) -> Result<(), Error> { - self.bmm_verifications - .put(rwtxn, &block_hash, &verification_result)?; - Ok(()) - } - /// Store a block body. The header must already exist. pub fn put_body( &self, @@ -515,6 +642,12 @@ impl Archive { Ok(()) } + /// Store a header. + /// + /// The following predicates MUST be met before calling this function: + /// * Ancestor headers MUST be stored + /// * BMM commitments MUST be stored for mainchain header where + /// `main_header.prev_blockhash == header.prev_main_hash` pub fn put_header( &self, rwtxn: &mut RwTxn, @@ -540,7 +673,13 @@ impl Archive { &pred_successors, )?; } - self.successors.put(rwtxn, &block_hash, &HashSet::new())?; + // Store successors + { + let successors = self + .try_get_successors(rwtxn, block_hash)? + .unwrap_or_default(); + self.successors.put(rwtxn, &block_hash, &successors)?; + } // populate exponential ancestors let mut exponential_ancestors = Vec::::new(); if height >= 2 { @@ -563,6 +702,119 @@ impl Archive { &block_hash, &exponential_ancestors, )?; + // Populate BMM verifications + { + let mut bmm_results = self.get_bmm_results(rwtxn, block_hash)?; + let parent_bmm_results = + self.get_bmm_results(rwtxn, header.prev_side_hash)?; + let main_blocks = + self.get_main_successors(rwtxn, header.prev_main_hash)?; + for main_block in main_blocks { + let Some(commitment) = + self.get_main_bmm_commitment(rwtxn, main_block)? + else { + tracing::trace!(%block_hash, "Failed BMM @ {main_block}: missing commitment"); + bmm_results.insert(main_block, BmmResult::Failed); + continue; + }; + if commitment != block_hash { + tracing::trace!(%block_hash, "Failed BMM @ {main_block}: commitment to other block ({commitment})"); + bmm_results.insert(main_block, BmmResult::Failed); + continue; + } + let main_header = self.get_main_header(rwtxn, main_block)?; + if header.prev_main_hash != main_header.prev_blockhash { + tracing::trace!(%block_hash, "Failed BMM @ {main_block}: should be impossible?"); + bmm_results.insert(main_block, BmmResult::Failed); + continue; + } + if header.prev_side_hash == BlockHash::default() { + tracing::trace!(%block_hash, "Verified BMM @ {main_block}: no parent"); + bmm_results.insert(main_block, BmmResult::Verified); + continue; + } + // Check if there is a valid BMM commitment to the parent in the + // main ancestry + let main_ancestry_contains_valid_bmm_commitment_to_parent = + parent_bmm_results + .iter() + .map(Ok) + .transpose_into_fallible() + .any(|(bmm_block, bmm_result)| { + let parent_verified = *bmm_result + == BmmResult::Verified + && self.is_main_descendant( + rwtxn, *bmm_block, main_block, + )?; + Result::::Ok(parent_verified) + })?; + if main_ancestry_contains_valid_bmm_commitment_to_parent { + tracing::trace!(%block_hash, "Verified BMM @ {main_block}: verified parent"); + bmm_results.insert(main_block, BmmResult::Verified); + continue; + } else { + tracing::trace!(%block_hash, "Failed BMM @ {main_block}: no valid BMM commitment to parent in main ancestry"); + bmm_results.insert(main_block, BmmResult::Failed); + continue; + } + } + self.bmm_results.put(rwtxn, &block_hash, &bmm_results)?; + } + Ok(()) + } + + /// All ancestors MUST be present. + /// Mainchain blocks MUST be present in `main_headers`. + pub fn put_main_bmm_commitment( + &self, + rwtxn: &mut RwTxn, + main_hash: bitcoin::BlockHash, + commitment: Option, + ) -> Result<(), Error> { + let main_header = self.get_main_header(rwtxn, main_hash)?; + if main_header.prev_blockhash != bitcoin::BlockHash::all_zeros() { + let _ = self + .get_main_bmm_commitment(rwtxn, main_header.prev_blockhash)?; + } + self.main_bmm_commitments + .put(rwtxn, &main_hash, &commitment)?; + let Some(commitment) = commitment else { + return Ok(()); + }; + let Some(header) = self.try_get_header(rwtxn, commitment)? else { + return Ok(()); + }; + let bmm_result = if header.prev_main_hash != main_header.prev_blockhash + { + BmmResult::Failed + } else if header.prev_side_hash == BlockHash::default() { + BmmResult::Verified + } else { + // Check if there is a valid BMM commitment to the parent in the + // main ancestry + let parent_bmm_results = + self.get_bmm_results(rwtxn, header.prev_side_hash)?; + let main_ancestry_contains_valid_bmm_commitment_to_parent = + parent_bmm_results + .into_iter() + .map(Ok) + .transpose_into_fallible() + .any(|(bmm_block, bmm_result)| { + let parent_verified = bmm_result == BmmResult::Verified + && self.is_main_descendant( + rwtxn, bmm_block, main_hash, + )?; + Result::::Ok(parent_verified) + })?; + if main_ancestry_contains_valid_bmm_commitment_to_parent { + BmmResult::Verified + } else { + BmmResult::Failed + } + }; + let mut bmm_results = self.get_bmm_results(rwtxn, commitment)?; + bmm_results.insert(main_hash, bmm_result); + self.bmm_results.put(rwtxn, &commitment, &bmm_results)?; Ok(()) } @@ -593,6 +845,24 @@ impl Archive { .put(rwtxn, &block_hash, &height)?; self.main_headers.put(rwtxn, &block_hash, header)?; self.total_work.put(rwtxn, &block_hash, &total_work)?; + // Add to successors for predecessor + { + let mut pred_successors = + self.get_main_successors(rwtxn, header.prev_blockhash)?; + pred_successors.insert(block_hash); + self.main_successors.put( + rwtxn, + &header.prev_blockhash, + &pred_successors, + )?; + } + // Store successors + { + let successors = self + .try_get_main_successors(rwtxn, block_hash)? + .unwrap_or_default(); + self.main_successors.put(rwtxn, &block_hash, &successors)?; + } // populate exponential ancestors let mut exponential_ancestors = Vec::::new(); if height >= 2 { @@ -809,6 +1079,7 @@ impl Archive { } /// Compares two potential tips and returns the better tip, if there is one. + /// Headers for each tip MUST exist. /// It is possible that neither tip is better, eg. if the mainchain lineage /// is not shared and the tip with greater total work had lower height before /// the common mainchain ancestor. @@ -826,119 +1097,226 @@ impl Archive { pub fn better_tip( &self, rotxn: &RoTxn, - block_hash0: BlockHash, - block_hash1: BlockHash, - ) -> Result, Error> { + tip0: Tip, + tip1: Tip, + ) -> Result, Error> { + if tip0 == tip1 { + return Ok(None); + } + let block_hash0 = tip0.block_hash; + let block_hash1 = tip1.block_hash; let height0 = self.get_height(rotxn, block_hash0)?; let height1 = self.get_height(rotxn, block_hash1)?; match (height0, height1) { (0, 0) => return Ok(None), - (0, _) => return Ok(Some(block_hash1)), - (_, 0) => return Ok(Some(block_hash0)), + (0, _) => return Ok(Some(tip1)), + (_, 0) => return Ok(Some(tip0)), (_, _) => (), } - let header0 = self.get_header(rotxn, block_hash0)?; - let header1 = self.get_header(rotxn, block_hash1)?; - if self.shared_mainchain_lineage( - rotxn, - header0.prev_main_hash, - header1.prev_main_hash, - )? { - match height0.cmp(&height1) { - Ordering::Less => Ok(Some(block_hash1)), - Ordering::Greater => Ok(Some(block_hash0)), - Ordering::Equal => { - let work0 = - self.get_total_work(rotxn, header0.prev_main_hash)?; - let work1 = - self.get_total_work(rotxn, header1.prev_main_hash)?; - match work0.cmp(&work1) { - Ordering::Less => Ok(Some(block_hash1)), - Ordering::Greater => Ok(Some(block_hash0)), - Ordering::Equal => Ok(None), - } + let work0 = self.get_total_work(rotxn, tip0.main_block_hash)?; + let work1 = self.get_total_work(rotxn, tip1.main_block_hash)?; + match (work0.cmp(&work1), height0.cmp(&height1)) { + (Ordering::Less | Ordering::Equal, Ordering::Less) => { + // No ancestor of tip0 can have greater height, + // so tip1 is better. + Ok(Some(tip1)) + } + (Ordering::Equal | Ordering::Greater, Ordering::Greater) => { + // No ancestor of tip1 can have greater height, + // so tip0 is better. + Ok(Some(tip0)) + } + (Ordering::Less, Ordering::Equal) => { + // Within the same mainchain lineage, prefer lower work + // Otherwise, prefer tip with greater work + if self.shared_mainchain_lineage( + rotxn, + tip0.main_block_hash, + tip1.main_block_hash, + )? { + Ok(Some(tip0)) + } else { + Ok(Some(tip1)) } } - } else { - let work0 = self.get_total_work(rotxn, header0.prev_main_hash)?; - let work1 = self.get_total_work(rotxn, header1.prev_main_hash)?; - match (height0.cmp(&height1), work0.cmp(&work1)) { - (Ordering::Less, Ordering::Equal) => Ok(Some(block_hash1)), - (Ordering::Greater, Ordering::Equal) => Ok(Some(block_hash0)), - (Ordering::Less | Ordering::Equal, Ordering::Less) => { - Ok(Some(block_hash1)) + (Ordering::Greater, Ordering::Equal) => { + // Within the same mainchain lineage, prefer lower work + // Otherwise, prefer tip with greater work + if !self.shared_mainchain_lineage( + rotxn, + tip0.main_block_hash, + tip1.main_block_hash, + )? { + Ok(Some(tip0)) + } else { + Ok(Some(tip1)) } - (Ordering::Greater | Ordering::Equal, Ordering::Greater) => { - Ok(Some(block_hash0)) + } + (Ordering::Less, Ordering::Greater) => { + // Need to check if tip0 ancestor before common + // mainchain ancestor had greater or equal height + let main_ancestor = self.last_common_main_ancestor( + rotxn, + tip0.main_block_hash, + tip1.main_block_hash, + )?; + let tip0_ancestor_height = self + .ancestors(rotxn, block_hash0) + .find_map(|tip0_ancestor| { + if tip0_ancestor == BlockHash::default() { + return Ok(Some(0)); + } + let header = self.get_header(rotxn, tip0_ancestor)?; + if !self.is_main_descendant( + rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + self.get_height(rotxn, tip0_ancestor).map(Some) + })? + .unwrap(); + if tip0_ancestor_height >= height1 { + Ok(Some(tip0)) + } else { + Ok(Some(tip1)) } - (Ordering::Less, Ordering::Greater) - | (Ordering::Greater, Ordering::Less) - | (Ordering::Equal, Ordering::Equal) => { - let common_mainchain_ancestor = self - .last_common_main_ancestor( + } + (Ordering::Greater, Ordering::Less) => { + // Need to check if tip1 ancestor before common + // mainchain ancestor had greater or equal height + let main_ancestor = self.last_common_main_ancestor( + rotxn, + tip0.main_block_hash, + tip1.main_block_hash, + )?; + let tip1_ancestor_height = self + .ancestors(rotxn, block_hash1) + .find_map(|tip1_ancestor| { + if tip1_ancestor == BlockHash::default() { + return Ok(Some(0)); + } + let header = self.get_header(rotxn, tip1_ancestor)?; + if !self.is_main_descendant( rotxn, - header0.prev_main_hash, - header1.prev_main_hash, - )?; - let common_mainchain_ancestor_height = - self.get_main_height(rotxn, common_mainchain_ancestor)?; - let height_before_common_mainchain_ancestor0 = self - .ancestors(rotxn, block_hash0) - .find_map(|block_hash| { - if block_hash == BlockHash::default() { - return Ok(Some(0)); - }; - let header = self.get_header(rotxn, block_hash)?; - let main_height = self.get_main_height( + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + self.get_height(rotxn, tip1_ancestor).map(Some) + })? + .unwrap(); + if tip1_ancestor_height < height0 { + Ok(Some(tip0)) + } else { + Ok(Some(tip1)) + } + } + (Ordering::Equal, Ordering::Equal) => { + // If tip0 is the same as tip1, return tip0 + if block_hash0 == block_hash1 { + return Ok(Some(tip0)); + } + // Need to compare tip0 ancestor and tip1 ancestor + // before common mainchain ancestor + let main_ancestor = self.last_common_main_ancestor( + rotxn, + tip0.main_block_hash, + tip1.main_block_hash, + )?; + let main_ancestor_height = + self.get_main_height(rotxn, main_ancestor)?; + let (tip0_ancestor_height, tip0_ancestor_work) = self + .ancestors(rotxn, block_hash0) + .find_map(|tip0_ancestor| { + if tip0_ancestor == BlockHash::default() { + return Ok(Some((0, None))); + } + let header = self.get_header(rotxn, tip0_ancestor)?; + if !self.is_main_descendant( + rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + let height = self.get_height(rotxn, tip0_ancestor)?; + // Find mainchain block hash to get total work + let main_block = { + let prev_height = self.get_main_height( rotxn, header.prev_main_hash, )?; - if main_height > common_mainchain_ancestor_height { - return Ok(None); - }; - let height = self.get_height(rotxn, block_hash)?; - Ok(Some(height)) - })? - .unwrap(); - let height_before_common_mainchain_ancestor1 = self - .ancestors(rotxn, block_hash1) - .find_map(|block_hash| { - if block_hash == BlockHash::default() { - return Ok(Some(0)); - }; - let header = self.get_header(rotxn, block_hash)?; - let main_height = self.get_main_height( + let height = prev_height + 1; + self.get_nth_main_ancestor( rotxn, - header.prev_main_hash, - )?; - if main_height > common_mainchain_ancestor_height { - return Ok(None); - }; - let height = self.get_height(rotxn, block_hash)?; - Ok(Some(height)) - })? - .unwrap(); - match ( - work0.cmp(&work1), - height_before_common_mainchain_ancestor0 - .cmp(&height_before_common_mainchain_ancestor1), - ) { - (Ordering::Less, Ordering::Less | Ordering::Equal) => { - Ok(Some(block_hash1)) + main_ancestor, + main_ancestor_height - height, + )? + }; + let work = self.get_total_work(rotxn, main_block)?; + Ok(Some((height, Some(work)))) + })? + .unwrap(); + let (tip1_ancestor_height, tip1_ancestor_work) = self + .ancestors(rotxn, block_hash1) + .find_map(|tip1_ancestor| { + if tip1_ancestor == BlockHash::default() { + return Ok(Some((0, None))); } - (Ordering::Less, Ordering::Greater) => Ok(None), - ( - Ordering::Greater, - Ordering::Greater | Ordering::Equal, - ) => Ok(Some(block_hash0)), - (Ordering::Greater, Ordering::Less) => Ok(None), - (Ordering::Equal, Ordering::Less) => { - Ok(Some(block_hash1)) + let header = self.get_header(rotxn, tip1_ancestor)?; + if !self.is_main_descendant( + rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); } - (Ordering::Equal, Ordering::Greater) => { - Ok(Some(block_hash0)) + if header.prev_main_hash == main_ancestor { + return Ok(None); } - (Ordering::Equal, Ordering::Equal) => Ok(None), + let height = self.get_height(rotxn, tip1_ancestor)?; + // Find mainchain block hash to get total work + let main_block = { + let prev_height = self.get_main_height( + rotxn, + header.prev_main_hash, + )?; + let height = prev_height + 1; + self.get_nth_main_ancestor( + rotxn, + main_ancestor, + main_ancestor_height - height, + )? + }; + let work = self.get_total_work(rotxn, main_block)?; + Ok(Some((height, Some(work)))) + })? + .unwrap(); + match ( + tip0_ancestor_work.cmp(&tip1_ancestor_work), + tip0_ancestor_height.cmp(&tip1_ancestor_height), + ) { + (Ordering::Less | Ordering::Equal, Ordering::Equal) + | (_, Ordering::Greater) => { + // tip1 is not better + Ok(Some(tip0)) + } + (Ordering::Greater, Ordering::Equal) + | (_, Ordering::Less) => { + // tip1 is better + Ok(Some(tip1)) } } } diff --git a/lib/miner.rs b/lib/miner.rs index 27dbc2f..e68fe5a 100644 --- a/lib/miner.rs +++ b/lib/miner.rs @@ -91,21 +91,28 @@ impl Miner { pub async fn confirm_bmm( &mut self, - ) -> Result, Error> { + ) -> Result, Error> { const VERIFY_BMM_POLL_INTERVAL: Duration = Duration::from_secs(15); if let Some((header, body)) = self.block.clone() { - let block_hash = header.hash().into(); + let block_hash = header.hash(); tracing::trace!(%block_hash, "verifying bmm..."); - self.drivechain + let (bmm_verified, main_hash) = self + .drivechain .verify_bmm_next_block( header.prev_main_hash, - block_hash, + block_hash.into(), VERIFY_BMM_POLL_INTERVAL, ) .await?; - tracing::trace!(%block_hash, "verified bmm"); - self.block = None; - return Ok(Some((header, body))); + if bmm_verified { + tracing::trace!(%block_hash, "verified bmm"); + self.block = None; + return Ok(Some((main_hash, header, body))); + } else { + tracing::trace!(%block_hash, "bmm verification failed"); + self.block = None; + return Ok(None); + } } Ok(None) } diff --git a/lib/net/mod.rs b/lib/net/mod.rs index 7972919..c0cb801 100644 --- a/lib/net/mod.rs +++ b/lib/net/mod.rs @@ -27,6 +27,7 @@ use peer::{ }; pub use peer::{ ConnectionError as PeerConnectionError, Info as PeerConnectionInfo, + InternalMessage as PeerConnectionMessage, PeerStateId, Request as PeerRequest, Response as PeerResponse, }; @@ -48,6 +49,8 @@ pub enum Error { Heed(#[from] heed::Error), #[error("quinn error")] Io(#[from] std::io::Error), + #[error("peer connection not found for {0}")] + MissingPeerConnection(SocketAddr), #[error("peer connection")] PeerConnection(#[from] PeerConnectionError), #[error("quinn rustls error")] @@ -255,6 +258,26 @@ impl Net { Ok(()) } + // Push an internal message to the specified peer + pub fn push_internal_message( + &self, + message: PeerConnectionMessage, + addr: SocketAddr, + ) -> Result<(), Error> { + let active_peers_read = self.active_peers.read(); + let Some(peer_connection_handle) = active_peers_read.get(&addr) else { + return Err(Error::MissingPeerConnection(addr)); + }; + if let Err(send_err) = peer_connection_handle + .internal_message_tx + .unbounded_send(message) + { + let message = send_err.into_inner(); + tracing::error!("Failed to push internal message to peer connection {addr}: {message:?}") + } + Ok(()) + } + // Push a request to the specified peers pub fn push_request( &self, @@ -268,8 +291,8 @@ impl Net { continue; }; if let Err(_send_err) = peer_connection_handle - .forward_request_tx - .unbounded_send(request.clone()) + .internal_message_tx + .unbounded_send(request.clone().into()) { tracing::warn!( "Failed to push request to peer at {addr}: {request:?}" @@ -289,11 +312,12 @@ impl Net { .iter() .filter(|(addr, _)| !exclude.contains(addr)) .for_each(|(addr, peer_connection_handle)| { + let request = PeerRequest::PushTransaction { + transaction: tx.clone(), + }; if let Err(_send_err) = peer_connection_handle - .forward_request_tx - .unbounded_send(PeerRequest::PushTransaction { - transaction: tx.clone(), - }) + .internal_message_tx + .unbounded_send(request.into()) { let txid = tx.transaction.txid(); tracing::warn!("Failed to push tx {txid} to peer at {addr}") diff --git a/lib/net/peer.rs b/lib/net/peer.rs index 4e086ff..880203f 100644 --- a/lib/net/peer.rs +++ b/lib/net/peer.rs @@ -1,6 +1,11 @@ -use std::{collections::HashSet, net::SocketAddr}; +use std::{ + cmp::Ordering, + collections::{HashMap, HashSet}, + fmt::Debug, + net::SocketAddr, +}; -use bip300301::bitcoin::{self, hashes::Hash as _}; +use bip300301::bitcoin::{self, hashes::Hash as _, Work}; use borsh::BorshSerialize; use fallible_iterator::FallibleIterator; use futures::{channel::mpsc, stream, StreamExt, TryFutureExt, TryStreamExt}; @@ -17,18 +22,26 @@ use tokio_stream::wrappers::IntervalStream; use crate::{ archive::{self, Archive}, state::{self, State}, - types::{hash, AuthorizedTransaction, BlockHash, Body, Hash, Header, Txid}, + types::{ + hash, AuthorizedTransaction, BlockHash, BmmResult, Body, Hash, Header, + Tip, Txid, + }, }; #[derive(Debug, Error)] pub enum BanReason { - #[error("BMM verification failed for block {0}")] - BmmVerificationFailed(BlockHash), - #[error("Incorrect total work for block {block_hash}: {total_work:?}")] - IncorrectTotalWork { - block_hash: BlockHash, - total_work: Option, - }, + #[error( + "BMM verification failed for block hash {} at {}", + .0.block_hash, + .0.main_block_hash + )] + BmmVerificationFailed(Tip), + #[error( + "Incorrect total work for block {} at {}: {total_work:?}", + tip.block_hash, + tip.main_block_hash + )] + IncorrectTotalWork { tip: Tip, total_work: Option }, } #[must_use] @@ -46,14 +59,16 @@ pub enum ConnectionError { HeartbeatTimeout, #[error("heed error")] Heed(#[from] heed::Error), + #[error("missing peer state for id {0}")] + MissingPeerState(PeerStateId), #[error("peer should be banned; {0}")] PeerBan(#[from] BanReason), #[error("read to end error")] ReadToEnd(#[from] quinn::ReadToEndError), #[error("send datagram error")] SendDatagram(#[from] quinn::SendDatagramError), - #[error("send forward request error")] - SendForwardRequest, + #[error("send internal message error")] + SendInternalMessage, #[error("send info error")] SendInfo, #[error("state error")] @@ -68,16 +83,13 @@ impl From> for ConnectionError { } } -impl From> for ConnectionError { - fn from(_: mpsc::TrySendError) -> Self { - Self::SendForwardRequest +impl From> for ConnectionError { + fn from(_: mpsc::TrySendError) -> Self { + Self::SendInternalMessage } } -fn borsh_serialize_work( - work: &bitcoin::Work, - writer: &mut W, -) -> borsh::io::Result<()> +fn borsh_serialize_work(work: &Work, writer: &mut W) -> borsh::io::Result<()> where W: borsh::io::Write, { @@ -85,25 +97,42 @@ where } fn borsh_serialize_option_work( - work: &Option, + work: &Option, writer: &mut W, ) -> borsh::io::Result<()> where W: borsh::io::Write, { #[derive(BorshSerialize)] - struct BorshWrapper( - #[borsh(serialize_with = "borsh_serialize_work")] bitcoin::Work, - ); + struct BorshWrapper(#[borsh(serialize_with = "borsh_serialize_work")] Work); borsh::BorshSerialize::serialize(&work.map(BorshWrapper), writer) } -#[derive(BorshSerialize, Clone, Debug, Default, Deserialize, Serialize)] +#[derive( + BorshSerialize, Clone, Copy, Debug, Default, Deserialize, Serialize, +)] pub struct PeerState { block_height: u32, - tip: BlockHash, + tip: Tip, #[borsh(serialize_with = "borsh_serialize_option_work")] - total_work: Option, + total_work: Option, +} + +/// Unique identifier for a peer state +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +#[repr(transparent)] +pub struct PeerStateId(Hash); + +impl From<&PeerState> for PeerStateId { + fn from(peer_state: &PeerState) -> Self { + Self(hash(peer_state)) + } +} + +impl std::fmt::Display for PeerStateId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } } #[derive(Debug, Serialize, Deserialize)] @@ -129,6 +158,19 @@ pub enum Request { Heartbeat(PeerState), GetBlock { block_hash: BlockHash, + /// Mainchain descendant tip that we are requesting the block to reach. + /// Only relevant for the requester, so serialization is skipped + #[borsh(skip)] + #[serde(skip)] + descendant_tip: Option, + /// Ancestor block. If no bodies are missing between `descendant_tip` + /// and `ancestor`, then `descendant_tip` is ready to apply. + /// Only relevant for the requester, so serialization is skipped + ancestor: Option, + /// Only relevant for the requester, so serialization is skipped + #[borsh(skip)] + #[serde(skip)] + peer_state_id: Option, }, /// Request headers up to [`end`] GetHeaders { @@ -138,26 +180,38 @@ pub enum Request { end: BlockHash, /// Height is only relevant for the requester, /// so serialization is skipped + #[borsh(skip)] #[serde(skip)] height: Option, + /// Only relevant for the requester, so serialization is skipped + #[borsh(skip)] + #[serde(skip)] + peer_state_id: Option, }, PushTransaction { transaction: AuthorizedTransaction, }, } +/// Info to send to the net task / node #[must_use] #[derive(Debug)] pub enum Info { Error(ConnectionError), - /// Need BMM verification for the specified block - NeedBmmVerification(BlockHash), - /// Need Mainchain ancestors for the specified block hash - NeedMainchainAncestors(BlockHash), + /// Need BMM verification for the specified tip + NeedBmmVerification { + main_hash: bitcoin::BlockHash, + peer_state_id: PeerStateId, + }, + /// Need Mainchain ancestors for the specified tip + NeedMainchainAncestors { + main_hash: bitcoin::BlockHash, + peer_state_id: PeerStateId, + }, /// New tip ready (body and header exist in archive, BMM verified) - NewTipReady(BlockHash), + NewTipReady(Tip), NewTransaction(AuthorizedTransaction), - Response(Response, Request), + Response(Box<(Response, Request)>), } impl From for Info { @@ -178,6 +232,31 @@ where } } +/// Message received from the connection task / net task / node +#[derive(Clone, Debug)] +pub enum InternalMessage { + /// Indicates if a BMM verification request completed. + /// Does not indicate that BMM was verified successfully. + BmmVerification { + res: Result<(), bip300301::BlockNotFoundError>, + peer_state_id: PeerStateId, + }, + /// Forward a request + ForwardRequest(Request), + /// Indicates that mainchain ancestors are now available + MainchainAncestors(PeerStateId), + /// Indicates that the requested headers are now available + Headers(PeerStateId), + /// Indicates that all requested missing block bodies are now available + BodiesAvailable(PeerStateId), +} + +impl From for InternalMessage { + fn from(request: Request) -> Self { + Self::ForwardRequest(request) + } +} + #[derive(Clone)] pub struct Connection(pub(super) quinn::Connection); @@ -265,11 +344,10 @@ struct ConnectionTask { connection: Connection, ctxt: ConnectionContext, info_tx: mpsc::UnboundedSender, - peer_state: Option, - /// Push a request to forward to the peer - forward_request_tx: mpsc::UnboundedSender, - /// Receive requests to forward to the peer - forward_request_rx: mpsc::UnboundedReceiver, + /// Push an internal message from connection task / net task / node + internal_message_tx: mpsc::UnboundedSender, + /// Receive an internal message from connection task / net task / node + internal_message_rx: mpsc::UnboundedReceiver, } impl ConnectionTask { @@ -303,125 +381,491 @@ impl ConnectionTask { .map_err(ConnectionError::from) } - /// If a new tip is announced with greater height than the current tip: - /// * If the header does not exist, request it - /// * Verify height of the new tip. - /// * If the previous mainchain header does not exist, request it - /// * Verify PoW - /// * Verify BMM - /// * If ancestor bodies do not exist, request them - /// * Attempt to apply the new tip - async fn handle_heartbeat( + /// * Request any missing mainchain headers + /// * Check claimed work + /// * Request BMM commitments if necessary + /// * Check that BMM commitment matches peer tip + /// * Check if peer tip is better, requesting headers if necessary + /// * If peer tip is better: + /// * request headers if missing + /// * verify BMM + /// * request missing bodies + /// * notify net task / node that new tip is ready + async fn handle_peer_state( ctxt: &ConnectionContext, info_tx: &mpsc::UnboundedSender, - forward_request_tx: &mpsc::UnboundedSender, + internal_message_tx: &mpsc::UnboundedSender, peer_state: &PeerState, ) -> Result<(), ConnectionError> { + if peer_state.tip.main_block_hash == bitcoin::BlockHash::all_zeros() { + // Nothing to do in this case + return Ok(()); + } + let Some(peer_total_work) = peer_state.total_work else { + // Peer should send total work if they send a main tip + let ban_reason = BanReason::IncorrectTotalWork { + tip: peer_state.tip, + total_work: None, + }; + return Err(ConnectionError::PeerBan(ban_reason)); + }; let (tip, tip_height, total_work) = { let rotxn = ctxt.env.read_txn()?; let tip = ctxt.state.get_tip(&rotxn)?; let tip_height = ctxt.state.get_height(&rotxn)?; - let total_work = match ctxt.archive.try_get_header(&rotxn, tip)? { - None => None, - Some(header) - if header.prev_main_hash - == bitcoin::BlockHash::all_zeros() => - { - None - } - Some(header) => Some( - ctxt.archive - .get_total_work(&rotxn, header.prev_main_hash)?, - ), + let (bmm_verification, total_work) = if tip == BlockHash::default() + { + (bitcoin::BlockHash::all_zeros(), None) + } else { + let bmm_verification = + ctxt.archive.get_best_main_verification(&rotxn, tip)?; + let work = + ctxt.archive.get_total_work(&rotxn, bmm_verification)?; + (bmm_verification, Some(work)) + }; + let tip = Tip { + block_hash: tip, + main_block_hash: bmm_verification, }; (tip, tip_height, total_work) }; - let peer_height = peer_state.block_height; - if peer_height > tip_height - || (peer_height == tip_height && peer_state.total_work > total_work) + // Check claimed work and request mainchain headers and BMM commitments + // if necessary { - let header = { + let rotxn = ctxt.env.read_txn()?; + match ctxt + .archive + .try_get_main_header(&rotxn, peer_state.tip.main_block_hash)? + { + None => { + let info = Info::NeedMainchainAncestors { + main_hash: peer_state.tip.main_block_hash, + peer_state_id: peer_state.into(), + }; + info_tx.unbounded_send(info)?; + return Ok(()); + } + Some(_main_header) => { + let computed_total_work = ctxt.archive.get_total_work( + &rotxn, + peer_state.tip.main_block_hash, + )?; + if peer_total_work != computed_total_work { + let ban_reason = BanReason::IncorrectTotalWork { + tip: peer_state.tip, + total_work: Some(peer_total_work), + }; + return Err(ConnectionError::PeerBan(ban_reason)); + } + if peer_state.tip.block_hash == BlockHash::default() { + // Nothing else to do in this case + return Ok(()); + } + let Some(bmm_commitment) = + ctxt.archive.try_get_main_bmm_commitment( + &rotxn, + peer_state.tip.main_block_hash, + )? + else { + let info = Info::NeedBmmVerification { + main_hash: peer_state.tip.main_block_hash, + peer_state_id: peer_state.into(), + }; + info_tx.unbounded_send(info)?; + return Ok(()); + }; + if bmm_commitment != Some(peer_state.tip.block_hash) { + let ban_reason = + BanReason::BmmVerificationFailed(peer_state.tip); + return Err(ConnectionError::PeerBan(ban_reason)); + } + } + } + } + // Check if the peer tip is better, requesting headers if necessary + match ( + total_work.cmp(&Some(peer_total_work)), + tip_height.cmp(&peer_state.block_height), + ) { + (Ordering::Less | Ordering::Equal, Ordering::Less) => { + // No tip ancestor can have greater height, + // so peer tip is better. + // Request headers if necessary let rotxn = ctxt.env.read_txn()?; - ctxt.archive.try_get_header(&rotxn, peer_state.tip)? - }; - let Some(header) = header else { - // Request headers - let request = Request::GetHeaders { - // TODO: provide alternative start points - start: HashSet::new(), - end: peer_state.tip, - height: Some(peer_state.block_height), - }; - forward_request_tx.unbounded_send(request)?; + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() + { + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); + } + } + (Ordering::Equal | Ordering::Greater, Ordering::Greater) => { + // No peer tip ancestor can have greater height, + // so tip is better. + // Nothing to do in this case return Ok(()); - }; - // Check mainchain headers - let prev_main_header = { + } + (Ordering::Less, Ordering::Equal) => { + // Within the same mainchain lineage, prefer lower work + // Otherwise, prefer tip with greater work let rotxn = ctxt.env.read_txn()?; - ctxt.archive - .try_get_main_header(&rotxn, header.prev_main_hash)? - }; - let Some(_prev_main_header) = prev_main_header else { - let info = Info::NeedMainchainAncestors(header.hash()); - info_tx.unbounded_send(info)?; - return Ok(()); - }; - // Check PoW - let prev_main_total_work = { + if ctxt.archive.shared_mainchain_lineage( + &rotxn, + tip.main_block_hash, + peer_state.tip.main_block_hash, + )? { + // Nothing to do in this case + return Ok(()); + } + // Request headers if necessary + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() + { + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); + } + } + (Ordering::Greater, Ordering::Equal) => { + // Within the same mainchain lineage, prefer lower work + // Otherwise, prefer tip with greater work let rotxn = ctxt.env.read_txn()?; - ctxt.archive.get_total_work(&rotxn, header.prev_main_hash)? - }; - if Some(prev_main_total_work) != peer_state.total_work { - let ban_reason = BanReason::IncorrectTotalWork { - block_hash: peer_state.tip, - total_work: peer_state.total_work, - }; - return Err(ConnectionError::PeerBan(ban_reason)); + if !ctxt.archive.shared_mainchain_lineage( + &rotxn, + tip.main_block_hash, + peer_state.tip.main_block_hash, + )? { + // Nothing to do in this case + return Ok(()); + } + // Request headers if necessary + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() + { + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); + } } - // Verify BMM - { + (Ordering::Less, Ordering::Greater) => { + // Need to check if tip ancestor before common + // mainchain ancestor had greater or equal height let rotxn = ctxt.env.read_txn()?; - match ctxt + let main_ancestor = ctxt.archive.last_common_main_ancestor( + &rotxn, + tip.main_block_hash, + peer_state.tip.main_block_hash, + )?; + let tip_ancestor_height = ctxt .archive - .try_get_bmm_verification(&rotxn, peer_state.tip)? + .ancestors(&rotxn, tip.block_hash) + .find_map(|tip_ancestor| { + if tip_ancestor == BlockHash::default() { + return Ok(Some(0)); + } + let header = + ctxt.archive.get_header(&rotxn, tip_ancestor)?; + if !ctxt.archive.is_main_descendant( + &rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + ctxt.archive.get_height(&rotxn, tip_ancestor).map(Some) + })? + .unwrap(); + if tip_ancestor_height >= peer_state.block_height { + // Nothing to do in this case + return Ok(()); + } + // Request headers if necessary + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() { - Some(true) => (), - Some(false) => { - let ban_reason = - BanReason::BmmVerificationFailed(peer_state.tip); - return Err(ConnectionError::PeerBan(ban_reason)); - } - None => { - let info = Info::NeedBmmVerification(peer_state.tip); - info_tx.unbounded_send(info)?; - return Ok(()); - } + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); } - }; - let missing_bodies: Vec = { + } + (Ordering::Greater, Ordering::Less) => { + // Need to check if peer's tip ancestor before common + // mainchain ancestor had greater or equal height let rotxn = ctxt.env.read_txn()?; - let common_ancestor = ctxt.archive.last_common_ancestor( + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() + { + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); + } + let main_ancestor = ctxt.archive.last_common_main_ancestor( &rotxn, - tip, - peer_state.tip, + tip.main_block_hash, + peer_state.tip.main_block_hash, )?; - ctxt.archive.get_missing_bodies( + let peer_tip_ancestor_height = ctxt + .archive + .ancestors(&rotxn, peer_state.tip.block_hash) + .find_map(|peer_tip_ancestor| { + if peer_tip_ancestor == BlockHash::default() { + return Ok(Some(0)); + } + let header = ctxt + .archive + .get_header(&rotxn, peer_tip_ancestor)?; + if !ctxt.archive.is_main_descendant( + &rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + ctxt.archive + .get_height(&rotxn, peer_tip_ancestor) + .map(Some) + })? + .unwrap(); + if peer_tip_ancestor_height < tip_height { + // Nothing to do in this case + return Ok(()); + } + } + (Ordering::Equal, Ordering::Equal) => { + // If the peer tip is the same as the tip, nothing to do + if peer_state.tip.block_hash == tip.block_hash { + return Ok(()); + } + // Need to compare tip ancestor and peer's tip ancestor + // before common mainchain ancestor + let rotxn = ctxt.env.read_txn()?; + if ctxt + .archive + .try_get_header(&rotxn, peer_state.tip.block_hash)? + .is_none() + { + let start = HashSet::from_iter( + ctxt.archive + .get_block_locator(&rotxn, tip.block_hash)?, + ); + let request = Request::GetHeaders { + start, + end: peer_state.tip.block_hash, + height: Some(peer_state.block_height), + peer_state_id: Some(peer_state.into()), + }; + internal_message_tx.unbounded_send(request.into())?; + return Ok(()); + } + let main_ancestor = ctxt.archive.last_common_main_ancestor( &rotxn, - peer_state.tip, - common_ancestor, - )? - }; - if missing_bodies.is_empty() { - let info = Info::NewTipReady(peer_state.tip); - info_tx.unbounded_send(info)?; - } else { - // Request missing bodies - missing_bodies.into_iter().try_for_each(|block_hash| { - let request = Request::GetBlock { block_hash }; - forward_request_tx.unbounded_send(request) - })?; + tip.main_block_hash, + peer_state.tip.main_block_hash, + )?; + let main_ancestor_height = + ctxt.archive.get_main_height(&rotxn, main_ancestor)?; + let (tip_ancestor_height, tip_ancestor_work) = ctxt + .archive + .ancestors(&rotxn, tip.block_hash) + .find_map(|tip_ancestor| { + if tip_ancestor == BlockHash::default() { + return Ok(Some((0, None))); + } + let header = + ctxt.archive.get_header(&rotxn, tip_ancestor)?; + if !ctxt.archive.is_main_descendant( + &rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + let height = + ctxt.archive.get_height(&rotxn, tip_ancestor)?; + // Find mainchain block hash to get total work + let main_block = { + let prev_height = ctxt.archive.get_main_height( + &rotxn, + header.prev_main_hash, + )?; + let height = prev_height + 1; + ctxt.archive.get_nth_main_ancestor( + &rotxn, + main_ancestor, + main_ancestor_height - height, + )? + }; + let work = + ctxt.archive.get_total_work(&rotxn, main_block)?; + Ok(Some((height, Some(work)))) + })? + .unwrap(); + let (peer_tip_ancestor_height, peer_tip_ancestor_work) = ctxt + .archive + .ancestors(&rotxn, peer_state.tip.block_hash) + .find_map(|peer_tip_ancestor| { + if peer_tip_ancestor == BlockHash::default() { + return Ok(Some((0, None))); + } + let header = ctxt + .archive + .get_header(&rotxn, peer_tip_ancestor)?; + if !ctxt.archive.is_main_descendant( + &rotxn, + header.prev_main_hash, + main_ancestor, + )? { + return Ok(None); + } + if header.prev_main_hash == main_ancestor { + return Ok(None); + } + let height = ctxt + .archive + .get_height(&rotxn, peer_tip_ancestor)?; + // Find mainchain block hash to get total work + let main_block = { + let prev_height = ctxt.archive.get_main_height( + &rotxn, + header.prev_main_hash, + )?; + let height = prev_height + 1; + ctxt.archive.get_nth_main_ancestor( + &rotxn, + main_ancestor, + main_ancestor_height - height, + )? + }; + let work = + ctxt.archive.get_total_work(&rotxn, main_block)?; + Ok(Some((height, Some(work)))) + })? + .unwrap(); + match ( + tip_ancestor_work.cmp(&peer_tip_ancestor_work), + tip_ancestor_height.cmp(&peer_tip_ancestor_height), + ) { + (Ordering::Less | Ordering::Equal, Ordering::Equal) + | (_, Ordering::Greater) => { + // Peer tip is not better, nothing to do + return Ok(()); + } + (Ordering::Greater, Ordering::Equal) + | (_, Ordering::Less) => { + // Peer tip is better + } + } } } + // Check BMM now that headers are available + { + let rotxn = ctxt.env.read_txn()?; + let Some(BmmResult::Verified) = ctxt.archive.try_get_bmm_result( + &rotxn, + peer_state.tip.block_hash, + peer_state.tip.main_block_hash, + )? + else { + let ban_reason = + BanReason::BmmVerificationFailed(peer_state.tip); + return Err(ConnectionError::PeerBan(ban_reason)); + }; + } + // Request missing bodies, or notify that a new tip is ready + let (common_ancestor, missing_bodies): (BlockHash, Vec) = { + let rotxn = ctxt.env.read_txn()?; + let common_ancestor = ctxt.archive.last_common_ancestor( + &rotxn, + tip.block_hash, + peer_state.tip.block_hash, + )?; + let missing_bodies = ctxt.archive.get_missing_bodies( + &rotxn, + peer_state.tip.block_hash, + common_ancestor, + )?; + (common_ancestor, missing_bodies) + }; + if missing_bodies.is_empty() { + let info = Info::NewTipReady(peer_state.tip); + info_tx.unbounded_send(info)?; + } else { + // Request missing bodies + missing_bodies.into_iter().try_for_each(|block_hash| { + let request = Request::GetBlock { + block_hash, + descendant_tip: Some(peer_state.tip), + peer_state_id: Some(peer_state.into()), + ancestor: Some(common_ancestor), + }; + internal_message_tx.unbounded_send(request.into()) + })?; + } Ok(()) } @@ -505,30 +949,40 @@ impl ConnectionTask { async fn handle_request( ctxt: &ConnectionContext, info_tx: &mpsc::UnboundedSender, - forward_request_tx: &mpsc::UnboundedSender, - peer_state: &mut Option, + internal_message_tx: &mpsc::UnboundedSender, + peer_state: &mut Option, + // Map associating peer state hashes to peer state + peer_states: &mut HashMap, response_tx: SendStream, request: Request, ) -> Result<(), ConnectionError> { match request { Request::Heartbeat(new_peer_state) => { - let () = Self::handle_heartbeat( - ctxt, - info_tx, - forward_request_tx, - &new_peer_state, - ) - .await?; - *peer_state = Some(new_peer_state); + let new_peer_state_id = (&new_peer_state).into(); + peer_states.insert(new_peer_state_id, new_peer_state); + if *peer_state != Some(new_peer_state_id) { + let () = Self::handle_peer_state( + ctxt, + info_tx, + internal_message_tx, + &new_peer_state, + ) + .await?; + *peer_state = Some(new_peer_state_id); + } Ok(()) } - Request::GetBlock { block_hash } => { - Self::handle_get_block(ctxt, response_tx, block_hash).await - } + Request::GetBlock { + block_hash, + descendant_tip: _, + ancestor: _, + peer_state_id: _, + } => Self::handle_get_block(ctxt, response_tx, block_hash).await, Request::GetHeaders { start, end, height: _, + peer_state_id: _, } => Self::handle_get_headers(ctxt, response_tx, start, end).await, Request::PushTransaction { transaction } => { Self::handle_push_tx(ctxt, info_tx, response_tx, transaction) @@ -537,17 +991,18 @@ impl ConnectionTask { } } - async fn run(mut self) -> Result<(), ConnectionError> { + async fn run(self) -> Result<(), ConnectionError> { enum MailboxItem { - ForwardRequest(Request), + /// Internal messages from the connection task / net task / node + InternalMessage(InternalMessage), /// Signals that a heartbeat message should be sent to the peer Heartbeat, Request((Request, SendStream)), Response(Result, Request), } - let forward_request_stream = self - .forward_request_rx - .map(|request| Ok(MailboxItem::ForwardRequest(request))); + let internal_message_stream = self + .internal_message_rx + .map(|msg| Ok(MailboxItem::InternalMessage(msg))); let heartbeat_stream = IntervalStream::new(interval(Connection::HEARTBEAT_SEND_INTERVAL)) .map(|_| Ok(MailboxItem::Heartbeat)); @@ -572,18 +1027,24 @@ impl ConnectionTask { let response_stream = response_rx.map(|(resp, req)| Ok(MailboxItem::Response(resp, req))); let mut mailbox_stream = stream::select_all([ - forward_request_stream.boxed(), + internal_message_stream.boxed(), heartbeat_stream.boxed(), request_stream.boxed(), response_stream.boxed(), ]); // spawn child tasks on a JoinSet so that they are dropped alongside this task let mut task_set: JoinSet<()> = JoinSet::new(); + // current peer state + let mut peer_state = Option::::None; + // known peer states + let mut peer_states = HashMap::::new(); // Do not repeat requests let mut pending_request_hashes = HashSet::::new(); while let Some(mailbox_item) = mailbox_stream.try_next().await? { match mailbox_item { - MailboxItem::ForwardRequest(request) => { + MailboxItem::InternalMessage( + InternalMessage::ForwardRequest(request), + ) => { let request_hash = hash(&request); if !pending_request_hashes.insert(request_hash) { continue; @@ -601,29 +1062,68 @@ impl ConnectionTask { } }); } + MailboxItem::InternalMessage( + InternalMessage::BmmVerification { res, peer_state_id }, + ) => { + if let Err(block_not_found) = res { + tracing::warn!("{block_not_found}"); + continue; + } + let Some(peer_state) = peer_states.get(&peer_state_id) + else { + return Err(ConnectionError::MissingPeerState( + peer_state_id, + )); + }; + let () = Self::handle_peer_state( + &self.ctxt, + &self.info_tx, + &self.internal_message_tx, + peer_state, + ) + .await?; + } + MailboxItem::InternalMessage( + InternalMessage::MainchainAncestors(peer_state_id) + | InternalMessage::Headers(peer_state_id) + | InternalMessage::BodiesAvailable(peer_state_id), + ) => { + let Some(peer_state) = peer_states.get(&peer_state_id) + else { + return Err(ConnectionError::MissingPeerState( + peer_state_id, + )); + }; + let () = Self::handle_peer_state( + &self.ctxt, + &self.info_tx, + &self.internal_message_tx, + peer_state, + ) + .await?; + } MailboxItem::Heartbeat => { let (tip, tip_height, total_work) = { let rotxn = self.ctxt.env.read_txn()?; let tip = self.ctxt.state.get_tip(&rotxn)?; let tip_height = self.ctxt.state.get_height(&rotxn)?; - let total_work = match self - .ctxt - .archive - .try_get_header(&rotxn, tip)? - { - None => None, - Some(header) - if header.prev_main_hash - == bitcoin::BlockHash::all_zeros() => - { - None - } - Some(header) => { - Some(self.ctxt.archive.get_total_work( - &rotxn, - header.prev_main_hash, - )?) - } + let (bmm_verification, total_work) = + if tip == BlockHash::default() { + (bitcoin::BlockHash::all_zeros(), None) + } else { + let bmm_verification = self + .ctxt + .archive + .get_best_main_verification(&rotxn, tip)?; + let work = self + .ctxt + .archive + .get_total_work(&rotxn, bmm_verification)?; + (bmm_verification, Some(work)) + }; + let tip = Tip { + block_hash: tip, + main_block_hash: bmm_verification, }; (tip, tip_height, total_work) }; @@ -649,8 +1149,9 @@ impl ConnectionTask { let () = Self::handle_request( &self.ctxt, &self.info_tx, - &self.forward_request_tx, - &mut self.peer_state, + &self.internal_message_tx, + &mut peer_state, + &mut peer_states, response_tx, request, ) @@ -659,8 +1160,9 @@ impl ConnectionTask { MailboxItem::Response(resp, req) => { let request_hash = hash(&req); pending_request_hashes.remove(&request_hash); - let info = - resp.map(|resp| Info::Response(resp, req)).into(); + let info = resp + .map(|resp| Info::Response(Box::new((resp, req)))) + .into(); if self.info_tx.unbounded_send(info).is_err() { tracing::error!("Failed to send response info") }; @@ -674,7 +1176,8 @@ impl ConnectionTask { /// Connection killed on drop pub struct ConnectionHandle { task: JoinHandle<()>, - pub forward_request_tx: mpsc::UnboundedSender, + /// Push messages from connection task / net task / node + pub internal_message_tx: mpsc::UnboundedSender, } impl Drop for ConnectionHandle { @@ -688,19 +1191,18 @@ pub fn handle( ctxt: ConnectionContext, connection: Connection, ) -> (ConnectionHandle, mpsc::UnboundedReceiver) { - let (forward_request_tx, forward_request_rx) = mpsc::unbounded(); + let (internal_message_tx, internal_message_rx) = mpsc::unbounded(); let (info_tx, info_rx) = mpsc::unbounded(); let connection_task = { let info_tx = info_tx.clone(); - let forward_request_tx = forward_request_tx.clone(); + let internal_message_tx = internal_message_tx.clone(); move || async move { let connection_task = ConnectionTask { connection, ctxt, info_tx, - peer_state: None, - forward_request_tx, - forward_request_rx, + internal_message_tx, + internal_message_rx, }; connection_task.run().await } @@ -716,7 +1218,7 @@ pub fn handle( }); let connection_handle = ConnectionHandle { task, - forward_request_tx, + internal_message_tx, }; (connection_handle, info_rx) } @@ -726,20 +1228,19 @@ pub fn connect( addr: SocketAddr, ctxt: ConnectionContext, ) -> (ConnectionHandle, mpsc::UnboundedReceiver) { - let (forward_request_tx, forward_request_rx) = mpsc::unbounded(); + let (internal_message_tx, internal_message_rx) = mpsc::unbounded(); let (info_tx, info_rx) = mpsc::unbounded(); let connection_task = { let info_tx = info_tx.clone(); - let forward_request_tx = forward_request_tx.clone(); + let internal_message_tx = internal_message_tx.clone(); move || async move { let connection = Connection::new(&endpoint, addr).await?; let connection_task = ConnectionTask { connection, ctxt, info_tx, - peer_state: None, - forward_request_tx, - forward_request_rx, + internal_message_tx, + internal_message_rx, }; connection_task.run().await } @@ -755,7 +1256,7 @@ pub fn connect( }); let connection_handle = ConnectionHandle { task, - forward_request_tx, + internal_message_tx, }; (connection_handle, info_rx) } diff --git a/lib/node/mainchain_task.rs b/lib/node/mainchain_task.rs index 536b987..df4fef5 100644 --- a/lib/node/mainchain_task.rs +++ b/lib/node/mainchain_task.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use bip300301::{ bitcoin::{self, hashes::Hash as _}, + client::{BlockCommitment, SidechainId}, Drivechain, Header as BitcoinHeader, }; use fallible_iterator::FallibleIterator; @@ -18,11 +19,11 @@ use thiserror::Error; use tokio::{ spawn, task::{self, JoinHandle}, - time::Duration, }; use crate::{ archive::{self, Archive}, + node::THIS_SIDECHAIN, types::BlockHash, }; @@ -30,14 +31,33 @@ use crate::{ #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] pub(super) enum Request { /// Request missing mainchain ancestor headers - AncestorHeaders(BlockHash), + AncestorHeaders(bitcoin::BlockHash), /// Request recursive BMM verification - VerifyBmm(BlockHash), + VerifyBmm(bitcoin::BlockHash), } -/// Response indicating that a request has been fulfilled successfully +/// Response indicating that a request has been fulfilled #[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub(super) struct Response(pub Request); +pub(super) enum Response { + AncestorHeaders(bitcoin::BlockHash), + VerifyBmm( + bitcoin::BlockHash, + Result<(), bip300301::BlockNotFoundError>, + ), +} + +impl From for Request { + fn from(resp: Response) -> Self { + match resp { + Response::AncestorHeaders(block_hash) => { + Request::AncestorHeaders(block_hash) + } + Response::VerifyBmm(block_hash, _) => { + Request::VerifyBmm(block_hash) + } + } + } +} #[derive(Debug, Error)] enum Error { @@ -70,120 +90,150 @@ impl MainchainTask { env: &heed::Env, archive: &Archive, drivechain: &bip300301::Drivechain, - mut block_hash: bitcoin::BlockHash, + block_hash: bitcoin::BlockHash, ) -> Result<(), Error> { - tracing::debug!("requesting ancestor headers for {block_hash}"); + if block_hash == bitcoin::BlockHash::all_zeros() { + return Ok(()); + } else { + let rotxn = env.read_txn()?; + if archive.try_get_main_header(&rotxn, block_hash)?.is_some() { + return Ok(()); + } + } + let mut current_block_hash = block_hash; + let mut current_height = None; let mut headers: Vec = Vec::new(); + tracing::debug!(%block_hash, "requesting ancestor headers"); loop { - if block_hash == bitcoin::BlockHash::all_zeros() { + if let Some(current_height) = current_height { + tracing::trace!(%block_hash, "requesting ancestor headers: {current_block_hash}({current_height})") + } + let header = drivechain.get_header(current_block_hash).await?; + current_block_hash = header.prev_blockhash; + current_height = header.height.checked_sub(1); + headers.push(header); + if current_block_hash == bitcoin::BlockHash::all_zeros() { break; } else { let rotxn = env.read_txn()?; - if archive.try_get_main_header(&rotxn, block_hash)?.is_some() { + if archive + .try_get_main_header(&rotxn, current_block_hash)? + .is_some() + { break; } } - let header = drivechain.get_header(block_hash).await?; - block_hash = header.prev_blockhash; - headers.push(header); } headers.reverse(); - if headers.is_empty() { + // Writing all headers during IBD can starve archive readers. + tracing::trace!(%block_hash, "storing ancestor headers"); + task::block_in_place(|| { + let mut rwtxn = env.write_txn()?; + headers.into_iter().try_for_each(|header| { + archive.put_main_header(&mut rwtxn, &header) + })?; + rwtxn.commit()?; + tracing::trace!(%block_hash, "stored ancestor headers"); Ok(()) - } else { - // Writing all headers during IBD can starve archive readers. - task::block_in_place(|| { - let mut rwtxn = env.write_txn()?; - headers.into_iter().try_for_each(|header| { - archive.put_main_header(&mut rwtxn, &header) - })?; - rwtxn.commit()?; - Ok(()) - }) - } + }) } - /// Attempt to verify bmm for the specified block, - /// and store the verification result - async fn verify_bmm( + /// Request ancestor BMM commitments from the mainchain node, + /// up to and including the specified block. + /// Mainchain headers for the specified block and all ancestors MUST exist + /// in the archive. + async fn request_bmm_commitments( env: &heed::Env, archive: &Archive, drivechain: &bip300301::Drivechain, - block_hash: BlockHash, - ) -> Result { - const VERIFY_BMM_POLL_INTERVAL: Duration = Duration::from_secs(15); - let header = { + main_hash: bitcoin::BlockHash, + ) -> Result, Error> { + if main_hash == bitcoin::BlockHash::all_zeros() { + return Ok(Ok(())); + } else { let rotxn = env.read_txn()?; - archive.get_header(&rotxn, block_hash)? - }; - let (res, _next_block_hash) = drivechain - .verify_bmm_next_block( - header.prev_main_hash, - block_hash.into(), - VERIFY_BMM_POLL_INTERVAL, - ) - .await?; - let mut rwtxn = env.write_txn()?; - let () = archive.put_bmm_verification(&mut rwtxn, block_hash, res)?; - rwtxn.commit()?; - Ok(res) - } - - /// Attempt to verify bmm recursively up to the specified block, - /// and store the verification results - async fn recursive_verify_bmm( - env: &heed::Env, - archive: &Archive, - drivechain: &bip300301::Drivechain, - block_hash: BlockHash, - ) -> Result<(), Error> { - tracing::debug!( - "requesting recursive BMM verification for {block_hash}" - ); - let blocks_to_verify: Vec = { + if archive + .try_get_main_bmm_commitment(&rotxn, main_hash)? + .is_some() + { + return Ok(Ok(())); + } + } + let mut missing_commitments: Vec<_> = { let rotxn = env.read_txn()?; archive - .ancestors(&rotxn, block_hash) + .main_ancestors(&rotxn, main_hash) .take_while(|block_hash| { - archive - .try_get_bmm_verification(&rotxn, *block_hash) - .map(|bmm_verification| bmm_verification.is_none()) + Ok(*block_hash != bitcoin::BlockHash::all_zeros() + && archive + .try_get_main_bmm_commitment(&rotxn, *block_hash)? + .is_none()) }) .collect()? }; - let mut blocks_to_verify_iter = blocks_to_verify.into_iter().rev(); - while let Some(block_hash) = blocks_to_verify_iter.next() { - if !Self::verify_bmm(env, archive, drivechain, block_hash).await? { - // mark descendent blocks as BMM failed, - // no need to request from mainchain node + missing_commitments.reverse(); + tracing::debug!(%main_hash, "requesting ancestor bmm commitments"); + for missing_commitment in missing_commitments { + tracing::trace!(%main_hash, + "requesting ancestor bmm commitment: {missing_commitment}" + ); + let commitments: Vec = match drivechain + .get_block_commitments(missing_commitment) + .await? + { + Ok(commitments) => commitments + .into_iter() + .filter_map(|(_, commitment)| match commitment { + BlockCommitment::BmmHStar { + commitment, + sidechain_id: SidechainId(THIS_SIDECHAIN), + prev_bytes: _, + } => Some(commitment.into()), + BlockCommitment::BmmHStar { .. } + | BlockCommitment::ScdbUpdateBytes { .. } + | BlockCommitment::WitnessCommitment { .. } + | BlockCommitment::SidechainProposal + | BlockCommitment::SidechainActivationAck { .. } => { + None + } + }) + .collect(), + Err(block_not_found) => return Ok(Err(block_not_found)), + }; + // Should never be more than one commitment + assert!(commitments.len() <= 1); + let commitment = commitments.first().copied(); + tracing::trace!(%main_hash, + "storing ancestor bmm commitment: {missing_commitment}" + ); + { let mut rwtxn = env.write_txn()?; - for block_hash in blocks_to_verify_iter { - let () = archive - .put_bmm_verification(&mut rwtxn, block_hash, false)?; - } + archive.put_main_bmm_commitment( + &mut rwtxn, + missing_commitment, + commitment, + )?; rwtxn.commit()?; - break; } + tracing::trace!(%main_hash, + "stored ancestor bmm commitment: {missing_commitment}" + ); } - Ok(()) + Ok(Ok(())) } async fn run(mut self) -> Result<(), Error> { while let Some((request, response_tx)) = self.request_rx.next().await { match request { - Request::AncestorHeaders(block_hash) => { - let header = { - let rotxn = self.env.read_txn()?; - self.archive.get_header(&rotxn, block_hash)? - }; + Request::AncestorHeaders(main_block_hash) => { let () = Self::request_ancestor_headers( &self.env, &self.archive, &self.drivechain, - header.prev_main_hash, + main_block_hash, ) .await?; - let response = Response(request); + let response = Response::AncestorHeaders(main_block_hash); if let Some(response_tx) = response_tx { response_tx .send(response) @@ -195,14 +245,16 @@ impl MainchainTask { } } Request::VerifyBmm(block_hash) => { - let () = Self::recursive_verify_bmm( - &self.env, - &self.archive, - &self.drivechain, + let response = Response::VerifyBmm( block_hash, - ) - .await?; - let response = Response(request); + Self::request_bmm_commitments( + &self.env, + &self.archive, + &self.drivechain, + block_hash, + ) + .await?, + ); if let Some(response_tx) = response_tx { response_tx .send(response) diff --git a/lib/node/mod.rs b/lib/node/mod.rs index c370296..4ea0f08 100644 --- a/lib/node/mod.rs +++ b/lib/node/mod.rs @@ -15,9 +15,9 @@ use crate::{ net::{self, Net}, state::{self, State}, types::{ - Accumulator, Address, AuthorizedTransaction, BlockHash, Body, GetValue, - Header, OutPoint, Output, SpentOutput, Transaction, Txid, - WithdrawalBundle, + Accumulator, Address, AuthorizedTransaction, BlockHash, BmmResult, + Body, GetValue, Header, OutPoint, Output, SpentOutput, Tip, + Transaction, Txid, WithdrawalBundle, }, }; @@ -429,6 +429,7 @@ impl Node { /// or was rejected as the new tip. pub async fn submit_block( &self, + main_block_hash: bitcoin::BlockHash, header: &Header, body: &Body, ) -> Result { @@ -437,35 +438,56 @@ impl Node { if header.prev_side_hash != BlockHash::default() && self.try_get_header(header.prev_side_hash)?.is_none() { - tracing::error!( + tracing::error!(%block_hash, "Rejecting block {block_hash} due to missing ancestor headers", ); return Ok(false); - } else { - let mut rwtxn = self.env.write_txn()?; - let () = self.archive.put_header(&mut rwtxn, header)?; - rwtxn.commit()?; } // Request mainchain headers if they do not exist let _: mainchain_task::Response = self .mainchain_task .request_oneshot(mainchain_task::Request::AncestorHeaders( - header.hash(), + main_block_hash, )) .map_err(|_| Error::SendMainchainTaskRequest)? .await .map_err(|_| Error::ReceiveMainchainTaskResponse)?; // Verify BMM - let _: mainchain_task::Response = self + let mainchain_task::Response::VerifyBmm(_, res) = self .mainchain_task - .request_oneshot(mainchain_task::Request::VerifyBmm(block_hash)) + .request_oneshot(mainchain_task::Request::VerifyBmm( + main_block_hash, + )) .map_err(|_| Error::SendMainchainTaskRequest)? .await - .map_err(|_| Error::ReceiveMainchainTaskResponse)?; + .map_err(|_| Error::ReceiveMainchainTaskResponse)? + else { + panic!("should be impossible") + }; + if let Err(bip300301::BlockNotFoundError(missing_block)) = res { + tracing::error!(%block_hash, + "Rejecting block {block_hash} due to missing mainchain block {missing_block}", + ); + return Ok(false); + } + // Write header + tracing::trace!("Storing header: {block_hash}"); + { + let mut rwtxn = self.env.write_txn()?; + let () = self.archive.put_header(&mut rwtxn, header)?; + rwtxn.commit()?; + } + tracing::trace!("Stored header: {block_hash}"); + // Check BMM { let rotxn = self.env.read_txn()?; - if !self.archive.get_bmm_verification(&rotxn, block_hash)? { - tracing::error!( + if self.archive.get_bmm_result( + &rotxn, + block_hash, + main_block_hash, + )? == BmmResult::Failed + { + tracing::error!(%block_hash, "Rejecting block {block_hash} due to failing BMM verification", ); return Ok(false); @@ -486,7 +508,7 @@ impl Node { if !(missing_bodies.is_empty() || missing_bodies == vec![block_hash]) { - tracing::error!( + tracing::error!(%block_hash, "Rejecting block {block_hash} due to missing ancestor bodies", ); return Ok(false); @@ -499,7 +521,11 @@ impl Node { } } // Submit new tip - if !self.net_task.new_tip_ready_confirm(header.hash()).await? { + let new_tip = Tip { + block_hash, + main_block_hash, + }; + if !self.net_task.new_tip_ready_confirm(new_tip).await? { return Ok(false); }; let rotxn = self.env.read_txn()?; diff --git a/lib/node/net_task.rs b/lib/node/net_task.rs index f0424c6..2e124a1 100644 --- a/lib/node/net_task.rs +++ b/lib/node/net_task.rs @@ -26,10 +26,12 @@ use crate::{ archive::{self, Archive}, mempool::{self, MemPool}, net::{ - self, Net, PeerConnectionInfo, PeerInfoRx, PeerRequest, PeerResponse, + self, Net, PeerConnectionInfo, PeerConnectionMessage, PeerInfoRx, + PeerRequest, PeerResponse, PeerStateId, }, state::{self, State}, - types::{BlockHash, Body, Header}, + types::{BlockHash, BmmResult, Body, Header, Tip}, + util::UnitKey, }; #[derive(Debug, Error)] @@ -132,11 +134,10 @@ async fn disconnect_tip_( { let new_tip = state.get_tip(rwtxn)?; let accumulator = archive.get_accumulator(rwtxn, new_tip)?; - let () = state.utreexo_accumulator.put( - rwtxn, - &state::UnitKey, - &accumulator, - )?; + let () = + state + .utreexo_accumulator + .put(rwtxn, &UnitKey, &accumulator)?; } for transaction in tip_body.authorized_transactions().iter().rev() { mempool.put(rwtxn, transaction)?; @@ -156,19 +157,28 @@ async fn reorg_to_tip( drivechain: &bip300301::Drivechain, mempool: &MemPool, state: &State, - new_tip: BlockHash, + new_tip: Tip, ) -> Result { let mut rwtxn = env.write_txn()?; - let tip = state.get_tip(&rwtxn)?; + let tip_hash = state.get_tip(&rwtxn)?; let tip_height = state.get_height(&rwtxn)?; - // check that new tip is better than current tip - if archive.better_tip(&rwtxn, tip, new_tip)? != Some(new_tip) { - return Ok(false); + if tip_hash != BlockHash::default() { + let bmm_verification = + archive.get_best_main_verification(&rwtxn, tip_hash)?; + let tip = Tip { + block_hash: tip_hash, + main_block_hash: bmm_verification, + }; + // check that new tip is better than current tip + if archive.better_tip(&rwtxn, tip, new_tip)? != Some(new_tip) { + return Ok(false); + } } - let common_ancestor = archive.last_common_ancestor(&rwtxn, tip, new_tip)?; + let common_ancestor = + archive.last_common_ancestor(&rwtxn, tip_hash, new_tip.block_hash)?; // Check that all necessary bodies exist before disconnecting tip let blocks_to_apply: Vec<(Header, Body)> = archive - .ancestors(&rwtxn, new_tip) + .ancestors(&rwtxn, new_tip.block_hash) .take_while(|block_hash| Ok(*block_hash != common_ancestor)) .map(|block_hash| { let header = archive.get_header(&rwtxn, block_hash)?; @@ -193,9 +203,9 @@ async fn reorg_to_tip( .await?; } let tip = state.get_tip(&rwtxn)?; - assert_eq!(tip, new_tip); + assert_eq!(tip, new_tip.block_hash); rwtxn.commit()?; - tracing::info!("reorged to tip: {new_tip}"); + tracing::info!("synced to tip: {}", new_tip.block_hash); Ok(true) } @@ -217,18 +227,20 @@ struct NetTaskContext { /// An optional oneshot sender can be used receive the result of attempting /// to reorg to the new tip, on the corresponding oneshot receiver. type NewTipReadyMessage = - (BlockHash, Option, Option>); + (Tip, Option, Option>); struct NetTask { ctxt: NetTaskContext, /// Receive a request to forward to the mainchain task, with the address of - /// the peer connection that caused the request + /// the peer connection that caused the request, and the peer state ID of + /// the request forward_mainchain_task_request_rx: - UnboundedReceiver<(mainchain_task::Request, SocketAddr)>, + UnboundedReceiver<(mainchain_task::Request, SocketAddr, PeerStateId)>, /// Push a request to forward to the mainchain task, with the address of - /// the peer connection that caused the request + /// the peer connection that caused the request, and the peer state ID of + /// the request forward_mainchain_task_request_tx: - UnboundedSender<(mainchain_task::Request, SocketAddr)>, + UnboundedSender<(mainchain_task::Request, SocketAddr, PeerStateId)>, mainchain_task_response_rx: UnboundedReceiver, /// Receive a tip that is ready to reorg to, with the address of the peer /// connection that caused the request, if it originated from a peer. @@ -253,14 +265,7 @@ impl NetTask { // Attempt to switch to a descendant tip once a body has been // stored, if all other ancestor bodies are available. // Each descendant tip maps to the peers that sent that tip. - descendant_tips: &mut HashMap< - BlockHash, - HashMap>, - >, - forward_mainchain_task_request_tx: &UnboundedSender<( - mainchain_task::Request, - SocketAddr, - )>, + descendant_tips: &mut HashMap>>, new_tip_ready_tx: &UnboundedSender, addr: SocketAddr, resp: PeerResponse, @@ -268,7 +273,12 @@ impl NetTask { ) -> Result<(), Error> { match (req, resp) { ( - req @ PeerRequest::GetBlock { block_hash }, + req @ PeerRequest::GetBlock { + block_hash, + descendant_tip: Some(descendant_tip), + ancestor: Some(ancestor), + peer_state_id: Some(peer_state_id), + }, ref resp @ PeerResponse::Block { ref header, ref body, @@ -286,18 +296,62 @@ impl NetTask { ctxt.archive.put_body(&mut rwtxn, block_hash, body)?; rwtxn.commit()?; } + // Notify the peer connection if all requested block bodies are + // now available + { + let rotxn = ctxt.env.read_txn()?; + let missing_bodies = ctxt + .archive + .get_missing_bodies(&rotxn, block_hash, ancestor)?; + if missing_bodies.is_empty() { + let message = PeerConnectionMessage::BodiesAvailable( + peer_state_id, + ); + let () = + ctxt.net.push_internal_message(message, addr)?; + } + } // Check if any new tips can be applied, // and send new tip ready if so { let rotxn = ctxt.env.read_txn()?; - let tip = ctxt.state.get_tip(&rotxn)?; - if header.prev_side_hash == tip { + let tip_hash = ctxt.state.get_tip(&rotxn)?; + // Find the BMM verification that is an ancestor of + // `main_descendant_tip` + let main_block_hash = ctxt + .archive + .get_bmm_results(&rotxn, block_hash)? + .into_iter() + .map(Result::<_, Error>::Ok) + .transpose_into_fallible() + .find_map(|(main_block_hash, bmm_result)| { + match bmm_result { + BmmResult::Failed => Ok(None), + BmmResult::Verified => { + if ctxt.archive.is_main_descendant( + &rotxn, + main_block_hash, + descendant_tip.main_block_hash, + )? { + Ok(Some(main_block_hash)) + } else { + Ok(None) + } + } + } + })? + .unwrap(); + let block_tip = Tip { + block_hash, + main_block_hash, + }; + if header.prev_side_hash == tip_hash { let () = new_tip_ready_tx - .unbounded_send((block_hash, Some(addr), None)) + .unbounded_send((block_tip, Some(addr), None)) .map_err(|_| Error::SendNewTipReady)?; } let Some(descendant_tips) = - descendant_tips.remove(&block_hash) + descendant_tips.remove(&block_tip) else { return Ok(()); }; @@ -305,12 +359,12 @@ impl NetTask { let common_ancestor = ctxt.archive.last_common_ancestor( &rotxn, - descendant_tip, - tip, + descendant_tip.block_hash, + tip_hash, )?; let missing_bodies = ctxt.archive.get_missing_bodies( &rotxn, - descendant_tip, + descendant_tip.block_hash, common_ancestor, )?; if missing_bodies.is_empty() { @@ -326,12 +380,14 @@ impl NetTask { } } } - Ok(()) } ( PeerRequest::GetBlock { block_hash: req_block_hash, + descendant_tip: Some(_), + ancestor: Some(_), + peer_state_id: Some(_), }, PeerResponse::NoBlock { block_hash: resp_block_hash, @@ -342,6 +398,7 @@ impl NetTask { ref start, end, height: Some(height), + peer_state_id: Some(peer_state_id), }, PeerResponse::Headers(headers), ) => { @@ -410,12 +467,9 @@ impl NetTask { } } rwtxn.commit()?; - // Request mainchain headers - let request = - mainchain_task::Request::AncestorHeaders(end_header_hash); - let () = forward_mainchain_task_request_tx - .unbounded_send((request, addr)) - .map_err(|_| Error::ForwardMainchainTaskRequest)?; + // Notify peer connection that headers are available + let message = PeerConnectionMessage::Headers(peer_state_id); + let () = ctxt.net.push_internal_message(message, addr)?; Ok(()) } ( @@ -423,6 +477,7 @@ impl NetTask { start: _, end, height: _, + peer_state_id: _, }, PeerResponse::NoHeader { block_hash }, ) if end == block_hash => Ok(()), @@ -453,18 +508,18 @@ impl NetTask { enum MailboxItem { AcceptConnection(Result<(), Error>), // Forward a mainchain task request, along with the peer that - // caused the request - ForwardMainchainTaskRequest(mainchain_task::Request, SocketAddr), + // caused the request, and the peer state ID of the request + ForwardMainchainTaskRequest( + mainchain_task::Request, + SocketAddr, + PeerStateId, + ), MainchainTaskResponse(mainchain_task::Response), // Apply new tip from peer or self. // An optional oneshot sender can be used receive the result of // attempting to reorg to the new tip, on the corresponding oneshot // receiver. - NewTipReady( - BlockHash, - Option, - Option>, - ), + NewTipReady(Tip, Option, Option>), PeerInfo(Option<(SocketAddr, Option)>), } let accept_connections = stream::try_unfold((), |()| { @@ -479,8 +534,12 @@ impl NetTask { .map(MailboxItem::AcceptConnection); let forward_request_stream = self .forward_mainchain_task_request_rx - .map(|(request, addr)| { - MailboxItem::ForwardMainchainTaskRequest(request, addr) + .map(|(request, addr, peer_state_id)| { + MailboxItem::ForwardMainchainTaskRequest( + request, + addr, + peer_state_id, + ) }); let mainchain_task_response_stream = self .mainchain_task_response_rx @@ -502,20 +561,25 @@ impl NetTask { // stored, if all other ancestor bodies are available. // Each descendant tip maps to the peers that sent that tip. let mut descendant_tips = - HashMap::>>::new( - ); + HashMap::>>::new(); // Map associating mainchain task requests with the peer(s) that - // caused the request - let mut mainchain_task_request_sources = - HashMap::>::new(); + // caused the request, and the request peer state ID + let mut mainchain_task_request_sources = HashMap::< + mainchain_task::Request, + HashSet<(SocketAddr, PeerStateId)>, + >::new(); while let Some(mailbox_item) = mailbox_stream.next().await { match mailbox_item { MailboxItem::AcceptConnection(res) => res?, - MailboxItem::ForwardMainchainTaskRequest(request, peer) => { + MailboxItem::ForwardMainchainTaskRequest( + request, + peer, + peer_state_id, + ) => { mainchain_task_request_sources .entry(request) .or_default() - .insert(peer); + .insert((peer, peer_state_id)); let () = self .ctxt .mainchain_task @@ -523,133 +587,46 @@ impl NetTask { .map_err(|_| Error::SendMainchainTaskRequest)?; } MailboxItem::MainchainTaskResponse(response) => { - let request = response.0; - match request { - mainchain_task::Request::AncestorHeaders( - block_hash, + let request = response.into(); + match response { + mainchain_task::Response::AncestorHeaders( + _block_hash, ) => { let Some(sources) = mainchain_task_request_sources.remove(&request) else { continue; }; - // request verify BMM - for addr in sources { - let request = - mainchain_task::Request::VerifyBmm( - block_hash, + for (addr, peer_state_id) in sources { + let message = + PeerConnectionMessage::MainchainAncestors( + peer_state_id, ); let () = self - .forward_mainchain_task_request_tx - .unbounded_send((request, addr)) - .map_err(|_| { - Error::ForwardMainchainTaskRequest - })?; + .ctxt + .net + .push_internal_message(message, addr)?; } } - mainchain_task::Request::VerifyBmm(block_hash) => { + mainchain_task::Response::VerifyBmm( + _block_hash, + res, + ) => { let Some(sources) = mainchain_task_request_sources.remove(&request) else { continue; }; - let verify_bmm_result = { - let rotxn = self.ctxt.env.read_txn()?; - self.ctxt - .archive - .get_bmm_verification(&rotxn, block_hash)? - }; - if !verify_bmm_result { - for addr in &sources { - tracing::warn!( - %addr, - %block_hash, - "Invalid response from peer; BMM verification failed" - ); - let () = - self.ctxt.net.remove_active_peer(*addr); - } - } - let missing_bodies: Vec = { - let rotxn = self.ctxt.env.read_txn()?; - let tip = self.ctxt.state.get_tip(&rotxn)?; - let last_common_ancestor = - self.ctxt.archive.last_common_ancestor( - &rotxn, tip, block_hash, - )?; - self.ctxt.archive.get_missing_bodies( - &rotxn, - block_hash, - last_common_ancestor, - )? - }; - if missing_bodies.is_empty() { - for addr in sources { - let () = self - .new_tip_ready_tx - .unbounded_send(( - block_hash, - Some(addr), - None, - )) - .map_err(|_| Error::SendNewTipReady)?; - } - } else { - let rotxn = self.ctxt.env.read_txn()?; - // Request missing bodies, update descendent tips - for missing_body in missing_bodies { - descendant_tips - .entry(missing_body) - .or_default() - .entry(block_hash) - .or_default() - .extend(&sources); - // tips descended from the missing body, - // that are alo ancestors of `block_hash` - let lineage_tips: Vec = - descendant_tips[&missing_body] - .keys() - .map(Ok) - .transpose_into_fallible() - .filter(|tip| { - self.ctxt.archive.is_descendant( - &rotxn, **tip, block_hash, - ) - }) - .cloned() - .collect()?; - for lineage_tip in lineage_tips.into_iter() - { - let updated_sources: HashSet< - SocketAddr, - > = descendant_tips[&missing_body] - [&lineage_tip] - .difference(&sources) - .cloned() - .collect(); - if updated_sources.is_empty() { - descendant_tips - .get_mut(&missing_body) - .unwrap() - .remove(&lineage_tip); - } else { - descendant_tips - .get_mut(&missing_body) - .unwrap() - .insert( - lineage_tip, - updated_sources, - ); - } - } - let request = PeerRequest::GetBlock { - block_hash: missing_body, + for (addr, peer_state_id) in sources { + let message = + PeerConnectionMessage::BmmVerification { + res, + peer_state_id, }; - let () = self - .ctxt - .net - .push_request(request, &sources); - } + let () = self + .ctxt + .net + .push_internal_message(message, addr)?; } } } @@ -686,26 +663,30 @@ impl NetTask { tracing::error!(%addr, err = format!("{err:#}"), "Peer connection error"); let () = self.ctxt.net.remove_active_peer(addr); } - PeerConnectionInfo::NeedBmmVerification(block_hash) => { + PeerConnectionInfo::NeedBmmVerification { + main_hash, + peer_state_id, + } => { let request = - mainchain_task::Request::VerifyBmm(block_hash); + mainchain_task::Request::VerifyBmm(main_hash); let () = self .forward_mainchain_task_request_tx - .unbounded_send((request, addr)) + .unbounded_send((request, addr, peer_state_id)) .map_err(|_| { Error::ForwardMainchainTaskRequest })?; } - PeerConnectionInfo::NeedMainchainAncestors( - block_hash, - ) => { + PeerConnectionInfo::NeedMainchainAncestors { + main_hash, + peer_state_id, + } => { let request = mainchain_task::Request::AncestorHeaders( - block_hash, + main_hash, ); let () = self .forward_mainchain_task_request_tx - .unbounded_send((request, addr)) + .unbounded_send((request, addr, peer_state_id)) .map_err(|_| { Error::ForwardMainchainTaskRequest })?; @@ -729,11 +710,11 @@ impl NetTask { .net .push_tx(HashSet::from_iter([addr]), new_tx); } - PeerConnectionInfo::Response(resp, req) => { + PeerConnectionInfo::Response(boxed) => { + let (resp, req) = *boxed; let () = Self::handle_response( &self.ctxt, &mut descendant_tips, - &self.forward_mainchain_task_request_tx, &self.new_tip_ready_tx, addr, resp, @@ -814,7 +795,7 @@ impl NetTaskHandle { /// Push a tip that is ready to reorg to. #[allow(dead_code)] - pub fn new_tip_ready(&self, new_tip: BlockHash) -> Result<(), Error> { + pub fn new_tip_ready(&self, new_tip: Tip) -> Result<(), Error> { self.new_tip_ready_tx .unbounded_send((new_tip, None, None)) .map_err(|_| Error::SendNewTipReady) @@ -826,7 +807,7 @@ impl NetTaskHandle { /// A result of Ok(false) indicates that the tip was not reorged to. pub async fn new_tip_ready_confirm( &self, - new_tip: BlockHash, + new_tip: Tip, ) -> Result { let (oneshot_tx, oneshot_rx) = oneshot::channel(); let () = self diff --git a/lib/state.rs b/lib/state.rs index 2c826df..84653c1 100644 --- a/lib/state.rs +++ b/lib/state.rs @@ -9,7 +9,6 @@ use bip300301::{ TwoWayPegData, WithdrawalBundleStatus, }; use rustreexo::accumulator::{node_hash::NodeHash, proof::Proof}; -use serde::{Deserialize, Serialize}; use crate::{ authorization::Authorization, @@ -20,6 +19,7 @@ use crate::{ PointedOutput, SpentOutput, Transaction, Txid, Verify, WithdrawalBundle, }, + util::UnitKey, }; #[derive(Debug, thiserror::Error)] @@ -81,31 +81,6 @@ pub enum Error { WrongPubKeyForAddress, } -/// Unit key. LMDB can't use zero-sized keys, so this encodes to a single byte -#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)] -pub struct UnitKey; - -impl<'de> Deserialize<'de> for UnitKey { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - // Deserialize any byte (ignoring it) and return UnitKey - let _ = u8::deserialize(deserializer)?; - Ok(UnitKey) - } -} - -impl Serialize for UnitKey { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - // Always serialize to the same arbitrary byte - serializer.serialize_u8(0x69) - } -} - #[derive(Clone)] pub struct State { /// Current tip @@ -873,7 +848,7 @@ impl State { header: &Header, body: &Body, ) -> Result<(), Error> { - let tip_hash = self.tip.get(rwtxn, &UnitKey)?.unwrap_or_default(); + let tip_hash = self.get_tip(rwtxn)?; if tip_hash != header.prev_side_hash { let err = InvalidHeaderError::PrevSideHash { expected: tip_hash, diff --git a/lib/types/mod.rs b/lib/types/mod.rs index 05fdc25..fc691ad 100644 --- a/lib/types/mod.rs +++ b/lib/types/mod.rs @@ -1,4 +1,4 @@ -use bip300301::bitcoin; +use bip300301::bitcoin::{self, hashes::Hash as _}; use borsh::BorshSerialize; use rustreexo::accumulator::{node_hash::NodeHash, pollard::Pollard}; use serde::{Deserialize, Serialize}; @@ -202,3 +202,37 @@ impl Serialize for Accumulator { as Serialize>::serialize(&bytes, serializer) } } + +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub enum BmmResult { + Verified, + Failed, +} + +/// A tip refers to both a sidechain block AND the mainchain block that commits +/// to it. +#[derive( + BorshSerialize, + Clone, + Copy, + Debug, + Deserialize, + Eq, + Hash, + PartialEq, + Serialize, +)] +pub struct Tip { + pub block_hash: BlockHash, + #[borsh(serialize_with = "borsh_serialize_bitcoin_block_hash")] + pub main_block_hash: bitcoin::BlockHash, +} + +impl Default for Tip { + fn default() -> Self { + Self { + block_hash: BlockHash::default(), + main_block_hash: bitcoin::BlockHash::all_zeros(), + } + } +} diff --git a/lib/util.rs b/lib/util.rs index b4255a8..cfadda8 100644 --- a/lib/util.rs +++ b/lib/util.rs @@ -2,6 +2,7 @@ use futures::Stream; use heed::{Database, DefaultComparator, RoTxn, RwTxn}; +use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; @@ -21,6 +22,31 @@ pub mod watchable { pub use watchable::Watchable; +/// Unit key. LMDB can't use zero-sized keys, so this encodes to a single byte +#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)] +pub struct UnitKey; + +impl<'de> Deserialize<'de> for UnitKey { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + // Deserialize any byte (ignoring it) and return UnitKey + let _ = u8::deserialize(deserializer)?; + Ok(UnitKey) + } +} + +impl Serialize for UnitKey { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + // Always serialize to the same arbitrary byte + serializer.serialize_u8(0x69) + } +} + /// Heed DB augmented with watchable signals #[derive(Debug)] pub(crate) struct WatchableDb {