From 11797c7316695938d2abfd221d6b398afdb97520 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 21 Jan 2021 19:04:14 +0100 Subject: [PATCH] Compress the PoV block before sending it over the network (#2288) * Compress the PoV block before sending it over the network This pr changes the way we send PoV blocks over the network. We now compress the PoV block before it is send over the network. This should reduce the size significant for PoVs which contain the runtime WASM for example. * Preallocate 1KB * Try something.. * Switch to zstd and some renamings * Make compression/decompression fail in browsers * Use some sane maximum value * Update roadmap/implementers-guide/src/types/network.md Co-authored-by: Andronik Ordian * Fix and add test * add Co-authored-by: Andronik Ordian Co-authored-by: Robert Habermeier --- Cargo.lock | 2 + .../collator-protocol/src/collator_side.rs | 20 +++-- .../collator-protocol/src/validator_side.rs | 28 ++++-- node/network/pov-distribution/src/lib.rs | 65 ++++++++++---- node/network/pov-distribution/src/tests.rs | 24 +++--- node/network/protocol/Cargo.toml | 4 + node/network/protocol/src/lib.rs | 85 +++++++++++++++++-- .../implementers-guide/src/types/network.md | 9 +- scripts/gitlab/check_web_wasm.sh | 9 +- 9 files changed, 190 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d2f21938ab3..f1597fd67eb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5268,6 +5268,8 @@ dependencies = [ "polkadot-primitives", "sc-network", "strum 0.20.0", + "thiserror", + "zstd", ] [[package]] diff --git a/node/network/collator-protocol/src/collator_side.rs b/node/network/collator-protocol/src/collator_side.rs index ef061062ae1d..d33a98637374 100644 --- a/node/network/collator-protocol/src/collator_side.rs +++ b/node/network/collator-protocol/src/collator_side.rs @@ -501,11 +501,19 @@ async fn send_collation( receipt: CandidateReceipt, pov: PoV, ) { - let wire_message = protocol_v1::CollatorProtocolMessage::Collation( - request_id, - receipt, - pov, - ); + let pov = match protocol_v1::CompressedPoV::compress(&pov) { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + error = ?error, + "Failed to create `CompressedPov`", + ); + return + } + }; + + let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendCollationMessage( @@ -1280,7 +1288,7 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => { assert_eq!(req_id, request_id); assert_eq!(receipt, candidate); - assert_eq!(pov, pov_block); + assert_eq!(pov.decompress().unwrap(), pov_block); } ); } diff --git a/node/network/collator-protocol/src/validator_side.rs b/node/network/collator-protocol/src/validator_side.rs index 23fde5c75c96..50203e4c71a7 100644 --- a/node/network/collator-protocol/src/validator_side.rs +++ b/node/network/collator-protocol/src/validator_side.rs @@ -353,7 +353,7 @@ async fn received_collation( origin: PeerId, request_id: RequestId, receipt: CandidateReceipt, - pov: PoV, + pov: protocol_v1::CompressedPoV, ) where Context: SubsystemContext @@ -368,6 +368,21 @@ where if let Some(per_request) = state.requests_info.remove(&id) { let _ = per_request.received.send(()); if let Some(collator_id) = state.known_collators.get(&origin) { + let pov = match pov.decompress() { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + %request_id, + ?error, + "Failed to extract PoV", + ); + return; + } + }; + + let _span = jaeger::pov_span(&pov, "received-collation"); + tracing::debug!( target: LOG_TARGET, %request_id, @@ -529,9 +544,8 @@ where modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; } Collation(request_id, receipt, pov) => { - let _span1 = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent) + let _span = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent) .map(|s| s.child("received-collation")); - let _span2 = jaeger::pov_span(&pov, "received-collation"); received_collation(ctx, state, origin, request_id, receipt, pov).await; } } @@ -1295,9 +1309,9 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation( request_id, candidate_a.clone(), - PoV { + protocol_v1::CompressedPoV::compress(&PoV { block_data: BlockData(vec![]), - }, + }).unwrap(), ) ) ) @@ -1333,9 +1347,9 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation( request_id, candidate_b.clone(), - PoV { + protocol_v1::CompressedPoV::compress(&PoV { block_data: BlockData(vec![1, 2, 3]), - }, + }).unwrap(), ) ) ) diff --git a/node/network/pov-distribution/src/lib.rs b/node/network/pov-distribution/src/lib.rs index 5ee1f14063e2..4fe81ee0e1d2 100644 --- a/node/network/pov-distribution/src/lib.rs +++ b/node/network/pov-distribution/src/lib.rs @@ -106,7 +106,7 @@ struct State { } struct BlockBasedState { - known: HashMap>, + known: HashMap, protocol_v1::CompressedPoV)>, /// All the PoVs we are or were fetching, coupled with channels expecting the data. /// @@ -131,11 +131,13 @@ fn awaiting_message(relay_parent: Hash, awaiting: Vec) ) } -fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV) - -> protocol_v1::ValidationProtocol -{ +fn send_pov_message( + relay_parent: Hash, + pov_hash: Hash, + pov: &protocol_v1::CompressedPoV, +) -> protocol_v1::ValidationProtocol { protocol_v1::ValidationProtocol::PoVDistribution( - protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov) + protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov.clone()) ) } @@ -267,7 +269,7 @@ async fn distribute_to_awaiting( metrics: &Metrics, relay_parent: Hash, pov_hash: Hash, - pov: &PoV, + pov: &protocol_v1::CompressedPoV, ) { // Send to all peers who are awaiting the PoV and have that relay-parent in their view. // @@ -284,7 +286,7 @@ async fn distribute_to_awaiting( if peers_to_send.is_empty() { return; } - let payload = send_pov_message(relay_parent, pov_hash, pov.clone()); + let payload = send_pov_message(relay_parent, pov_hash, pov); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, @@ -379,7 +381,7 @@ async fn handle_fetch( None => return, }; - if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) { + if let Some((pov, _)) = relay_parent_state.known.get(&descriptor.pov_hash) { let _ = response_sender.send(pov.clone()); return; } @@ -468,7 +470,17 @@ async fn handle_distribute( } } - relay_parent_state.known.insert(descriptor.pov_hash, pov.clone()); + let encoded_pov = match protocol_v1::CompressedPoV::compress(&*pov) { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + error = ?error, + "Failed to create `CompressedPov`." + ); + return + } + }; distribute_to_awaiting( &mut state.peer_state, @@ -476,8 +488,10 @@ async fn handle_distribute( &state.metrics, relay_parent, descriptor.pov_hash, - &*pov, - ).await + &encoded_pov, + ).await; + + relay_parent_state.known.insert(descriptor.pov_hash, (pov, encoded_pov)); } /// Report a reputation change for a peer. @@ -527,8 +541,9 @@ async fn handle_awaiting( for pov_hash in pov_hashes { // For all requested PoV hashes, if we have it, we complete the request immediately. // Otherwise, we note that the peer is awaiting the PoV. - if let Some(pov) = relay_parent_state.known.get(&pov_hash) { - let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone()); + if let Some((_, ref pov)) = relay_parent_state.known.get(&pov_hash) { + let payload = send_pov_message(relay_parent, pov_hash, pov); + ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) )).await; @@ -544,23 +559,35 @@ async fn handle_awaiting( /// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not. /// /// Completes any requests awaiting that PoV. -#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, state, encoded_pov), fields(subsystem = LOG_TARGET))] async fn handle_incoming_pov( state: &mut State, ctx: &mut impl SubsystemContext, peer: PeerId, relay_parent: Hash, pov_hash: Hash, - pov: PoV, + encoded_pov: protocol_v1::CompressedPoV, ) { let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { - None => { + None => { report_peer(ctx, peer, COST_UNEXPECTED_POV).await; return; }, Some(r) => r, }; + let pov = match encoded_pov.decompress() { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + error = ?error, + "Could not extract PoV", + ); + return; + } + }; + let pov = { // Do validity checks and complete all senders awaiting this PoV. let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) { @@ -607,8 +634,10 @@ async fn handle_incoming_pov( &state.metrics, relay_parent, pov_hash, - &*pov, - ).await + &encoded_pov, + ).await; + + relay_parent_state.known.insert(pov_hash, (pov, encoded_pov)); } /// Handles a newly connected validator in the context of some relay leaf. diff --git a/node/network/pov-distribution/src/tests.rs b/node/network/pov-distribution/src/tests.rs index 285f479e76ba..afe83b6399eb 100644 --- a/node/network/pov-distribution/src/tests.rs +++ b/node/network/pov-distribution/src/tests.rs @@ -396,7 +396,11 @@ fn ask_validators_for_povs() { PoVDistributionMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerMessage( test_state.validator_peer_id[2].clone(), - protocol_v1::PoVDistributionMessage::SendPoV(current, pov_hash, pov_block.clone()), + protocol_v1::PoVDistributionMessage::SendPoV( + current, + pov_hash, + protocol_v1::CompressedPoV::compress(&pov_block).unwrap(), + ), ) ) ).await; @@ -631,7 +635,7 @@ fn distributes_to_those_awaiting_and_completes_local() { assert_eq!(peers, vec![peer_a.clone()]); assert_eq!( message, - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ); } ) @@ -943,7 +947,7 @@ fn peer_complete_fetch_and_is_rewarded() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -952,7 +956,7 @@ fn peer_complete_fetch_and_is_rewarded() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_b.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1033,7 +1037,7 @@ fn peer_punished_for_sending_bad_pov() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, bad_pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&bad_pov).unwrap()), ).focus().unwrap(), ).await; @@ -1098,7 +1102,7 @@ fn peer_punished_for_sending_unexpected_pov() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1161,7 +1165,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_b, pov_hash, pov.clone()), + send_pov_message(hash_b, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1450,7 +1454,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1474,7 +1478,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { assert_eq!(peers, vec![peer_b.clone()]); assert_eq!( message, - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ); } ); @@ -1534,7 +1538,7 @@ fn peer_completing_request_no_longer_awaiting() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; diff --git a/node/network/protocol/Cargo.toml b/node/network/protocol/Cargo.toml index f06f2ccd4e06..f40b063a8e41 100644 --- a/node/network/protocol/Cargo.toml +++ b/node/network/protocol/Cargo.toml @@ -12,3 +12,7 @@ polkadot-node-jaeger = { path = "../../jaeger" } parity-scale-codec = { version = "1.3.6", default-features = false, features = ["derive"] } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } strum = { version = "0.20", features = ["derive"] } +thiserror = "1.0.23" + +[target.'cfg(not(target_os = "unknown"))'.dependencies] +zstd = "0.5.0" diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 937c17981a19..e7dcf18d354d 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -16,7 +16,7 @@ //! Network protocol types for parachains. -#![deny(unused_crate_dependencies, unused_results)] +#![deny(unused_crate_dependencies)] #![warn(missing_docs)] use polkadot_primitives::v1::{Hash, BlockNumber}; @@ -286,8 +286,8 @@ pub mod v1 { }; use polkadot_node_primitives::SignedFullStatement; use parity_scale_codec::{Encode, Decode}; - use std::convert::TryFrom; use super::RequestId; + use std::convert::TryFrom; /// Network messages used by the availability distribution subsystem #[derive(Debug, Clone, Encode, Decode, PartialEq)] @@ -323,9 +323,9 @@ pub mod v1 { #[codec(index = "0")] Awaiting(Hash, Vec), /// Notification of an awaited PoV, in a given relay-parent context. - /// (relay_parent, pov_hash, pov) + /// (relay_parent, pov_hash, compressed_pov) #[codec(index = "1")] - SendPoV(Hash, Hash, PoV), + SendPoV(Hash, Hash, CompressedPoV), } /// Network messages used by the statement distribution subsystem. @@ -336,6 +336,67 @@ pub mod v1 { Statement(Hash, SignedFullStatement) } + #[derive(Debug, Clone, Copy, PartialEq, thiserror::Error)] + #[allow(missing_docs)] + pub enum CompressedPoVError { + #[error("Failed to compress a PoV")] + Compress, + #[error("Failed to decompress a PoV")] + Decompress, + #[error("Failed to decode the uncompressed PoV")] + Decode, + #[error("Architecture is not supported")] + NotSupported, + } + + /// SCALE and Zstd encoded [`PoV`]. + #[derive(Debug, Clone, Encode, Decode, PartialEq)] + pub struct CompressedPoV(Vec); + + impl CompressedPoV { + /// Compress the given [`PoV`] and returns a [`CompressedPoV`]. + #[cfg(not(target_os = "unknown"))] + pub fn compress(pov: &PoV) -> Result { + zstd::encode_all(pov.encode().as_slice(), 3).map_err(|_| CompressedPoVError::Compress).map(Self) + } + + /// Compress the given [`PoV`] and returns a [`CompressedPoV`]. + #[cfg(target_os = "unknown")] + pub fn compress(_: &PoV) -> Result { + Err(CompressedPoVError::NotSupported) + } + + /// Decompress `self` and returns the [`PoV`] on success. + #[cfg(not(target_os = "unknown"))] + pub fn decompress(&self) -> Result { + use std::io::Read; + const MAX_POV_BLOCK_SIZE: usize = 32 * 1024 * 1024; + + struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder, usize); + impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> { + fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> { + self.1 = self.1.saturating_add(into.len()); + if self.1 > MAX_POV_BLOCK_SIZE { + return Err("pov block too big".into()) + } + self.0.read_exact(into).map_err(Into::into) + } + fn remaining_len(&mut self) -> Result, parity_scale_codec::Error> { + Ok(None) + } + } + + let mut decoder = zstd::Decoder::new(self.0.as_slice()).map_err(|_| CompressedPoVError::Decompress)?; + PoV::decode(&mut InputDecoder(&mut decoder, 0)).map_err(|_| CompressedPoVError::Decode) + } + + /// Decompress `self` and returns the [`PoV`] on success. + #[cfg(target_os = "unknown")] + pub fn decompress(&self) -> Result { + Err(CompressedPoVError::NotSupported) + } + } + /// Network messages used by the collator protocol subsystem #[derive(Debug, Clone, Encode, Decode, PartialEq)] pub enum CollatorProtocolMessage { @@ -351,7 +412,7 @@ pub mod v1 { RequestCollation(RequestId, Hash, ParaId), /// A requested collation. #[codec(index = "3")] - Collation(RequestId, CandidateReceipt, PoV), + Collation(RequestId, CandidateReceipt, CompressedPoV), } /// All network messages on the validation peer-set. @@ -389,3 +450,17 @@ pub mod v1 { impl_try_from!(CollationProtocol, CollatorProtocol, CollatorProtocolMessage); } + +#[cfg(test)] +mod tests { + use polkadot_primitives::v1::PoV; + use super::v1::{CompressedPoV, CompressedPoVError}; + + #[test] + fn decompress_huge_pov_block_fails() { + let pov = PoV { block_data: vec![0; 63 * 1024 * 1024].into() }; + + let compressed = CompressedPoV::compress(&pov).unwrap(); + assert_eq!(CompressedPoVError::Decode, compressed.decompress().unwrap_err()); + } +} diff --git a/roadmap/implementers-guide/src/types/network.md b/roadmap/implementers-guide/src/types/network.md index 79eab39002c6..eb7bbe7070e9 100644 --- a/roadmap/implementers-guide/src/types/network.md +++ b/roadmap/implementers-guide/src/types/network.md @@ -19,6 +19,9 @@ enum ObservedRole { Full, Light, } + +/// SCALE and zstd encoded `PoV`. +struct CompressedPoV(Vec); ``` ## V1 Network Subsystem Message Types @@ -75,8 +78,8 @@ enum PoVDistributionV1Message { /// specific relay-parent hash. Awaiting(Hash, Vec), /// Notification of an awaited PoV, in a given relay-parent context. - /// (relay_parent, pov_hash, pov) - SendPoV(Hash, Hash, PoV), + /// (relay_parent, pov_hash, compressed_pov) + SendPoV(Hash, Hash, CompressedPoV), } ``` @@ -101,7 +104,7 @@ enum CollatorProtocolV1Message { /// Request the advertised collation at that relay-parent. RequestCollation(RequestId, Hash, ParaId), /// A requested collation. - Collation(RequestId, CandidateReceipt, PoV), + Collation(RequestId, CandidateReceipt, CompressedPoV), } ``` diff --git a/scripts/gitlab/check_web_wasm.sh b/scripts/gitlab/check_web_wasm.sh index 9d9006e908a7..0b9494713875 100755 --- a/scripts/gitlab/check_web_wasm.sh +++ b/scripts/gitlab/check_web_wasm.sh @@ -1,13 +1,8 @@ #!/usr/bin/env bash +set -e + #shellcheck source=lib.sh source "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )/lib.sh" -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path runtime/polkadot/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path runtime/kusama/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path erasure-coding/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path parachain/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path primitives/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path rpc/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path statement-table/Cargo.toml time cargo build --locked --target=wasm32-unknown-unknown --manifest-path cli/Cargo.toml --no-default-features --features browser