diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index bc737d6883..1ab749cefc 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -20,10 +20,9 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{fs, fs::File, io::BufReader, path::Path, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; -use futures::future::try_join_all; -use log::*; +use log::info; use tari_common::{configuration::ValidatorNodeConfig, GlobalConfig}; use tari_comms::{types::CommsPublicKey, NodeIdentity}; use tari_comms_dht::Dht; @@ -31,6 +30,7 @@ use tari_crypto::tari_utilities::hex::Hex; use tari_dan_core::{ models::{AssetDefinition, Committee}, services::{ + BaseNodeClient, ConcreteAssetProcessor, ConcreteCheckpointManager, ConcreteCommitteeManager, @@ -46,7 +46,7 @@ use tari_dan_storage_sqlite::{SqliteDbFactory, SqliteStorageService}; use tari_p2p::{comms_connector::SubscriptionFactory, tari_message::TariMessageType}; use tari_service_framework::ServiceHandles; use tari_shutdown::ShutdownSignal; -use tokio::task; +use tokio::{task, time}; use crate::{ default_service_specification::DefaultServiceSpecification, @@ -58,7 +58,7 @@ use crate::{ ExitCodes, }; -const LOG_TARGET: &str = "tari::dan::dan_node"; +const LOG_TARGET: &str = "tari::validator_node::app"; pub struct DanNode { config: GlobalConfig, @@ -84,84 +84,63 @@ impl DanNode { .as_ref() .ok_or_else(|| ExitCodes::ConfigError("Missing dan section".to_string()))?; - let asset_definitions = self.read_asset_definitions(&dan_config.asset_config_directory)?; - if asset_definitions.is_empty() { - warn!( - target: LOG_TARGET, - "No assets to process. Add assets by putting definitions in the `assets` folder with a `.asset` \ - extension." - ); - } - - let mut tasks = vec![]; - for asset in asset_definitions { - let node_identitiy = node_identity.as_ref().clone(); - let mempool = mempool_service.clone(); - let handles = handles.clone(); - let subscription_factory = subscription_factory.clone(); - let shutdown = shutdown.clone(); - let dan_config = dan_config.clone(); - let db_factory = db_factory.clone(); - - tasks.push(task::spawn(async move { - DanNode::start_asset_worker( - asset, - node_identitiy, - mempool, - handles, - subscription_factory, - shutdown, - dan_config, - db_factory, - ) - .await - })); - } - - if tasks.is_empty() { - // If there are no assets to process, work in proxy mode - tasks.push(task::spawn(DanNode::wait_for_exit())); - } - try_join_all(tasks) - .await - .map_err(|err| ExitCodes::UnknownError(format!("Join error occurred. {}", err)))? - .into_iter() - .collect::>()?; - - Ok(()) - } - - fn read_asset_definitions(&self, path: &Path) -> Result, ExitCodes> { - if !path.exists() { - fs::create_dir_all(path).expect("Could not create dir"); - } - let paths = fs::read_dir(path).expect("Could not read asset definitions"); - - let mut result = vec![]; - for path in paths { - let path = path.expect("Not a valid file").path(); - - if !path.is_dir() && path.extension().unwrap_or_default() == "asset" { - let file = File::open(path).expect("could not open file"); - let reader = BufReader::new(file); - - let def: AssetDefinition = serde_json::from_reader(reader).expect("lol not a valid json"); - result.push(def); - } - } - Ok(result) - } - - async fn wait_for_exit() -> Result<(), ExitCodes> { - println!("Type `exit` to exit"); + let mut base_node_client = GrpcBaseNodeClient::new(dan_config.base_node_grpc_address); + let mut tasks = HashMap::new(); + let mut next_scanned_height = 0u64; loop { - let mut line = String::new(); - let _ = std::io::stdin().read_line(&mut line).expect("Failed to read line"); - if line.to_lowercase().trim() == "exit" { - return Err(ExitCodes::UnknownError("User cancelled".to_string())); - } else { - println!("Type `exit` to exit"); + let tip = base_node_client.get_tip_info().await.unwrap(); + if tip.height_of_longest_chain >= next_scanned_height { + info!( + target: LOG_TARGET, + "Scanning base layer (tip : {}) for new assets", tip.height_of_longest_chain + ); + if dan_config.scan_for_assets { + next_scanned_height = tip.height_of_longest_chain + dan_config.new_asset_scanning_interval; + info!(target: LOG_TARGET, "Next scanning height {}", next_scanned_height); + } else { + next_scanned_height = u64::MAX; // Never run again. + } + + let assets = base_node_client + .get_assets_for_dan_node(node_identity.public_key().clone()) + .await + .unwrap(); + for asset in assets { + if tasks.contains_key(&asset.public_key) { + continue; + } + if let Some(allow_list) = &dan_config.assets_allow_list { + if !allow_list.contains(&asset.public_key.to_hex()) { + continue; + } + } + info!(target: LOG_TARGET, "Adding asset {:?}", asset.public_key); + let node_identitiy = node_identity.as_ref().clone(); + let mempool = mempool_service.clone(); + let handles = handles.clone(); + let subscription_factory = subscription_factory.clone(); + let shutdown = shutdown.clone(); + let dan_config = dan_config.clone(); + let db_factory = db_factory.clone(); + tasks.insert( + asset.public_key.clone(), + task::spawn(async move { + DanNode::start_asset_worker( + asset.clone(), + node_identitiy, + mempool, + handles, + subscription_factory, + shutdown, + dan_config, + db_factory, + ) + .await + }), + ); + } } + time::sleep(Duration::from_secs(120)).await; } } diff --git a/applications/tari_validator_node/src/grpc/services/base_node_client.rs b/applications/tari_validator_node/src/grpc/services/base_node_client.rs index 8654288a24..236627f353 100644 --- a/applications/tari_validator_node/src/grpc/services/base_node_client.rs +++ b/applications/tari_validator_node/src/grpc/services/base_node_client.rs @@ -27,7 +27,7 @@ use tari_app_grpc::tari_rpc as grpc; use tari_common_types::types::PublicKey; use tari_crypto::tari_utilities::ByteArray; use tari_dan_core::{ - models::{BaseLayerMetadata, BaseLayerOutput}, + models::{AssetDefinition, BaseLayerMetadata, BaseLayerOutput}, services::BaseNodeClient, DigitalAssetError, }; @@ -100,4 +100,48 @@ impl BaseNodeClient for GrpcBaseNodeClient { .transpose()?; Ok(output) } + + async fn get_assets_for_dan_node( + &mut self, + dan_node_public_key: PublicKey, + ) -> Result, DigitalAssetError> { + let inner = match self.inner.as_mut() { + Some(i) => i, + None => { + self.connect().await?; + self.inner.as_mut().unwrap() + }, + }; + let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 0 }; + let mut result = inner.list_asset_registrations(request).await.unwrap().into_inner(); + let mut assets: Vec = vec![]; + let tip = self.get_tip_info().await?; + while let Some(r) = result.message().await.unwrap() { + if let Ok(asset_public_key) = PublicKey::from_bytes(r.unique_id.as_bytes()) { + if let Some(checkpoint) = self + .get_current_checkpoint(tip.height_of_longest_chain, asset_public_key.clone(), vec![3u8; 32]) + .await? + { + if let Some(committee) = checkpoint.get_side_chain_committee() { + if committee.contains(&dan_node_public_key) { + assets.push(AssetDefinition { + public_key: asset_public_key, + template_parameters: r + .features + .unwrap() + .asset + .unwrap() + .template_parameters + .into_iter() + .map(|tp| tp.into()) + .collect(), + ..Default::default() + }); + } + } + } + } + } + Ok(assets) + } } diff --git a/common/config/presets/validator_node.toml b/common/config/presets/validator_node.toml index f6af4517a9..735cd0def8 100644 --- a/common/config/presets/validator_node.toml +++ b/common/config/presets/validator_node.toml @@ -9,3 +9,10 @@ committee = ["2ea0df3059caf4411624d6bf5b9c02238d607d2798c586b3e6c2a054da3f205a"] # cannot be of zero size phase_timeout = 30 template_id = "EditableMetadata" + +# If set to false, there will be no scanning at all. +scan_for_assets = true +# How often do we want to scan the base layer for changes. +new_asset_scanning_interval = 10 +# If set then only the specific assets will be checked. +# assets_allow_list = [""] diff --git a/common/src/configuration/validator_node_config.rs b/common/src/configuration/validator_node_config.rs index 243b42aa16..79c9eba215 100644 --- a/common/src/configuration/validator_node_config.rs +++ b/common/src/configuration/validator_node_config.rs @@ -41,6 +41,9 @@ pub struct ValidatorNodeConfig { pub base_node_grpc_address: SocketAddr, #[serde(default = "default_wallet_grpc_address")] pub wallet_grpc_address: SocketAddr, + pub scan_for_assets: bool, + pub new_asset_scanning_interval: u64, + pub assets_allow_list: Option>, } fn default_asset_config_directory() -> PathBuf { diff --git a/dan_layer/core/src/services/base_node_client.rs b/dan_layer/core/src/services/base_node_client.rs index 99cacf2a4b..ae77144a8d 100644 --- a/dan_layer/core/src/services/base_node_client.rs +++ b/dan_layer/core/src/services/base_node_client.rs @@ -25,7 +25,7 @@ use tari_common_types::types::PublicKey; use crate::{ digital_assets_error::DigitalAssetError, - models::{BaseLayerMetadata, BaseLayerOutput}, + models::{AssetDefinition, BaseLayerMetadata, BaseLayerOutput}, }; #[async_trait] @@ -38,4 +38,9 @@ pub trait BaseNodeClient { asset_public_key: PublicKey, checkpoint_unique_id: Vec, ) -> Result, DigitalAssetError>; + + async fn get_assets_for_dan_node( + &mut self, + dan_node_public_key: PublicKey, + ) -> Result, DigitalAssetError>; } diff --git a/dan_layer/core/src/services/mocks/mod.rs b/dan_layer/core/src/services/mocks/mod.rs index 67e520f52e..b5989823bd 100644 --- a/dan_layer/core/src/services/mocks/mod.rs +++ b/dan_layer/core/src/services/mocks/mod.rs @@ -197,6 +197,13 @@ impl BaseNodeClient for MockBaseNodeClient { ) -> Result, DigitalAssetError> { todo!(); } + + async fn get_assets_for_dan_node( + &mut self, + _dan_node_public_key: PublicKey, + ) -> Result, DigitalAssetError> { + todo!(); + } } pub fn mock_base_node_client() -> MockBaseNodeClient {