Skip to content

Commit

Permalink
fix(validator-node): fix consensus stall after genesis
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Feb 18, 2022
1 parent 50fe421 commit e289bcd
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ use tari_dan_core::{
TariDanPayloadProvider,
},
};
use tari_dan_storage_sqlite::{SqliteDbFactory, SqliteStateDbBackendAdapter, SqliteStorageService};
use tari_dan_storage_sqlite::{
SqliteChainBackendAdapter,
SqliteDbFactory,
SqliteStateDbBackendAdapter,
SqliteStorageService,
};

use crate::{
grpc::services::{base_node_client::GrpcBaseNodeClient, wallet_client::GrpcWalletClient},
Expand All @@ -55,6 +60,7 @@ impl ServiceSpecification for DefaultServiceSpecification {
type AssetProcessor = ConcreteAssetProcessor;
type AssetProxy = ConcreteAssetProxy<Self>;
type BaseNodeClient = GrpcBaseNodeClient;
type ChainDbBackendAdapter = SqliteChainBackendAdapter;
type ChainStorageService = SqliteStorageService;
type CheckpointManager = ConcreteCheckpointManager<Self::WalletClient>;
type CommitteeManager = ConcreteCommitteeManager;
Expand Down
8 changes: 8 additions & 0 deletions dan_layer/core/src/models/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::fmt::{Display, Formatter};

use crate::models::ViewId;

// TODO: Encapsulate
Expand All @@ -38,3 +40,9 @@ impl View {
self.view_id
}
}

impl Display for View {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "View(Id: {}, leader: {})", self.view_id.as_u64(), self.is_leader)
}
}
1 change: 1 addition & 0 deletions dan_layer/core/src/services/asset_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl<TServiceSpecification: ServiceSpecification<Addr = PublicKey>> ConcreteAsse
method: String,
args: Vec<u8>,
) -> Result<Option<Vec<u8>>, DigitalAssetError> {
debug!(target: LOG_TARGET, "Forwarding '{}' instruction to {}", member, method);
let mut client = self.validator_node_client_factory.create_client(member);
let resp = client
.invoke_method(asset_public_key, template_id, method, args)
Expand Down
11 changes: 9 additions & 2 deletions dan_layer/core/src/services/service_specification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{
SigningService,
ValidatorNodeClientFactory,
},
storage::{state::StateDbBackendAdapter, ChainStorageService, DbFactory},
storage::{chain::ChainDbBackendAdapter, state::StateDbBackendAdapter, ChainStorageService, DbFactory},
};

/// A trait to describe a specific configuration of services. This type allows other services to
Expand All @@ -48,10 +48,17 @@ pub trait ServiceSpecification: Clone {
type AssetProcessor: AssetProcessor + Clone + Sync + Send + 'static;
type AssetProxy: AssetProxy + Clone + Sync + Send + 'static;
type BaseNodeClient: BaseNodeClient + Clone + Sync + Send + 'static;
type ChainDbBackendAdapter: ChainDbBackendAdapter;
type ChainStorageService: ChainStorageService<Self::Payload>;
type CheckpointManager: CheckpointManager<Self::Addr>;
type CommitteeManager: CommitteeManager<Self::Addr>;
type DbFactory: DbFactory<StateDbBackendAdapter = Self::StateDbBackendAdapter> + Clone + Sync + Send + 'static;
type DbFactory: DbFactory<
StateDbBackendAdapter = Self::StateDbBackendAdapter,
ChainDbBackendAdapter = Self::ChainDbBackendAdapter,
> + Clone
+ Sync
+ Send
+ 'static;
type EventsPublisher: EventsPublisher<ConsensusWorkerDomainEvent>;
type InboundConnectionService: InboundConnectionService<Addr = Self::Addr, Payload = Self::Payload>
+ 'static
Expand Down
62 changes: 33 additions & 29 deletions dan_layer/core/src/workers/consensus_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
models::{domain_events::ConsensusWorkerDomainEvent, AssetDefinition, ConsensusWorkerState, View, ViewId},
services::{CheckpointManager, CommitteeManager, EventsPublisher, PayloadProvider, ServiceSpecification},
storage::{
chain::ChainDbUnitOfWork,
chain::{ChainDb, ChainDbUnitOfWork},
state::{StateDbUnitOfWork, StateDbUnitOfWorkImpl, StateDbUnitOfWorkReader},
DbFactory,
},
Expand Down Expand Up @@ -116,14 +116,25 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
shutdown: ShutdownSignal,
max_views_to_process: Option<u64>,
) -> Result<(), DigitalAssetError> {
let chain_db = self
.db_factory
.get_or_create_chain_db(&self.asset_definition.public_key)?;
self.current_view_id = chain_db
.get_tip_node()?
.map(|n| ViewId(n.height() as u64))
.unwrap_or_else(|| ViewId(0));
info!(
target: LOG_TARGET,
"Consensus worker started for asset '{}'. Tip: {}", self.asset_definition.public_key, self.current_view_id
);
let starting_view = self.current_view_id;
loop {
if let Some(max) = max_views_to_process {
if max <= self.current_view_id.0 - starting_view.0 {
break;
}
}
let next_event = self.next_state_event(&shutdown).await?;
let next_event = self.next_state_event(&chain_db, &shutdown).await?;
if next_event.must_shutdown() {
info!(
target: LOG_TARGET,
Expand All @@ -133,7 +144,10 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
break;
}
let (from, to) = self.transition(next_event)?;
debug!(target: LOG_TARGET, "Transitioning from {:?} to {:?}", from, to);
debug!(
target: LOG_TARGET,
"Transitioning from {:?} to {:?} ({})", from, to, self.current_view_id
);

self.events_publisher
.publish(ConsensusWorkerDomainEvent::StateChanged { from, to });
Expand All @@ -144,12 +158,13 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp

async fn next_state_event(
&mut self,
chain_db: &ChainDb<TSpecification::ChainDbBackendAdapter>,
shutdown: &ShutdownSignal,
) -> Result<ConsensusWorkerStateEvent, DigitalAssetError> {
use ConsensusWorkerState::*;
match &mut self.state {
Starting => {
states::Starting::default()
states::Starting::<TSpecification>::new()
.next_event(
&mut self.base_node_client,
&self.asset_definition,
Expand All @@ -160,7 +175,7 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
.await
},
Synchronizing => {
states::Synchronizing::new()
states::Synchronizing::<TSpecification>::new()
.next_event(
&mut self.base_node_client,
&self.asset_definition,
Expand All @@ -171,18 +186,17 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
.await
},
Prepare => {
let db = self
.db_factory
.get_or_create_chain_db(&self.asset_definition.public_key)?;
let mut unit_of_work = db.new_unit_of_work();
let mut unit_of_work = chain_db.new_unit_of_work();
let mut state_tx = self
.db_factory
.get_state_db(&self.asset_definition.public_key)?
.ok_or(DigitalAssetError::MissingDatabase)?
.new_unit_of_work(self.current_view_id.as_u64());

let mut prepare =
states::Prepare::new(self.node_address.clone(), self.asset_definition.public_key.clone());
let mut prepare = states::Prepare::<TSpecification>::new(
self.node_address.clone(),
self.asset_definition.public_key.clone(),
);
let res = prepare
.next_event(
&self.get_current_view()?,
Expand All @@ -206,11 +220,8 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
Ok(res)
},
PreCommit => {
let db = self
.db_factory
.get_or_create_chain_db(&self.asset_definition.public_key)?;
let mut unit_of_work = db.new_unit_of_work();
let mut state = states::PreCommitState::new(
let mut unit_of_work = chain_db.new_unit_of_work();
let mut state = states::PreCommitState::<TSpecification>::new(
self.node_address.clone(),
self.committee_manager.current_committee()?.clone(),
self.asset_definition.public_key.clone(),
Expand All @@ -230,11 +241,8 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
},

Commit => {
let db = self
.db_factory
.get_or_create_chain_db(&self.asset_definition.public_key)?;
let mut unit_of_work = db.new_unit_of_work();
let mut state = states::CommitState::new(
let mut unit_of_work = chain_db.new_unit_of_work();
let mut state = states::CommitState::<TSpecification>::new(
self.node_address.clone(),
self.asset_definition.public_key.clone(),
self.committee_manager.current_committee()?.clone(),
Expand All @@ -255,11 +263,8 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
Ok(res)
},
Decide => {
let db = self
.db_factory
.get_or_create_chain_db(&self.asset_definition.public_key)?;
let mut unit_of_work = db.new_unit_of_work();
let mut state = states::DecideState::new(
let mut unit_of_work = chain_db.new_unit_of_work();
let mut state = states::DecideState::<TSpecification>::new(
self.node_address.clone(),
self.asset_definition.public_key.clone(),
self.committee_manager.current_committee()?.clone(),
Expand Down Expand Up @@ -300,7 +305,7 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
self.payload_provider.get_payload_queue().await,
);
self.state_db_unit_of_work = None;
let mut state = states::NextViewState::default();
let mut state = states::NextViewState::<TSpecification>::new();
state
.next_event(
&self.get_current_view()?,
Expand Down Expand Up @@ -335,8 +340,7 @@ impl<TSpecification: ServiceSpecification<Addr = PublicKey>> ConsensusWorker<TSp
(_, NotPartOfCommittee) => Idle,
(Idle, TimedOut) => Starting,
(_, TimedOut) => {
warn!(target: LOG_TARGET, "State timed out");
self.current_view_id = self.current_view_id.saturating_sub(1.into());
warn!(target: LOG_TARGET, "State timed out for {}", self.current_view_id);
NextView
},
(NextView, NewView { new_view }) => {
Expand Down
Loading

0 comments on commit e289bcd

Please sign in to comment.