Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Compress the PoV block before sending it over the network (#2288)
Browse files Browse the repository at this point in the history
* Compress the PoV block before sending it over the network

This pr changes the way we send PoV blocks over the network. We now
compress the PoV block before it is send over the network. This should
reduce the size significant for PoVs which contain the runtime WASM for
example.

* Preallocate 1KB

* Try something..

* Switch to zstd and some renamings

* Make compression/decompression fail in browsers

* Use some sane maximum value

* Update roadmap/implementers-guide/src/types/network.md

Co-authored-by: Andronik Ordian <[email protected]>

* Fix and add test

* add

Co-authored-by: Andronik Ordian <[email protected]>
Co-authored-by: Robert Habermeier <[email protected]>
  • Loading branch information
3 people authored Jan 21, 2021
1 parent 8387af2 commit 11797c7
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 56 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions node/network/collator-protocol/src/collator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,19 @@ async fn send_collation(
receipt: CandidateReceipt,
pov: PoV,
) {
let wire_message = protocol_v1::CollatorProtocolMessage::Collation(
request_id,
receipt,
pov,
);
let pov = match protocol_v1::CompressedPoV::compress(&pov) {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`",
);
return
}
};

let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov);

ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
Expand Down Expand Up @@ -1280,7 +1288,7 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => {
assert_eq!(req_id, request_id);
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
assert_eq!(pov.decompress().unwrap(), pov_block);
}
);
}
Expand Down
28 changes: 21 additions & 7 deletions node/network/collator-protocol/src/validator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ async fn received_collation<Context>(
origin: PeerId,
request_id: RequestId,
receipt: CandidateReceipt,
pov: PoV,
pov: protocol_v1::CompressedPoV,
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
Expand All @@ -368,6 +368,21 @@ where
if let Some(per_request) = state.requests_info.remove(&id) {
let _ = per_request.received.send(());
if let Some(collator_id) = state.known_collators.get(&origin) {
let pov = match pov.decompress() {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
%request_id,
?error,
"Failed to extract PoV",
);
return;
}
};

let _span = jaeger::pov_span(&pov, "received-collation");

tracing::debug!(
target: LOG_TARGET,
%request_id,
Expand Down Expand Up @@ -529,9 +544,8 @@ where
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
Collation(request_id, receipt, pov) => {
let _span1 = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent)
let _span = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent)
.map(|s| s.child("received-collation"));
let _span2 = jaeger::pov_span(&pov, "received-collation");
received_collation(ctx, state, origin, request_id, receipt, pov).await;
}
}
Expand Down Expand Up @@ -1295,9 +1309,9 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(
request_id,
candidate_a.clone(),
PoV {
protocol_v1::CompressedPoV::compress(&PoV {
block_data: BlockData(vec![]),
},
}).unwrap(),
)
)
)
Expand Down Expand Up @@ -1333,9 +1347,9 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(
request_id,
candidate_b.clone(),
PoV {
protocol_v1::CompressedPoV::compress(&PoV {
block_data: BlockData(vec![1, 2, 3]),
},
}).unwrap(),
)
)
)
Expand Down
65 changes: 47 additions & 18 deletions node/network/pov-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ struct State {
}

struct BlockBasedState {
known: HashMap<Hash, Arc<PoV>>,
known: HashMap<Hash, (Arc<PoV>, protocol_v1::CompressedPoV)>,

/// All the PoVs we are or were fetching, coupled with channels expecting the data.
///
Expand All @@ -131,11 +131,13 @@ fn awaiting_message(relay_parent: Hash, awaiting: Vec<Hash>)
)
}

fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV)
-> protocol_v1::ValidationProtocol
{
fn send_pov_message(
relay_parent: Hash,
pov_hash: Hash,
pov: &protocol_v1::CompressedPoV,
) -> protocol_v1::ValidationProtocol {
protocol_v1::ValidationProtocol::PoVDistribution(
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov)
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov.clone())
)
}

Expand Down Expand Up @@ -267,7 +269,7 @@ async fn distribute_to_awaiting(
metrics: &Metrics,
relay_parent: Hash,
pov_hash: Hash,
pov: &PoV,
pov: &protocol_v1::CompressedPoV,
) {
// Send to all peers who are awaiting the PoV and have that relay-parent in their view.
//
Expand All @@ -284,7 +286,7 @@ async fn distribute_to_awaiting(

if peers_to_send.is_empty() { return; }

let payload = send_pov_message(relay_parent, pov_hash, pov.clone());
let payload = send_pov_message(relay_parent, pov_hash, pov);

ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
Expand Down Expand Up @@ -379,7 +381,7 @@ async fn handle_fetch(
None => return,
};

if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) {
if let Some((pov, _)) = relay_parent_state.known.get(&descriptor.pov_hash) {
let _ = response_sender.send(pov.clone());
return;
}
Expand Down Expand Up @@ -468,16 +470,28 @@ async fn handle_distribute(
}
}

relay_parent_state.known.insert(descriptor.pov_hash, pov.clone());
let encoded_pov = match protocol_v1::CompressedPoV::compress(&*pov) {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`."
);
return
}
};

distribute_to_awaiting(
&mut state.peer_state,
ctx,
&state.metrics,
relay_parent,
descriptor.pov_hash,
&*pov,
).await
&encoded_pov,
).await;

relay_parent_state.known.insert(descriptor.pov_hash, (pov, encoded_pov));
}

/// Report a reputation change for a peer.
Expand Down Expand Up @@ -527,8 +541,9 @@ async fn handle_awaiting(
for pov_hash in pov_hashes {
// For all requested PoV hashes, if we have it, we complete the request immediately.
// Otherwise, we note that the peer is awaiting the PoV.
if let Some(pov) = relay_parent_state.known.get(&pov_hash) {
let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone());
if let Some((_, ref pov)) = relay_parent_state.known.get(&pov_hash) {
let payload = send_pov_message(relay_parent, pov_hash, pov);

ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
Expand All @@ -544,23 +559,35 @@ async fn handle_awaiting(
/// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not.
///
/// Completes any requests awaiting that PoV.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, state, encoded_pov), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_pov(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
peer: PeerId,
relay_parent: Hash,
pov_hash: Hash,
pov: PoV,
encoded_pov: protocol_v1::CompressedPoV,
) {
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => {
None => {
report_peer(ctx, peer, COST_UNEXPECTED_POV).await;
return;
},
Some(r) => r,
};

let pov = match encoded_pov.decompress() {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Could not extract PoV",
);
return;
}
};

let pov = {
// Do validity checks and complete all senders awaiting this PoV.
let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) {
Expand Down Expand Up @@ -607,8 +634,10 @@ async fn handle_incoming_pov(
&state.metrics,
relay_parent,
pov_hash,
&*pov,
).await
&encoded_pov,
).await;

relay_parent_state.known.insert(pov_hash, (pov, encoded_pov));
}

/// Handles a newly connected validator in the context of some relay leaf.
Expand Down
24 changes: 14 additions & 10 deletions node/network/pov-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,11 @@ fn ask_validators_for_povs() {
PoVDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
test_state.validator_peer_id[2].clone(),
protocol_v1::PoVDistributionMessage::SendPoV(current, pov_hash, pov_block.clone()),
protocol_v1::PoVDistributionMessage::SendPoV(
current,
pov_hash,
protocol_v1::CompressedPoV::compress(&pov_block).unwrap(),
),
)
)
).await;
Expand Down Expand Up @@ -631,7 +635,7 @@ fn distributes_to_those_awaiting_and_completes_local() {
assert_eq!(peers, vec![peer_a.clone()]);
assert_eq!(
message,
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
);
}
)
Expand Down Expand Up @@ -943,7 +947,7 @@ fn peer_complete_fetch_and_is_rewarded() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand All @@ -952,7 +956,7 @@ fn peer_complete_fetch_and_is_rewarded() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1033,7 +1037,7 @@ fn peer_punished_for_sending_bad_pov() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, bad_pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&bad_pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1098,7 +1102,7 @@ fn peer_punished_for_sending_unexpected_pov() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1161,7 +1165,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_b, pov_hash, pov.clone()),
send_pov_message(hash_b, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down Expand Up @@ -1450,7 +1454,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand All @@ -1474,7 +1478,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
assert_eq!(peers, vec![peer_b.clone()]);
assert_eq!(
message,
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
);
}
);
Expand Down Expand Up @@ -1534,7 +1538,7 @@ fn peer_completing_request_no_longer_awaiting() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;

Expand Down
4 changes: 4 additions & 0 deletions node/network/protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ polkadot-node-jaeger = { path = "../../jaeger" }
parity-scale-codec = { version = "1.3.6", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
strum = { version = "0.20", features = ["derive"] }
thiserror = "1.0.23"

[target.'cfg(not(target_os = "unknown"))'.dependencies]
zstd = "0.5.0"
Loading

0 comments on commit 11797c7

Please sign in to comment.