Skip to content

Commit

Permalink
Merge branch 'rumenov/arfpfl' into 'master'
Browse files Browse the repository at this point in the history
chore: Use the StateReader interface in the canister http component in consensus. Move out the consensus pool caches from the ArtifactPools struct.

retrieve the consensus pool cache on demand when needed 

See merge request dfinity-lab/public/ic!11810
  • Loading branch information
rumenov committed Apr 12, 2023
2 parents 888257f + 06830b5 commit 4ba15d2
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 41 deletions.
10 changes: 5 additions & 5 deletions rs/consensus/src/canister_http/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ic_interfaces::{
artifact_pool::PriorityFnAndFilterProducer, canister_http::CanisterHttpPool,
consensus_pool::ConsensusPoolCache,
};
use ic_interfaces_state_manager::StateManager;
use ic_interfaces_state_manager::StateReader;
use ic_logger::{warn, ReplicaLogger};
use ic_replicated_state::ReplicatedState;
use ic_types::{
Expand All @@ -19,20 +19,20 @@ use std::{collections::BTreeSet, sync::Arc};
/// The canonical implementation of [`PriorityFnAndFilterProducer`]
pub struct CanisterHttpGossipImpl {
consensus_cache: Arc<dyn ConsensusPoolCache>,
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
log: ReplicaLogger,
}

impl CanisterHttpGossipImpl {
/// Construcet a new CanisterHttpGossipImpl instance
pub fn new(
consensus_cache: Arc<dyn ConsensusPoolCache>,
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
log: ReplicaLogger,
) -> Self {
CanisterHttpGossipImpl {
consensus_cache,
state_manager,
state_reader,
log,
}
}
Expand All @@ -49,7 +49,7 @@ impl<Pool: CanisterHttpPool> PriorityFnAndFilterProducer<CanisterHttpArtifact, P
let registry_version =
registry_version_at_height(self.consensus_cache.as_ref(), finalized_height).unwrap();
let known_request_ids: BTreeSet<_> = self
.state_manager
.state_reader
.get_latest_state()
.get_ref()
.metadata
Expand Down
12 changes: 6 additions & 6 deletions rs/consensus/src/canister_http/payload_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use ic_interfaces::{
consensus_pool::ConsensusPoolCache,
};
use ic_interfaces_registry::RegistryClient;
use ic_interfaces_state_manager::StateManager;
use ic_interfaces_state_manager::StateReader;
use ic_logger::{warn, ReplicaLogger};
use ic_metrics::MetricsRegistry;
use ic_registry_client_helpers::subnet::SubnetRegistry;
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct CanisterHttpPayloadBuilderImpl {
pool: Arc<RwLock<dyn CanisterHttpPool>>,
cache: Arc<dyn ConsensusPoolCache>,
crypto: Arc<dyn ConsensusCrypto>,
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
membership: Arc<Membership>,
subnet_id: SubnetId,
registry: Arc<dyn RegistryClient>,
Expand All @@ -76,7 +76,7 @@ impl CanisterHttpPayloadBuilderImpl {
pool: Arc<RwLock<dyn CanisterHttpPool>>,
cache: Arc<dyn ConsensusPoolCache>,
crypto: Arc<dyn ConsensusCrypto>,
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
membership: Arc<Membership>,
subnet_id: SubnetId,
registry: Arc<dyn RegistryClient>,
Expand All @@ -87,7 +87,7 @@ impl CanisterHttpPayloadBuilderImpl {
pool,
cache,
crypto,
state_manager,
state_reader,
membership,
subnet_id,
registry,
Expand Down Expand Up @@ -223,7 +223,7 @@ impl CanisterHttpPayloadBuilder for CanisterHttpPayloadBuilderImpl {
// time out response. Instead, we scan the state metadata for timed
// out requests and generate time out responses based on that
if let Ok(state) = self
.state_manager
.state_reader
.get_state_at(validation_context.certified_height)
{
// Iterate over all outstanding canister http requests
Expand Down Expand Up @@ -442,7 +442,7 @@ impl CanisterHttpPayloadBuilder for CanisterHttpPayloadBuilderImpl {

// Validate the timed out calls
let state = &self
.state_manager
.state_reader
.get_state_at(validation_context.certified_height)
.map_err(|_| {
CanisterHttpPayloadValidationError::Transient(
Expand Down
2 changes: 1 addition & 1 deletion rs/consensus/src/canister_http/payload_builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ fn timeout_priority() {
Height::new(0),
Arc::new(init_state),
)));
payload_builder.state_manager = state_manager;
payload_builder.state_reader = state_manager;
}

let validation_context = ValidationContext {
Expand Down
14 changes: 7 additions & 7 deletions rs/consensus/src/canister_http/pool_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use ic_interfaces::{
};
use ic_interfaces_https_outcalls_adapter_client::*;
use ic_interfaces_registry::RegistryClient;
use ic_interfaces_state_manager::StateManager;
use ic_interfaces_state_manager::StateReader;
use ic_logger::*;
use ic_metrics::MetricsRegistry;
use ic_registry_client_helpers::subnet::SubnetRegistry;
Expand All @@ -37,7 +37,7 @@ use std::{
/// - Validate shares in the unvalidated pool that were received from gossip
pub struct CanisterHttpPoolManagerImpl {
registry_client: Arc<dyn RegistryClient>,
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
http_adapter_shim: Arc<Mutex<CanisterHttpAdapterClient>>,
consensus_pool_cache: Arc<dyn ConsensusPoolCache>,
crypto: Arc<dyn ConsensusCrypto>,
Expand All @@ -51,7 +51,7 @@ pub struct CanisterHttpPoolManagerImpl {
impl CanisterHttpPoolManagerImpl {
/// Create a new [`CanisterHttpPoolManagerImpl`]
pub fn new(
state_manager: Arc<dyn StateManager<State = ReplicatedState>>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
http_adapter_shim: Arc<Mutex<CanisterHttpAdapterClient>>,
crypto: Arc<dyn ConsensusCrypto>,
membership: Arc<Membership>,
Expand All @@ -62,7 +62,7 @@ impl CanisterHttpPoolManagerImpl {
log: ReplicaLogger,
) -> Self {
Self {
state_manager,
state_reader,
http_adapter_shim,
crypto,
replica_config,
Expand All @@ -87,7 +87,7 @@ impl CanisterHttpPoolManagerImpl {
.start_timer();

let active_callback_ids: BTreeSet<_> = self
.state_manager
.state_reader
.get_latest_state()
.get_ref()
.metadata
Expand Down Expand Up @@ -155,7 +155,7 @@ impl CanisterHttpPoolManagerImpl {
.start_timer();

let http_requests = self
.state_manager
.state_reader
.get_latest_state()
.get_ref()
.metadata
Expand Down Expand Up @@ -519,7 +519,7 @@ pub mod test {
vec![CanisterHttpChangeAction::AddToValidated(share, content)],
);
let pool_manager = CanisterHttpPoolManagerImpl::new(
state_manager,
state_manager as Arc<_>,
shim,
crypto,
membership,
Expand Down
36 changes: 15 additions & 21 deletions rs/replica/setup_ic_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use ic_icos_sev_interfaces::ValidateAttestedStream;
use ic_ingress_manager::IngressManager;
use ic_interfaces::{
artifact_manager::{ArtifactClient, ArtifactManager, ArtifactProcessor},
consensus_pool::ConsensusPoolCache,
crypto::IngressSigVerifier,
execution_environment::IngressHistoryReader,
messaging::{MessageRouting, XNetPayloadBuilder},
Expand Down Expand Up @@ -87,8 +86,7 @@ pub enum P2PStateSyncClient {
/// The collection of all artifact pools.
pub struct ArtifactPools {
ingress_pool: Arc<RwLock<IngressPoolImpl>>,
consensus_pool: Arc<RwLock<ConsensusPoolImpl>>,
pub consensus_pool_cache: Arc<dyn ConsensusPoolCache>,
pub consensus_pool: Arc<RwLock<ConsensusPoolImpl>>,
certification_pool: Arc<RwLock<CertificationPoolImpl>>,
dkg_pool: Arc<RwLock<DkgPoolImpl>>,
ecdsa_pool: Arc<RwLock<EcdsaPoolImpl>>,
Expand Down Expand Up @@ -136,11 +134,9 @@ pub fn create_networking_stack(
registry_poll_delay_duration_ms: u64,
) -> (IngressIngestionService, P2PThreadJoiner) {
let advert_subscriber = AdvertBroadcaster::new(log.clone(), metrics_registry);
let consensus_pool_cache = artifact_pools.consensus_pool_cache.clone();
let ingress_pool = artifact_pools.ingress_pool.clone();
let oldest_registry_version_in_use = artifact_pools
.consensus_pool_cache
.get_oldest_registry_version_in_use();
let consensus_pool_cache = artifact_pools.consensus_pool.read().unwrap().get_cache();
let oldest_registry_version_in_use = consensus_pool_cache.get_oldest_registry_version_in_use();
// Now we setup the Artifact Pools and the manager.
let artifact_manager = setup_artifact_manager(
node_id,
Expand Down Expand Up @@ -242,6 +238,7 @@ fn setup_artifact_manager(
) -> std::io::Result<Arc<dyn ArtifactManager>> {
// Initialize the time source.
let time_source = Arc::new(SysTimeSource::new());
let consensus_pool_cache = artifact_pools.consensus_pool.read().unwrap().get_cache();

let mut backends: HashMap<ArtifactTag, Box<dyn manager::ArtifactManagerBackend>> =
HashMap::new();
Expand Down Expand Up @@ -296,13 +293,13 @@ fn setup_artifact_manager(

let replica_config = ReplicaConfig { node_id, subnet_id };
let membership = Arc::new(Membership::new(
artifact_pools.consensus_pool_cache.clone(),
consensus_pool_cache.clone(),
Arc::clone(&registry_client),
subnet_id,
));

let ingress_manager = Arc::new(IngressManager::new(
artifact_pools.consensus_pool_cache.clone(),
consensus_pool_cache.clone(),
ingress_history_reader,
artifact_pools.ingress_pool.clone(),
Arc::clone(&registry_client),
Expand All @@ -317,9 +314,9 @@ fn setup_artifact_manager(

let canister_http_payload_builder = Arc::new(CanisterHttpPayloadBuilderImpl::new(
artifact_pools.canister_http_pool.clone(),
artifact_pools.consensus_pool_cache.clone(),
consensus_pool_cache.clone(),
consensus_crypto.clone(),
state_manager.clone(),
state_reader.clone(),
membership.clone(),
subnet_id,
registry_client.clone(),
Expand Down Expand Up @@ -403,7 +400,7 @@ fn setup_artifact_manager(
Arc::clone(&membership) as Arc<_>,
Arc::clone(&certifier_crypto),
Arc::clone(&state_manager) as Arc<_>,
Arc::clone(&artifact_pools.consensus_pool_cache) as Arc<_>,
Arc::clone(&consensus_pool_cache) as Arc<_>,
metrics_registry.clone(),
replica_logger.clone(),
),
Expand All @@ -426,7 +423,7 @@ fn setup_artifact_manager(
dkg::DkgImpl::new(
node_id,
Arc::clone(&consensus_crypto),
Arc::clone(&artifact_pools.consensus_pool_cache),
Arc::clone(&consensus_pool_cache),
dkg_key_manager,
metrics_registry.clone(),
replica_logger.clone(),
Expand All @@ -443,7 +440,7 @@ fn setup_artifact_manager(

{
let advert_broadcaster = advert_broadcaster.clone();
let finalized = artifact_pools.consensus_pool_cache.finalized_block();
let finalized = consensus_pool_cache.finalized_block();
let ecdsa_config =
registry_client.get_ecdsa_config(subnet_id, registry_client.get_latest_version());
info!(
Expand Down Expand Up @@ -490,19 +487,19 @@ fn setup_artifact_manager(
move |req| advert_broadcaster.send(req.into()),
(
CanisterHttpPoolManagerImpl::new(
Arc::clone(&state_manager) as Arc<_>,
Arc::clone(&state_reader),
Arc::new(Mutex::new(canister_http_adapter_client)),
Arc::clone(&consensus_crypto),
Arc::clone(&membership),
Arc::clone(&artifact_pools.consensus_pool_cache),
Arc::clone(&consensus_pool_cache),
ReplicaConfig { subnet_id, node_id },
Arc::clone(&registry_client),
metrics_registry.clone(),
replica_logger.clone(),
),
CanisterHttpGossipImpl::new(
Arc::clone(&artifact_pools.consensus_pool_cache),
Arc::clone(&state_manager) as Arc<_>,
Arc::clone(&consensus_pool_cache),
Arc::clone(&state_reader),
replica_logger,
),
),
Expand All @@ -518,7 +515,6 @@ fn setup_artifact_manager(
}

/// The function initializes the artifact pools.
#[allow(clippy::type_complexity)]
pub fn init_artifact_pools(
subnet_id: SubnetId,
config: ArtifactPoolConfig,
Expand Down Expand Up @@ -550,7 +546,6 @@ pub fn init_artifact_pools(
registry.clone(),
log.clone(),
)));
let consensus_pool_cache = consensus_pool.read().unwrap().get_cache();
let certification_pool = Arc::new(RwLock::new(CertificationPoolImpl::new(
config,
log,
Expand All @@ -561,7 +556,6 @@ pub fn init_artifact_pools(
ArtifactPools {
ingress_pool,
consensus_pool,
consensus_pool_cache,
certification_pool,
dkg_pool,
ecdsa_pool,
Expand Down
2 changes: 1 addition & 1 deletion rs/replica/src/setup_ic_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub fn construct_ic_stack(

// ---------- REPLICATED STATE DEPS FOLLOW ----------
let sev_handshake = Arc::new(Sev::new(node_id, registry.clone()));
let consensus_pool_cache = artifact_pools.consensus_pool_cache.clone();
let consensus_pool_cache = artifact_pools.consensus_pool.read().unwrap().get_cache();
let verifier = Arc::new(VerifierImpl::new(crypto.clone()));
let state_manager = Arc::new(StateManagerImpl::new(
verifier,
Expand Down

0 comments on commit 4ba15d2

Please sign in to comment.