diff --git a/validator_client/src/duties_service/sync.rs b/validator_client/src/duties_service/sync.rs index 4c6e77a9d07..b66baaadd02 100644 --- a/validator_client/src/duties_service/sync.rs +++ b/validator_client/src/duties_service/sync.rs @@ -9,11 +9,27 @@ use types::{ ChainSpec, Epoch, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId, }; +/// Number of epochs in advance to compute selection proofs. +pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2; + +/// Top-level data-structure containing sync duty information. +/// +/// This data is structured as a series of nested `HashMap`s wrapped in `RwLock`s. Fine-grained +/// locking is used to provide maximum concurrency for the different services reading and writing. +/// +/// Deadlocks are prevented by: +/// +/// 1. Hierarchical locking. It is impossible to lock an inner lock (e.g. `validators`) without +/// first locking its parent. +/// 2. One-at-a-time locking. For the innermost locks on the aggregator duties, all of the functions +/// in this file take care to only lock one validator at a time. We never hold a lock while +/// trying to obtain another one (hence no lock ordering issues). pub struct SyncDutiesMap { /// Map from sync committee period to duties for members of that sync committee. committees: RwLock>, } +/// Duties for a single sync committee period. #[derive(Default)] pub struct CommitteeDuties { /// Map from validator index to validator duties. @@ -25,19 +41,30 @@ pub struct CommitteeDuties { validators: RwLock>>, } +/// Duties for a single validator. pub struct ValidatorDuties { /// The sync duty: including validator sync committee indices & pubkey. duty: SyncDuty, + /// The aggregator duties: cached selection proofs for upcoming epochs. + aggregation_duties: AggregatorDuties, +} + +/// Aggregator duties for a single validator. +pub struct AggregatorDuties { + /// The epoch up to which aggregation proofs have already been computed (inclusive). + pre_compute_epoch: RwLock>, /// Map from slot & subnet ID to proof that this validator is an aggregator. /// /// The slot is the slot at which the signed contribution and proof should be broadcast, /// which is 1 less than the slot for which the `duty` was computed. - aggregation_proofs: RwLock>, + proofs: RwLock>, } -/// Duties for a single slot. +/// Duties for multiple validators, for a single slot. +/// +/// This type is returned to the sync service. pub struct SlotDuties { - /// List of duties for all sync committee members at this slot + /// List of duties for all sync committee members at this slot. /// /// Note: this is intentionally NOT split by subnet so that we only sign /// one `SyncCommitteeMessage` per validator (recall a validator may be part of multiple @@ -56,6 +83,7 @@ impl Default for SyncDutiesMap { } impl SyncDutiesMap { + /// Check if duties are already known for all of the given validators for `committee_period`. pub fn all_duties_known(&self, committee_period: u64, validator_indices: &[u64]) -> bool { self.committees .read() @@ -68,6 +96,56 @@ impl SyncDutiesMap { }) } + /// Prepare for pre-computation of selection proofs for `committee_period`. + /// + /// Return the epoch up to which proofs should be pre-computed, as well as a vec of + /// `(previous_pre_compute_epoch, sync_duty)` pairs for all validators which need to have proofs + /// computed. See `fill_in_aggregation_proofs` for the actual calculation. + pub fn prepare_for_aggregator_pre_compute( + &self, + committee_period: u64, + current_epoch: Epoch, + spec: &ChainSpec, + ) -> (Epoch, Vec<(Epoch, SyncDuty)>) { + let default_start_epoch = + std::cmp::max(current_epoch, first_epoch_of_period(committee_period, spec)); + let pre_compute_epoch = std::cmp::min( + current_epoch + AGGREGATION_PRE_COMPUTE_EPOCHS, + last_epoch_of_period(committee_period, spec), + ); + + let pre_compute_duties = self.committees.read().get(&committee_period).map_or_else( + Vec::new, + |committee_duties| { + let validator_duties = committee_duties.validators.read(); + validator_duties + .values() + .filter_map(|maybe_duty| { + let duty = maybe_duty.as_ref()?; + let old_pre_compute_epoch = duty + .aggregation_duties + .pre_compute_epoch + .write() + .replace(pre_compute_epoch); + + match old_pre_compute_epoch { + // No proofs pre-computed previously, compute all from the start of + // the period or the current epoch (whichever is later). + None => Some((default_start_epoch, duty.duty.clone())), + // Proofs computed up to `prev`, start from the subsequent epoch. + Some(prev) if prev < pre_compute_epoch => { + Some((prev + 1, duty.duty.clone())) + } + // Proofs already known, no need to compute. + _ => None, + } + }) + .collect() + }, + ); + (pre_compute_epoch, pre_compute_duties) + } + pub fn get_or_create_committee_duties<'a, 'b>( &'a self, committee_period: u64, @@ -87,6 +165,9 @@ impl SyncDutiesMap { ) } + /// Get duties for all validators for the given `wall_clock_slot`. + /// + /// This is the entry-point for the sync committee service. pub fn get_duties_for_slot( &self, wall_clock_slot: Slot, @@ -121,7 +202,7 @@ impl SyncDutiesMap { .for_each(|(validator_duty, subnet_ids)| { duties.push(validator_duty.duty.clone()); - let proofs = validator_duty.aggregation_proofs.read(); + let proofs = validator_duty.aggregation_duties.proofs.read(); for subnet_id in subnet_ids { if let Some(proof) = proofs.get(&(wall_clock_slot, subnet_id)) { @@ -139,6 +220,13 @@ impl SyncDutiesMap { aggregators, }) } + + /// Prune duties for past sync committee periods from the map. + pub fn prune(&self, current_sync_committee_period: u64) { + self.committees + .write() + .retain(|period, _| *period >= current_sync_committee_period) + } } impl CommitteeDuties { @@ -156,7 +244,10 @@ impl ValidatorDuties { fn new(duty: SyncDuty) -> Self { Self { duty, - aggregation_proofs: RwLock::new(HashMap::new()), + aggregation_duties: AggregatorDuties { + pre_compute_epoch: RwLock::new(None), + proofs: RwLock::new(HashMap::new()), + }, } } } @@ -166,6 +257,14 @@ fn epoch_offset(spec: &ChainSpec) -> u64 { spec.epochs_per_sync_committee_period.as_u64() / 2 } +fn first_epoch_of_period(sync_committee_period: u64, spec: &ChainSpec) -> Epoch { + spec.epochs_per_sync_committee_period * sync_committee_period +} + +fn last_epoch_of_period(sync_committee_period: u64, spec: &ChainSpec) -> Epoch { + first_epoch_of_period(sync_committee_period + 1, spec) - 1 +} + pub async fn poll_sync_committee_duties( duties_service: &Arc>, ) -> Result<(), Error> { @@ -191,7 +290,7 @@ pub async fn poll_sync_committee_duties( let local_pubkeys = duties_service.local_pubkeys(); let local_indices = duties_service.local_indices(&local_pubkeys); - // If duties aren't known for the current period, poll for them + // If duties aren't known for the current period, poll for them. if !sync_duties.all_duties_known(current_sync_committee_period, &local_indices) { poll_sync_committee_duties_for_period( duties_service, @@ -199,6 +298,29 @@ pub async fn poll_sync_committee_duties( current_sync_committee_period, ) .await?; + + // Prune previous duties (we avoid doing this too often as it locks the whole map). + sync_duties.prune(current_sync_committee_period); + } + + // Pre-compute aggregator selection proofs for the current period. + let (current_pre_compute_epoch, new_pre_compute_duties) = sync_duties + .prepare_for_aggregator_pre_compute(current_sync_committee_period, current_epoch, spec); + + if !new_pre_compute_duties.is_empty() { + let sub_duties_service = duties_service.clone(); + duties_service.context.executor.spawn_blocking( + move || { + fill_in_aggregation_proofs( + sub_duties_service, + &new_pre_compute_duties, + current_sync_committee_period, + current_epoch, + current_pre_compute_epoch, + ) + }, + "duties_service_sync_selection_proofs", + ); } // If we're past the point in the current period where we should determine duties for the next @@ -212,6 +334,34 @@ pub async fn poll_sync_committee_duties( next_sync_committee_period, ) .await?; + + // Prune (this is the main code path for updating duties, so we should almost always hit + // this prune). + sync_duties.prune(current_sync_committee_period); + } + + // Pre-compute aggregator selection proofs for the next period. + if (current_epoch + AGGREGATION_PRE_COMPUTE_EPOCHS).sync_committee_period(spec)? + == next_sync_committee_period + { + let (pre_compute_epoch, new_pre_compute_duties) = sync_duties + .prepare_for_aggregator_pre_compute(next_sync_committee_period, current_epoch, spec); + + if !new_pre_compute_duties.is_empty() { + let sub_duties_service = duties_service.clone(); + duties_service.context.executor.spawn_blocking( + move || { + fill_in_aggregation_proofs( + sub_duties_service, + &new_pre_compute_duties, + next_sync_committee_period, + current_epoch, + pre_compute_epoch, + ) + }, + "duties_service_sync_selection_proofs", + ); + } } Ok(()) @@ -263,76 +413,68 @@ pub async fn poll_sync_committee_duties_for_period "this could be due to a really long re-org, or a bug" - ); - } - updated_due_to_reorg - }); - - if updated { - info!( + let mut validator_writer = committee_duties.validators.write(); + for duty in duties { + let validator_duties = validator_writer + .get_mut(&duty.validator_index) + .ok_or(Error::SyncDutiesNotFound(duty.validator_index))?; + + let updated = validator_duties.as_ref().map_or(true, |existing_duties| { + let updated_due_to_reorg = existing_duties.duty.validator_sync_committee_indices + != duty.validator_sync_committee_indices; + if updated_due_to_reorg { + warn!( log, - "Validator in sync committee"; - "validator_index" => duty.validator_index, - "sync_committee_period" => sync_committee_period, + "Sync committee duties changed"; + "message" => "this could be due to a really long re-org, or a bug" ); - - updated_validators.push(duty.clone()); - *validator_duties = Some(ValidatorDuties::new(duty)); } + updated_due_to_reorg + }); + + if updated { + info!( + log, + "Validator in sync committee"; + "validator_index" => duty.validator_index, + "sync_committee_period" => sync_committee_period, + ); + + *validator_duties = Some(ValidatorDuties::new(duty)); } } - // Spawn background task to fill in aggregator selection proofs. - let sub_duties_service = duties_service.clone(); - duties_service.context.executor.spawn_blocking( - move || { - fill_in_aggregation_proofs( - sub_duties_service, - &updated_validators, - sync_committee_period, - ) - }, - "duties_service_sync_selection_proofs", - ); - Ok(()) } pub fn fill_in_aggregation_proofs( duties_service: Arc>, - validators: &[SyncDuty], + pre_compute_duties: &[(Epoch, SyncDuty)], sync_committee_period: u64, + current_epoch: Epoch, + pre_compute_epoch: Epoch, ) { - let spec = &duties_service.spec; let log = duties_service.context.log(); - // Generate selection proofs for each validator at each slot, one epoch at a time - let start_epoch = spec.epochs_per_sync_committee_period * sync_committee_period; - let end_epoch = start_epoch + spec.epochs_per_sync_committee_period; + debug!( + log, + "Calculating sync selection proofs"; + "period" => sync_committee_period, + "current_epoch" => current_epoch, + "pre_compute_epoch" => pre_compute_epoch + ); - for epoch in (start_epoch.as_u64()..end_epoch.as_u64()).map(Epoch::new) { + // Generate selection proofs for each validator at each slot, one epoch at a time. + for epoch in (current_epoch.as_u64()..=pre_compute_epoch.as_u64()).map(Epoch::new) { // Generate proofs. - let validator_proofs: Vec<(u64, Vec<_>)> = validators + let validator_proofs: Vec<(u64, Vec<_>)> = pre_compute_duties .iter() - .filter_map(|duty| { + .filter_map(|(validator_start_epoch, duty)| { + // Proofs are already known at this epoch for this validator. + if epoch < *validator_start_epoch { + return None; + } + let subnet_ids = duty .subnet_ids::() .map_err(|e| { @@ -378,7 +520,7 @@ pub fn fill_in_aggregation_proofs( .ok()?; if is_aggregator { - debug!( + info!( log, "Validator is sync aggregator"; "validator_index" => duty.validator_index, @@ -396,23 +538,41 @@ pub fn fill_in_aggregation_proofs( }) .collect(); - // Add to global storage (we add regularly in case the proofs are required). - let committee_duties = duties_service.sync_duties.get_or_create_committee_duties( - sync_committee_period, - validator_proofs.iter().map(|(index, _)| index), - ); - let validators_reader = committee_duties.validators.read(); + // Add to global storage (we add regularly so the proofs can be used ASAP). + let sync_map = duties_service.sync_duties.committees.read(); + let committee_duties = if let Some(duties) = sync_map.get(&sync_committee_period) { + duties + } else { + debug!( + log, + "Missing sync duties"; + "period" => sync_committee_period, + ); + continue; + }; + let validators = committee_duties.validators.read(); + let num_validators_updated = validator_proofs.len(); for (validator_index, proofs) in validator_proofs { - if let Some(Some(duty)) = validators_reader.get(&validator_index) { - duty.aggregation_proofs.write().extend(proofs); + if let Some(Some(duty)) = validators.get(&validator_index) { + duty.aggregation_duties.proofs.write().extend(proofs); } else { debug!( log, "Missing sync duty to update"; "validator_index" => validator_index, + "period" => sync_committee_period, ); } } + + if num_validators_updated > 0 { + debug!( + log, + "Finished computing sync selection proofs"; + "epoch" => epoch, + "updated_validators" => num_validators_updated, + ); + } } }