Skip to content

Commit

Permalink
chore: small cleanup in the replica crate
Browse files Browse the repository at this point in the history
  • Loading branch information
rumenov committed Apr 13, 2023
1 parent b06f5eb commit 25d13d2
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 49 deletions.
4 changes: 2 additions & 2 deletions rs/replica/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ fn main() -> io::Result<()> {
info!(logger, "Constructing IC stack");
let (_, _, _p2p_thread_joiner, _, _xnet_endpoint) =
ic_replica::setup_ic_stack::construct_ic_stack(
logger.clone(),
&logger,
&metrics_registry,
rt_main.handle().clone(),
rt_http.handle().clone(),
rt_xnet.handle().clone(),
Expand All @@ -282,7 +283,6 @@ fn main() -> io::Result<()> {
root_subnet_id,
registry,
crypto,
metrics_registry.clone(),
cup_with_proto,
registry_certified_time_reader,
)?;
Expand Down
89 changes: 44 additions & 45 deletions rs/replica/src/setup_ic_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use ic_interfaces_p2p::IngressIngestionService;
use ic_interfaces_registry::{LocalStoreCertifiedTimeReader, RegistryClient};
use ic_logger::{info, ReplicaLogger};
use ic_messaging::MessageRoutingImpl;
use ic_metrics::MetricsRegistry;
use ic_p2p::P2PThreadJoiner;
use ic_registry_subnet_type::SubnetType;
use ic_replica_setup_ic_network::{
Expand All @@ -23,12 +24,12 @@ use ic_state_manager::{state_sync::StateSync, StateManagerImpl};
use ic_types::{consensus::catchup::CUPWithOriginalProtobuf, NodeId, SubnetId};
use ic_xnet_endpoint::{XNetEndpoint, XNetEndpointConfig};
use ic_xnet_payload_builder::XNetPayloadBuilderImpl;

use std::sync::Arc;

#[allow(clippy::too_many_arguments, clippy::type_complexity)]
pub fn construct_ic_stack(
replica_logger: ReplicaLogger,
log: &ReplicaLogger,
metrics_registry: &MetricsRegistry,
rt_handle: tokio::runtime::Handle,
rt_handle_http: tokio::runtime::Handle,
rt_handle_xnet: tokio::runtime::Handle,
Expand All @@ -39,16 +40,16 @@ pub fn construct_ic_stack(
subnet_type: SubnetType,
root_subnet_id: SubnetId,
registry: Arc<dyn RegistryClient + Send + Sync>,
// TODO(SCL-213): When Rust traits support it, simplify and pass a single
// trait.
crypto: Arc<CryptoComponent>,
metrics_registry: ic_metrics::MetricsRegistry,
catch_up_package: Option<CUPWithOriginalProtobuf>,
local_store_time_reader: Arc<dyn LocalStoreCertifiedTimeReader>,
) -> std::io::Result<(
// TODO: remove this return value since it is used only in tests
Arc<StateManagerImpl>,
// TODO: remove this return value since it is used only in tests
Arc<dyn QueryHandler<State = ReplicatedState>>,
P2PThreadJoiner,
// TODO: remove this return value since it is used only in tests
IngressIngestionService,
XNetEndpoint,
)> {
Expand Down Expand Up @@ -77,7 +78,7 @@ pub fn construct_ic_stack(
.0
.is_empty();
info!(
&replica_logger,
log,
"Using the {} CUP with height {}",
if signed { "signed" } else { "unsigned" },
cup_from_orc.cup.height()
Expand All @@ -89,7 +90,7 @@ pub fn construct_ic_stack(
None => {
let registry_cup = make_registry_cup();
info!(
&replica_logger,
log,
"Using the CUP with height {} generated from the registry",
registry_cup.cup.height()
);
Expand All @@ -102,20 +103,19 @@ pub fn construct_ic_stack(
subnet_id,
artifact_pool_config,
metrics_registry.clone(),
replica_logger.clone(),
log.clone(),
catch_up_package,
);

// ---------- REPLICATED STATE DEPS FOLLOW ----------
let sev_handshake = Arc::new(Sev::new(node_id, registry.clone()));
let consensus_pool_cache = artifact_pools.consensus_pool.read().unwrap().get_cache();
let verifier = Arc::new(VerifierImpl::new(crypto.clone()));
let state_manager = Arc::new(StateManagerImpl::new(
verifier,
subnet_id,
subnet_type,
replica_logger.clone(),
&metrics_registry,
log.clone(),
metrics_registry,
&config.state_manager,
// In order for the state manager to start, it needs to know the height of the last
// CUP and/or certification. This information part of the persisted consensus pool.
Expand All @@ -131,8 +131,8 @@ pub fn construct_ic_stack(
subnet_config.cycles_account_manager_config,
));
let execution_services = ExecutionServices::setup_execution(
replica_logger.clone(),
&metrics_registry,
log.clone(),
metrics_registry,
subnet_id,
subnet_type,
subnet_config.scheduler_config,
Expand All @@ -153,8 +153,8 @@ pub fn construct_ic_stack(
subnet_id,
Arc::clone(&state_manager) as Arc<_>,
execution_services.ingress_history_writer,
&metrics_registry,
replica_logger.clone(),
metrics_registry,
log.clone(),
)
} else {
MessageRoutingImpl::new(
Expand All @@ -165,70 +165,69 @@ pub fn construct_ic_stack(
config.hypervisor,
Arc::clone(&cycles_account_manager),
subnet_id,
&metrics_registry,
replica_logger.clone(),
Arc::clone(&registry) as Arc<_>,
metrics_registry,
log.clone(),
registry.clone(),
config.malicious_behaviour.malicious_flags.clone(),
)
};
let message_router = Arc::new(message_router);
let xnet_config =
XNetEndpointConfig::from(Arc::clone(&registry) as Arc<_>, node_id, &replica_logger);
let xnet_config = XNetEndpointConfig::from(Arc::clone(&registry) as Arc<_>, node_id, log);
let xnet_endpoint = XNetEndpoint::new(
rt_handle_xnet,
Arc::clone(&certified_stream_store),
Arc::clone(&crypto) as Arc<_>,
Arc::clone(&registry),
registry.clone(),
xnet_config,
&metrics_registry,
replica_logger.clone(),
metrics_registry,
log.clone(),
);
// Use default runtime to spawn xnet client threads.
let xnet_payload_builder = Arc::new(XNetPayloadBuilderImpl::new(
Arc::clone(&state_manager) as Arc<_>,
Arc::clone(&certified_stream_store) as Arc<_>,
Arc::clone(&crypto) as Arc<_>,
Arc::clone(&registry) as Arc<_>,
registry.clone(),
rt_handle.clone(),
node_id,
subnet_id,
&metrics_registry,
replica_logger.clone(),
metrics_registry,
log.clone(),
));
// ---------- BITCOIN INTEGRATION DEPS FOLLOW ----------
let BitcoinAdapterClients {
btc_testnet_client,
btc_mainnet_client,
} = setup_bitcoin_adapter_clients(
replica_logger.clone(),
&metrics_registry,
log.clone(),
metrics_registry,
rt_handle.clone(),
config.adapters_config.clone(),
);
let self_validating_payload_builder = Arc::new(BitcoinPayloadBuilder::new(
state_manager.clone(),
&metrics_registry,
metrics_registry,
btc_mainnet_client,
btc_testnet_client,
subnet_id,
Arc::clone(&registry),
replica_logger.clone(),
registry.clone(),
log.clone(),
));
// ---------- CONSENSUS AND P2P DEPS FOLLOW ----------
// ---------- HTTPS OUTCALLS DEPS FOLLOW ----------
let canister_http_adapter_client = setup_canister_http_client(
rt_handle.clone(),
&metrics_registry,
metrics_registry,
config.adapters_config,
execution_services.anonymous_query_handler,
replica_logger.clone(),
log.clone(),
subnet_type,
);

let state_sync = StateSync::new(state_manager.clone(), replica_logger.clone());

// ---------- CONSENSUS AND P2P DEPS FOLLOW ----------
let state_sync = StateSync::new(state_manager.clone(), log.clone());
let sev_handshake = Arc::new(Sev::new(node_id, registry.clone()));
let (ingress_ingestion_service, p2p_runner) = create_networking_stack(
&metrics_registry,
replica_logger.clone(),
metrics_registry,
log.clone(),
rt_handle,
config.transport,
config.consensus,
Expand All @@ -241,14 +240,14 @@ pub fn construct_ic_stack(
Arc::clone(&state_manager) as Arc<_>,
Arc::clone(&state_manager) as Arc<_>,
P2PStateSyncClient::Client(state_sync),
xnet_payload_builder as Arc<_>,
self_validating_payload_builder as Arc<_>,
message_router as Arc<_>,
xnet_payload_builder,
self_validating_payload_builder,
message_router,
// TODO(SCL-213)
Arc::clone(&crypto) as Arc<_>,
Arc::clone(&crypto) as Arc<_>,
Arc::clone(&crypto) as Arc<_>,
Arc::clone(&registry) as Arc<_>,
registry.clone(),
execution_services.ingress_history_reader,
artifact_pools,
cycles_account_manager,
Expand All @@ -259,7 +258,7 @@ pub fn construct_ic_stack(
// ---------- PUBLIC ENDPOINT DEPS FOLLOW ----------
ic_http_endpoints_public::start_server(
rt_handle_http,
&metrics_registry,
metrics_registry,
config.http_handler.clone(),
execution_services.ingress_filter,
ingress_ingestion_service.clone(),
Expand All @@ -270,7 +269,7 @@ pub fn construct_ic_stack(
Arc::clone(&crypto) as Arc<_>,
subnet_id,
root_subnet_id,
replica_logger,
log.clone(),
consensus_pool_cache,
subnet_type,
config.malicious_behaviour.malicious_flags,
Expand Down
4 changes: 2 additions & 2 deletions rs/replica_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ where
let temp_node = node_id;
let (state_manager, query_handler, _p2p_thread_joiner, ingress_ingestion_service, _) =
ic_replica::setup_ic_stack::construct_ic_stack(
logger,
&logger,
&metrics_registry,
tokio::runtime::Handle::current(),
tokio::runtime::Handle::current(),
tokio::runtime::Handle::current(),
Expand All @@ -408,7 +409,6 @@ where
subnet_id,
registry.clone(),
crypto,
metrics_registry,
None,
fake_local_store_certified_time_reader,
)
Expand Down

0 comments on commit 25d13d2

Please sign in to comment.