Skip to content

Commit

Permalink
feat: trickle validators on layer 1 (#1197)
Browse files Browse the repository at this point in the history
Description
---
Query base layer for changes (added/removed) validator nodes per epoch


Breaking Changes
---

- [ ] None
- [x] Requires data directory to be deleted
- [ ] Other - Please specify

---------

Co-authored-by: Stan Bondi <[email protected]>
  • Loading branch information
ksrichard and sdbondi authored Nov 12, 2024
1 parent a148dcb commit 9907bc1
Show file tree
Hide file tree
Showing 62 changed files with 1,407 additions and 1,147 deletions.
694 changes: 460 additions & 234 deletions Cargo.lock

Large diffs are not rendered by default.

46 changes: 26 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,26 +131,6 @@ tari_p2p = { git = "https://github.com/tari-project/tari.git", branch = "feature
tari_shutdown = { git = "https://github.com/tari-project/tari.git", branch = "feature-dan2" }
tari_storage = { git = "https://github.com/tari-project/tari.git", branch = "feature-dan2" }

#minotari_app_grpc = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_app_utilities = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_console_wallet = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_node = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_node_grpc_client = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_wallet = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#minotari_wallet_grpc_client = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_common = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_common_types = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_hashing = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#
## avoid including default features so each crate can choose which ones to import
#tari_core = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2", default-features = false }
#tari_key_manager = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_metrics = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_mmr = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_p2p = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_shutdown = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }
#tari_storage = { git = "https://github.com/sdbondi/tari.git", branch = "update-feature-dan2" }

tari_crypto = "0.21.0"
tari_utilities = "0.8.0"

Expand Down Expand Up @@ -297,3 +277,29 @@ overflow-checks = true
#tari_libtor = { git = "https://github.com/account/tari.git", branch = "my-branch" }
#tari_hashing = { git = "https://github.com/account/tari.git", branch = "my-branch" }


#[patch."https://github.com/tari-project/tari.git"]
#minotari_app_grpc = { path = "../tari/applications/minotari_app_grpc" }
#minotari_wallet_grpc_client = { path = "../tari/clients/rust/wallet_grpc_client" }
#minotari_node_grpc_client = { path = "../tari/clients/rust/base_node_grpc_client" }
#tari_common = { path = "../tari/common" }
#tari_common_types = { path = "../tari/base_layer/common_types" }
#tari_comms = { path = "../tari/comms/core" }
#tari_comms_rpc_macros = { path = "../tari/comms/rpc_macros" }
#tari_core = { path = "../tari/base_layer/core" }
#tari_key_manager = { path = "../tari/base_layer/key_manager" }
#tari_mmr = { path = "../tari/base_layer/mmr" }
#tari_p2p = { path = "../tari/base_layer/p2p" }
#tari_shutdown = { path = "../tari/infrastructure/shutdown" }
#tari_storage = { path = "../tari/infrastructure/storage" }
#tari_script = { path = "../tari/infrastructure/tari_script" }
#minotari_wallet = { path = "../tari/base_layer/wallet" }
#minotari_console_wallet = { path = "../tari/applications/minotari_console_wallet" }
#tari_service_framework = { path = "../tari/base_layer/service_framework" }
#tari_comms_dht = { path = "../tari/comms/dht" }
#minotari_app_utilities = { path = "../tari/applications/minotari_app_utilities" }
#minotari_node = { path = "../tari/applications/minotari_node" }
#tari_metrics = { path = "../tari/infrastructure/metrics" }
#tari_libtor = { path = "../tari/infrastructure/libtor" }
#tari_hashing = { path = "../tari/hashing" }

1 change: 1 addition & 0 deletions applications/tari_dan_app_utilities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tari_bor = { workspace = true, default-features = true }
tari_indexer_lib = { workspace = true }
tari_networking = { workspace = true }
tari_validator_node_rpc = { workspace = true }
minotari_app_grpc = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
105 changes: 82 additions & 23 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use std::time::Duration;

use log::*;
use minotari_app_grpc::tari_rpc::ValidatorNodeChangeState;
use tari_base_node_client::{
grpc::GrpcBaseNodeClient,
types::{BaseLayerMetadata, BlockInfo},
Expand All @@ -42,7 +43,7 @@ use tari_core::transactions::{
};
use tari_crypto::{
ristretto::RistrettoPublicKey,
tari_utilities::{hex::Hex, ByteArray},
tari_utilities::{hex::Hex, ByteArray, ByteArrayError},
};
use tari_dan_common_types::{optional::Optional, NodeAddressable, VersionedSubstateId};
use tari_dan_storage::{
Expand Down Expand Up @@ -106,6 +107,7 @@ pub struct BaseLayerScanner<TAddr> {
last_scanned_height: u64,
last_scanned_tip: Option<FixedHash>,
last_scanned_hash: Option<FixedHash>,
last_scanned_validator_node_mr: Option<FixedHash>,
next_block_hash: Option<FixedHash>,
base_node_client: GrpcBaseNodeClient,
epoch_manager: EpochManagerHandle<TAddr>,
Expand Down Expand Up @@ -141,6 +143,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
last_scanned_tip: None,
last_scanned_height: 0,
last_scanned_hash: None,
last_scanned_validator_node_mr: None,
next_block_hash: None,
base_node_client,
epoch_manager,
Expand Down Expand Up @@ -223,6 +226,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
);
// TODO: we need to figure out where the fork happened, and delete data after the fork.
self.last_scanned_hash = None;
self.last_scanned_validator_node_mr = None;
self.last_scanned_height = 0;
self.sync_blockchain().await?;
},
Expand Down Expand Up @@ -278,6 +282,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
Some(end_height) => end_height,
};
let mut scan = tip.tip_hash;
let mut current_last_validator_nodes_mr = self.last_scanned_validator_node_mr;
loop {
let header = self.base_node_client.get_header_by_hash(scan).await?;
if let Some(last_tip) = self.last_scanned_tip {
Expand All @@ -290,9 +295,60 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
// This will be processed down below.
break;
}
current_last_validator_nodes_mr = Some(header.validator_node_mr);
self.epoch_manager.add_block_hash(header.height, scan).await?;
scan = header.prev_hash;
}

// syncing validator node changes
if current_last_validator_nodes_mr != self.last_scanned_validator_node_mr {
info!(target: LOG_TARGET,
"⛓️ Syncing validator nodes (sidechain ID: {:?}) from base node (height range: {}-{})",
self.validator_node_sidechain_id,
start_scan_height,
end_height,
);

let node_changes = self
.base_node_client
.get_validator_node_changes(start_scan_height, end_height, self.validator_node_sidechain_id.as_ref())
.await
.map_err(BaseLayerScannerError::BaseNodeError)?;

for node_change in node_changes {
if node_change.registration.is_none() {
warn!(
target: LOG_TARGET,
"Can't register validator node \"{}\" because it has empty registration!",
node_change.public_key.to_hex(),
);
continue;
}
let registration = ValidatorNodeRegistration::try_from(node_change.registration.clone().unwrap())
.map_err(BaseLayerScannerError::GrpcConversion)?;
match node_change.state() {
ValidatorNodeChangeState::Add => {
self.add_validator_node_registration(
node_change.start_height,
registration,
node_change.minimum_value_promise.into(),
)
.await?;
},
ValidatorNodeChangeState::Remove => {
self.remove_validator_node_registration(
PublicKey::from_canonical_bytes(node_change.public_key.as_slice())
.map_err(BaseLayerScannerError::PublicKeyConversion)?,
registration.sidechain_id().cloned(),
)
.await?;
},
}
}

self.last_scanned_validator_node_mr = current_last_validator_nodes_mr;
}

for current_height in start_scan_height..=end_height {
let utxos = self
.base_node_client
Expand Down Expand Up @@ -329,27 +385,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
};
match sidechain_feature {
SideChainFeature::ValidatorNodeRegistration(reg) => {
info!(
target: LOG_TARGET,
"⛓️ Validator node registration UTXO for {} sidechain {} found at height {}",
reg.public_key(),
reg.sidechain_id().map(|v| v.to_hex()).unwrap_or("None".to_string()),
current_height,
);
if reg.sidechain_id() == self.validator_node_sidechain_id.as_ref() {
self.register_validator_node_registration(
current_height,
reg.clone(),
output.minimum_value_promise,
)
.await?;
} else {
warn!(
target: LOG_TARGET,
"Ignoring validator node registration for sidechain ID {:?}. Expected sidechain ID: {:?}",
reg.sidechain_id().map(|v| v.to_hex()),
self.validator_node_sidechain_id.as_ref().map(|v| v.to_hex()));
}
trace!(target: LOG_TARGET, "New validator node registration scanned: {reg:?}");
},
SideChainFeature::CodeTemplateRegistration(reg) => {
if reg.sidechain_id != self.template_sidechain_id {
Expand Down Expand Up @@ -496,7 +532,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
Ok(())
}

async fn register_validator_node_registration(
async fn add_validator_node_registration(
&mut self,
height: u64,
registration: ValidatorNodeRegistration,
Expand All @@ -516,6 +552,25 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
Ok(())
}

async fn remove_validator_node_registration(
&mut self,
public_key: PublicKey,
sidechain_id: Option<PublicKey>,
) -> Result<(), BaseLayerScannerError> {
info!(
target: LOG_TARGET,
"⛓️ Remove validator node registration for {}(side chain ID: {:?})",
public_key,
sidechain_id
);

self.epoch_manager
.remove_validator_node_registration(public_key, sidechain_id)
.await?;

Ok(())
}

async fn register_code_template_registration(
&mut self,
template_name: String,
Expand Down Expand Up @@ -574,6 +629,10 @@ pub enum BaseLayerScannerError {
commitment: Box<Commitment>,
source: StorageError,
},
#[error("Public key conversion error: {0}")]
PublicKeyConversion(ByteArrayError),
#[error("GRPC conversion error: {0}")]
GrpcConversion(String),
}

enum BlockchainProgression {
Expand Down
1 change: 0 additions & 1 deletion applications/tari_indexer/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ pub async fn spawn_services(
.try_into()
.context("committee_size must be non-zero")?,
validator_node_sidechain_id: config.indexer.sidechain_id.clone(),
max_vns_per_epoch_activated: consensus_constants.max_vns_per_epoch_activated,
},
global_db.clone(),
base_node_client.clone(),
Expand Down
18 changes: 9 additions & 9 deletions applications/tari_swarm_daemon/src/process_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,19 @@ impl ProcessManager {
};
let num_blocks = num_vns + u64::try_from(templates_to_register.len()).unwrap();

// Mine some initial funds, guessing 10 blocks extra to allow for coinbase maturity
self.mine(num_blocks + 10).await.context("initial mining failed")?;
self.wait_for_wallet_funds(num_blocks)
.await
.context("waiting for wallet funds")?;

if !self.skip_registration {
// Mine some initial funds, guessing 10 blocks extra to allow for coinbase maturity
self.mine(num_blocks + 10).await.context("initial mining failed")?;
self.wait_for_wallet_funds(num_blocks)
.await
.context("waiting for wallet funds")?;

self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
}
for templates in templates_to_register {
self.register_template(templates).await?;
for templates in templates_to_register {
self.register_template(templates).await?;
}
}

if num_blocks > 0 {
Expand Down
Loading

0 comments on commit 9907bc1

Please sign in to comment.