Skip to content

Commit

Permalink
feat: read asset definitions from base layer (#3802)
Browse files Browse the repository at this point in the history
Description
---
Load assets from base layer. And periodically check for new assets.

How Has This Been Tested?
---
Manually.
  • Loading branch information
Cifko authored Feb 7, 2022
1 parent bbd0e1e commit 86de08b
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 83 deletions.
6 changes: 6 additions & 0 deletions applications/tari_app_grpc/proto/wallet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ service Wallet {
rpc GetCompletedTransactions (GetCompletedTransactionsRequest) returns (stream GetCompletedTransactionsResponse);
// Returns the balance
rpc GetBalance (GetBalanceRequest) returns (GetBalanceResponse);
// Returns unspent amounts
rpc GetUnspentAmounts (Empty) returns (GetUnspentAmountsResponse);
// Request the wallet perform a coinsplit
rpc CoinSplit (CoinSplitRequest) returns (CoinSplitResponse);
// Import Utxo to wallet
Expand Down Expand Up @@ -206,6 +208,10 @@ message GetBalanceResponse {
uint64 pending_outgoing_balance = 3;
}

message GetUnspentAmountsResponse {
repeated uint64 amount = 1;
}

message GetCoinbaseRequest {
uint64 reward = 1;
uint64 fee = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,19 @@ impl WalletClient {
debug!(target: LOG_TARGET, "result {:?}", result);
Ok(result.into_inner())
}

pub async fn get_unspent_amounts(
&mut self,
) -> Result<grpc::GetUnspentAmountsResponse, CollectiblesError> {
let inner = self.inner.as_mut().unwrap();
let request = grpc::Empty {};
let result = inner.get_unspent_amounts(request).await.map_err(|source| {
CollectiblesError::ClientRequestError {
request: "get_unspent_amounts".to_string(),
source,
}
})?;
debug!(target: LOG_TARGET, "result {:?}", result);
Ok(result.into_inner())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ pub(crate) async fn asset_wallets_get_balance(
Ok(total)
}

#[tauri::command]
pub(crate) async fn asset_wallets_get_unspent_amounts(
state: tauri::State<'_, ConcurrentAppState>,
) -> Result<Vec<u64>, Status> {
let mut client = state.create_wallet_client().await;
client.connect().await?;
let result = client.get_unspent_amounts().await?;
Ok(result.amount)
}

#[tauri::command]
pub(crate) async fn asset_wallets_list(
state: tauri::State<'_, ConcurrentAppState>,
Expand Down
1 change: 1 addition & 0 deletions applications/tari_collectibles/src-tauri/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ fn main() -> Result<(), Box<dyn Error>> {
commands::asset_wallets::asset_wallets_create,
commands::asset_wallets::asset_wallets_list,
commands::asset_wallets::asset_wallets_get_balance,
commands::asset_wallets::asset_wallets_get_unspent_amounts,
commands::asset_wallets::asset_wallets_get_latest_address,
commands::asset_wallets::asset_wallets_create_address,
commands::asset_wallets::asset_wallets_send_to,
Expand Down
5 changes: 5 additions & 0 deletions applications/tari_collectibles/web-app/src/Create.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ class Create extends React.Component {
templateIds.push(721);
}

let outputs = await binding.command_asset_wallets_get_unspent_amounts();

if (outputs.length <= 1) {
throw { message: "You need at least two unspent outputs" };
}
let publicKey = await binding.command_assets_create(
name,
description,
Expand Down
5 changes: 5 additions & 0 deletions applications/tari_collectibles/web-app/src/binding.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ async function command_asset_wallets_get_balance(assetPublicKey) {
return await invoke("asset_wallets_get_balance", { assetPublicKey });
}

async function command_asset_wallets_get_unspent_amounts() {
return await invoke("asset_wallets_get_unspent_amounts", {});
}

const commands = {
command_create_db,
command_assets_create,
Expand All @@ -147,6 +151,7 @@ const commands = {
command_next_asset_public_key,
command_asset_wallets_create,
command_asset_wallets_get_balance,
command_asset_wallets_get_unspent_amounts,
command_asset_wallets_list,
command_asset_wallets_get_latest_address,
command_asset_wallets_create_address,
Expand Down
20 changes: 20 additions & 0 deletions applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use tari_app_grpc::{
GetOwnedAssetsResponse,
GetTransactionInfoRequest,
GetTransactionInfoResponse,
GetUnspentAmountsResponse,
GetVersionRequest,
GetVersionResponse,
ImportUtxosRequest,
Expand Down Expand Up @@ -163,6 +164,25 @@ impl wallet_server::Wallet for WalletGrpcServer {
}))
}

async fn get_unspent_amounts(
&self,
_: Request<tari_rpc::Empty>,
) -> Result<Response<GetUnspentAmountsResponse>, Status> {
let mut output_service = self.get_output_manager_service();
let unspent_amounts;
match output_service.get_unspent_outputs().await {
Ok(uo) => unspent_amounts = uo,
Err(e) => return Err(Status::not_found(format!("GetUnspentAmounts error! {}", e))),
}
Ok(Response::new(GetUnspentAmountsResponse {
amount: unspent_amounts
.into_iter()
.map(|o| o.value.as_u64())
.filter(|&a| a > 0)
.collect(),
}))
}

async fn revalidate_all_transactions(
&self,
_request: Request<RevalidateRequest>,
Expand Down
141 changes: 60 additions & 81 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fs, fs::File, io::BufReader, path::Path, sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};

use futures::future::try_join_all;
use log::*;
use log::info;
use tari_common::{configuration::ValidatorNodeConfig, GlobalConfig};
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_dht::Dht;
use tari_crypto::tari_utilities::hex::Hex;
use tari_dan_core::{
models::{AssetDefinition, Committee},
services::{
BaseNodeClient,
ConcreteAssetProcessor,
ConcreteCheckpointManager,
ConcreteCommitteeManager,
Expand All @@ -46,7 +46,7 @@ use tari_dan_storage_sqlite::{SqliteDbFactory, SqliteStorageService};
use tari_p2p::{comms_connector::SubscriptionFactory, tari_message::TariMessageType};
use tari_service_framework::ServiceHandles;
use tari_shutdown::ShutdownSignal;
use tokio::task;
use tokio::{task, time};

use crate::{
default_service_specification::DefaultServiceSpecification,
Expand All @@ -58,7 +58,7 @@ use crate::{
ExitCodes,
};

const LOG_TARGET: &str = "tari::dan::dan_node";
const LOG_TARGET: &str = "tari::validator_node::app";

pub struct DanNode {
config: GlobalConfig,
Expand All @@ -84,84 +84,63 @@ impl DanNode {
.as_ref()
.ok_or_else(|| ExitCodes::ConfigError("Missing dan section".to_string()))?;

let asset_definitions = self.read_asset_definitions(&dan_config.asset_config_directory)?;
if asset_definitions.is_empty() {
warn!(
target: LOG_TARGET,
"No assets to process. Add assets by putting definitions in the `assets` folder with a `.asset` \
extension."
);
}

let mut tasks = vec![];
for asset in asset_definitions {
let node_identitiy = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
let subscription_factory = subscription_factory.clone();
let shutdown = shutdown.clone();
let dan_config = dan_config.clone();
let db_factory = db_factory.clone();

tasks.push(task::spawn(async move {
DanNode::start_asset_worker(
asset,
node_identitiy,
mempool,
handles,
subscription_factory,
shutdown,
dan_config,
db_factory,
)
.await
}));
}

if tasks.is_empty() {
// If there are no assets to process, work in proxy mode
tasks.push(task::spawn(DanNode::wait_for_exit()));
}
try_join_all(tasks)
.await
.map_err(|err| ExitCodes::UnknownError(format!("Join error occurred. {}", err)))?
.into_iter()
.collect::<Result<_, _>>()?;

Ok(())
}

fn read_asset_definitions(&self, path: &Path) -> Result<Vec<AssetDefinition>, ExitCodes> {
if !path.exists() {
fs::create_dir_all(path).expect("Could not create dir");
}
let paths = fs::read_dir(path).expect("Could not read asset definitions");

let mut result = vec![];
for path in paths {
let path = path.expect("Not a valid file").path();

if !path.is_dir() && path.extension().unwrap_or_default() == "asset" {
let file = File::open(path).expect("could not open file");
let reader = BufReader::new(file);

let def: AssetDefinition = serde_json::from_reader(reader).expect("lol not a valid json");
result.push(def);
}
}
Ok(result)
}

async fn wait_for_exit() -> Result<(), ExitCodes> {
println!("Type `exit` to exit");
let mut base_node_client = GrpcBaseNodeClient::new(dan_config.base_node_grpc_address);
let mut tasks = HashMap::new();
let mut next_scanned_height = 0u64;
loop {
let mut line = String::new();
let _ = std::io::stdin().read_line(&mut line).expect("Failed to read line");
if line.to_lowercase().trim() == "exit" {
return Err(ExitCodes::UnknownError("User cancelled".to_string()));
} else {
println!("Type `exit` to exit");
let tip = base_node_client.get_tip_info().await.unwrap();
if tip.height_of_longest_chain >= next_scanned_height {
info!(
target: LOG_TARGET,
"Scanning base layer (tip : {}) for new assets", tip.height_of_longest_chain
);
if dan_config.scan_for_assets {
next_scanned_height = tip.height_of_longest_chain + dan_config.new_asset_scanning_interval;
info!(target: LOG_TARGET, "Next scanning height {}", next_scanned_height);
} else {
next_scanned_height = u64::MAX; // Never run again.
}

let assets = base_node_client
.get_assets_for_dan_node(node_identity.public_key().clone())
.await
.unwrap();
for asset in assets {
if tasks.contains_key(&asset.public_key) {
continue;
}
if let Some(allow_list) = &dan_config.assets_allow_list {
if !allow_list.contains(&asset.public_key.to_hex()) {
continue;
}
}
info!(target: LOG_TARGET, "Adding asset {:?}", asset.public_key);
let node_identitiy = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
let subscription_factory = subscription_factory.clone();
let shutdown = shutdown.clone();
let dan_config = dan_config.clone();
let db_factory = db_factory.clone();
tasks.insert(
asset.public_key.clone(),
task::spawn(async move {
DanNode::start_asset_worker(
asset.clone(),
node_identitiy,
mempool,
handles,
subscription_factory,
shutdown,
dan_config,
db_factory,
)
.await
}),
);
}
}
time::sleep(Duration::from_secs(120)).await;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tari_app_grpc::tari_rpc as grpc;
use tari_common_types::types::PublicKey;
use tari_crypto::tari_utilities::ByteArray;
use tari_dan_core::{
models::{BaseLayerMetadata, BaseLayerOutput},
models::{AssetDefinition, BaseLayerMetadata, BaseLayerOutput},
services::BaseNodeClient,
DigitalAssetError,
};
Expand Down Expand Up @@ -100,4 +100,48 @@ impl BaseNodeClient for GrpcBaseNodeClient {
.transpose()?;
Ok(output)
}

async fn get_assets_for_dan_node(
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<AssetDefinition>, DigitalAssetError> {
let inner = match self.inner.as_mut() {
Some(i) => i,
None => {
self.connect().await?;
self.inner.as_mut().unwrap()
},
};
let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 0 };
let mut result = inner.list_asset_registrations(request).await.unwrap().into_inner();
let mut assets: Vec<AssetDefinition> = vec![];
let tip = self.get_tip_info().await?;
while let Some(r) = result.message().await.unwrap() {
if let Ok(asset_public_key) = PublicKey::from_bytes(r.unique_id.as_bytes()) {
if let Some(checkpoint) = self
.get_current_checkpoint(tip.height_of_longest_chain, asset_public_key.clone(), vec![3u8; 32])
.await?
{
if let Some(committee) = checkpoint.get_side_chain_committee() {
if committee.contains(&dan_node_public_key) {
assets.push(AssetDefinition {
public_key: asset_public_key,
template_parameters: r
.features
.unwrap()
.asset
.unwrap()
.template_parameters
.into_iter()
.map(|tp| tp.into())
.collect(),
..Default::default()
});
}
}
}
}
}
Ok(assets)
}
}
7 changes: 7 additions & 0 deletions common/config/presets/validator_node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@
committee = ["2ea0df3059caf4411624d6bf5b9c02238d607d2798c586b3e6c2a054da3f205a"] # cannot be of zero size
phase_timeout = 30
template_id = "EditableMetadata"

# If set to false, there will be no scanning at all.
scan_for_assets = true
# How often do we want to scan the base layer for changes.
new_asset_scanning_interval = 10
# If set then only the specific assets will be checked.
# assets_allow_list = ["<pubkey>"]
3 changes: 3 additions & 0 deletions common/src/configuration/validator_node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct ValidatorNodeConfig {
pub base_node_grpc_address: SocketAddr,
#[serde(default = "default_wallet_grpc_address")]
pub wallet_grpc_address: SocketAddr,
pub scan_for_assets: bool,
pub new_asset_scanning_interval: u64,
pub assets_allow_list: Option<Vec<String>>,
}

fn default_asset_config_directory() -> PathBuf {
Expand Down
Loading

0 comments on commit 86de08b

Please sign in to comment.