Skip to content

Commit

Permalink
feat: get assetdefinition from base layer
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Feb 7, 2022
1 parent 5955668 commit 109f4a8
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 83 deletions.
141 changes: 60 additions & 81 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fs, fs::File, io::BufReader, path::Path, sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};

use futures::future::try_join_all;
use log::*;
use log::info;
use tari_common::{configuration::ValidatorNodeConfig, GlobalConfig};
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_dht::Dht;
use tari_crypto::tari_utilities::hex::Hex;
use tari_dan_core::{
models::{AssetDefinition, Committee},
services::{
BaseNodeClient,
ConcreteAssetProcessor,
ConcreteCheckpointManager,
ConcreteCommitteeManager,
Expand All @@ -46,7 +46,7 @@ use tari_dan_storage_sqlite::{SqliteDbFactory, SqliteStorageService};
use tari_p2p::{comms_connector::SubscriptionFactory, tari_message::TariMessageType};
use tari_service_framework::ServiceHandles;
use tari_shutdown::ShutdownSignal;
use tokio::task;
use tokio::{task, time};

use crate::{
default_service_specification::DefaultServiceSpecification,
Expand All @@ -58,7 +58,7 @@ use crate::{
ExitCodes,
};

const LOG_TARGET: &str = "tari::dan::dan_node";
const LOG_TARGET: &str = "tari::validator_node::app";

pub struct DanNode {
config: GlobalConfig,
Expand All @@ -84,84 +84,63 @@ impl DanNode {
.as_ref()
.ok_or_else(|| ExitCodes::ConfigError("Missing dan section".to_string()))?;

let asset_definitions = self.read_asset_definitions(&dan_config.asset_config_directory)?;
if asset_definitions.is_empty() {
warn!(
target: LOG_TARGET,
"No assets to process. Add assets by putting definitions in the `assets` folder with a `.asset` \
extension."
);
}

let mut tasks = vec![];
for asset in asset_definitions {
let node_identitiy = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
let subscription_factory = subscription_factory.clone();
let shutdown = shutdown.clone();
let dan_config = dan_config.clone();
let db_factory = db_factory.clone();

tasks.push(task::spawn(async move {
DanNode::start_asset_worker(
asset,
node_identitiy,
mempool,
handles,
subscription_factory,
shutdown,
dan_config,
db_factory,
)
.await
}));
}

if tasks.is_empty() {
// If there are no assets to process, work in proxy mode
tasks.push(task::spawn(DanNode::wait_for_exit()));
}
try_join_all(tasks)
.await
.map_err(|err| ExitCodes::UnknownError(format!("Join error occurred. {}", err)))?
.into_iter()
.collect::<Result<_, _>>()?;

Ok(())
}

fn read_asset_definitions(&self, path: &Path) -> Result<Vec<AssetDefinition>, ExitCodes> {
if !path.exists() {
fs::create_dir_all(path).expect("Could not create dir");
}
let paths = fs::read_dir(path).expect("Could not read asset definitions");

let mut result = vec![];
for path in paths {
let path = path.expect("Not a valid file").path();

if !path.is_dir() && path.extension().unwrap_or_default() == "asset" {
let file = File::open(path).expect("could not open file");
let reader = BufReader::new(file);

let def: AssetDefinition = serde_json::from_reader(reader).expect("lol not a valid json");
result.push(def);
}
}
Ok(result)
}

async fn wait_for_exit() -> Result<(), ExitCodes> {
println!("Type `exit` to exit");
let mut base_node_client = GrpcBaseNodeClient::new(dan_config.base_node_grpc_address);
let mut tasks = HashMap::new();
let mut next_scanned_height = 0u64;
loop {
let mut line = String::new();
let _ = std::io::stdin().read_line(&mut line).expect("Failed to read line");
if line.to_lowercase().trim() == "exit" {
return Err(ExitCodes::UnknownError("User cancelled".to_string()));
} else {
println!("Type `exit` to exit");
let tip = base_node_client.get_tip_info().await.unwrap();
if tip.height_of_longest_chain >= next_scanned_height {
info!(
target: LOG_TARGET,
"Scanning base layer (tip : {}) for new assets", tip.height_of_longest_chain
);
if dan_config.scan_for_assets {
next_scanned_height = tip.height_of_longest_chain + dan_config.new_asset_scanning_interval;
info!(target: LOG_TARGET, "Next scanning height {}", next_scanned_height);
} else {
next_scanned_height = u64::MAX; // Never run again.
}

let assets = base_node_client
.get_assets_for_dan_node(node_identity.public_key().clone())
.await
.unwrap();
for asset in assets {
if tasks.contains_key(&asset.public_key) {
continue;
}
if let Some(allow_list) = &dan_config.assets_allow_list {
if !allow_list.contains(&asset.public_key.to_hex()) {
continue;
}
}
info!(target: LOG_TARGET, "Adding asset {:?}", asset.public_key);
let node_identitiy = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
let subscription_factory = subscription_factory.clone();
let shutdown = shutdown.clone();
let dan_config = dan_config.clone();
let db_factory = db_factory.clone();
tasks.insert(
asset.public_key.clone(),
task::spawn(async move {
DanNode::start_asset_worker(
asset.clone(),
node_identitiy,
mempool,
handles,
subscription_factory,
shutdown,
dan_config,
db_factory,
)
.await
}),
);
}
}
time::sleep(Duration::from_secs(120)).await;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tari_app_grpc::tari_rpc as grpc;
use tari_common_types::types::PublicKey;
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{
models::{BaseLayerMetadata, BaseLayerOutput},
models::{AssetDefinition, BaseLayerMetadata, BaseLayerOutput},
services::BaseNodeClient,
DigitalAssetError,
};
Expand Down Expand Up @@ -100,4 +100,48 @@ impl BaseNodeClient for GrpcBaseNodeClient {
.transpose()?;
Ok(output)
}

async fn get_assets_for_dan_node(
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<AssetDefinition>, DigitalAssetError> {
let inner = match self.inner.as_mut() {
Some(i) => i,
None => {
self.connect().await?;
self.inner.as_mut().unwrap()
},
};
let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 0 };
let mut result = inner.list_asset_registrations(request).await.unwrap().into_inner();
let mut assets: Vec<AssetDefinition> = vec![];
let tip = self.get_tip_info().await?;
while let Some(r) = result.message().await.unwrap() {
if let Ok(asset_public_key) = PublicKey::from_bytes(r.unique_id.as_bytes()) {
if let Some(checkpoint) = self
.get_current_checkpoint(tip.height_of_longest_chain, asset_public_key.clone(), vec![3u8; 32])
.await?
{
if let Some(committee) = checkpoint.get_side_chain_committee() {
if committee.contains(&dan_node_public_key) {
assets.push(AssetDefinition {
public_key: asset_public_key,
template_parameters: r
.features
.unwrap()
.asset
.unwrap()
.template_parameters
.into_iter()
.map(|tp| tp.into())
.collect(),
..Default::default()
});
}
}
}
}
}
Ok(assets)
}
}
7 changes: 7 additions & 0 deletions common/config/presets/validator_node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@
committee = ["2ea0df3059caf4411624d6bf5b9c02238d607d2798c586b3e6c2a054da3f205a"] # cannot be of zero size
phase_timeout = 30
template_id = "EditableMetadata"

# If set to false, there will be no scanning at all.
scan_for_assets = true
# How often do we want to scan the base layer for changes.
new_asset_scanning_interval = 10
# If set then only the specific assets will be checked.
# assets_allow_list = ["<pubkey>"]
3 changes: 3 additions & 0 deletions common/src/configuration/validator_node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct ValidatorNodeConfig {
pub base_node_grpc_address: SocketAddr,
#[serde(default = "default_wallet_grpc_address")]
pub wallet_grpc_address: SocketAddr,
pub scan_for_assets: bool,
pub new_asset_scanning_interval: u64,
pub assets_allow_list: Option<Vec<String>>,
}

fn default_asset_config_directory() -> PathBuf {
Expand Down
7 changes: 6 additions & 1 deletion dan_layer/core/src/services/base_node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tari_common_types::types::PublicKey;

use crate::{
digital_assets_error::DigitalAssetError,
models::{BaseLayerMetadata, BaseLayerOutput},
models::{AssetDefinition, BaseLayerMetadata, BaseLayerOutput},
};

#[async_trait]
Expand All @@ -38,4 +38,9 @@ pub trait BaseNodeClient {
asset_public_key: PublicKey,
checkpoint_unique_id: Vec<u8>,
) -> Result<Option<BaseLayerOutput>, DigitalAssetError>;

async fn get_assets_for_dan_node(
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<AssetDefinition>, DigitalAssetError>;
}
7 changes: 7 additions & 0 deletions dan_layer/core/src/services/mocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ impl BaseNodeClient for MockBaseNodeClient {
) -> Result<Option<BaseLayerOutput>, DigitalAssetError> {
todo!();
}

async fn get_assets_for_dan_node(
&mut self,
_dan_node_public_key: PublicKey,
) -> Result<Vec<AssetDefinition>, DigitalAssetError> {
todo!();
}
}

pub fn mock_base_node_client() -> MockBaseNodeClient {
Expand Down

0 comments on commit 109f4a8

Please sign in to comment.