Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Compress the PoV block before sending it over the network #2288

Merged
10 commits merged into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions node/network/collator-protocol/src/collator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,19 @@ async fn send_collation(
receipt: CandidateReceipt,
pov: PoV,
) {
let wire_message = protocol_v1::CollatorProtocolMessage::Collation(
request_id,
receipt,
pov,
);
let pov = match protocol_v1::CompressedPoV::compress(&pov) {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`",
);
return
}
};

let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov);

ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
Expand Down Expand Up @@ -1280,7 +1288,7 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => {
assert_eq!(req_id, request_id);
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
assert_eq!(pov.decompress().unwrap(), pov_block);
}
);
}
Expand Down
28 changes: 21 additions & 7 deletions node/network/collator-protocol/src/validator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ async fn received_collation<Context>(
origin: PeerId,
request_id: RequestId,
receipt: CandidateReceipt,
pov: PoV,
pov: protocol_v1::CompressedPoV,
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
Expand All @@ -368,6 +368,21 @@ where
if let Some(per_request) = state.requests_info.remove(&id) {
let _ = per_request.received.send(());
if let Some(collator_id) = state.known_collators.get(&origin) {
let pov = match pov.decompress() {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
%request_id,
?error,
"Failed to extract PoV",
);
return;
}
};

let _span = jaeger::pov_span(&pov, "received-collation");

tracing::debug!(
target: LOG_TARGET,
%request_id,
Expand Down Expand Up @@ -529,9 +544,8 @@ where
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
Collation(request_id, receipt, pov) => {
let _span1 = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent)
let _span = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent)
.map(|s| s.child("received-collation"));
let _span2 = jaeger::pov_span(&pov, "received-collation");
received_collation(ctx, state, origin, request_id, receipt, pov).await;
}
}
Expand Down Expand Up @@ -1295,9 +1309,9 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(
request_id,
candidate_a.clone(),
PoV {
protocol_v1::CompressedPoV::compress(&PoV {
block_data: BlockData(vec![]),
},
}).unwrap(),
)
)
)
Expand Down Expand Up @@ -1333,9 +1347,9 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(
request_id,
candidate_b.clone(),
PoV {
protocol_v1::CompressedPoV::compress(&PoV {
block_data: BlockData(vec![1, 2, 3]),
},
}).unwrap(),
)
)
)
Expand Down
65 changes: 47 additions & 18 deletions node/network/pov-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ struct State {
}

struct BlockBasedState {
known: HashMap<Hash, Arc<PoV>>,
known: HashMap<Hash, (Arc<PoV>, protocol_v1::CompressedPoV)>,

/// All the PoVs we are or were fetching, coupled with channels expecting the data.
///
Expand All @@ -131,11 +131,13 @@ fn awaiting_message(relay_parent: Hash, awaiting: Vec<Hash>)
)
}

fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV)
-> protocol_v1::ValidationProtocol
{
fn send_pov_message(
relay_parent: Hash,
pov_hash: Hash,
pov: &protocol_v1::CompressedPoV,
) -> protocol_v1::ValidationProtocol {
protocol_v1::ValidationProtocol::PoVDistribution(
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov)
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov.clone())
)
}

Expand Down Expand Up @@ -267,7 +269,7 @@ async fn distribute_to_awaiting(
metrics: &Metrics,
relay_parent: Hash,
pov_hash: Hash,
pov: &PoV,
pov: &protocol_v1::CompressedPoV,
) {
// Send to all peers who are awaiting the PoV and have that relay-parent in their view.
//
Expand All @@ -284,7 +286,7 @@ async fn distribute_to_awaiting(

if peers_to_send.is_empty() { return; }

let payload = send_pov_message(relay_parent, pov_hash, pov.clone());
let payload = send_pov_message(relay_parent, pov_hash, pov);

ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
Expand Down Expand Up @@ -379,7 +381,7 @@ async fn handle_fetch(
None => return,
};

if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) {
if let Some((pov, _)) = relay_parent_state.known.get(&descriptor.pov_hash) {
let _ = response_sender.send(pov.clone());
return;
}
Expand Down Expand Up @@ -468,16 +470,28 @@ async fn handle_distribute(
}
}

relay_parent_state.known.insert(descriptor.pov_hash, pov.clone());
let encoded_pov = match protocol_v1::CompressedPoV::compress(&*pov) {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`."
);
return
}
};

distribute_to_awaiting(
&mut state.peer_state,
ctx,
&state.metrics,
relay_parent,
descriptor.pov_hash,
&*pov,
).await
&encoded_pov,
).await;

relay_parent_state.known.insert(descriptor.pov_hash, (pov, encoded_pov));
}

/// Report a reputation change for a peer.
Expand Down Expand Up @@ -527,8 +541,9 @@ async fn handle_awaiting(
for pov_hash in pov_hashes {
// For all requested PoV hashes, if we have it, we complete the request immediately.
// Otherwise, we note that the peer is awaiting the PoV.
if let Some(pov) = relay_parent_state.known.get(&pov_hash) {
let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone());
if let Some((_, ref pov)) = relay_parent_state.known.get(&pov_hash) {
let payload = send_pov_message(relay_parent, pov_hash, pov);

ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
Expand All @@ -544,23 +559,35 @@ async fn handle_awaiting(
/// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not.
///
/// Completes any requests awaiting that PoV.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, state, encoded_pov), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_pov(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
peer: PeerId,
relay_parent: Hash,
pov_hash: Hash,
pov: PoV,
encoded_pov: protocol_v1::CompressedPoV,
) {
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => {
None => {
report_peer(ctx, peer, COST_UNEXPECTED_POV).await;
return;
},
Some(r) => r,
};

let pov = match encoded_pov.decompress() {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Could not extract PoV",
);
return;
}
};

let pov = {
// Do validity checks and complete all senders awaiting this PoV.
let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) {
Expand Down Expand Up @@ -607,8 +634,10 @@ async fn handle_incoming_pov(
&state.metrics,
relay_parent,
pov_hash,
&*pov,
).await
&encoded_pov,
).await;

relay_parent_state.known.insert(pov_hash, (pov, encoded_pov));
}

/// Handles a newly connected validator in the context of some relay leaf.
Expand Down
24 changes: 14 additions & 10 deletions node/network/pov-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,11 @@ fn ask_validators_for_povs() {
PoVDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
test_state.validator_peer_id[2].clone(),
protocol_v1::PoVDistributionMessage::SendPoV(current, pov_hash, pov_block.clone()),
protocol_v1::PoVDistributionMessage::SendPoV(
current,
pov_hash,
protocol_v1::CompressedPoV::compress(&pov_block).unwrap(),
),
)
)
).await;
Expand Down Expand Up @@ -631,7 +635,7 @@ fn distributes_to_those_awaiting_and_completes_local() {
assert_eq!(peers, vec![peer_a.clone()]);
assert_eq!(
message,
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
);
}
)
Expand Down Expand Up @@ -943,7 +947,7 @@ fn peer_complete_fetch_and_is_rewarded() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand All @@ -952,7 +956,7 @@ fn peer_complete_fetch_and_is_rewarded() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1033,7 +1037,7 @@ fn peer_punished_for_sending_bad_pov() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, bad_pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&bad_pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1098,7 +1102,7 @@ fn peer_punished_for_sending_unexpected_pov() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1161,7 +1165,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_b, pov_hash, pov.clone()),
send_pov_message(hash_b, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1450,7 +1454,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand All @@ -1474,7 +1478,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
assert_eq!(peers, vec![peer_b.clone()]);
assert_eq!(
message,
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
);
}
);
Expand Down Expand Up @@ -1534,7 +1538,7 @@ fn peer_completing_request_no_longer_awaiting() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down
4 changes: 4 additions & 0 deletions node/network/protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ polkadot-node-jaeger = { path = "../../jaeger" }
parity-scale-codec = { version = "1.3.6", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
strum = { version = "0.20", features = ["derive"] }
thiserror = "1.0.23"

[target.'cfg(not(target_os = "unknown"))'.dependencies]
zstd = "0.5.0"
ordian marked this conversation as resolved.
Show resolved Hide resolved
Loading