diff --git a/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs b/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs index f6e126b382..17e29c034b 100644 --- a/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs +++ b/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs @@ -26,6 +26,7 @@ use tari_common_types::types::PublicKey; use tari_comms::NodeIdentity; use tari_crypto::tari_utilities::ByteArray; use tari_dan_core::{ + models::Instruction, services::{AssetProcessor, AssetProxy, ServiceSpecification}, storage::DbFactory, }; @@ -144,9 +145,10 @@ impl rpc::validator_node_ .map_err(|e| Status::internal(format!("Could not create state db: {}", e)))? { let mut state_db_reader = state.reader(); + let instruction = Instruction::new(template_id, request.method, request.args); let response_bytes = self .asset_processor - .invoke_read_method(template_id, request.method, &request.args, &mut state_db_reader) + .invoke_read_method(&instruction, &mut state_db_reader) .map_err(|e| Status::internal(format!("Could not invoke read method: {}", e)))?; Ok(Response::new(rpc::InvokeReadMethodResponse { result: response_bytes.unwrap_or_default(), diff --git a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs index 9a7d4947bf..bf73986a86 100644 --- a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs +++ b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs @@ -20,7 +20,7 @@ // CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR // OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH // DAMAGE. -use std::convert::TryFrom; +use std::convert::{TryFrom, TryInto}; use log::*; use tari_common_types::types::PublicKey; @@ -30,7 +30,7 @@ use tari_comms::{ }; use tari_crypto::tari_utilities::ByteArray; use tari_dan_core::{ - models::{Instruction, TemplateId, TreeNodeHash}, + models::{Instruction, TreeNodeHash}, services::{AssetProcessor, MempoolService}, storage::{state::StateDbUnitOfWorkReader, DbFactory}, }; @@ -93,14 +93,18 @@ where .ok_or_else(|| RpcStatus::not_found("This node does not process this asset".to_string()))?; let mut unit_of_work = state.reader(); + + let instruction = Instruction::new( + request + .template_id + .try_into() + .map_err(|_| RpcStatus::bad_request("Invalid template_id"))?, + request.method, + request.args, + ); let response_bytes = self .asset_processor - .invoke_read_method( - TemplateId::try_from(request.template_id).map_err(|_| RpcStatus::bad_request("Invalid template_id"))?, - request.method, - &request.args, - &mut unit_of_work, - ) + .invoke_read_method(&instruction, &mut unit_of_work) .map_err(|e| RpcStatus::general(format!("Could not invoke read method: {}", e)))?; Ok(Response::new(proto::InvokeReadMethodResponse { @@ -115,7 +119,10 @@ where dbg!(&request); let request = request.into_message(); let instruction = Instruction::new( - TemplateId::try_from(request.template_id).map_err(|_| RpcStatus::bad_request("Invalid template_id"))?, + request + .template_id + .try_into() + .map_err(|_| RpcStatus::bad_request("Invalid template_id"))?, request.method.clone(), request.args.clone(), /* TokenId(request.token_id.clone()), diff --git a/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs b/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs index ba610ac9ce..febc84ee69 100644 --- a/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs +++ b/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs @@ -291,7 +291,10 @@ impl TariCommsInboundReceiverHandle { } #[async_trait] -impl InboundConnectionService for TariCommsInboundReceiverHandle { +impl InboundConnectionService for TariCommsInboundReceiverHandle { + type Addr = CommsPublicKey; + type Payload = TariDanPayload; + async fn wait_for_message( &self, message_type: HotStuffMessageType, diff --git a/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs b/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs index f90935f3b1..51ec816132 100644 --- a/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs +++ b/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs @@ -61,7 +61,10 @@ impl TariCommsOutboundService { } #[async_trait] -impl OutboundService for TariCommsOutboundService { +impl OutboundService for TariCommsOutboundService { + type Addr = CommsPublicKey; + type Payload = TariDanPayload; + async fn send( &mut self, from: CommsPublicKey, diff --git a/dan_layer/core/src/digital_assets_error.rs b/dan_layer/core/src/digital_assets_error.rs index 65fcf6d072..e697c1a823 100644 --- a/dan_layer/core/src/digital_assets_error.rs +++ b/dan_layer/core/src/digital_assets_error.rs @@ -76,6 +76,14 @@ pub enum DigitalAssetError { StateSyncError(#[from] StateSyncError), #[error("Validator node client error: {0}")] ValidatorNodeClientError(#[from] ValidatorNodeClientError), + #[error("Peer did not send a quorum certificate in prepare phase")] + PreparePhaseNoQuorumCertificate, + #[error("Quorum certificate does not extend node")] + PreparePhaseCertificateDoesNotExtendNode, + #[error("Node not safe")] + PreparePhaseNodeNotSafe, + #[error("Unsupported template method {name}")] + TemplateUnsupportedMethod { name: String }, } impl From for DigitalAssetError { diff --git a/dan_layer/core/src/fixed_hash.rs b/dan_layer/core/src/fixed_hash.rs index ea642e2c3f..400bb5d6c2 100644 --- a/dan_layer/core/src/fixed_hash.rs +++ b/dan_layer/core/src/fixed_hash.rs @@ -31,7 +31,7 @@ const ZERO_HASH: [u8; FixedHash::byte_size()] = [0u8; FixedHash::byte_size()]; #[error("Invalid size")] pub struct FixedHashSizeError; -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)] pub struct FixedHash([u8; FixedHash::byte_size()]); impl FixedHash { diff --git a/dan_layer/core/src/models/instruction_set.rs b/dan_layer/core/src/models/instruction_set.rs index 1e9d6a7cd0..0ef6a50b22 100644 --- a/dan_layer/core/src/models/instruction_set.rs +++ b/dan_layer/core/src/models/instruction_set.rs @@ -20,15 +20,24 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{hash::Hash, iter::FromIterator}; +use std::{convert::TryFrom, hash::Hash, iter::FromIterator}; use tari_crypto::common::Blake256; use tari_mmr::MerkleMountainRange; -use crate::models::{ConsensusHash, Instruction}; +use crate::{ + fixed_hash::FixedHash, + models::{ConsensusHash, Instruction}, +}; #[derive(PartialEq, Clone, Debug, Hash)] -pub struct InstructionSetHash(Vec); +pub struct InstructionSetHash(FixedHash); + +impl InstructionSetHash { + pub fn zero() -> InstructionSetHash { + Self(FixedHash::zero()) + } +} impl InstructionSetHash { pub fn as_bytes(&self) -> &[u8] { @@ -36,6 +45,12 @@ impl InstructionSetHash { } } +impl From for InstructionSetHash { + fn from(hash: FixedHash) -> Self { + Self(hash) + } +} + // TODO: Implement hash properly #[allow(clippy::derive_hash_xor_eq)] #[derive(Clone, Debug)] @@ -52,7 +67,7 @@ impl InstructionSet { pub fn from_vec(instructions: Vec) -> Self { let mut result = Self { instructions, - hash: InstructionSetHash(vec![]), + hash: InstructionSetHash::zero(), }; result.hash = result.calculate_hash(); result @@ -65,7 +80,7 @@ impl InstructionSet { mmr.push(instruction.calculate_hash().to_vec()).unwrap(); } - InstructionSetHash(mmr.get_merkle_root().unwrap()) + FixedHash::try_from(mmr.get_merkle_root().unwrap()).unwrap().into() } pub fn instructions(&self) -> &[Instruction] { @@ -91,3 +106,19 @@ impl ConsensusHash for InstructionSet { self.hash.as_bytes() } } + +impl IntoIterator for InstructionSet { + type IntoIter = as IntoIterator>::IntoIter; + type Item = Instruction; + + fn into_iter(self) -> Self::IntoIter { + self.instructions.into_iter() + } +} + +impl Extend for InstructionSet { + fn extend>(&mut self, iter: T) { + self.instructions.extend(iter); + self.hash = self.calculate_hash(); + } +} diff --git a/dan_layer/core/src/models/view_id.rs b/dan_layer/core/src/models/view_id.rs index bc60c079ed..01ad1f1ea1 100644 --- a/dan_layer/core/src/models/view_id.rs +++ b/dan_layer/core/src/models/view_id.rs @@ -34,6 +34,10 @@ impl ViewId { (self.0 % committee_size as u64) as usize } + pub fn is_genesis(&self) -> bool { + self.0 == 0 + } + pub fn next(&self) -> ViewId { ViewId(self.0 + 1) } diff --git a/dan_layer/core/src/services/asset_processor.rs b/dan_layer/core/src/services/asset_processor.rs index 8d15ebd3ca..7015e28c97 100644 --- a/dan_layer/core/src/services/asset_processor.rs +++ b/dan_layer/core/src/services/asset_processor.rs @@ -26,20 +26,13 @@ use tari_core::transactions::transaction_components::TemplateParameter; use crate::{ digital_assets_error::DigitalAssetError, - models::{AssetDefinition, Instruction, TemplateId}, + models::{Instruction, InstructionSet, TemplateId}, storage::state::{StateDbUnitOfWork, StateDbUnitOfWorkReader}, template_command::ExecutionResult, templates::{tip002_template, tip004_template, tip721_template}, }; pub trait AssetProcessor: Sync + Send + 'static { - fn init_template( - &self, - template_parameter: &TemplateParameter, - asset_definition: &AssetDefinition, - state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError>; - // purposefully made sync, because instructions should be run in order, and complete before the // next one starts. There may be a better way to enforce this though... fn execute_instruction( @@ -50,10 +43,8 @@ pub trait AssetProcessor: Sync + Send + 'static { fn invoke_read_method( &self, - template_id: TemplateId, - method: String, - args: &[u8], - state_db: &mut TUnitOfWorkReader, + instruction: &Instruction, + state_db: &TUnitOfWorkReader, ) -> Result>, DigitalAssetError>; } @@ -63,108 +54,75 @@ pub struct ConcreteAssetProcessor { } impl AssetProcessor for ConcreteAssetProcessor { - fn init_template( - &self, - template_parameter: &TemplateParameter, - asset_definition: &AssetDefinition, - state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError> { - self.template_factory - .init(template_parameter, asset_definition, state_db) - } - fn execute_instruction( &self, instruction: &Instruction, - db: &mut TUnitOfWork, + state_db: &mut TUnitOfWork, ) -> Result<(), DigitalAssetError> { - self.execute( - instruction.template_id(), - instruction.method().to_owned(), - instruction.args().into(), - // InstructionCaller { - // owner_token_id: instruction.from_owner().to_owned(), - // }, - db, - ) + self.template_factory.invoke_write_method(instruction, state_db) } fn invoke_read_method( &self, - template_id: TemplateId, - method: String, - args: &[u8], - state_db: &mut TUnitOfWork, + instruction: &Instruction, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { + self.template_factory.invoke_read_method(instruction, state_db) + } +} + +#[derive(Default, Clone)] +pub struct TemplateFactory {} + +impl TemplateFactory { + pub fn initial_instructions(&self, template_param: &TemplateParameter) -> InstructionSet { + use TemplateId::*; + // TODO: We may want to use the TemplateId type, so that we know it is known/valid + let template_id = template_param.template_id.try_into().unwrap(); match template_id { - TemplateId::Tip002 => tip002_template::invoke_read_method(method, args, state_db), - TemplateId::Tip004 => tip004_template::invoke_read_method(method, args, state_db), - TemplateId::Tip721 => tip721_template::invoke_read_method(method, args, state_db), - _ => { + Tip002 => tip002_template::initial_instructions(template_param), + Tip003 => todo!(), + Tip004 => tip004_template::initial_instructions(template_param), + Tip721 => tip721_template::initial_instructions(template_param), + EditableMetadata => { todo!() }, } } -} -impl ConcreteAssetProcessor { - pub fn execute( + pub fn invoke_read_method( &self, - template_id: TemplateId, - method: String, - args: Vec, - state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError> { - match template_id { - TemplateId::Tip002 => { - tip002_template::invoke_method(method, &args, state_db)?; - }, - TemplateId::Tip004 => { - tip004_template::invoke_method(method, &args, state_db)?; - }, - TemplateId::Tip721 => { - tip721_template::invoke_method(method, &args, state_db)?; - }, - _ => { + instruction: &Instruction, + state_db: &TUnitOfWork, + ) -> Result>, DigitalAssetError> { + use TemplateId::*; + match instruction.template_id() { + Tip002 => tip002_template::invoke_read_method(instruction.method(), instruction.args(), state_db), + Tip003 => todo!(), + Tip004 => tip004_template::invoke_read_method(instruction.method(), instruction.args(), state_db), + Tip721 => tip721_template::invoke_read_method(instruction.method(), instruction.args(), state_db), + EditableMetadata => { todo!() }, } - // let instruction = self.template_factory.create_command(template_id, method, args)?; - // let unit_of_work = state_db.new_unit_of_work(); - // let result = instruction.try_execute(db)?; - // unit_of_work.commit()?; - // self.instruction_log.store(hash, result); - // Ok(()) - Ok(()) } -} -#[derive(Default, Clone)] -pub struct TemplateFactory {} - -impl TemplateFactory { - pub fn init( + pub fn invoke_write_method( &self, - template: &TemplateParameter, - asset_definition: &AssetDefinition, + instruction: &Instruction, state_db: &mut TUnitOfWork, ) -> Result<(), DigitalAssetError> { - match template.template_id.try_into()? { - TemplateId::Tip002 => tip002_template::init(template, asset_definition, state_db)?, - _ => unimplemented!(), + use TemplateId::*; + match instruction.template_id() { + Tip002 => tip002_template::invoke_write_method(instruction.method(), instruction.args(), state_db), + Tip003 => todo!(), + Tip004 => tip004_template::invoke_write_method(instruction.method(), instruction.args(), state_db), + Tip721 => tip721_template::invoke_write_method(instruction.method(), instruction.args(), state_db), + EditableMetadata => { + todo!() + }, } - Ok(()) } - - // pub fn create_command( - // &self, - // _template: TemplateId, - // _method: String, - // _args: VecDeque>, - // // caller: InstructionCaller, - // ) -> Result<(), DigitalAssetError> { - // todo!() - // } } pub trait InstructionLog { diff --git a/dan_layer/core/src/services/infrastructure_services/inbound_connection_service.rs b/dan_layer/core/src/services/infrastructure_services/inbound_connection_service.rs index cb8456ca76..99461f5be8 100644 --- a/dan_layer/core/src/services/infrastructure_services/inbound_connection_service.rs +++ b/dan_layer/core/src/services/infrastructure_services/inbound_connection_service.rs @@ -29,16 +29,19 @@ use crate::{ }; #[async_trait] -pub trait InboundConnectionService { +pub trait InboundConnectionService { + type Addr: NodeAddressable; + type Payload: Payload; + async fn wait_for_message( &self, message_type: HotStuffMessageType, for_view: ViewId, - ) -> Result<(TAddr, HotStuffMessage), DigitalAssetError>; + ) -> Result<(Self::Addr, HotStuffMessage), DigitalAssetError>; async fn wait_for_qc( &self, message_type: HotStuffMessageType, for_view: ViewId, - ) -> Result<(TAddr, HotStuffMessage), DigitalAssetError>; + ) -> Result<(Self::Addr, HotStuffMessage), DigitalAssetError>; } diff --git a/dan_layer/core/src/services/infrastructure_services/outbound_service.rs b/dan_layer/core/src/services/infrastructure_services/outbound_service.rs index c00bf494cf..3702ab246a 100644 --- a/dan_layer/core/src/services/infrastructure_services/outbound_service.rs +++ b/dan_layer/core/src/services/infrastructure_services/outbound_service.rs @@ -29,18 +29,21 @@ use crate::{ }; #[async_trait] -pub trait OutboundService { +pub trait OutboundService { + type Addr: NodeAddressable + Send; + type Payload: Payload; + async fn send( &mut self, - from: TAddr, - to: TAddr, - message: HotStuffMessage, + from: Self::Addr, + to: Self::Addr, + message: HotStuffMessage, ) -> Result<(), DigitalAssetError>; async fn broadcast( &mut self, - from: TAddr, - committee: &[TAddr], - message: HotStuffMessage, + from: Self::Addr, + committee: &[Self::Addr], + message: HotStuffMessage, ) -> Result<(), DigitalAssetError>; } diff --git a/dan_layer/core/src/services/mocks/mod.rs b/dan_layer/core/src/services/mocks/mod.rs index 36db7254da..d18914c4e0 100644 --- a/dan_layer/core/src/services/mocks/mod.rs +++ b/dan_layer/core/src/services/mocks/mod.rs @@ -28,7 +28,6 @@ use std::{ use async_trait::async_trait; use tari_common_types::types::PublicKey; -use tari_core::transactions::transaction_components::TemplateParameter; use super::CommitteeManager; use crate::{ @@ -44,7 +43,6 @@ use crate::{ Payload, Signature, StateRoot, - TemplateId, TreeNodeHash, }, services::{ @@ -114,7 +112,7 @@ impl PayloadProvider for MockStaticPayloadProvider< Ok(self.static_payload.clone()) } - fn create_genesis_payload(&self) -> TPayload { + fn create_genesis_payload(&self, _: &AssetDefinition) -> TPayload { self.static_payload.clone() } @@ -257,15 +255,6 @@ pub struct MockPayloadProcessor {} #[async_trait] impl PayloadProcessor for MockPayloadProcessor { - fn init_template( - &self, - _template_parameter: &TemplateParameter, - _asset_definition: &AssetDefinition, - _state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError> { - todo!() - } - async fn process_payload( &self, _payload: &TPayload, @@ -279,15 +268,6 @@ impl PayloadProcessor for MockPayloadProcessor { pub struct MockAssetProcessor; impl AssetProcessor for MockAssetProcessor { - fn init_template( - &self, - _template_parameter: &TemplateParameter, - _asset_definition: &AssetDefinition, - _state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError> { - todo!() - } - fn execute_instruction( &self, _instruction: &Instruction, @@ -298,10 +278,8 @@ impl AssetProcessor for MockAssetProcessor { fn invoke_read_method( &self, - _template_id: TemplateId, - _method: String, - _args: &[u8], - _state_db: &mut TUnifOfWork, + _instruction: &Instruction, + _state_db: &TUnifOfWork, ) -> Result>, DigitalAssetError> { todo!() } diff --git a/dan_layer/core/src/services/payload_processor.rs b/dan_layer/core/src/services/payload_processor.rs index c2c79daeda..177c50357a 100644 --- a/dan_layer/core/src/services/payload_processor.rs +++ b/dan_layer/core/src/services/payload_processor.rs @@ -21,23 +21,16 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use async_trait::async_trait; -use tari_core::transactions::transaction_components::TemplateParameter; use crate::{ digital_assets_error::DigitalAssetError, - models::{AssetDefinition, Payload, StateRoot, TariDanPayload}, + models::{Payload, StateRoot, TariDanPayload}, services::AssetProcessor, storage::state::StateDbUnitOfWork, }; #[async_trait] pub trait PayloadProcessor { - fn init_template( - &self, - template_parameter: &TemplateParameter, - asset_definition: &AssetDefinition, - state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError>; async fn process_payload( &self, payload: &TPayload, @@ -61,16 +54,6 @@ impl TariDanPayloadProcessor { impl PayloadProcessor for TariDanPayloadProcessor { - fn init_template( - &self, - template_parameter: &TemplateParameter, - asset_definition: &AssetDefinition, - state_db: &mut TUnitOfWork, - ) -> Result<(), DigitalAssetError> { - self.asset_processor - .init_template(template_parameter, asset_definition, state_db) - } - async fn process_payload( &self, payload: &TariDanPayload, diff --git a/dan_layer/core/src/services/payload_provider.rs b/dan_layer/core/src/services/payload_provider.rs index 734e5cdcee..a2b34b84b8 100644 --- a/dan_layer/core/src/services/payload_provider.rs +++ b/dan_layer/core/src/services/payload_provider.rs @@ -24,14 +24,14 @@ use async_trait::async_trait; use crate::{ digital_assets_error::DigitalAssetError, - models::{InstructionSet, Payload, TariDanPayload, TreeNodeHash}, - services::MempoolService, + models::{AssetDefinition, InstructionSet, Payload, TariDanPayload, TreeNodeHash}, + services::{asset_processor::TemplateFactory, MempoolService}, }; #[async_trait] pub trait PayloadProvider { async fn create_payload(&self) -> Result; - fn create_genesis_payload(&self) -> TPayload; + fn create_genesis_payload(&self, asset_definition: &AssetDefinition) -> TPayload; async fn get_payload_queue(&self) -> usize; async fn reserve_payload( &mut self, @@ -41,13 +41,17 @@ pub trait PayloadProvider { async fn remove_payload(&mut self, reservation_key: &TreeNodeHash) -> Result<(), DigitalAssetError>; } -pub struct TariDanPayloadProvider { +pub struct TariDanPayloadProvider { mempool: TMempoolService, + template_factory: TemplateFactory, } impl TariDanPayloadProvider { pub fn new(mempool: TMempoolService) -> Self { - Self { mempool } + Self { + mempool, + template_factory: TemplateFactory {}, + } } } @@ -60,8 +64,13 @@ impl PayloadProvider for TariDa Ok(TariDanPayload::new(instruction_set, None)) } - fn create_genesis_payload(&self) -> TariDanPayload { - TariDanPayload::new(InstructionSet::empty(), None) + fn create_genesis_payload(&self, asset_definition: &AssetDefinition) -> TariDanPayload { + let mut instruction_set = InstructionSet::empty(); + for params in asset_definition.template_parameters.iter() { + let instructions = self.template_factory.initial_instructions(params); + instruction_set.extend(instructions); + } + TariDanPayload::new(instruction_set, None) } async fn get_payload_queue(&self) -> usize { diff --git a/dan_layer/core/src/services/service_specification.rs b/dan_layer/core/src/services/service_specification.rs index 20c057ed0b..ec7a0da79a 100644 --- a/dan_layer/core/src/services/service_specification.rs +++ b/dan_layer/core/src/services/service_specification.rs @@ -53,9 +53,12 @@ pub trait ServiceSpecification: Clone { type CommitteeManager: CommitteeManager; type DbFactory: DbFactory + Clone + Sync + Send + 'static; type EventsPublisher: EventsPublisher; - type InboundConnectionService: InboundConnectionService + 'static + Send + Sync; + type InboundConnectionService: InboundConnectionService + + 'static + + Send + + Sync; type MempoolService: MempoolService + Clone + Sync + Send + 'static; - type OutboundService: OutboundService; + type OutboundService: OutboundService; type Payload: Payload; type PayloadProcessor: PayloadProcessor; type PayloadProvider: PayloadProvider; diff --git a/dan_layer/core/src/storage/mocks/mod.rs b/dan_layer/core/src/storage/mocks/mod.rs index d8c3d6bf00..fd5aa0700e 100644 --- a/dan_layer/core/src/storage/mocks/mod.rs +++ b/dan_layer/core/src/storage/mocks/mod.rs @@ -85,7 +85,7 @@ impl DbFactory for MockDbFactory { .unwrap() .get(asset_public_key) .cloned() - .map(StateDb::new)) + .map(|db| StateDb::new(asset_public_key.clone(), db))) } fn get_or_create_state_db( @@ -99,7 +99,7 @@ impl DbFactory for MockDbFactory { .entry(asset_public_key.clone()) .or_default() .clone(); - Ok(StateDb::new(entry)) + Ok(StateDb::new(asset_public_key.clone(), entry)) } } diff --git a/dan_layer/core/src/storage/state/state_db.rs b/dan_layer/core/src/storage/state/state_db.rs index a6452eff81..91bedac1a3 100644 --- a/dan_layer/core/src/storage/state/state_db.rs +++ b/dan_layer/core/src/storage/state/state_db.rs @@ -20,6 +20,8 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use tari_common_types::types::PublicKey; + use crate::storage::state::{ state_db_unit_of_work::{StateDbUnitOfWorkImpl, StateDbUnitOfWorkReader, UnitOfWorkContext}, StateDbBackendAdapter, @@ -27,20 +29,30 @@ use crate::storage::state::{ pub struct StateDb { backend_adapter: TStateDbBackendAdapter, + asset_public_key: PublicKey, } impl StateDb { - pub fn new(backend_adapter: TStateDbBackendAdapter) -> Self { - Self { backend_adapter } + pub fn new(asset_public_key: PublicKey, backend_adapter: TStateDbBackendAdapter) -> Self { + Self { + backend_adapter, + asset_public_key, + } } pub fn new_unit_of_work(&self, height: u64) -> StateDbUnitOfWorkImpl { - StateDbUnitOfWorkImpl::new(UnitOfWorkContext::new(height), self.backend_adapter.clone()) + StateDbUnitOfWorkImpl::new( + UnitOfWorkContext::new(height, self.asset_public_key.clone()), + self.backend_adapter.clone(), + ) } pub fn reader(&self) -> impl StateDbUnitOfWorkReader { // TODO: A reader doesnt need the current context, should perhaps make a read-only implementation that the // writable implementation also uses - StateDbUnitOfWorkImpl::new(UnitOfWorkContext::new(0), self.backend_adapter.clone()) + StateDbUnitOfWorkImpl::new( + UnitOfWorkContext::new(0, self.asset_public_key.clone()), + self.backend_adapter.clone(), + ) } } diff --git a/dan_layer/core/src/storage/state/state_db_unit_of_work.rs b/dan_layer/core/src/storage/state/state_db_unit_of_work.rs index b973386be6..6a5c760229 100644 --- a/dan_layer/core/src/storage/state/state_db_unit_of_work.rs +++ b/dan_layer/core/src/storage/state/state_db_unit_of_work.rs @@ -28,7 +28,7 @@ use std::{ use digest::Digest; use log::*; -use tari_common_types::types::HashDigest; +use tari_common_types::types::{HashDigest, PublicKey}; use tari_crypto::common::Blake256; use tari_mmr::{MemBackendVec, MerkleMountainRange}; use tari_utilities::hex::Hex; @@ -52,8 +52,9 @@ pub trait StateDbUnitOfWork: StateDbUnitOfWorkReader { } pub trait StateDbUnitOfWorkReader: Clone + Send + Sync { - fn get_value(&mut self, schema: &str, key: &[u8]) -> Result>, StorageError>; - fn get_u64(&mut self, schema: &str, key: &[u8]) -> Result, StorageError>; + fn context(&self) -> &UnitOfWorkContext; + fn get_value(&self, schema: &str, key: &[u8]) -> Result>, StorageError>; + fn get_u64(&self, schema: &str, key: &[u8]) -> Result, StorageError>; fn find_keys_by_value(&self, schema: &str, value: &[u8]) -> Result>, StorageError>; fn calculate_root(&self) -> Result; fn get_all_state(&self) -> Result, StorageError>; @@ -62,12 +63,24 @@ pub trait StateDbUnitOfWorkReader: Clone + Send + Sync { #[derive(Debug, Clone)] pub struct UnitOfWorkContext { - pub height: u64, + asset_public_key: PublicKey, + height: u64, } impl UnitOfWorkContext { - pub fn new(height: u64) -> Self { - Self { height } + pub fn new(height: u64, asset_public_key: PublicKey) -> Self { + Self { + height, + asset_public_key, + } + } + + pub fn height(&self) -> u64 { + self.height + } + + pub fn asset_public_key(&self) -> &PublicKey { + &self.asset_public_key } } @@ -167,35 +180,20 @@ impl StateDbUnitOfWork for StateDbUnitOf } impl StateDbUnitOfWorkReader for StateDbUnitOfWorkImpl { - fn get_value(&mut self, schema: &str, key: &[u8]) -> Result>, StorageError> { - let mut inner = self.inner.write().unwrap(); - for v in &inner.updates { - let inner_v = v.get(); - if inner_v.schema == schema && inner_v.key == key { - return Ok(Some(inner_v.value.clone())); - } - } + fn context(&self) -> &UnitOfWorkContext { + &self.context + } + + fn get_value(&self, schema: &str, key: &[u8]) -> Result>, StorageError> { + let inner = self.inner.read().unwrap(); // Hit the DB. - let value = inner + inner .backend_adapter .get(schema, key) - .map_err(TBackendAdapter::Error::into)?; - if let Some(value) = value { - inner.updates.push(UnitOfWorkTracker::new( - DbKeyValue { - schema: schema.to_string(), - key: Vec::from(key), - value: value.clone(), - }, - false, - )); - Ok(Some(value)) - } else { - Ok(None) - } + .map_err(TBackendAdapter::Error::into) } - fn get_u64(&mut self, schema: &str, key: &[u8]) -> Result, StorageError> { + fn get_u64(&self, schema: &str, key: &[u8]) -> Result, StorageError> { let data = self.get_value(schema, key)?; match data { Some(data) => { diff --git a/dan_layer/core/src/templates/tip002_template.rs b/dan_layer/core/src/templates/tip002_template.rs index abe073b841..2107402f7f 100644 --- a/dan_layer/core/src/templates/tip002_template.rs +++ b/dan_layer/core/src/templates/tip002_template.rs @@ -26,56 +26,60 @@ use tari_crypto::tari_utilities::{hex::Hex, ByteArray}; use tari_dan_common_types::proto::tips::tip002; use crate::{ - models::AssetDefinition, + models::{Instruction, InstructionSet, TemplateId}, storage::state::{StateDbUnitOfWork, StateDbUnitOfWorkReader}, DigitalAssetError, }; -pub fn init( - template_parameter: &TemplateParameter, - asset_definition: &AssetDefinition, - state_db: &mut TUnitOfWork, -) -> Result<(), DigitalAssetError> { - let params = tip002::InitRequest::decode(&*template_parameter.template_data).map_err(|e| { - DigitalAssetError::ProtoBufDecodeError { - source: e, - message_type: "tip002::InitRequest".to_string(), - } - })?; - dbg!(¶ms); - state_db.set_value( - "owners".to_string(), - asset_definition.public_key.to_vec(), - Vec::from(params.total_supply.to_le_bytes()), - )?; - Ok(()) +pub fn initial_instructions(template_param: &TemplateParameter) -> InstructionSet { + InstructionSet::from_vec(vec![Instruction::new( + TemplateId::Tip002, + "init".to_string(), + template_param.template_data.clone(), + )]) } pub fn invoke_read_method( - method: String, + method: &str, args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { match method.to_lowercase().replace("_", "").as_str() { "balanceof" => balance_of(args, state_db), - _ => todo!(), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), } } -pub fn invoke_method( - method: String, +pub fn invoke_write_method( + method: &str, args: &[u8], state_db: &mut TUnitOfWork, ) -> Result<(), DigitalAssetError> { match method.to_lowercase().replace("_", "").as_str() { + "init" => init(args, state_db), "transfer" => transfer(args, state_db), - _ => todo!(), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), } } +fn init(args: &[u8], state_db: &mut TUnitOfWork) -> Result<(), DigitalAssetError> { + let params = tip002::InitRequest::decode(args).map_err(|e| DigitalAssetError::ProtoBufDecodeError { + source: e, + message_type: "tip002::InitRequest".to_string(), + })?; + dbg!(¶ms); + state_db.set_value( + "owners".to_string(), + state_db.context().asset_public_key().to_vec(), + // TODO: Encode full owner data + Vec::from(params.total_supply.to_le_bytes()), + )?; + Ok(()) +} + fn balance_of( args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { let request = tip002::BalanceOfRequest::decode(&*args).map_err(|e| DigitalAssetError::ProtoBufDecodeError { source: e, diff --git a/dan_layer/core/src/templates/tip004_template.rs b/dan_layer/core/src/templates/tip004_template.rs index f59473b0d2..88b75325cb 100644 --- a/dan_layer/core/src/templates/tip004_template.rs +++ b/dan_layer/core/src/templates/tip004_template.rs @@ -23,36 +23,42 @@ use digest::Digest; use log::*; use prost::Message; +use tari_core::transactions::transaction_components::TemplateParameter; use tari_crypto::{common::Blake256, tari_utilities::hex::Hex}; use tari_dan_common_types::proto::tips::tip004; use crate::{ + models::InstructionSet, storage::state::{StateDbUnitOfWork, StateDbUnitOfWorkReader}, DigitalAssetError, }; const LOG_TARGET: &str = "tari::dan_layer::core::templates::tip004_template"; -pub fn invoke_method( - method: String, - args: &[u8], - state_db: &mut TUnitOfWork, -) -> Result<(), DigitalAssetError> { - match method.to_lowercase().replace("_", "").as_str() { - "mint" => mint(args, state_db), - _ => todo!(), - } +pub fn initial_instructions(_: &TemplateParameter) -> InstructionSet { + InstructionSet::empty() } pub fn invoke_read_method( - method: String, + method: &str, args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { match method.to_lowercase().replace("_", "").as_str() { "balanceof" => balance_of(args, state_db), "tokenofownerbyindex" => token_of_owner_by_index(args, state_db), - _ => todo!(), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), + } +} + +pub fn invoke_write_method( + method: &str, + args: &[u8], + state_db: &mut TUnitOfWork, +) -> Result<(), DigitalAssetError> { + match method.to_lowercase().replace("_", "").as_str() { + "mint" => mint(args, state_db), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), } } @@ -96,7 +102,7 @@ fn hash_of(s: &str) -> Vec { fn balance_of( args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { // TODO: move this to the invoke_read_method method let request = tip004::BalanceOfRequest::decode(&*args).map_err(|e| DigitalAssetError::ProtoBufDecodeError { @@ -116,7 +122,7 @@ fn balance_of( fn token_of_owner_by_index( args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { // TODO: move this to the invoke_read_method method let request = diff --git a/dan_layer/core/src/templates/tip721_template.rs b/dan_layer/core/src/templates/tip721_template.rs index 9fd13a216f..f954b352cb 100644 --- a/dan_layer/core/src/templates/tip721_template.rs +++ b/dan_layer/core/src/templates/tip721_template.rs @@ -22,31 +22,26 @@ use log::*; use prost::Message; +use tari_core::transactions::transaction_components::TemplateParameter; use tari_crypto::tari_utilities::{hex::Hex, ByteArray}; use tari_dan_common_types::proto::tips::tip721; use crate::{ + models::InstructionSet, storage::state::{StateDbUnitOfWork, StateDbUnitOfWorkReader}, DigitalAssetError, }; const LOG_TARGET: &str = "tari::dan_layer::core::templates::tip721_template"; -pub fn invoke_method( - method: String, - args: &[u8], - state_db: &mut TUnitOfWork, -) -> Result<(), DigitalAssetError> { - match method.to_lowercase().replace("_", "").as_str() { - "transferfrom" => transfer_from(args, state_db), - _ => todo!(), - } +pub fn initial_instructions(_: &TemplateParameter) -> InstructionSet { + InstructionSet::empty() } pub fn invoke_read_method( - method: String, + method: &str, args: &[u8], - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result>, DigitalAssetError> { match method.to_lowercase().replace("_", "").as_str() { "ownerof" => { @@ -60,13 +55,24 @@ pub fn invoke_read_method( }; Ok(Some(response.encode_to_vec())) }, - _ => todo!(), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), + } +} + +pub fn invoke_write_method( + method: &str, + args: &[u8], + state_db: &mut TUnitOfWork, +) -> Result<(), DigitalAssetError> { + match method.to_lowercase().replace("_", "").as_str() { + "transferfrom" => transfer_from(args, state_db), + name => Err(DigitalAssetError::TemplateUnsupportedMethod { name: name.to_string() }), } } fn owner_of( token_id: Vec, - state_db: &mut TUnitOfWork, + state_db: &TUnitOfWork, ) -> Result, DigitalAssetError> { state_db .get_value("owners", &token_id)? diff --git a/dan_layer/core/src/workers/consensus_worker.rs b/dan_layer/core/src/workers/consensus_worker.rs index f10939c49d..05087d1289 100644 --- a/dan_layer/core/src/workers/consensus_worker.rs +++ b/dan_layer/core/src/workers/consensus_worker.rs @@ -155,9 +155,6 @@ impl> ConsensusWorker> ConsensusWorker> ConsensusWorker -where - TInboundConnectionService: InboundConnectionService, - TAddr: NodeAddressable, - TPayload: Payload, - TOutboundService: OutboundService, - TSigningService: SigningService, +pub struct CommitState +where TOutboundService: OutboundService { - node_id: TAddr, + node_id: TOutboundService::Addr, asset_public_key: PublicKey, - committee: Committee, + committee: Committee, phantom_inbound: PhantomData, phantom_outbound: PhantomData, - ta: PhantomData, - p_p: PhantomData, + ta: PhantomData, + p_p: PhantomData, p_s: PhantomData, - received_new_view_messages: HashMap>, + received_new_view_messages: HashMap>, } -impl - CommitState +impl + CommitState where - TInboundConnectionService: InboundConnectionService, - TOutboundService: OutboundService, - TAddr: NodeAddressable, - TPayload: Payload, - TSigningService: SigningService, + TInboundConnectionService: + InboundConnectionService, + TOutboundService: OutboundService, + TSigningService: SigningService, { - pub fn new(node_id: TAddr, asset_public_key: PublicKey, committee: Committee) -> Self { + pub fn new( + node_id: TOutboundService::Addr, + asset_public_key: PublicKey, + committee: Committee, + ) -> Self { Self { node_id, asset_public_key, @@ -129,8 +127,8 @@ where async fn process_leader_message( &mut self, current_view: &View, - message: HotStuffMessage, - sender: &TAddr, + message: HotStuffMessage, + sender: &TOutboundService::Addr, outbound: &mut TOutboundService, ) -> Result, DigitalAssetError> { if !message.matches(HotStuffMessageType::PreCommit, current_view.view_id) { @@ -211,10 +209,10 @@ where async fn process_replica_message( &mut self, - message: &HotStuffMessage, + message: &HotStuffMessage, current_view: &View, - from: &TAddr, - view_leader: &TAddr, + from: &TOutboundService::Addr, + view_leader: &TOutboundService::Addr, outbound: &mut TOutboundService, signing_service: &TSigningService, unit_of_work: &mut TUnitOfWork, @@ -259,7 +257,7 @@ where &self, node: TreeNodeHash, outbound: &mut TOutboundService, - view_leader: &TAddr, + view_leader: &TOutboundService::Addr, view_number: ViewId, signing_service: &TSigningService, ) -> Result<(), DigitalAssetError> { diff --git a/dan_layer/core/src/workers/states/decide_state.rs b/dan_layer/core/src/workers/states/decide_state.rs index eaca8e9e54..d3ba64ea83 100644 --- a/dan_layer/core/src/workers/states/decide_state.rs +++ b/dan_layer/core/src/workers/states/decide_state.rs @@ -29,9 +29,9 @@ use tokio::time::{sleep, Duration}; use crate::{ digital_assets_error::DigitalAssetError, - models::{Committee, HotStuffMessage, HotStuffMessageType, Payload, QuorumCertificate, View, ViewId}, + models::{Committee, HotStuffMessage, HotStuffMessageType, QuorumCertificate, View, ViewId}, services::{ - infrastructure_services::{InboundConnectionService, NodeAddressable, OutboundService}, + infrastructure_services::{InboundConnectionService, OutboundService}, PayloadProvider, }, storage::chain::ChainDbUnitOfWork, @@ -41,32 +41,30 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::workers::states::decide"; // TODO: This is very similar to pre-commit, and commit state -pub struct DecideState -where - TInboundConnectionService: InboundConnectionService, - TAddr: NodeAddressable, - TPayload: Payload, - TOutboundService: OutboundService, +pub struct DecideState +where TOutboundService: OutboundService { - node_id: TAddr, + node_id: TOutboundService::Addr, asset_public_key: PublicKey, - committee: Committee, + committee: Committee, phantom_inbound: PhantomData, phantom_outbound: PhantomData, - ta: PhantomData, - p_p: PhantomData, - received_new_view_messages: HashMap>, + ta: PhantomData, + p_p: PhantomData, + received_new_view_messages: HashMap>, } -impl - DecideState +impl DecideState where - TInboundConnectionService: InboundConnectionService, - TOutboundService: OutboundService, - TAddr: NodeAddressable, - TPayload: Payload, + TInboundConnectionService: + InboundConnectionService, + TOutboundService: OutboundService, { - pub fn new(node_id: TAddr, asset_public_key: PublicKey, committee: Committee) -> Self { + pub fn new( + node_id: TOutboundService::Addr, + asset_public_key: PublicKey, + committee: Committee, + ) -> Self { Self { node_id, asset_public_key, @@ -79,7 +77,10 @@ where } } - pub async fn next_event>( + pub async fn next_event< + TUnitOfWork: ChainDbUnitOfWork, + TPayloadProvider: PayloadProvider, + >( &mut self, timeout: Duration, current_view: &View, @@ -127,8 +128,8 @@ where async fn process_leader_message( &mut self, current_view: &View, - message: HotStuffMessage, - sender: &TAddr, + message: HotStuffMessage, + sender: &TOutboundService::Addr, outbound: &mut TOutboundService, ) -> Result, DigitalAssetError> { if !message.matches(HotStuffMessageType::Commit, current_view.view_id) { @@ -205,12 +206,15 @@ where Some(qc) } - async fn process_replica_message>( + async fn process_replica_message< + TUnitOfWork: ChainDbUnitOfWork, + TPayloadProvider: PayloadProvider, + >( &mut self, - message: &HotStuffMessage, + message: &HotStuffMessage, current_view: &View, - from: &TAddr, - view_leader: &TAddr, + from: &TOutboundService::Addr, + view_leader: &TOutboundService::Addr, unit_of_work: &mut TUnitOfWork, payload_provider: &mut TPayloadProvider, ) -> Result, DigitalAssetError> { diff --git a/dan_layer/core/src/workers/states/next_view.rs b/dan_layer/core/src/workers/states/next_view.rs index 8233a36a0e..a0b0c03399 100644 --- a/dan_layer/core/src/workers/states/next_view.rs +++ b/dan_layer/core/src/workers/states/next_view.rs @@ -25,8 +25,8 @@ use tari_shutdown::ShutdownSignal; use crate::{ digital_assets_error::DigitalAssetError, - models::{AssetDefinition, Committee, HotStuffMessage, Payload, View}, - services::infrastructure_services::{NodeAddressable, OutboundService}, + models::{AssetDefinition, Committee, HotStuffMessage, HotStuffTreeNode, QuorumCertificate, View}, + services::{infrastructure_services::OutboundService, PayloadProvider}, storage::DbFactory, workers::states::ConsensusWorkerStateEvent, }; @@ -37,29 +37,43 @@ const LOG_TARGET: &str = "tari::dan::workers::states::next_view"; pub struct NextViewState {} impl NextViewState { - pub async fn next_event< - TPayload: Payload, - TOutboundService: OutboundService, - TAddr: NodeAddressable + Clone + Send, - TDbFactory: DbFactory, - >( + pub async fn next_event( &mut self, current_view: &View, db_factory: &TDbFactory, broadcast: &mut TOutboundService, - committee: &Committee, - node_id: TAddr, + committee: &Committee, + node_id: TOutboundService::Addr, asset_definition: &AssetDefinition, + payload_provider: &TPayloadProvider, _shutdown: &ShutdownSignal, - ) -> Result { - let db = db_factory.get_or_create_chain_db(&asset_definition.public_key)?; - let prepare_qc = db.find_highest_prepared_qc()?; - let message = HotStuffMessage::new_view(prepare_qc, current_view.view_id, asset_definition.public_key.clone()); - let next_view = current_view.view_id.next(); - let leader = committee.leader_for_view(next_view); - broadcast.send(node_id, leader.clone(), message).await?; - info!(target: LOG_TARGET, "End of view: {}", current_view.view_id.0); - debug!(target: LOG_TARGET, "--------------------------------"); + ) -> Result + where + TOutboundService: OutboundService, + TDbFactory: DbFactory, + TPayloadProvider: PayloadProvider, + { + let chain_db = db_factory.get_or_create_chain_db(&asset_definition.public_key)?; + let next_view = if chain_db.is_empty()? { + info!(target: LOG_TARGET, "Database is empty. Proposing genesis block"); + let node = HotStuffTreeNode::genesis(payload_provider.create_genesis_payload(asset_definition)); + let genesis_qc = QuorumCertificate::genesis(*node.hash()); + let genesis_view_no = genesis_qc.view_number(); + let leader = committee.leader_for_view(genesis_view_no); + let message = HotStuffMessage::new_view(genesis_qc, 0.into(), asset_definition.public_key.clone()); + broadcast.send(node_id, leader.clone(), message).await?; + genesis_view_no + } else { + let prepare_qc = chain_db.find_highest_prepared_qc()?; + let message = + HotStuffMessage::new_view(prepare_qc, current_view.view_id, asset_definition.public_key.clone()); + let next_view = current_view.view_id.next(); + let leader = committee.leader_for_view(next_view); + broadcast.send(node_id, leader.clone(), message).await?; + info!(target: LOG_TARGET, "End of view: {}", current_view.view_id.0); + debug!(target: LOG_TARGET, "--------------------------------"); + next_view + }; Ok(ConsensusWorkerStateEvent::NewView { new_view: next_view }) } } diff --git a/dan_layer/core/src/workers/states/pre_commit_state.rs b/dan_layer/core/src/workers/states/pre_commit_state.rs index d84785a22c..833b35e38b 100644 --- a/dan_layer/core/src/workers/states/pre_commit_state.rs +++ b/dan_layer/core/src/workers/states/pre_commit_state.rs @@ -28,9 +28,9 @@ use tokio::time::{sleep, Duration}; use crate::{ digital_assets_error::DigitalAssetError, - models::{Committee, HotStuffMessage, HotStuffMessageType, Payload, QuorumCertificate, TreeNodeHash, View, ViewId}, + models::{Committee, HotStuffMessage, HotStuffMessageType, QuorumCertificate, TreeNodeHash, View, ViewId}, services::{ - infrastructure_services::{InboundConnectionService, NodeAddressable, OutboundService}, + infrastructure_services::{InboundConnectionService, OutboundService}, SigningService, }, storage::chain::ChainDbUnitOfWork, @@ -39,35 +39,33 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::workers::states::precommit"; -pub struct PreCommitState -where - TInboundConnectionService: InboundConnectionService, - TAddr: NodeAddressable, - TPayload: Payload, - TOutboundService: OutboundService, - TSigningService: SigningService, +pub struct PreCommitState +where TOutboundService: OutboundService { - node_id: TAddr, + node_id: TOutboundService::Addr, asset_public_key: PublicKey, - committee: Committee, + committee: Committee, phantom_inbound: PhantomData, phantom_outbound: PhantomData, - ta: PhantomData, - p_p: PhantomData, + ta: PhantomData, + p_p: PhantomData, p_s: PhantomData, - received_new_view_messages: HashMap>, + received_new_view_messages: HashMap>, } -impl - PreCommitState +impl + PreCommitState where - TInboundConnectionService: InboundConnectionService, - TOutboundService: OutboundService, - TAddr: NodeAddressable, - TPayload: Payload, - TSigningService: SigningService, + TInboundConnectionService: + InboundConnectionService, + TOutboundService: OutboundService, + TSigningService: SigningService, { - pub fn new(node_id: TAddr, committee: Committee, asset_public_key: PublicKey) -> Self { + pub fn new( + node_id: TOutboundService::Addr, + committee: Committee, + asset_public_key: PublicKey, + ) -> Self { Self { node_id, asset_public_key, @@ -127,8 +125,8 @@ where async fn process_leader_message( &mut self, current_view: &View, - message: HotStuffMessage, - sender: &TAddr, + message: HotStuffMessage, + sender: &TOutboundService::Addr, outbound: &mut TOutboundService, ) -> Result, DigitalAssetError> { debug!( @@ -177,7 +175,7 @@ where async fn broadcast( &self, outbound: &mut TOutboundService, - committee: &Committee, + committee: &Committee, prepare_qc: QuorumCertificate, view_number: ViewId, ) -> Result<(), DigitalAssetError> { @@ -215,10 +213,10 @@ where async fn process_replica_message( &mut self, - message: &HotStuffMessage, + message: &HotStuffMessage, current_view: &View, - from: &TAddr, - view_leader: &TAddr, + from: &TOutboundService::Addr, + view_leader: &TOutboundService::Addr, outbound: &mut TOutboundService, signing_service: &TSigningService, unit_of_work: &mut TUnitOfWork, @@ -266,7 +264,7 @@ where &self, node: TreeNodeHash, outbound: &mut TOutboundService, - view_leader: &TAddr, + view_leader: &TOutboundService::Addr, view_number: ViewId, signing_service: &TSigningService, ) -> Result<(), DigitalAssetError> { diff --git a/dan_layer/core/src/workers/states/prepare.rs b/dan_layer/core/src/workers/states/prepare.rs index f72ba0f401..d486fbfb15 100644 --- a/dan_layer/core/src/workers/states/prepare.rs +++ b/dan_layer/core/src/workers/states/prepare.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{collections::HashMap, marker::PhantomData, time::Instant}; +use std::{collections::HashMap, marker::PhantomData}; use log::*; use tari_common_types::types::PublicKey; @@ -29,18 +29,18 @@ use tokio::time::{sleep, Duration}; use crate::{ digital_assets_error::DigitalAssetError, models::{ + AssetDefinition, Committee, HotStuffMessage, HotStuffMessageType, HotStuffTreeNode, - Payload, QuorumCertificate, TreeNodeHash, View, ViewId, }, services::{ - infrastructure_services::{InboundConnectionService, NodeAddressable, OutboundService}, + infrastructure_services::{InboundConnectionService, OutboundService}, PayloadProcessor, PayloadProvider, SigningService, @@ -51,24 +51,10 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::workers::states::prepare"; -pub struct Prepare< - TInboundConnectionService, - TOutboundService, - TAddr, - TSigningService, - TPayloadProvider, - TPayload, - TPayloadProcessor, -> where - TInboundConnectionService: InboundConnectionService + Send, - TOutboundService: OutboundService, - TAddr: NodeAddressable, - TSigningService: SigningService, - TPayload: Payload, - TPayloadProvider: PayloadProvider, - TPayloadProcessor: PayloadProcessor, +pub struct Prepare +where TOutboundService: OutboundService { - node_id: TAddr, + node_id: TOutboundService::Addr, asset_public_key: PublicKey, // bft_service: Box, // TODO remove this hack @@ -77,37 +63,20 @@ pub struct Prepare< phantom_outbound: PhantomData, phantom_signing: PhantomData, phantom_processor: PhantomData, - received_new_view_messages: HashMap>, + received_new_view_messages: HashMap>, } -impl< - TInboundConnectionService, - TOutboundService, - TAddr, - TSigningService, - TPayloadProvider, - TPayload, - TPayloadProcessor, - > - Prepare< - TInboundConnectionService, - TOutboundService, - TAddr, - TSigningService, - TPayloadProvider, - TPayload, - TPayloadProcessor, - > +impl + Prepare where - TInboundConnectionService: InboundConnectionService + Send, - TOutboundService: OutboundService, - TAddr: NodeAddressable, - TSigningService: SigningService, - TPayload: Payload, - TPayloadProvider: PayloadProvider, - TPayloadProcessor: PayloadProcessor, + TOutboundService: OutboundService, + TInboundConnectionService: + InboundConnectionService + Send, + TSigningService: SigningService, + TPayloadProvider: PayloadProvider, + TPayloadProcessor: PayloadProcessor, { - pub fn new(node_id: TAddr, asset_public_key: PublicKey) -> Self { + pub fn new(node_id: TOutboundService::Addr, asset_public_key: PublicKey) -> Self { Self { node_id, asset_public_key, @@ -122,7 +91,7 @@ where #[allow(clippy::too_many_arguments)] pub async fn next_event< - TChainStorageService: ChainStorageService, + TChainStorageService: ChainStorageService, TUnitOfWork: ChainDbUnitOfWork, TStateDbUnitOfWork: StateDbUnitOfWork, TDbFactory: DbFactory, @@ -130,7 +99,8 @@ where &mut self, current_view: &View, timeout: Duration, - committee: &Committee, + asset_definition: &AssetDefinition, + committee: &Committee, inbound_services: &TInboundConnectionService, outbound_service: &mut TOutboundService, payload_provider: &mut TPayloadProvider, @@ -143,17 +113,27 @@ where ) -> Result { self.received_new_view_messages.clear(); - let started = Instant::now(); let mut chain_tx = chain_tx; let next_event_result; + let timeout = sleep(timeout); + futures::pin_mut!(timeout); loop { tokio::select! { r = inbound_services.wait_for_message(HotStuffMessageType::NewView, current_view.view_id() - 1.into()) => { let (from, message) = r?; debug!(target: LOG_TARGET, "Received leader message"); if current_view.is_leader() { - if let Some(result) = self.process_leader_message(current_view, message.clone(), - &from, committee, payload_provider, payload_processor, outbound_service, db_factory).await?{ + if let Some(result) = self.process_leader_message( + current_view, + message.clone(), + &from, + asset_definition, + committee, + payload_provider, + payload_processor, + outbound_service, + db_factory, + ).await? { next_event_result = result; break; } @@ -170,7 +150,7 @@ where } }, - _ = sleep(timeout.saturating_sub(Instant::now() - started)) => { + _ = &mut timeout => { next_event_result = ConsensusWorkerStateEvent::TimedOut; break; } @@ -185,9 +165,10 @@ where async fn process_leader_message( &mut self, current_view: &View, - message: HotStuffMessage, - sender: &TAddr, - committee: &Committee, + message: HotStuffMessage, + sender: &TOutboundService::Addr, + asset_definition: &AssetDefinition, + committee: &Committee, payload_provider: &TPayloadProvider, payload_processor: &mut TPayloadProcessor, outbound: &mut TOutboundService, @@ -223,9 +204,10 @@ where let proposal = self .create_proposal( *high_qc.node_hash(), + asset_definition, payload_provider, payload_processor, - current_view.view_id.as_u64() as u32, + current_view.view_id, temp_state_tx, ) .await?; @@ -245,14 +227,14 @@ where async fn process_replica_message< TUnitOfWork: ChainDbUnitOfWork, - TChainStorageService: ChainStorageService, + TChainStorageService: ChainStorageService, TStateDbUnitOfWork: StateDbUnitOfWork, >( &self, - message: &HotStuffMessage, + message: &HotStuffMessage, current_view: &View, - from: &TAddr, - view_leader: &TAddr, + from: &TOutboundService::Addr, + view_leader: &TOutboundService::Addr, outbound: &mut TOutboundService, signing_service: &TSigningService, payload_processor: &mut TPayloadProcessor, @@ -275,45 +257,46 @@ where return Ok(None); } let node = message.node().unwrap(); - if let Some(justify) = message.justify() { - if self.does_extend(node, justify.node_hash()) { - if !self.is_safe_node(node, justify, chain_tx)? { - unimplemented!("Node is not safe") - } + let justify = message + .justify() + .ok_or(DigitalAssetError::PreparePhaseNoQuorumCertificate)?; - let res = payload_processor - .process_payload(node.payload(), state_tx.clone()) - .await?; - if &res == node.state_root() { - chain_storage_service - .add_node::(node, chain_tx.clone()) - .await?; + if !self.does_extend(node, justify.node_hash()) { + return Err(DigitalAssetError::PreparePhaseCertificateDoesNotExtendNode); + } - payload_provider.reserve_payload(node.payload(), node.hash()).await?; - self.send_vote_to_leader( - *node.hash(), - outbound, - view_leader, - current_view.view_id, - signing_service, - ) - .await?; - Ok(Some(ConsensusWorkerStateEvent::Prepared)) - } else { - warn!( - target: LOG_TARGET, - "Calculated state root did not match the state root provided by the leader: Expected: {:?} \ - Leader provided:{:?}", - res, - node.state_root() - ); - Ok(None) - } - } else { - unimplemented!("Did not extend from qc.justify.node") - } + if !self.is_safe_node(node, justify, chain_tx)? { + return Err(DigitalAssetError::PreparePhaseNodeNotSafe); + } + + let res = payload_processor + .process_payload(node.payload(), state_tx.clone()) + .await?; + + if res == *node.state_root() { + chain_storage_service + .add_node::(node, chain_tx.clone()) + .await?; + + payload_provider.reserve_payload(node.payload(), node.hash()).await?; + self.send_vote_to_leader( + *node.hash(), + outbound, + view_leader, + current_view.view_id, + signing_service, + ) + .await?; + Ok(Some(ConsensusWorkerStateEvent::Prepared)) } else { - unimplemented!("unexpected Null justify ") + warn!( + target: LOG_TARGET, + "Calculated state root did not match the state root provided by the leader: Expected: {:?} Leader \ + provided:{:?}", + res, + node.state_root() + ); + Ok(None) } } @@ -338,26 +321,37 @@ where async fn create_proposal( &self, parent: TreeNodeHash, + asset_definition: &AssetDefinition, payload_provider: &TPayloadProvider, payload_processor: &mut TPayloadProcessor, - height: u32, + view_id: ViewId, state_db: TStateDbUnitOfWork, - ) -> Result, DigitalAssetError> { + ) -> Result, DigitalAssetError> { debug!(target: LOG_TARGET, "Creating new proposal"); // TODO: Artificial delay here to set the block time - sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(1)).await; + // + let payload = if view_id.is_genesis() { + payload_provider.create_genesis_payload(asset_definition) + } else { + payload_provider.create_payload().await? + }; - let payload = payload_provider.create_payload().await?; let state_root = payload_processor.process_payload(&payload, state_db).await?; - Ok(HotStuffTreeNode::from_parent(parent, payload, state_root, height)) + Ok(HotStuffTreeNode::from_parent( + parent, + payload, + state_root, + view_id.as_u64() as u32, + )) } async fn broadcast_proposal( &self, outbound: &mut TOutboundService, - committee: &Committee, - proposal: HotStuffTreeNode, + committee: &Committee, + proposal: HotStuffTreeNode, high_qc: QuorumCertificate, view_number: ViewId, ) -> Result<(), DigitalAssetError> { @@ -367,13 +361,13 @@ where .await } - fn does_extend(&self, node: &HotStuffTreeNode, from: &TreeNodeHash) -> bool { + fn does_extend(&self, node: &HotStuffTreeNode, from: &TreeNodeHash) -> bool { from == node.parent() } fn is_safe_node( &self, - node: &HotStuffTreeNode, + node: &HotStuffTreeNode, quorum_certificate: &QuorumCertificate, chain_tx: &mut TUnitOfWork, ) -> Result { @@ -385,7 +379,7 @@ where &self, node: TreeNodeHash, outbound: &mut TOutboundService, - view_leader: &TAddr, + view_leader: &TOutboundService::Addr, view_number: ViewId, signing_service: &TSigningService, ) -> Result<(), DigitalAssetError> { diff --git a/dan_layer/core/src/workers/states/starting.rs b/dan_layer/core/src/workers/states/starting.rs index 101e4925c0..3eac2626d4 100644 --- a/dan_layer/core/src/workers/states/starting.rs +++ b/dan_layer/core/src/workers/states/starting.rs @@ -27,15 +27,9 @@ use tari_utilities::hex::Hex; use crate::{ digital_assets_error::DigitalAssetError, - models::{AssetDefinition, HotStuffTreeNode, Payload, QuorumCertificate}, - services::{ - infrastructure_services::NodeAddressable, - BaseNodeClient, - CommitteeManager, - PayloadProcessor, - PayloadProvider, - }, - storage::{chain::ChainDbUnitOfWork, state::StateDbUnitOfWork, ChainStorageService, DbFactory}, + models::AssetDefinition, + services::{infrastructure_services::NodeAddressable, BaseNodeClient, CommitteeManager}, + storage::DbFactory, workers::states::ConsensusWorkerStateEvent, }; @@ -59,20 +53,13 @@ where TBaseNodeClient: BaseNodeClient pub async fn next_event< TAddr: NodeAddressable, TCommitteeManager: CommitteeManager, - TPayload: Payload, - TPayloadProvider: PayloadProvider, - TPayloadProcessor: PayloadProcessor, TDbFactory: DbFactory, - TChainStorageService: ChainStorageService, >( &self, base_node_client: &mut TBaseNodeClient, asset_definition: &AssetDefinition, committee_manager: &mut TCommitteeManager, db_factory: &TDbFactory, - payload_provider: &TPayloadProvider, - payload_processor: &TPayloadProcessor, - chain_storage_service: &TChainStorageService, node_id: &TAddr, ) -> Result { info!( @@ -113,46 +100,7 @@ where TBaseNodeClient: BaseNodeClient ); // read and create the genesis block info!(target: LOG_TARGET, "Creating DB"); - let chain_db = db_factory.get_or_create_chain_db(&asset_definition.public_key)?; - if chain_db.is_empty()? { - info!(target: LOG_TARGET, "DB is empty, initializing"); - let mut tx = chain_db.new_unit_of_work(); - - let state_db = db_factory.get_or_create_state_db(&asset_definition.public_key)?; - let mut state_tx = state_db.new_unit_of_work(0); - - info!(target: LOG_TARGET, "Loading initial state"); - let initial_state = asset_definition.initial_state(); - for schema in &initial_state.schemas { - debug!(target: LOG_TARGET, "Setting initial state for {}", schema.name); - for key_value in &schema.items { - debug!( - target: LOG_TARGET, - "Setting {:?} = {:?}", key_value.key, key_value.value - ); - state_tx.set_value(schema.name.clone(), key_value.key.clone(), key_value.value.clone())?; - } - } - dbg!(&asset_definition); - for template in &asset_definition.template_parameters { - debug!( - target: LOG_TARGET, - "Setting template parameters for: {}", template.template_id - ); - payload_processor.init_template(template, asset_definition, &mut state_tx)?; - } - info!(target: LOG_TARGET, "Saving genesis node"); - let node = HotStuffTreeNode::genesis(payload_provider.create_genesis_payload()); - let genesis_qc = QuorumCertificate::genesis(*node.hash()); - chain_storage_service.add_node(&node, tx.clone()).await?; - tx.commit_node(node.hash())?; - debug!(target: LOG_TARGET, "Setting locked QC"); - tx.set_locked_qc(&genesis_qc)?; - debug!(target: LOG_TARGET, "Committing state"); - state_tx.commit()?; - debug!(target: LOG_TARGET, "Committing node"); - tx.commit()?; - } + let _ = db_factory.get_or_create_chain_db(&asset_definition.public_key)?; Ok(ConsensusWorkerStateEvent::Initialized) } diff --git a/dan_layer/storage_sqlite/src/sqlite_db_factory.rs b/dan_layer/storage_sqlite/src/sqlite_db_factory.rs index 2d2aa029ea..2828b77a9a 100644 --- a/dan_layer/storage_sqlite/src/sqlite_db_factory.rs +++ b/dan_layer/storage_sqlite/src/sqlite_db_factory.rs @@ -124,7 +124,10 @@ impl DbFactory for SqliteDbFactory { ) -> Result>, StorageError> { let database_url = self.database_url_for(asset_public_key); match self.try_connect(&database_url)? { - Some(_) => Ok(Some(StateDb::new(SqliteStateDbBackendAdapter::new(database_url)))), + Some(_) => Ok(Some(StateDb::new( + asset_public_key.clone(), + SqliteStateDbBackendAdapter::new(database_url), + ))), None => Ok(None), } } @@ -147,6 +150,9 @@ impl DbFactory for SqliteDbFactory { })?; embed_migrations!("./migrations"); embedded_migrations::run(&connection).map_err(SqliteStorageError::from)?; - Ok(StateDb::new(SqliteStateDbBackendAdapter::new(database_url))) + Ok(StateDb::new( + asset_public_key.clone(), + SqliteStateDbBackendAdapter::new(database_url), + )) } }