Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Dispute spam protection (#4134)
Browse files Browse the repository at this point in the history
* Mostly notes.

* Better error messages.

* Introduce Fatal/NonFatal + drop back channel participation

- Fatal/NonFatal - in order to make it easier to use utility functions.
- We drop the back channel in dispute participation as it won't be
needed any more.

* Better error messages.

* Utility function for receiving `CandidateEvent`s.

* Ordering module typechecks.

* cargo fmt

* Prepare spam slots module.

* Implement SpamSlots mechanism.

* Implement queues.

* cargo fmt

* Participation.

* Participation taking shape.

* Finish participation.

* cargo fmt

* Cleanup.

* WIP: Cleanup + Integration.

* Make `RollingSessionWindow` initialized by default.

* Make approval voting typecheck.

* Get rid of lazy_static & fix approval voting tests

* Move `SessionWindowSize` to node primitives.

* Implement dispute coordinator initialization.

* cargo fmt

* Make queues return error instead of boolean.

* Initialized: WIP

* Introduce chain api for getting finalized block.

* Fix ordering to only prune candidates on finalized events.

* Pruning of old sessions in spam slots.

* New import logic.

* Make everything typecheck.

* Fix warnings.

* Get rid of obsolete dispute-participation.

* Fixes.

* Add back accidentelly deleted Cargo.lock

* Deliver disputes in an ordered fashion.

* Add module docs for errors

* Use type synonym.

* hidden docs.

* Fix overseer tests.

* Ordering provider taking `CandidateReceipt`.

... To be kicked on one next commit.

* Fix ordering to use relay_parent

as included block is not unique per candidate.

* Add comment in ordering.rs.

* Take care of duplicate entries in queues.

* Better spam slots.

* Review remarks + docs.

* Fix db tests.

* Participation tests.

* Also scrape votes on first leaf for good measure.

* Make tests typecheck.

* Spelling.

* Only participate in actual disputes, not on every import.

* Don't account backing votes to spam slots.

* Fix more tests.

* Don't participate if we don't have keys.

* Fix tests, typos and warnings.

* Fix merge error.

* Spelling fixes.

* Add missing docs.

* Queue tests.

* More tests.

* Add metrics + don't short circuit import.

* Basic test for ordering provider.

* Import fix.

* Remove dead link.

* One more dead link.

Co-authored-by: Lldenaurois <[email protected]>
  • Loading branch information
eskimor and Lldenaurois authored Nov 19, 2021
1 parent 9f059fb commit d488955
Show file tree
Hide file tree
Showing 45 changed files with 4,073 additions and 2,595 deletions.
19 changes: 2 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ members = [
"node/core/chain-api",
"node/core/chain-selection",
"node/core/dispute-coordinator",
"node/core/dispute-participation",
"node/core/parachains-inherent",
"node/core/provisioner",
"node/core/pvf",
Expand Down
36 changes: 21 additions & 15 deletions node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct ImportedBlockInfo {
}

struct ImportedBlockInfoEnv<'a> {
session_window: &'a RollingSessionWindow,
session_window: &'a Option<RollingSessionWindow>,
assignment_criteria: &'a (dyn AssignmentCriteria + Send + Sync),
keystore: &'a LocalKeystore,
}
Expand Down Expand Up @@ -133,7 +133,11 @@ async fn imported_block_info(
Err(_) => return Ok(None),
};

if env.session_window.earliest_session().map_or(true, |e| session_index < e) {
if env
.session_window
.as_ref()
.map_or(true, |s| session_index < s.earliest_session())
{
tracing::debug!(
target: LOG_TARGET,
"Block {} is from ancient session {}. Skipping",
Expand Down Expand Up @@ -180,7 +184,8 @@ async fn imported_block_info(
}
};

let session_info = match env.session_window.session_info(session_index) {
let session_info = match env.session_window.as_ref().and_then(|s| s.session_info(session_index))
{
Some(s) => s,
None => {
tracing::debug!(
Expand Down Expand Up @@ -324,7 +329,7 @@ pub(crate) async fn handle_new_head(
}
};

match state.session_window.cache_session_info_for_head(ctx, head).await {
match state.cache_session_info_for_head(ctx, head).await {
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
Expand All @@ -335,7 +340,7 @@ pub(crate) async fn handle_new_head(

return Ok(Vec::new())
},
Ok(a @ SessionWindowUpdate::Advanced { .. }) => {
Ok(Some(a @ SessionWindowUpdate::Advanced { .. })) => {
tracing::info!(
target: LOG_TARGET,
update = ?a,
Expand Down Expand Up @@ -431,8 +436,9 @@ pub(crate) async fn handle_new_head(

let session_info = state
.session_window
.session_info(session_index)
.expect("imported_block_info requires session to be available; qed");
.as_ref()
.and_then(|s| s.session_info(session_index))
.expect("imported_block_info requires session info to be available; qed");

let (block_tick, no_show_duration) = {
let block_tick = slot_number_to_tick(state.slot_duration_millis, slot);
Expand Down Expand Up @@ -608,7 +614,7 @@ pub(crate) mod tests {

fn blank_state() -> State {
State {
session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
session_window: None,
keystore: Arc::new(LocalKeystore::in_memory()),
slot_duration_millis: 6_000,
clock: Box::new(MockClock::default()),
Expand All @@ -618,11 +624,11 @@ pub(crate) mod tests {

fn single_session_state(index: SessionIndex, info: SessionInfo) -> State {
State {
session_window: RollingSessionWindow::with_session_info(
session_window: Some(RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
index,
vec![info],
),
)),
..blank_state()
}
}
Expand Down Expand Up @@ -740,7 +746,7 @@ pub(crate) mod tests {
let header = header.clone();
Box::pin(async move {
let env = ImportedBlockInfoEnv {
session_window: &session_window,
session_window: &Some(session_window),
assignment_criteria: &MockAssignmentCriteria,
keystore: &LocalKeystore::in_memory(),
};
Expand Down Expand Up @@ -849,7 +855,7 @@ pub(crate) mod tests {
let header = header.clone();
Box::pin(async move {
let env = ImportedBlockInfoEnv {
session_window: &session_window,
session_window: &Some(session_window),
assignment_criteria: &MockAssignmentCriteria,
keystore: &LocalKeystore::in_memory(),
};
Expand Down Expand Up @@ -942,7 +948,7 @@ pub(crate) mod tests {
.collect::<Vec<_>>();

let test_fut = {
let session_window = RollingSessionWindow::new(APPROVAL_SESSIONS);
let session_window = None;

let header = header.clone();
Box::pin(async move {
Expand Down Expand Up @@ -1037,11 +1043,11 @@ pub(crate) mod tests {
.map(|(r, c, g)| (r.hash(), r.clone(), *c, *g))
.collect::<Vec<_>>();

let session_window = RollingSessionWindow::with_session_info(
let session_window = Some(RollingSessionWindow::with_session_info(
APPROVAL_SESSIONS,
session,
vec![session_info],
);
));

let header = header.clone();
Box::pin(async move {
Expand Down
35 changes: 30 additions & 5 deletions node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ use polkadot_node_subsystem::{
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
rolling_session_window::RollingSessionWindow,
rolling_session_window::{
new_session_window_size, RollingSessionWindow, SessionWindowSize, SessionWindowUpdate,
SessionsUnavailable,
},
TimeoutExt,
};
use polkadot_primitives::v1::{
Expand Down Expand Up @@ -92,7 +95,8 @@ use crate::{
#[cfg(test)]
mod tests;

const APPROVAL_SESSIONS: SessionIndex = 6;
pub const APPROVAL_SESSIONS: SessionWindowSize = new_session_window_size!(6);

const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
const APPROVAL_CACHE_SIZE: usize = 1024;
const TICK_TOO_FAR_IN_FUTURE: Tick = 20; // 10 seconds.
Expand Down Expand Up @@ -568,7 +572,7 @@ impl CurrentlyCheckingSet {
}

struct State {
session_window: RollingSessionWindow,
session_window: Option<RollingSessionWindow>,
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
clock: Box<dyn Clock + Send + Sync>,
Expand All @@ -577,9 +581,30 @@ struct State {

impl State {
fn session_info(&self, i: SessionIndex) -> Option<&SessionInfo> {
self.session_window.session_info(i)
self.session_window.as_ref().and_then(|w| w.session_info(i))
}

/// Bring `session_window` up to date.
pub async fn cache_session_info_for_head(
&mut self,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
head: Hash,
) -> Result<Option<SessionWindowUpdate>, SessionsUnavailable> {
let session_window = self.session_window.take();
match session_window {
None => {
self.session_window =
Some(RollingSessionWindow::new(ctx, APPROVAL_SESSIONS, head).await?);
Ok(None)
},
Some(mut session_window) => {
let r =
session_window.cache_session_info_for_head(ctx, head).await.map(Option::Some);
self.session_window = Some(session_window);
r
},
}
}
// Compute the required tranches for approval for this block and candidate combo.
// Fails if there is no approval entry for the block under the candidate or no candidate entry
// under the block, or if the session is out of bounds.
Expand Down Expand Up @@ -671,7 +696,7 @@ where
B: Backend,
{
let mut state = State {
session_window: RollingSessionWindow::new(APPROVAL_SESSIONS),
session_window: None,
keystore: subsystem.keystore,
slot_duration_millis: subsystem.slot_duration_millis,
clock,
Expand Down
24 changes: 24 additions & 0 deletions node/core/dispute-coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ struct MetricsInner {
votes: prometheus::CounterVec<prometheus::U64>,
/// Conclusion across all disputes.
concluded: prometheus::CounterVec<prometheus::U64>,
/// Number of participations that have been queued.
queued_participations: prometheus::CounterVec<prometheus::U64>,
}

/// Candidate validation metrics.
Expand Down Expand Up @@ -61,6 +63,18 @@ impl Metrics {
metrics.concluded.with_label_values(&["invalid"]).inc();
}
}

pub(crate) fn on_queued_priority_participation(&self) {
if let Some(metrics) = &self.0 {
metrics.queued_participations.with_label_values(&["priority"]).inc();
}
}

pub(crate) fn on_queued_best_effort_participation(&self) {
if let Some(metrics) = &self.0 {
metrics.queued_participations.with_label_values(&["best-effort"]).inc();
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -93,6 +107,16 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
queued_participations: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_dispute_participations",
"Total number of queued participations, grouped by priority and best-effort. (Not every queueing will necessarily lead to an actual participation because of duplicates.)",
),
&["priority"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
Expand Down
7 changes: 5 additions & 2 deletions node/core/dispute-coordinator/src/real/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use polkadot_primitives::v1::{CandidateHash, SessionIndex};

use std::collections::HashMap;

use super::db::v1::{CandidateVotes, RecentDisputes};
use super::{
db::v1::{CandidateVotes, RecentDisputes},
error::FatalResult,
};

#[derive(Debug)]
pub enum BackendWriteOp {
Expand All @@ -53,7 +56,7 @@ pub trait Backend {

/// Atomically writes the list of operations, with later operations taking precedence over
/// prior.
fn write<I>(&mut self, ops: I) -> SubsystemResult<()>
fn write<I>(&mut self, ops: I) -> FatalResult<()>
where
I: IntoIterator<Item = BackendWriteOp>;
}
Expand Down
19 changes: 9 additions & 10 deletions node/core/dispute-coordinator/src/real/db/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ use std::sync::Arc;
use kvdb::{DBTransaction, KeyValueDB};
use parity_scale_codec::{Decode, Encode};

use crate::{
real::{
backend::{Backend, BackendWriteOp, OverlayedBackend},
DISPUTE_WINDOW,
},
DisputeStatus,
use crate::real::{
backend::{Backend, BackendWriteOp, OverlayedBackend},
error::{Fatal, FatalResult},
status::DisputeStatus,
DISPUTE_WINDOW,
};

const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes";
Expand Down Expand Up @@ -72,7 +71,7 @@ impl Backend for DbBackend {

/// Atomically writes the list of operations, with later operations taking precedence over
/// prior.
fn write<I>(&mut self, ops: I) -> SubsystemResult<()>
fn write<I>(&mut self, ops: I) -> FatalResult<()>
where
I: IntoIterator<Item = BackendWriteOp>,
{
Expand All @@ -98,7 +97,7 @@ impl Backend for DbBackend {
}
}

self.inner.write(tx).map_err(Into::into)
self.inner.write(tx).map_err(Fatal::DbWriteFailed)
}
}

Expand Down Expand Up @@ -214,7 +213,7 @@ pub(crate) fn note_current_session(
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
current_session: SessionIndex,
) -> SubsystemResult<()> {
let new_earliest = current_session.saturating_sub(DISPUTE_WINDOW);
let new_earliest = current_session.saturating_sub(DISPUTE_WINDOW.get());
match overlay_db.load_earliest_session()? {
None => {
// First launch - write new-earliest.
Expand Down Expand Up @@ -421,7 +420,7 @@ mod tests {

let prev_earliest_session = 0;
let new_earliest_session = 5;
let current_session = 5 + DISPUTE_WINDOW;
let current_session = 5 + DISPUTE_WINDOW.get();

let very_old = 3;
let slightly_old = 4;
Expand Down
Loading

0 comments on commit d488955

Please sign in to comment.