Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Delay reputation updates (#7214)
Browse files Browse the repository at this point in the history
* Add futures-timer

* Make cost_or_benefit public

* Update ReportPeer message format

* Add delay to reputation updates (dirtywork)

* Update ReputationAggregator

* Update tests

* Fix flucky tests

* Move reputation to state

* Use the main loop for handling reputation sendings

* Update

* Move reputation to utils

* Update reputation sending

* Fix arguments order

* Update state

* Remove new from state

* Add constant

* Add failing test for delay

* Change mocking approach

* Fix type errors

* Fix comments

* Add message handling to select

* Fix bitfields-distribution tests

* Add docs to reputation aggregator

* Replace .into_base_rep

* Use one REPUTATION_CHANGE_INTERVAL by default

* Add reputation change to statement-distribution

* Update polkadot-availability-bitfield-distribution

* Update futures selecting in subsystems

* Update reputation adding

* Send malicious changes right away without adding to state

* Add reputation to StatementDistributionSubsystem

* Handle reputation in statement distribution

* Add delay test for polkadot-statement-distribution

* Fix collator-protocol tests before applying reputation delay

* Remove into_base_rep

* Add reputation to State

* Fix failed tests

* Add reputation delay

* Update tests

* Add batched network message for peer reporting

* Update approval-distribution tests

* Update bitfield-distribution tests

* Update statement-distribution tests

* Update collator-protocol tests

* Remove levels in matching

* Address clippy errors

* Fix overseer test

* Add a metric for original count of rep changes

* Update Reputation

* Revert "Add a metric for original count of rep changes"

This reverts commit 6c9b0c1.

* Update node/subsystem-util/src/reputation.rs

Co-authored-by: Vsevolod Stakhov <[email protected]>

* Remove redundant vec

---------

Co-authored-by: Vsevolod Stakhov <[email protected]>
  • Loading branch information
AndreiEres and vstakhov authored Jun 15, 2023
1 parent 0e614b0 commit 2d840ff
Show file tree
Hide file tree
Showing 27 changed files with 2,223 additions and 797 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion node/network/approval-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ polkadot-node-metrics = { path = "../../metrics" }
polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-jaeger = { path = "../../jaeger" }
rand = "0.8"

futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }

[dev-dependencies]
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }

polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
polkadot-primitives-test-helpers = { path = "../../../primitives/test-helpers" }

Expand Down
192 changes: 147 additions & 45 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

#![warn(missing_docs)]

use futures::{channel::oneshot, FutureExt as _};
use futures::{channel::oneshot, select, FutureExt as _};
use polkadot_node_jaeger as jaeger;
use polkadot_node_network_protocol::{
self as net_protocol,
Expand All @@ -38,11 +38,15 @@ use polkadot_node_subsystem::{
},
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_util::reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL};
use polkadot_primitives::{
BlockNumber, CandidateIndex, Hash, SessionIndex, ValidatorIndex, ValidatorSignature,
};
use rand::{CryptoRng, Rng, SeedableRng};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque};
use std::{
collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque},
time::Duration,
};

use self::metrics::Metrics;

Expand Down Expand Up @@ -187,6 +191,9 @@ struct State {

/// Current approval checking finality lag.
approval_checking_lag: BlockNumber,

/// Aggregated reputation change
reputation: ReputationAggregator,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -755,7 +762,13 @@ impl State {
"Unexpected assignment",
);
if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
}
}
return
Expand All @@ -780,7 +793,13 @@ impl State {
?message_subject,
"Duplicate assignment",
);
modify_reputation(ctx.sender(), peer_id, COST_DUPLICATE_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
}
return
}
Expand All @@ -792,13 +811,25 @@ impl State {
?message_subject,
"Assignment from a peer is out of view",
);
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
},
}

// if the assignment is known to be valid, reward the peer
if entry.knowledge.contains(&message_subject, message_kind) {
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE,
)
.await;
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known assignment");
peer_knowledge.received.insert(message_subject, message_kind);
Expand Down Expand Up @@ -834,7 +865,13 @@ impl State {
);
match result {
AssignmentCheckResult::Accepted => {
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE_FIRST).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE_FIRST,
)
.await;
entry.knowledge.known_messages.insert(message_subject.clone(), message_kind);
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.received.insert(message_subject.clone(), message_kind);
Expand Down Expand Up @@ -862,8 +899,13 @@ impl State {
?peer_id,
"Got an assignment too far in the future",
);
modify_reputation(ctx.sender(), peer_id, COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE)
.await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE,
)
.await;
return
},
AssignmentCheckResult::Bad(error) => {
Expand All @@ -874,7 +916,13 @@ impl State {
%error,
"Got a bad assignment from peer",
);
modify_reputation(ctx.sender(), peer_id, COST_INVALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_INVALID_MESSAGE,
)
.await;
return
},
}
Expand Down Expand Up @@ -1024,7 +1072,13 @@ impl State {
_ => {
if let Some(peer_id) = source.peer_id() {
if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
}
}
return
Expand All @@ -1043,7 +1097,13 @@ impl State {
?message_subject,
"Unknown approval assignment",
);
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
return
}

Expand All @@ -1060,7 +1120,13 @@ impl State {
"Duplicate approval",
);

modify_reputation(ctx.sender(), peer_id, COST_DUPLICATE_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
}
return
}
Expand All @@ -1072,14 +1138,26 @@ impl State {
?message_subject,
"Approval from a peer is out of view",
);
modify_reputation(ctx.sender(), peer_id, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_UNEXPECTED_MESSAGE,
)
.await;
},
}

// if the approval is known to be valid, reward the peer
if entry.knowledge.contains(&message_subject, message_kind) {
gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known approval");
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE,
)
.await;
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.received.insert(message_subject.clone(), message_kind);
}
Expand Down Expand Up @@ -1110,15 +1188,27 @@ impl State {
);
match result {
ApprovalCheckResult::Accepted => {
modify_reputation(ctx.sender(), peer_id, BENEFIT_VALID_MESSAGE_FIRST).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
BENEFIT_VALID_MESSAGE_FIRST,
)
.await;

entry.knowledge.insert(message_subject.clone(), message_kind);
if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
peer_knowledge.received.insert(message_subject.clone(), message_kind);
}
},
ApprovalCheckResult::Bad(error) => {
modify_reputation(ctx.sender(), peer_id, COST_INVALID_MESSAGE).await;
modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_INVALID_MESSAGE,
)
.await;
gum::info!(
target: LOG_TARGET,
?peer_id,
Expand Down Expand Up @@ -1669,6 +1759,7 @@ async fn adjust_required_routing_and_propagate<Context, BlockFilter, RoutingModi

/// Modify the reputation of a peer based on its behavior.
async fn modify_reputation(
reputation: &mut ReputationAggregator,
sender: &mut impl overseer::ApprovalDistributionSenderTrait,
peer_id: PeerId,
rep: Rep,
Expand All @@ -1679,8 +1770,7 @@ async fn modify_reputation(
?peer_id,
"Reputation change for peer",
);

sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer_id, rep)).await;
reputation.modify(sender, peer_id, rep).await;
}

#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
Expand All @@ -1696,45 +1786,57 @@ impl ApprovalDistribution {
// According to the docs of `rand`, this is a ChaCha12 RNG in practice
// and will always be chosen for strong performance and security properties.
let mut rng = rand::rngs::StdRng::from_entropy();
self.run_inner(ctx, &mut state, &mut rng).await
self.run_inner(ctx, &mut state, REPUTATION_CHANGE_INTERVAL, &mut rng).await
}

/// Used for testing.
async fn run_inner<Context>(
self,
mut ctx: Context,
state: &mut State,
reputation_interval: Duration,
rng: &mut (impl CryptoRng + Rng),
) {
let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
let mut reputation_delay = new_reputation_delay();

loop {
let message = match ctx.recv().await {
Ok(message) => message,
Err(e) => {
gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
return
select! {
_ = reputation_delay => {
state.reputation.send(ctx.sender()).await;
reputation_delay = new_reputation_delay();
},
};
match message {
FromOrchestra::Communication { msg } =>
Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await,
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
// the relay chain blocks relevant to the approval subsystems
// are those that are available, but not finalized yet
// actived and deactivated heads hence are irrelevant to this subsystem, other than
// for tracing purposes.
if let Some(activated) = update.activated {
let head = activated.hash;
let approval_distribution_span =
jaeger::PerLeafSpan::new(activated.span, "approval-distribution");
state.spans.insert(head, approval_distribution_span);
message = ctx.recv().fuse() => {
let message = match message {
Ok(message) => message,
Err(e) => {
gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
return
},
};
match message {
FromOrchestra::Communication { msg } =>
Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await,
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
// the relay chain blocks relevant to the approval subsystems
// are those that are available, but not finalized yet
// actived and deactivated heads hence are irrelevant to this subsystem, other than
// for tracing purposes.
if let Some(activated) = update.activated {
let head = activated.hash;
let approval_distribution_span =
jaeger::PerLeafSpan::new(activated.span, "approval-distribution");
state.spans.insert(head, approval_distribution_span);
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
state.handle_block_finalized(&mut ctx, &self.metrics, number).await;
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return,
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
state.handle_block_finalized(&mut ctx, &self.metrics, number).await;
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return,
}
}
}
Expand Down
Loading

0 comments on commit 2d840ff

Please sign in to comment.