From e42085a5f8b14135fdae87953bea848668be0f6b Mon Sep 17 00:00:00 2001 From: Brian Pearce Date: Tue, 12 Jul 2022 11:34:06 +0200 Subject: [PATCH] feat(vn): recognize abandoned state (#4272) Description --- This is a subtask for quarantining a contract. It lets the VN's monitoring the contract to flag it as abandoned (locally only) when checkpoints have been missed. Motivation and Context --- VN's need to be more aware of the status of contracts this is an initital step before quarantine happens. How Has This Been Tested? --- - [x] Tested manually - [ ] ~Test written~ To much work to get contract state from the vn for a single spec --- .../src/contract_worker_manager.rs | 108 ++++++++++++++++-- .../core/src/storage/global/global_db.rs | 7 +- .../global/global_db_backend_adapter.rs | 7 +- dan_layer/core/src/storage/mocks/global_db.rs | 2 +- .../sqlite_global_db_backend_adapter.rs | 4 +- 5 files changed, 110 insertions(+), 18 deletions(-) diff --git a/applications/tari_validator_node/src/contract_worker_manager.rs b/applications/tari_validator_node/src/contract_worker_manager.rs index a224fd2ea6..686cfd0c15 100644 --- a/applications/tari_validator_node/src/contract_worker_manager.rs +++ b/applications/tari_validator_node/src/contract_worker_manager.rs @@ -31,7 +31,7 @@ use log::*; use tari_common_types::types::{FixedHash, FixedHashSizeError}; use tari_comms::{types::CommsPublicKey, NodeIdentity}; use tari_comms_dht::Dht; -use tari_core::transactions::transaction_components::ContractConstitution; +use tari_core::transactions::transaction_components::{ContractConstitution, OutputType}; use tari_crypto::tari_utilities::{hex::Hex, message_format::MessageFormat, ByteArray}; use tari_dan_core::{ models::{AssetDefinition, BaseLayerMetadata, Committee}, @@ -137,7 +137,10 @@ impl ContractWorkerManager { pub async fn start(mut self) -> Result<(), WorkerManagerError> { self.load_initial_state()?; - info!("constitution_auto_accept is {}", self.config.constitution_auto_accept); + info!( + target: LOG_TARGET, + "constitution_auto_accept is {}", self.config.constitution_auto_accept + ); if !self.config.scan_for_assets { info!( @@ -156,12 +159,14 @@ impl ContractWorkerManager { let tip = self.base_node_client.get_tip_info().await?; let new_contracts = self.scan_for_new_contracts(&tip).await?; - self.set_last_scanned_block(&tip)?; if self.config.constitution_auto_accept { self.accept_contracts(new_contracts).await?; } + self.validate_contract_activity(&tip).await?; + + self.set_last_scanned_block(&tip)?; tokio::select! { _ = time::sleep(Duration::from_secs(self.config.constitution_management_polling_interval_in_seconds)) => {}, _ = &mut self.shutdown => break @@ -171,13 +176,47 @@ impl ContractWorkerManager { Ok(()) } + async fn validate_contract_activity(&mut self, tip: &BaseLayerMetadata) -> Result<(), WorkerManagerError> { + let active_contracts = self.global_db.get_contracts_with_state(ContractState::Active)?; + + for contract in active_contracts { + let contract_id = FixedHash::try_from(contract.contract_id)?; + info!("Validating contract={} activity", contract_id.to_hex()); + + if let Some(checkpoint) = self.scan_for_last_checkpoint(tip, &contract_id).await? { + let constitution = ContractConstitution::from_binary(&*contract.constitution).map_err(|error| { + WorkerManagerError::DataCorruption { + details: error.to_string(), + } + })?; + + if tip.height_of_longest_chain > + checkpoint.mined_height + constitution.checkpoint_params.abandoned_interval + { + self.global_db + .update_contract_state(contract_id, ContractState::Abandoned)?; + + info!( + target: LOG_TARGET, + "Contract={} has missed checkpoints and has been marked Abandoned", + contract_id.to_hex() + ); + } + } + } + + Ok(()) + } + async fn start_active_contracts(&mut self) -> Result<(), WorkerManagerError> { - let active_contracts = self.global_db.get_active_contracts()?; + // Abandoned contracts can be revived by the VNC so they should continue to monitor them + let mut active_contracts = self.global_db.get_contracts_with_state(ContractState::Active)?; + active_contracts.append(&mut self.global_db.get_contracts_with_state(ContractState::Abandoned)?); for contract in active_contracts { let contract_id = FixedHash::try_from(contract.contract_id)?; - println!("Starting contract {}", contract_id); + info!(target: LOG_TARGET, "Starting contract {}", contract_id.to_hex()); let constitution = ContractConstitution::from_binary(&*contract.constitution).map_err(|error| { WorkerManagerError::DataCorruption { @@ -238,6 +277,40 @@ impl ContractWorkerManager { Ok(()) } + async fn scan_for_last_checkpoint( + &mut self, + tip: &BaseLayerMetadata, + contract_id: &FixedHash, + ) -> Result, WorkerManagerError> { + info!( + target: LOG_TARGET, + "Scanning base layer (tip: {}) for last checkpoint of contract={}", + tip.height_of_longest_chain, + contract_id + ); + + let outputs = self + .base_node_client + .get_current_contract_outputs( + tip.height_of_longest_chain, + *contract_id, + OutputType::ContractCheckpoint, + ) + .await?; + + let mut outputs = outputs + .iter() + .map({ + |output| Checkpoint { + mined_height: output.mined_height, + } + }) + .collect::>(); + outputs.sort_by(|l, r| l.mined_height.partial_cmp(&r.mined_height).unwrap()); + + Ok(outputs.pop()) + } + async fn scan_for_new_contracts( &mut self, tip: &BaseLayerMetadata, @@ -288,8 +361,13 @@ impl ContractWorkerManager { }; match self.global_db.save_contract(contract.into(), ContractState::Expired) { - Ok(_) => info!("Saving expired contract data id={}", contract_id.to_hex()), + Ok(_) => info!( + target: LOG_TARGET, + "Saving expired contract data id={}", + contract_id.to_hex() + ), Err(error) => error!( + target: LOG_TARGET, "Couldn't save expired contract data id={} received error={}", contract_id.to_hex(), error.to_string() @@ -309,8 +387,13 @@ impl ContractWorkerManager { .global_db .save_contract(contract.clone().into(), ContractState::Pending) { - Ok(_) => info!("Saving contract data id={}", contract.contract_id.to_hex()), + Ok(_) => info!( + target: LOG_TARGET, + "Saving contract data id={}", + contract.contract_id.to_hex() + ), Err(error) => error!( + target: LOG_TARGET, "Couldn't save contract data id={} received error={}", contract.contract_id.to_hex(), error.to_string() @@ -447,8 +530,8 @@ impl ContractWorkerManager { .publish_acceptance(&self.identity, &contract.contract_id) .await?; info!( - "Contract {} acceptance submitted with id={}", - contract.contract_id, tx_id + target: LOG_TARGET, + "Contract {} acceptance submitted with id={}", contract.contract_id, tx_id ); Ok(()) } @@ -475,13 +558,18 @@ pub enum WorkerManagerError { #[error("Storage error: {0}")] StorageError(#[from] StorageError), #[error("DigitalAsset error: {0}")] - DigitalAssetErrror(#[from] DigitalAssetError), + DigitalAssetError(#[from] DigitalAssetError), // TODO: remove dead_code #[allow(dead_code)] #[error("Data corruption: {details}")] DataCorruption { details: String }, } +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] +struct Checkpoint { + pub mined_height: u64, +} + #[derive(Debug, Clone)] struct ActiveContract { pub constitution: ContractConstitution, diff --git a/dan_layer/core/src/storage/global/global_db.rs b/dan_layer/core/src/storage/global/global_db.rs index 14116aa245..d630c107f4 100644 --- a/dan_layer/core/src/storage/global/global_db.rs +++ b/dan_layer/core/src/storage/global/global_db.rs @@ -67,9 +67,12 @@ impl GlobalDb Result, StorageError> { + pub fn get_contracts_with_state( + &self, + state: ContractState, + ) -> Result, StorageError> { self.adapter - .get_active_contracts() + .get_contracts_with_state(state) .map_err(TGlobalDbBackendAdapter::Error::into) } } diff --git a/dan_layer/core/src/storage/global/global_db_backend_adapter.rs b/dan_layer/core/src/storage/global/global_db_backend_adapter.rs index 6ae4e2c3cc..1993e4ec15 100644 --- a/dan_layer/core/src/storage/global/global_db_backend_adapter.rs +++ b/dan_layer/core/src/storage/global/global_db_backend_adapter.rs @@ -43,7 +43,7 @@ pub trait GlobalDbBackendAdapter: Send + Sync + Clone { ) -> Result>, Self::Error>; fn save_contract(&self, contract: Self::NewModel, state: ContractState) -> Result<(), Self::Error>; fn update_contract_state(&self, contract_id: FixedHash, state: ContractState) -> Result<(), Self::Error>; - fn get_active_contracts(&self) -> Result, Self::Error>; + fn get_contracts_with_state(&self, state: ContractState) -> Result, Self::Error>; } #[derive(Debug, Clone, Copy)] @@ -69,8 +69,9 @@ pub enum ContractState { Expired = 2, QuorumMet = 3, Active = 4, - Quarantined = 5, - Shutdown = 6, + Abandoned = 5, + Quarantined = 6, + Shutdown = 7, } impl ContractState { diff --git a/dan_layer/core/src/storage/mocks/global_db.rs b/dan_layer/core/src/storage/mocks/global_db.rs index e9038f267c..eff7a7702e 100644 --- a/dan_layer/core/src/storage/mocks/global_db.rs +++ b/dan_layer/core/src/storage/mocks/global_db.rs @@ -68,7 +68,7 @@ impl GlobalDbBackendAdapter for MockGlobalDbBackupAdapter { todo!() } - fn get_active_contracts(&self) -> Result, Self::Error> { + fn get_contracts_with_state(&self, _state: ContractState) -> Result, Self::Error> { todo!() } } diff --git a/dan_layer/storage_sqlite/src/global/sqlite_global_db_backend_adapter.rs b/dan_layer/storage_sqlite/src/global/sqlite_global_db_backend_adapter.rs index 4bc3d7e219..41a970f224 100644 --- a/dan_layer/storage_sqlite/src/global/sqlite_global_db_backend_adapter.rs +++ b/dan_layer/storage_sqlite/src/global/sqlite_global_db_backend_adapter.rs @@ -180,12 +180,12 @@ impl GlobalDbBackendAdapter for SqliteGlobalDbBackendAdapter { Ok(()) } - fn get_active_contracts(&self) -> Result, Self::Error> { + fn get_contracts_with_state(&self, state: ContractState) -> Result, Self::Error> { use crate::global::schema::{contracts, contracts::dsl}; let tx = self.create_transaction()?; dsl::contracts - .filter(contracts::state.eq(i32::from(ContractState::Active.as_byte()))) + .filter(contracts::state.eq(i32::from(state.as_byte()))) .load::(tx.connection()) .map_err(|source| SqliteStorageError::DieselError { source,