Skip to content

Commit

Permalink
Prune slashing protection DB (#2194)
Browse files Browse the repository at this point in the history
## Proposed Changes

Prune the slashing protection database so that it doesn't exhibit unbounded growth. Prune by dropping attestations and blocks from more than 512 epochs ago, relying on the guards that prevent signing messages with slots or epochs less than the minimum recorded in the DB.

The pruning process is potentially time consuming, so it's scheduled to run only every 512 epochs, in the last 2/3rds of a slot. This gives it at least 4 seconds to run without impacting other signing, which I think should be sufficient. I've seen it run for several minutes (yikes!) on our Pyrmont nodes, but I suspect that 1) this will only occur on the first run when the database is still huge 2) no other production users will be impacted because they don't have enough validators per node.

Pruning also happens at start-up, as I figured this is a fairly infrequent event, and if a user is experiencing problems with the VC related to pruning, it's nice to be able to trigger it with a quick restart. Users are also conditioned to not mind missing a few attestations during a restart.

We need to include a note in the release notes that users may see the message `timed out waiting for connection` the first time they prune a huge database, but that this is totally fine and to be expected (the VC will miss those attestations in the meantime).

I'm also open to making this opt-in for now, although the sooner we get users doing it, the less painful it will be: prune early, prune often!
  • Loading branch information
michaelsproul committed Feb 24, 2021
1 parent 2f077b1 commit afd4786
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 22 deletions.
3 changes: 2 additions & 1 deletion validator_client/slashing_protection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ impl From<SQLError> for NotSafe {

impl From<r2d2::Error> for NotSafe {
fn from(error: r2d2::Error) -> Self {
NotSafe::SQLPoolError(format!("{:?}", error))
// Use `Display` impl to print "timed out waiting for connection"
NotSafe::SQLPoolError(format!("{}", error))
}
}

Expand Down
82 changes: 63 additions & 19 deletions validator_client/slashing_protection/src/slashing_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,15 +662,16 @@ impl SlashingDatabase {
)?;
}

// Prune attestations less than the min source and target from this interchange file.
// See the rationale for blocks above.
if let Some((new_min_source, new_min_target)) = record
// Prune attestations less than the min target from this interchange file.
// See the rationale for blocks above, and the doc comment for `prune_signed_attestations`
// for why we don't need to separately prune for the min source.
if let Some(new_min_target) = record
.signed_attestations
.iter()
.map(|attestation| (attestation.source_epoch, attestation.target_epoch))
.map(|attestation| attestation.target_epoch)
.min()
{
self.prune_signed_attestations(&record.pubkey, new_min_source, new_min_target, txn)?;
self.prune_signed_attestations(&record.pubkey, new_min_target, txn)?;
}

let summary = self.validator_summary(&record.pubkey, txn)?;
Expand Down Expand Up @@ -754,7 +755,7 @@ impl SlashingDatabase {
}

/// Remove all blocks for `public_key` with slots less than `new_min_slot`.
pub fn prune_signed_blocks(
fn prune_signed_blocks(
&self,
public_key: &PublicKey,
new_min_slot: Slot,
Expand All @@ -764,39 +765,82 @@ impl SlashingDatabase {

txn.execute(
"DELETE FROM signed_blocks
WHERE validator_id = ?1 AND slot < ?2",
WHERE
validator_id = ?1 AND
slot < ?2 AND
slot < (SELECT MAX(slot)
FROM signed_blocks
WHERE validator_id = ?1)",
params![validator_id, new_min_slot],
)?;

Ok(())
}

/// Remove all attestations for `public_key` with
/// `(source, target) < (new_min_source, new_min_target)`.
pub fn prune_signed_attestations(
/// Prune the signed blocks table for the given public keys.
pub fn prune_all_signed_blocks<'a>(
&self,
mut public_keys: impl Iterator<Item = &'a PublicKey>,
new_min_slot: Slot,
) -> Result<(), NotSafe> {
let mut conn = self.conn_pool.get()?;
let txn = conn.transaction()?;
public_keys.try_for_each(|pubkey| self.prune_signed_blocks(pubkey, new_min_slot, &txn))?;
txn.commit()?;
Ok(())
}

/// Remove all attestations for `public_key` with `target < new_min_target`.
///
/// Pruning every attestation with target less than `new_min_target` also has the effect of
/// making the new minimum source the source of the attestation with `target == new_min_target`
/// (if any exists). This is exactly what's required for pruning after importing an interchange
/// file, whereby we want to update the new minimum source to the min source from the
/// interchange.
///
/// If the `new_min_target` was plucked out of thin air and doesn't necessarily correspond to
/// an extant attestation then this function is still safe. It will never delete *all* the
/// attestations in the database.
fn prune_signed_attestations(
&self,
public_key: &PublicKey,
new_min_source: Epoch,
new_min_target: Epoch,
txn: &Transaction,
) -> Result<(), NotSafe> {
let validator_id = self.get_validator_id_in_txn(txn, public_key)?;

// Delete attestations with source *and* target less than the minimums.
// Assuming `(new_min_source, new_min_target)` was successfully
// inserted into the database, then any other attestation in the database
// can't have just its source or just its target less than the new minimum.
// I.e. the following holds:
// a.source < new_min_source <--> a.target < new_min_target
// The following holds:
// a.target < new_min_target --> a.source <= new_min_source
//
// The `MAX(target_epoch)` acts as a guard to prevent accidentally clearing the DB.
txn.execute(
"DELETE FROM signed_attestations
WHERE validator_id = ?1 AND source_epoch < ?2 AND target_epoch < ?3",
params![validator_id, new_min_source, new_min_target],
WHERE
validator_id = ?1 AND
target_epoch < ?2 AND
target_epoch < (SELECT MAX(target_epoch)
FROM signed_attestations
WHERE validator_id = ?1)",
params![validator_id, new_min_target],
)?;

Ok(())
}

/// Prune the signed attestations table for the given validator keys.
pub fn prune_all_signed_attestations<'a>(
&self,
mut public_keys: impl Iterator<Item = &'a PublicKey>,
new_min_target: Epoch,
) -> Result<(), NotSafe> {
let mut conn = self.conn_pool.get()?;
let txn = conn.transaction()?;
public_keys
.try_for_each(|pubkey| self.prune_signed_attestations(pubkey, new_min_target, &txn))?;
txn.commit()?;
Ok(())
}

pub fn num_validator_rows(&self) -> Result<u32, NotSafe> {
let mut conn = self.conn_pool.get()?;
let txn = conn.transaction()?;
Expand Down
31 changes: 31 additions & 0 deletions validator_client/src/attestation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {
);
});

// Schedule pruning of the slashing protection database once all unaggregated
// attestations have (hopefully) been signed, i.e. at the same time as aggregate
// production.
self.spawn_slashing_protection_pruning_task(slot, aggregate_production_instant);

Ok(())
}

Expand Down Expand Up @@ -566,6 +571,32 @@ impl<T: SlotClock + 'static, E: EthSpec> AttestationService<T, E> {

Ok(())
}

/// Spawn a blocking task to run the slashing protection pruning process.
///
/// Start the task at `pruning_instant` to avoid interference with other tasks.
fn spawn_slashing_protection_pruning_task(&self, slot: Slot, pruning_instant: Instant) {
let attestation_service = self.clone();
let executor = self.inner.context.executor.clone();
let current_epoch = slot.epoch(E::slots_per_epoch());

// Wait for `pruning_instant` in a regular task, and then switch to a blocking one.
self.inner.context.executor.spawn(
async move {
sleep_until(pruning_instant).await;

executor.spawn_blocking(
move || {
attestation_service
.validator_store
.prune_slashing_protection_db(current_epoch, false)
},
"slashing_protection_pruning",
)
},
"slashing_protection_pre_pruning",
);
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions validator_client/src/http_metrics/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ lazy_static::lazy_static! {
"Duration to perform attestation service tasks",
&["task"]
);
pub static ref SLASHING_PROTECTION_PRUNE_TIMES: Result<Histogram> = try_create_histogram(
"vc_slashing_protection_prune_times_seconds",
"Time required to prune the slashing protection DB",
);
pub static ref BLOCK_SERVICE_TIMES: Result<HistogramVec> = try_create_histogram_vec(
"vc_beacon_block_service_task_times_seconds",
"Duration to perform beacon block service tasks",
Expand Down
7 changes: 7 additions & 0 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,13 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
"voting_validators" => validator_store.num_voting_validators()
);

// Perform pruning of the slashing protection database on start-up. In case the database is
// oversized from having not been pruned (by a prior version) we don't want to prune
// concurrently, as it will hog the lock and cause the attestation service to spew CRITs.
if let Some(slot) = slot_clock.now() {
validator_store.prune_slashing_protection_db(slot.epoch(T::slots_per_epoch()), true);
}

let duties_service = DutiesServiceBuilder::new()
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
Expand Down
72 changes: 70 additions & 2 deletions validator_client/src/validator_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::{
fork_service::ForkService, http_metrics::metrics, initialized_validators::InitializedValidators,
};
use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString};
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use slashing_protection::{NotSafe, Safe, SlashingDatabase};
use slog::{crit, error, warn, Logger};
use slog::{crit, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::path::Path;
use std::sync::Arc;
Expand All @@ -15,6 +15,11 @@ use types::{
};
use validator_dir::ValidatorDir;

/// Number of epochs of slashing protection history to keep.
///
/// This acts as a maximum safe-guard against clock drift.
const SLASHING_PROTECTION_HISTORY_EPOCHS: u64 = 512;

struct LocalValidator {
validator_dir: ValidatorDir,
voting_keypair: Keypair,
Expand Down Expand Up @@ -44,6 +49,7 @@ impl PartialEq for LocalValidator {
pub struct ValidatorStore<T, E: EthSpec> {
validators: Arc<RwLock<InitializedValidators>>,
slashing_protection: SlashingDatabase,
slashing_protection_last_prune: Arc<Mutex<Epoch>>,
genesis_validators_root: Hash256,
spec: Arc<ChainSpec>,
log: Logger,
Expand All @@ -63,6 +69,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
Self {
validators: Arc::new(RwLock::new(validators)),
slashing_protection,
slashing_protection_last_prune: Arc::new(Mutex::new(Epoch::new(0))),
genesis_validators_root,
spec: Arc::new(spec),
log,
Expand Down Expand Up @@ -359,4 +366,65 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
&self.spec,
))
}

/// Prune the slashing protection database so that it remains performant.
///
/// This function will only do actual pruning periodically, so it should usually be
/// cheap to call. The `first_run` flag can be used to print a more verbose message when pruning
/// runs.
pub fn prune_slashing_protection_db(&self, current_epoch: Epoch, first_run: bool) {
// Attempt to prune every SLASHING_PROTECTION_HISTORY_EPOCHs, with a tolerance for
// missing the epoch that aligns exactly.
let mut last_prune = self.slashing_protection_last_prune.lock();
if current_epoch / SLASHING_PROTECTION_HISTORY_EPOCHS
<= *last_prune / SLASHING_PROTECTION_HISTORY_EPOCHS
{
return;
}

if first_run {
info!(
self.log,
"Pruning slashing protection DB";
"epoch" => current_epoch,
"msg" => "pruning may take several minutes the first time it runs"
);
} else {
info!(self.log, "Pruning slashing protection DB"; "epoch" => current_epoch);
}

let _timer = metrics::start_timer(&metrics::SLASHING_PROTECTION_PRUNE_TIMES);

let new_min_target_epoch = current_epoch.saturating_sub(SLASHING_PROTECTION_HISTORY_EPOCHS);
let new_min_slot = new_min_target_epoch.start_slot(E::slots_per_epoch());

let validators = self.validators.read();
if let Err(e) = self
.slashing_protection
.prune_all_signed_attestations(validators.iter_voting_pubkeys(), new_min_target_epoch)
{
error!(
self.log,
"Error during pruning of signed attestations";
"error" => ?e,
);
return;
}

if let Err(e) = self
.slashing_protection
.prune_all_signed_blocks(validators.iter_voting_pubkeys(), new_min_slot)
{
error!(
self.log,
"Error during pruning of signed blocks";
"error" => ?e,
);
return;
}

*last_prune = current_epoch;

info!(self.log, "Completed pruning of slashing protection DB");
}
}

0 comments on commit afd4786

Please sign in to comment.