> + Member + Codec,
+ GOH: RetrieveAuthoritiesFromOrchestrator<
+ Block,
+ (PHash, PersistedValidationData),
+ OrchestratorAuraWorkerAuxData,
+ >
+ + 'static
+ + Sync
+ + Send,
+{
+ // This is an arbitrary value which is likely guaranteed to exceed any reasonable
+ // limit, as it would correspond to 10 non-included blocks.
+ //
+ // Since we only search for parent blocks which have already been imported,
+ // we can guarantee that all imported blocks respect the unincluded segment
+ // rules specified by the parachain's runtime and thus will never be too deep.
+ const PARENT_SEARCH_DEPTH: usize = 10;
+
+ log::info!("LOOKAHEAD COLLATOR RUNNING...");
+
+ async move {
+ cumulus_client_collator::initialize_collator_subsystems(
+ &mut params.overseer_handle,
+ params.collator_key,
+ params.para_id,
+ )
+ .await;
+
+ let mut import_notifications = match params.relay_client.import_notification_stream().await
+ {
+ Ok(s) => s,
+ Err(err) => {
+ tracing::error!(
+ target: crate::LOG_TARGET,
+ ?err,
+ "Failed to initialize consensus: no relay chain import notification stream"
+ );
+
+ return;
+ }
+ };
+
+ let mut collator = {
+ let params = collator_util::Params {
+ create_inherent_data_providers: params.create_inherent_data_providers.clone(),
+ block_import: params.block_import,
+ relay_client: params.relay_client.clone(),
+ keystore: params.keystore.clone(),
+ para_id: params.para_id,
+ proposer: params.proposer,
+ collator_service: params.collator_service,
+ };
+
+ collator_util::Collator::::new(params)
+ };
+
+ // If we move forward without marking the value as unchanged,
+ // the channel will assume that the value has already changed
+ // in a different step than the one we want (inside containerChainSpawner)
+ // and will not kill the lookahead collator properly if it was already running.
+ if let Some(end_lookahead_receiver) = &mut params.end_lookahead_receiver {
+ end_lookahead_receiver.mark_unchanged();
+ }
+
+ while let Some(relay_parent_header) = import_notifications.next().await {
+ if let Some(end_lookahead_receiver) = &mut params.end_lookahead_receiver {
+ // If the value of the channel has changed, it means that
+ // containerChainSpawner has informed that we need to tear down
+ // this consensus task, meaning that we don't need to spawn the
+ // lookahead collator twice.
+ if let Ok(true) = end_lookahead_receiver.has_changed() {
+ log::info!("Lookahead collator was already running! Exiting...");
+ return;
+ }
+ }
+
+ let relay_parent = relay_parent_header.hash();
+
+ if !is_para_scheduled(relay_parent, params.para_id, &mut params.overseer_handle).await {
+ tracing::trace!(
+ target: crate::LOG_TARGET,
+ ?relay_parent,
+ ?params.para_id,
+ "Para is not scheduled on any core, skipping import notification",
+ );
+
+ continue;
+ }
+
+ let max_pov_size = match params
+ .relay_client
+ .persisted_validation_data(
+ relay_parent,
+ params.para_id,
+ OccupiedCoreAssumption::Included,
+ )
+ .await
+ {
+ Ok(None) => continue,
+ Ok(Some(pvd)) => pvd.max_pov_size,
+ Err(err) => {
+ tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
+ continue;
+ }
+ };
+
+ let parent_search_params = ParentSearchParams {
+ relay_parent,
+ para_id: params.para_id,
+ ancestry_lookback: max_ancestry_lookback(relay_parent, ¶ms.relay_client).await,
+ max_depth: PARENT_SEARCH_DEPTH,
+ ignore_alternative_branches: true,
+ };
+
+ let potential_parents =
+ cumulus_client_consensus_common::find_potential_parents::(
+ parent_search_params,
+ &*params.para_backend,
+ ¶ms.relay_client,
+ )
+ .await;
+
+ let mut potential_parents = match potential_parents {
+ Err(e) => {
+ tracing::error!(
+ target: crate::LOG_TARGET,
+ ?relay_parent,
+ err = ?e,
+ "Could not fetch potential parents to build upon"
+ );
+
+ continue;
+ }
+ Ok(x) => x,
+ };
+
+ let included_block = match potential_parents.iter().find(|x| x.depth == 0) {
+ None => continue, // also serves as an `is_empty` check.
+ Some(b) => b.hash,
+ };
+
+ let para_client = &*params.para_client;
+ let keystore = ¶ms.keystore;
+ let can_build_upon = |slot_now, block_hash, aux_data| {
+ can_build_upon::<_, _, P>(
+ slot_now,
+ aux_data,
+ block_hash,
+ included_block,
+ params.force_authoring,
+ para_client,
+ &keystore,
+ )
+ };
+
+ // Sort by depth, ascending, to choose the longest chain.
+ //
+ // If the longest chain has space, build upon that. Otherwise, don't
+ // build at all.
+ potential_parents.sort_by_key(|a| a.depth);
+ let initial_parent = match potential_parents.pop() {
+ None => continue,
+ Some(p) => p,
+ };
+
+ // Build in a loop until not allowed. Note that the authorities can change
+ // at any block, so we need to re-claim our slot every time.
+ let mut parent_hash = initial_parent.hash;
+ let mut parent_header = initial_parent.header;
+ let overseer_handle = &mut params.overseer_handle;
+
+ // This needs to change to support elastic scaling, but for continuously
+ // scheduled chains this ensures that the backlog will grow steadily.
+ for n_built in 0..2 {
+ let validation_data = PersistedValidationData {
+ parent_head: parent_header.encode().into(),
+ relay_parent_number: *relay_parent_header.number(),
+ relay_parent_storage_root: *relay_parent_header.state_root(),
+ max_pov_size,
+ };
+
+ // Retrieve authorities that are able to produce the block
+ let aux_data = match params
+ .get_orchestrator_aux_data
+ .retrieve_authorities_from_orchestrator(
+ parent_hash,
+ (relay_parent_header.hash(), validation_data.clone()),
+ )
+ .await
+ {
+ Err(e) => {
+ tracing::error!(target: crate::LOG_TARGET, ?e);
+ break;
+ }
+ Ok(h) => h,
+ };
+
+ let inherent_providers = match params
+ .create_inherent_data_providers
+ .create_inherent_data_providers(
+ parent_hash,
+ (relay_parent_header.hash(), validation_data.clone()),
+ )
+ .await
+ {
+ Err(e) => {
+ tracing::error!(target: crate::LOG_TARGET, ?e);
+ break;
+ }
+ Ok(h) => h,
+ };
+
+ let mut slot_claim = match can_build_upon(
+ inherent_providers.slot(),
+ parent_header.clone(),
+ aux_data,
+ )
+ .await
+ {
+ Ok(None) => break,
+ Err(e) => {
+ tracing::error!(target: crate::LOG_TARGET, ?e);
+ break;
+ }
+ Ok(Some(c)) => c,
+ };
+
+ tracing::debug!(
+ target: crate::LOG_TARGET,
+ ?relay_parent,
+ unincluded_segment_len = initial_parent.depth + n_built,
+ "Slot claimed. Building"
+ );
+
+ // Build and announce collations recursively until
+ // `can_build_upon` fails or building a collation fails.
+ let (parachain_inherent_data, other_inherent_data) = match collator
+ .create_inherent_data(relay_parent, &validation_data, parent_hash, None)
+ .await
+ {
+ Err(err) => {
+ tracing::error!(target: crate::LOG_TARGET, ?err);
+ break;
+ }
+ Ok(x) => x,
+ };
+
+ let validation_code_hash = match params.code_hash_provider.code_hash_at(parent_hash)
+ {
+ None => {
+ tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
+ break;
+ }
+ Some(v) => v,
+ };
+
+ match collator
+ .collate(
+ &parent_header,
+ &mut slot_claim,
+ None,
+ (parachain_inherent_data, other_inherent_data),
+ params.authoring_duration,
+ // Set the block limit to 50% of the maximum PoV size.
+ //
+ // TODO: If we got benchmarking that includes the proof size,
+ // we should be able to use the maximum pov size.
+ (validation_data.max_pov_size / 2) as usize,
+ )
+ .await
+ {
+ Ok(Some((collation, block_data, new_block_hash))) => {
+ // Here we are assuming that the import logic protects against equivocations
+ // and provides sybil-resistance, as it should.
+ collator
+ .collator_service()
+ .announce_block(new_block_hash, None);
+
+ // Send a submit-collation message to the collation generation subsystem,
+ // which then distributes this to validators.
+ //
+ // Here we are assuming that the leaf is imported, as we've gotten an
+ // import notification.
+ overseer_handle
+ .send_msg(
+ CollationGenerationMessage::SubmitCollation(
+ SubmitCollationParams {
+ relay_parent,
+ collation,
+ parent_head: parent_header.encode().into(),
+ validation_code_hash,
+ result_sender: None,
+ },
+ ),
+ "SubmitCollation",
+ )
+ .await;
+
+ parent_hash = new_block_hash;
+ parent_header = block_data.into_header();
+ }
+ Ok(None) => {
+ tracing::debug!(target: crate::LOG_TARGET, "Lookahead collator: No block proposal");
+ }
+ Err(err) => {
+ tracing::error!(target: crate::LOG_TARGET, ?err);
+ break;
+ }
+ }
+ }
+ }
+ }
+}
+
+// Checks if we own the slot at the given block and whether there
+// is space in the unincluded segment.
+async fn can_build_upon(
+ slot: Slot,
+ aux_data: OrchestratorAuraWorkerAuxData,
+ parent_header: Block::Header,
+ included_block: Block::Hash,
+ force_authoring: bool,
+ client: &Client,
+ keystore: &KeystorePtr,
+) -> Result