From d9387c9709dc2d45ed5b8d707840f72ae670d27d Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Fri, 16 Jun 2023 16:58:18 +0300 Subject: [PATCH] approval-distribution: process messages while waiting after approval-voting In, the current implementation every time we process an assignment or an approval that needs checking in the approval voting, we will wait till approval-voting answers the message. Given that approval-voting will execute some signatures checks that take significant time(between 400us and 1 millis) per message, that's where most of the time in the approval-distribution, see https://github.com/paritytech/polkadot/issues/6608#issuecomment-1590942235 for the numbers. So, modify approval-distribution, so that it picks another message from the queue while the approval-voting is busy doing it's work. This will have a few benefits: 1. Better pipelinening of the messages, approval-voting will always have work to do and it won't have to wait for the approval-distribution to send it a message. Additionally, some of the works of the approval-distribution will be executed in parallel with work in approval-voting instead of serially. 2. By allowing approval-distribution to process messages from it's queue while approval-voting confirms that a message is valid we give the approval-distribution the ability to build a better view about what messages other peers already know, so it won't decide to gossip messages to some of it's peers once we confirm that message as being correct. 3. It opens the door for other optimizations in approval-voting subsystem, which would still be the bottleneck. Note! I still expect the amount of work the combo of this two systems can do, to still be bounded by the numbers of signatures checks it has to do, so we would have to stack this with other optimizations we have in the queue. - https://github.com/paritytech/polkadot/issues/6608 - https://github.com/paritytech/polkadot/issues/6831 [] Evaluate impact in versi [] Cleanup code an make CI happy to make the PR meargeable. Signed-off-by: Alexandru Gheorghe --- node/network/approval-distribution/src/lib.rs | 554 +++++++++++++----- 1 file changed, 417 insertions(+), 137 deletions(-) diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index 79aa090a140f..bffb2c6606c1 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -20,7 +20,12 @@ #![warn(missing_docs)] -use futures::{channel::oneshot, select, FutureExt as _}; +use futures::{ + channel::oneshot::{self, Canceled}, + select, + stream::FuturesUnordered, + Future, FutureExt as _, StreamExt, +}; use polkadot_node_jaeger as jaeger; use polkadot_node_network_protocol::{ self as net_protocol, @@ -45,7 +50,8 @@ use polkadot_primitives::{ use rand::{CryptoRng, Rng, SeedableRng}; use std::{ collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque}, - time::Duration, + pin::Pin, + time::{Duration, Instant}, }; use self::metrics::Metrics; @@ -194,6 +200,35 @@ struct State { /// Aggregated reputation change reputation: ReputationAggregator, + /// Approval votes check + answers_from_approval_voting: FuturesUnordered< + Pin> + Send>>, + >, +} + +#[derive(Debug, Clone)] +enum ApprovalVotingResponse { + ApprovalCheck(ApprovalVotingMetadata), + AssignmentCheck(AssignmentMetadata), +} + +#[derive(Debug, Clone)] +struct AssignmentMetadata { + result: AssignmentCheckResult, + assignment: IndirectAssignmentCert, + message_subject: MessageSubject, + message_kind: MessageKind, + peer_id: PeerId, + claimed_candidate_index: CandidateIndex, +} + +#[derive(Debug, Clone)] +struct ApprovalVotingMetadata { + vote: IndirectSignedApprovalVote, + message_subject: MessageSubject, + message_kind: MessageKind, + peer_id: PeerId, + result: ApprovalCheckResult, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -268,10 +303,16 @@ struct BlockEntry { parent_hash: Hash, /// Our knowledge of messages. knowledge: Knowledge, + /// Messages sent to approval voting that we are waiting to be verfied. + waiting_to_be_verified: Knowledge, /// A votes entry for each candidate indexed by [`CandidateIndex`]. candidates: Vec, /// The session index of this block. session: SessionIndex, + /// Store messages that arrive from different peer while the signature was checked in approval voting + maybe_needs_checking: HashMap>, + /// Store messages that arrive from different peer while the signature was checked in approval voting + maybe_needs_checking_assignments: HashMap>, } #[derive(Debug)] @@ -426,6 +467,9 @@ impl State { knowledge: Knowledge::default(), candidates, session: meta.session, + maybe_needs_checking: Default::default(), + waiting_to_be_verified: Default::default(), + maybe_needs_checking_assignments: Default::default(), }); self.topologies.inc_session_refs(meta.session); @@ -837,6 +881,26 @@ impl State { return } + // The approval is in process of being verified, nothing to do here, we don't want to check it multiple times + // just mark that the peer knew about it, so we don't send it to him again + if entry.waiting_to_be_verified.contains(&message_subject, message_kind) { + gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "approval is pending verification"); + if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { + peer_knowledge.received.insert(message_subject.clone(), message_kind); + } + + // Store this for the situation where the signature turns out it was invalid, so check it from a different peer. + entry + .maybe_needs_checking_assignments + .entry(message_subject) + .or_default() + .push(( + peer_id, + PendingMessage::Assignment(assignment, claimed_candidate_index), + )); + return + } + let (tx, rx) = oneshot::channel(); ctx.send_message(ApprovalVotingMessage::CheckAndImportAssignment( @@ -845,87 +909,24 @@ impl State { tx, )) .await; - - let timer = metrics.time_awaiting_approval_voting(); - let result = match rx.await { - Ok(result) => result, - Err(_) => { - gum::debug!(target: LOG_TARGET, "The approval voting subsystem is down"); - return - }, - }; - drop(timer); - - gum::trace!( - target: LOG_TARGET, - ?source, - ?message_subject, - ?result, - "Checked assignment", - ); - match result { - AssignmentCheckResult::Accepted => { - modify_reputation( - &mut self.reputation, - ctx.sender(), - peer_id, - BENEFIT_VALID_MESSAGE_FIRST, - ) - .await; - entry.knowledge.known_messages.insert(message_subject.clone(), message_kind); - if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { - peer_knowledge.received.insert(message_subject.clone(), message_kind); - } - }, - AssignmentCheckResult::AcceptedDuplicate => { - // "duplicate" assignments aren't necessarily equal. - // There is more than one way each validator can be assigned to each core. - // cf. https://github.com/paritytech/polkadot/pull/2160#discussion_r557628699 - if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { - peer_knowledge.received.insert(message_subject.clone(), message_kind); - } - gum::debug!( - target: LOG_TARGET, - hash = ?block_hash, - ?peer_id, - "Got an `AcceptedDuplicate` assignment", - ); - return - }, - AssignmentCheckResult::TooFarInFuture => { - gum::debug!( - target: LOG_TARGET, - hash = ?block_hash, - ?peer_id, - "Got an assignment too far in the future", - ); - modify_reputation( - &mut self.reputation, - ctx.sender(), - peer_id, - COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE, - ) - .await; - return - }, - AssignmentCheckResult::Bad(error) => { - gum::info!( - target: LOG_TARGET, - hash = ?block_hash, - ?peer_id, - %error, - "Got a bad assignment from peer", - ); - modify_reputation( - &mut self.reputation, - ctx.sender(), - peer_id, - COST_INVALID_MESSAGE, - ) - .await; - return - }, + entry.waiting_to_be_verified.insert(message_subject.clone(), message_kind); + + let message_subj_clone = message_subject.clone(); + let await_future = async move { + let result = rx.await?; + Ok(ApprovalVotingResponse::AssignmentCheck(AssignmentMetadata { + message_subject: message_subj_clone, + message_kind, + peer_id, + result, + assignment, + claimed_candidate_index, + })) } + .boxed(); + self.answers_from_approval_voting.push(await_future); + + return } else { if !entry.knowledge.insert(message_subject.clone(), message_kind) { // if we already imported an assignment, there is no need to distribute it again @@ -947,8 +948,164 @@ impl State { // Invariant: to our knowledge, none of the peers except for the `source` know about the assignment. metrics.on_assignment_imported(); - let topology = self.topologies.get_topology(entry.session); + self.circulate_assignment( + ctx, + source, + assignment, + claimed_candidate_index, + message_subject, + message_kind, + rng, + ) + .await; + } + + async fn received_assignment_answer( + &mut self, + ctx: &mut Context, + asignment_metadata: AssignmentMetadata, + ) -> Option { + let message_subject = &asignment_metadata.message_subject; + let message_kind = asignment_metadata.message_kind; + let peer_id = asignment_metadata.peer_id; + let block_hash = asignment_metadata.assignment.block_hash; + let entry = match self.blocks.get_mut(&block_hash) { + Some(entry) => entry, + // TODO: should not happen + None => return None, + }; + gum::trace!( + target: LOG_TARGET, + ?peer_id, + ?message_subject, + ?asignment_metadata.result, + "Checked assignment", + ); + + match asignment_metadata.result { + AssignmentCheckResult::Accepted => { + modify_reputation( + &mut self.reputation, + ctx.sender(), + peer_id, + BENEFIT_VALID_MESSAGE_FIRST, + ) + .await; + entry.knowledge.known_messages.insert(message_subject.clone(), message_kind); + if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { + peer_knowledge.received.insert(message_subject.clone(), message_kind); + } + entry.maybe_needs_checking_assignments.remove(message_subject); + Some(asignment_metadata) + }, + AssignmentCheckResult::AcceptedDuplicate => { + // "duplicate" assignments aren't necessarily equal. + // There is more than one way each validator can be assigned to each core. + // cf. https://github.com/paritytech/polkadot/pull/2160#discussion_r557628699 + if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { + peer_knowledge.received.insert(message_subject.clone(), message_kind); + } + gum::debug!( + target: LOG_TARGET, + hash = ?block_hash, + ?peer_id, + "Got an `AcceptedDuplicate` assignment", + ); + return None + }, + AssignmentCheckResult::TooFarInFuture => { + gum::debug!( + target: LOG_TARGET, + hash = ?block_hash, + ?peer_id, + "Got an assignment too far in the future", + ); + modify_reputation( + &mut self.reputation, + ctx.sender(), + peer_id, + COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE, + ) + .await; + return None + }, + AssignmentCheckResult::Bad(error) => { + gum::info!( + target: LOG_TARGET, + hash = ?block_hash, + ?peer_id, + %error, + "Got a bad assignment from peer", + ); + modify_reputation( + &mut self.reputation, + ctx.sender(), + peer_id, + COST_INVALID_MESSAGE, + ) + .await; + // Check if we received some other approval while we were checking this one. If yes, then try to see if that turn out to be valid + if let Some((peer_id, assignment)) = entry + .maybe_needs_checking_assignments + .get_mut(message_subject) + .and_then(|val| val.pop()) + { + match assignment { + PendingMessage::Assignment(assignment, claimed_candidate_index) => { + let (tx, rx) = oneshot::channel(); + + ctx.send_message(ApprovalVotingMessage::CheckAndImportAssignment( + assignment.clone(), + claimed_candidate_index, + tx, + )) + .await; + + let assignment_clone = assignment.clone(); + let message_subj_clone = message_subject.clone(); + let await_future = async move { + let result = rx.await?; + Ok(ApprovalVotingResponse::AssignmentCheck(AssignmentMetadata { + assignment: assignment_clone, + message_subject: message_subj_clone, + message_kind, + peer_id, + result, + claimed_candidate_index, + })) + } + .boxed(); + self.answers_from_approval_voting.push(await_future); + }, + _ => {}, + } + } + return None + }, + } + } + + async fn circulate_assignment( + &mut self, + ctx: &mut Context, + source: MessageSource, + assignment: IndirectAssignmentCert, + claimed_candidate_index: CandidateIndex, + message_subject: MessageSubject, + message_kind: MessageKind, + rng: &mut (impl CryptoRng + Rng), + ) { let local = source == MessageSource::Local; + let block_hash = assignment.block_hash; + let validator_index = assignment.validator; + let entry = match self.blocks.get_mut(&block_hash) { + Some(entry) => entry, + None => { + // TODO: should not happen + return + }, + }; + let topology = self.topologies.get_topology(entry.session); let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| { t.local_grid_neighbors().required_routing_by_index(validator_index, local) @@ -1164,60 +1321,47 @@ impl State { return } - let (tx, rx) = oneshot::channel(); + // The approval is in process of being verified, nothing to do here, we don't want to check it multiple times + // just mark that the peer knew about it, so we don't send it to him again + if entry.waiting_to_be_verified.contains(&message_subject, message_kind) { + gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "approval is pending verification"); + if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { + peer_knowledge.received.insert(message_subject.clone(), message_kind); + } + + // Store this for the situation where the signature turns out it was invalid, so check it from a different peer. + entry + .maybe_needs_checking + .entry(message_subject) + .or_default() + .push((peer_id, PendingMessage::Approval(vote))); + return + } + entry.waiting_to_be_verified.insert(message_subject.clone(), message_kind); + let (tx, rx) = oneshot::channel(); ctx.send_message(ApprovalVotingMessage::CheckAndImportApproval(vote.clone(), tx)) .await; - let timer = metrics.time_awaiting_approval_voting(); - let result = match rx.await { - Ok(result) => result, - Err(_) => { - gum::debug!(target: LOG_TARGET, "The approval voting subsystem is down"); - return - }, - }; - drop(timer); - - gum::trace!( - target: LOG_TARGET, - ?peer_id, - ?message_subject, - ?result, - "Checked approval", - ); - match result { - ApprovalCheckResult::Accepted => { - modify_reputation( - &mut self.reputation, - ctx.sender(), - peer_id, - BENEFIT_VALID_MESSAGE_FIRST, - ) - .await; - - entry.knowledge.insert(message_subject.clone(), message_kind); - if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { - peer_knowledge.received.insert(message_subject.clone(), message_kind); - } - }, - ApprovalCheckResult::Bad(error) => { - modify_reputation( - &mut self.reputation, - ctx.sender(), - peer_id, - COST_INVALID_MESSAGE, - ) - .await; - gum::info!( - target: LOG_TARGET, - ?peer_id, - %error, - "Got a bad approval from peer", - ); - return - }, + let vote_clone = vote.clone(); + let message_subj_clone = message_subject.clone(); + let await_future = async move { + let result = rx.await?; + Ok(ApprovalVotingResponse::ApprovalCheck(ApprovalVotingMetadata { + vote: vote_clone, + message_subject: message_subj_clone, + message_kind, + peer_id, + result, + })) } + .boxed(); + self.answers_from_approval_voting.push(await_future); + return + // let result = self.answers_from_approval_voting.next().await; + // if self.received_approval_voting_answer(ctx, result).await.is_none() { + // return + // } } else { if !entry.knowledge.insert(message_subject.clone(), message_kind) { // if we already imported an approval, there is no need to distribute it again @@ -1239,6 +1383,121 @@ impl State { // Invariant: to our knowledge, none of the peers except for the `source` know about the approval. metrics.on_approval_imported(); + // Dispatch a ApprovalDistributionV1Message::Approval(vote) + // to all peers required by the topology, with the exception of the source peer. + self.circulate_approval(ctx, source, message_subject, message_kind, vote).await; + } + + async fn received_approval_voting_answer( + &mut self, + ctx: &mut Context, + result: ApprovalVotingMetadata, + ) -> Option { + let message_subject = &result.message_subject; + let message_kind = result.message_kind; + let peer_id = result.peer_id; + + gum::trace!( + target: LOG_TARGET, + ?peer_id, + ?message_subject, + ?result, + "Checked approval", + ); + + let entry = match self.blocks.get_mut(&result.vote.block_hash) { + Some(entry) if entry.candidates.get(result.vote.candidate_index as usize).is_some() => + entry, + _ => return None, + }; + + match result.result { + ApprovalCheckResult::Accepted => { + modify_reputation( + &mut self.reputation, + ctx.sender(), + peer_id, + BENEFIT_VALID_MESSAGE_FIRST, + ) + .await; + + entry.knowledge.insert(message_subject.clone(), message_kind); + if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) { + peer_knowledge.received.insert(message_subject.clone(), message_kind); + } + + entry.maybe_needs_checking.remove(message_subject); + Some(result) + }, + ApprovalCheckResult::Bad(error) => { + modify_reputation( + &mut self.reputation, + ctx.sender(), + peer_id, + COST_INVALID_MESSAGE, + ) + .await; + gum::info!( + target: LOG_TARGET, + ?peer_id, + %error, + "Got a bad approval from peer", + ); + // Check if we received some other approval while we were checking this one. If yes, then try to see if that turn out to be valid + if let Some((peer_id, vote)) = + entry.maybe_needs_checking.get_mut(message_subject).and_then(|val| val.pop()) + { + match vote { + PendingMessage::Approval(vote) => { + let (tx, rx) = oneshot::channel(); + + ctx.send_message(ApprovalVotingMessage::CheckAndImportApproval( + vote.clone(), + tx, + )) + .await; + let vote_clone = vote.clone(); + let message_subj_clone = message_subject.clone(); + let await_future = async move { + let result = rx.await?; + Ok(ApprovalVotingResponse::ApprovalCheck(ApprovalVotingMetadata { + vote: vote_clone, + message_subject: message_subj_clone, + message_kind, + peer_id, + result, + })) + } + .boxed(); + self.answers_from_approval_voting.push(await_future); + }, + _ => {}, + } + } + None + }, + } + } + + async fn circulate_approval( + &mut self, + ctx: &mut Context, + source: MessageSource, + message_subject: MessageSubject, + message_kind: MessageKind, + vote: IndirectSignedApprovalVote, + ) { + let block_hash = vote.block_hash; + let validator_index = vote.validator; + let candidate_index = vote.candidate_index; + + let entry = match self.blocks.get_mut(&block_hash) { + Some(entry) if entry.candidates.get(candidate_index as usize).is_some() => entry, + _ => return, + }; + + // Dispatch a ApprovalDistributionV1Message::Approval(vote) + // to all peers required by the topology, with the exception of the source peer. let required_routing = match entry.candidates.get_mut(candidate_index as usize) { Some(candidate_entry) => { // set the approval state for validator_index to Approved @@ -1297,9 +1556,6 @@ impl State { }, }; - // Dispatch a ApprovalDistributionV1Message::Approval(vote) - // to all peers required by the topology, with the exception of the source peer. - let topology = self.topologies.get_topology(entry.session); let source_peer = source.peer_id(); @@ -1814,6 +2070,7 @@ impl ApprovalDistribution { return }, }; + match message { FromOrchestra::Communication { msg } => Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await, @@ -1830,13 +2087,36 @@ impl ApprovalDistribution { state.spans.insert(head, approval_distribution_span); } }, - FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { - gum::trace!(target: LOG_TARGET, number = %number, "finalized signal"); + FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, number)) => { + gum::debug!(target: LOG_TARGET, number = %number, "finalized signal {:?}", hash); state.handle_block_finalized(&mut ctx, &self.metrics, number).await; }, FromOrchestra::Signal(OverseerSignal::Conclude) => return, } }, + result = state.answers_from_approval_voting.select_next_some() => { + match result { + Ok(ApprovalVotingResponse::ApprovalCheck(result)) => { + if let Some(result) = state.received_approval_voting_answer(&mut ctx, result).await { + state.circulate_approval(&mut ctx, MessageSource::Peer(result.peer_id), + result.message_subject.clone(), result.message_kind, result.vote.clone()).await; + } else { + println!("WTF just happened") + } + }, + Ok(ApprovalVotingResponse::AssignmentCheck(result)) => { + if let Some(result) = state.received_assignment_answer(&mut ctx, result).await { + state.circulate_assignment( + &mut ctx, MessageSource::Peer(result.peer_id), result.assignment, + result.claimed_candidate_index, result.message_subject, result.message_kind, rng).await; + } + + }, + Err(_) => { + gum::debug!(target: LOG_TARGET, "The approval voting subsystem is down"); + }, + } + } } } }