Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Compress the PoV block before sending it over the network #2288

Merged
10 commits merged into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions node/network/collator-protocol/src/collator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,19 @@ async fn send_collation(
receipt: CandidateReceipt,
pov: PoV,
) {
let wire_message = protocol_v1::CollatorProtocolMessage::Collation(
request_id,
receipt,
pov,
);
let pov = match protocol_v1::CompressedPoV::from_pov(&pov) {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`",
);
return
}
};

let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov);

ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
Expand Down Expand Up @@ -1280,7 +1288,7 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => {
assert_eq!(req_id, request_id);
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
assert_eq!(pov.as_pov().unwrap(), pov_block);
}
);
}
Expand Down
28 changes: 21 additions & 7 deletions node/network/collator-protocol/src/validator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ async fn received_collation<Context>(
origin: PeerId,
request_id: RequestId,
receipt: CandidateReceipt,
pov: PoV,
pov: protocol_v1::CompressedPoV,
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
Expand All @@ -368,6 +368,21 @@ where
if let Some(per_request) = state.requests_info.remove(&id) {
let _ = per_request.received.send(());
if let Some(collator_id) = state.known_collators.get(&origin) {
let pov = match pov.as_pov() {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
%request_id,
?error,
"Failed to extract PoV",
);
return;
}
};

let _span = jaeger::pov_span(&pov, "received-collation");

tracing::debug!(
target: LOG_TARGET,
%request_id,
Expand Down Expand Up @@ -529,9 +544,8 @@ where
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
Collation(request_id, receipt, pov) => {
let _span1 = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent)
let _span = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent)
.map(|s| s.child("received-collation"));
let _span2 = jaeger::pov_span(&pov, "received-collation");
received_collation(ctx, state, origin, request_id, receipt, pov).await;
}
}
Expand Down Expand Up @@ -1295,9 +1309,9 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(
request_id,
candidate_a.clone(),
PoV {
protocol_v1::CompressedPoV::from_pov(&PoV {
block_data: BlockData(vec![]),
},
}).unwrap(),
)
)
)
Expand Down Expand Up @@ -1333,9 +1347,9 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(
request_id,
candidate_b.clone(),
PoV {
protocol_v1::CompressedPoV::from_pov(&PoV {
block_data: BlockData(vec![1, 2, 3]),
},
}).unwrap(),
)
)
)
Expand Down
65 changes: 47 additions & 18 deletions node/network/pov-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ struct State {
}

struct BlockBasedState {
known: HashMap<Hash, Arc<PoV>>,
known: HashMap<Hash, (Arc<PoV>, protocol_v1::CompressedPoV)>,

/// All the PoVs we are or were fetching, coupled with channels expecting the data.
///
Expand All @@ -131,11 +131,13 @@ fn awaiting_message(relay_parent: Hash, awaiting: Vec<Hash>)
)
}

fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV)
-> protocol_v1::ValidationProtocol
{
fn send_pov_message(
relay_parent: Hash,
pov_hash: Hash,
pov: &protocol_v1::CompressedPoV,
) -> protocol_v1::ValidationProtocol {
protocol_v1::ValidationProtocol::PoVDistribution(
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov)
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov.clone())
)
}

Expand Down Expand Up @@ -267,7 +269,7 @@ async fn distribute_to_awaiting(
metrics: &Metrics,
relay_parent: Hash,
pov_hash: Hash,
pov: &PoV,
pov: &protocol_v1::CompressedPoV,
) {
// Send to all peers who are awaiting the PoV and have that relay-parent in their view.
//
Expand All @@ -284,7 +286,7 @@ async fn distribute_to_awaiting(

if peers_to_send.is_empty() { return; }

let payload = send_pov_message(relay_parent, pov_hash, pov.clone());
let payload = send_pov_message(relay_parent, pov_hash, pov);

ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
Expand Down Expand Up @@ -379,7 +381,7 @@ async fn handle_fetch(
None => return,
};

if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) {
if let Some((pov, _)) = relay_parent_state.known.get(&descriptor.pov_hash) {
let _ = response_sender.send(pov.clone());
return;
}
Expand Down Expand Up @@ -468,16 +470,28 @@ async fn handle_distribute(
}
}

relay_parent_state.known.insert(descriptor.pov_hash, pov.clone());
let encoded_pov = match protocol_v1::CompressedPoV::from_pov(&*pov) {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`."
);
return
}
};

distribute_to_awaiting(
&mut state.peer_state,
ctx,
&state.metrics,
relay_parent,
descriptor.pov_hash,
&*pov,
).await
&encoded_pov,
).await;

relay_parent_state.known.insert(descriptor.pov_hash, (pov, encoded_pov));
}

/// Report a reputation change for a peer.
Expand Down Expand Up @@ -527,8 +541,9 @@ async fn handle_awaiting(
for pov_hash in pov_hashes {
// For all requested PoV hashes, if we have it, we complete the request immediately.
// Otherwise, we note that the peer is awaiting the PoV.
if let Some(pov) = relay_parent_state.known.get(&pov_hash) {
let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone());
if let Some((_, ref pov)) = relay_parent_state.known.get(&pov_hash) {
let payload = send_pov_message(relay_parent, pov_hash, pov);

ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
Expand All @@ -544,23 +559,35 @@ async fn handle_awaiting(
/// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not.
///
/// Completes any requests awaiting that PoV.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, state, encoded_pov), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_pov(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
peer: PeerId,
relay_parent: Hash,
pov_hash: Hash,
pov: PoV,
encoded_pov: protocol_v1::CompressedPoV,
) {
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => {
None => {
report_peer(ctx, peer, COST_UNEXPECTED_POV).await;
return;
},
Some(r) => r,
};

let pov = match encoded_pov.as_pov() {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Could not extract PoV",
);
return;
}
};

let pov = {
// Do validity checks and complete all senders awaiting this PoV.
let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) {
Expand Down Expand Up @@ -607,8 +634,10 @@ async fn handle_incoming_pov(
&state.metrics,
relay_parent,
pov_hash,
&*pov,
).await
&encoded_pov,
).await;

relay_parent_state.known.insert(pov_hash, (pov, encoded_pov));
}

/// Handles a newly connected validator in the context of some relay leaf.
Expand Down
24 changes: 14 additions & 10 deletions node/network/pov-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,11 @@ fn ask_validators_for_povs() {
PoVDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
test_state.validator_peer_id[2].clone(),
protocol_v1::PoVDistributionMessage::SendPoV(current, pov_hash, pov_block.clone()),
protocol_v1::PoVDistributionMessage::SendPoV(
current,
pov_hash,
protocol_v1::CompressedPoV::from_pov(&pov_block).unwrap(),
),
)
)
).await;
Expand Down Expand Up @@ -631,7 +635,7 @@ fn distributes_to_those_awaiting_and_completes_local() {
assert_eq!(peers, vec![peer_a.clone()]);
assert_eq!(
message,
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()),
);
}
)
Expand Down Expand Up @@ -943,7 +947,7 @@ fn peer_complete_fetch_and_is_rewarded() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand All @@ -952,7 +956,7 @@ fn peer_complete_fetch_and_is_rewarded() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1033,7 +1037,7 @@ fn peer_punished_for_sending_bad_pov() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, bad_pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&bad_pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1098,7 +1102,7 @@ fn peer_punished_for_sending_unexpected_pov() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1161,7 +1165,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_b, pov_hash, pov.clone()),
send_pov_message(hash_b, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1450,7 +1454,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand All @@ -1474,7 +1478,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
assert_eq!(peers, vec![peer_b.clone()]);
assert_eq!(
message,
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()),
);
}
);
Expand Down Expand Up @@ -1534,7 +1538,7 @@ fn peer_completing_request_no_longer_awaiting() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down
Loading