-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it is heading in the right direction. We should run Versi burn-ins after addressing some of the comments, especially the aggregation interval. I am interested in seeing the effects on the tx
bridge queue size.
} | ||
|
||
pub fn update(&mut self, peer_id: PeerId, rep: Rep) { | ||
if matches!(rep, Rep::Malicious(_)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should handle this case in the modify_reputation
function and keep the aggregator simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean?
We flush the reputation aggregator outside of the state, so I saw only one way to mark a case when we need immediately send reputation — set the flag in the aggregator and check it in the outer loop.
mut reputation_delay: &mut Fuse<futures_timer::Delay>, | ||
reputation_interval: &std::time::Duration, | ||
) -> bool { | ||
if reputation_interval.is_zero() || reputation.overflow() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to use select!
to handle message receive and timer futures in the subsystem main loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok if the subsystem handles in one loop the whole process alongside with the reputation change?
return true | ||
} | ||
|
||
select! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks clunky, you wouldn't need it if you implement select! in main loop.
) { | ||
for (&peer_id, &score) in reputation.by_peer() { | ||
sender | ||
.send_message(NetworkBridgeTxMessage::ReportPeer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also consider sending the hashmap directly to the network bridge in a new message type ReportPeers, to avoid the overhead of sending many small messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.
peer_id, | ||
net_protocol::ReputationChange::new( | ||
score, | ||
"Reputation changed during approval-distribution", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging individual reputation changes messages with trace! would be the better alternative. Anyway, a better description would be aggregated reputation change
@@ -1662,20 +1747,27 @@ impl ApprovalDistribution { | |||
|
|||
async fn run<Context>(self, ctx: Context) { | |||
let mut state = State::default(); | |||
let mut reputation = ReputationAggregator::new(); | |||
let reputation_interval = std::time::Duration::from_millis(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very low, likely no aggregation will actually happen, except maybe under load. I was thinking something like 30s to start with. We should do some Versi burn-ins to gauge the effectiveness of the aggregation.
} | ||
} | ||
|
||
async fn handle_incoming<Context>( | ||
ctx: &mut Context, | ||
state: &mut State, | ||
reputation: &mut ReputationAggregator, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd have the ReputationAggregator
part of the State
to avoid an extra argument to the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we even don't need to use extra abstraction and can just hold the hashmap with aggregated reputations in the State?
81092a7
to
1576e2c
Compare
// us anything to do with this relay-parent anyway. | ||
let _ = state.per_relay_parent.insert( | ||
// Will run if no futures are immediately ready | ||
default => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks conceptually wrong to me, as the loop would get stuck in the ctx.recv() until there is a message coming frm the outside. In this specific case, this happens every 6s at least (active leaves) or sooner due to peer messages.
We should remove the default
here and write something like, so we poll both futures without blocking in waiting for a specific one.
message = ctx.recv().fuse() => {
match message {
......
jaeger::PerLeafSpan::new(activated.span, "approval-distribution"); | ||
state.spans.insert(head, approval_distribution_span); | ||
// Will run if no futures are immediately ready | ||
default => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
} | ||
|
||
/// Adds reputation change to inner state, | ||
/// сhecks if the change is dangerous, sends all collected changes in a batch if it is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should only send the malicious rep change not the entire set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable,but then we're going to send hashmaps instead of a single message so we will need to prepare a hashmap only for a malicious change. Is it ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can keep current message for the malicious ones and add another to send them in bulk
let current = match self.by_peer.get(&peer_id) { | ||
Some(v) => *v, | ||
None => 0, | ||
}; | ||
let new_value = current.saturating_add(rep.cost_or_benefit()); | ||
self.by_peer.insert(peer_id, new_value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would look much better and simpler with entry api
. Something like:
let cost = rep.cost_or_benefit();
self.by_peer.entry(peer_id)
.and_modify(|old_rep| *old_rep = *old_rep.saturating_add(cost))
.or_insert(cost)
f6cbaec
to
a7f9e88
Compare
rep: UnifiedReputationChange, | ||
) { | ||
if (self.send_immediately_if)(rep) { | ||
self.single_send(sender, peer_id, rep).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense instead of sending a single message here to just do an add and self.send
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was originally, but then Andrei asked to change it to current behavior.
#7214 (comment)
} | ||
|
||
fn add(&mut self, peer_id: PeerId, rep: UnifiedReputationChange) { | ||
add_reputation(&mut self.by_peer, peer_id, rep) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that one thing that will be slightly different with this approach where we aggregate the reputation change will be here:https://github.com/paritytech/substrate/blob/master/client/network/src/peer_store.rs#L161
. However, since our flush interval is relatively small 30s, I think we should be fine.
dda4119
to
6c9b0c1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with some minor nits.
node/network/bridge/src/tx/mod.rs
Outdated
gum::debug!(target: LOG_TARGET, ?peer, ?rep, action = "ReportPeer"); | ||
} | ||
|
||
metrics.on_report_event(); | ||
network_service.report_peer(peer, rep); | ||
}, | ||
NetworkBridgeTxMessage::ReportPeer(ReportPeerMessage::Batch(batch)) => { | ||
let reports: Vec<(PeerId, ReputationChange)> = batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this intermediate Vec
if we can just iterate over the original hash table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work @AndreiEres ! Let's burn this in on Versi before merging!
} | ||
|
||
metrics.on_report_event(); | ||
network_service.report_peer(peer, rep); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this batching helps, then we can likely gain more my pushing the batch even further. network_service
by itself also again just forwards messages - might make sense to introduce a batch type there as well up until to the actual worker applying the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our case it helps mostly by putting less pressure on the network bridge tx channel -> less times subsystems block when sending to it. I agree that the batching can be even more improved up to the network service layer. I'd say that is the next step after merging this. @AndreiEres can you create a ticket for this please ?
bot merge |
Error: Statuses failed for 99beef0 |
This reverts commit 6c9b0c1.
Co-authored-by: Vsevolod Stakhov <[email protected]>
99beef0
to
de9e834
Compare
bot merge |
Problem
Nodes are sending numerous peer reputation changes, which is overloading or blocking the channels.
Hypothesis
We can address this issue by aggregating reputation changes and sending them to the network bridge in one batch every 30 seconds. However, there are certain changes that still require immediate reporting, so we will send those as individual messages right away.
Results
To test the hypothesis, we changed the sending of reputation changes in group A validators and compared the results with groups B, C, and D.
Group A sends changes in one batch, while others send them in individual messages. Thanos stopped working for some time, so there is no data at the end of the chartsIn the left charts we see a huge difference in the number of messages sent. Group A sends about 200 times fewer messages.
However, the channel load on the right charts is almost unchanged. The peaks in group A are about 15% smaller.
Related
Fixes #7203