From 175ceca1f29a3a189b7f685c6466547dac335e49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Mon, 18 Jan 2021 23:29:15 +0100 Subject: [PATCH 1/9] Compress the PoV block before sending it over the network This pr changes the way we send PoV blocks over the network. We now compress the PoV block before it is send over the network. This should reduce the size significant for PoVs which contain the runtime WASM for example. --- Cargo.lock | 15 +++-- .../collator-protocol/src/collator_side.rs | 20 ++++-- .../collator-protocol/src/validator_side.rs | 28 ++++++-- node/network/pov-distribution/src/lib.rs | 65 ++++++++++++++----- node/network/pov-distribution/src/tests.rs | 24 ++++--- node/network/protocol/Cargo.toml | 2 + node/network/protocol/src/lib.rs | 45 +++++++++++-- .../implementers-guide/src/types/network.md | 9 ++- 8 files changed, 152 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 024d3641ed66..d9cbfd7d4804 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1463,11 +1463,11 @@ checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" [[package]] name = "flate2" -version = "1.0.16" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68c90b0fc46cf89d227cc78b40e494ff81287a92dd07631e5af0d06fe3cf885e" +checksum = "7411863d55df97a419aa64cb4d2f167103ea9d767e2c54a1868b7ac3f6b47129" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "crc32fast", "libc", "libz-sys", @@ -3272,12 +3272,11 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.0.25" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb5e43362e38e2bca2fd5f5134c4d4564a23a5c28e9b95411652021a8675ebe" +checksum = "602113192b08db8f38796c4e85c39e960c145965140e918018bcde1952429655" dependencies = [ "cc", - "libc", "pkg-config", "vcpkg", ] @@ -5067,7 +5066,7 @@ name = "polkadot-node-core-approval-voting" version = "0.1.0" dependencies = [ "bitvec", - "futures 0.3.8", + "futures 0.3.12", "parity-scale-codec", "polkadot-node-primitives", "polkadot-node-subsystem", @@ -5270,12 +5269,14 @@ dependencies = [ name = "polkadot-node-network-protocol" version = "0.1.0" dependencies = [ + "flate2", "parity-scale-codec", "polkadot-node-jaeger", "polkadot-node-primitives", "polkadot-primitives", "sc-network", "strum 0.20.0", + "thiserror", ] [[package]] diff --git a/node/network/collator-protocol/src/collator_side.rs b/node/network/collator-protocol/src/collator_side.rs index ef061062ae1d..2ea8f3e4e655 100644 --- a/node/network/collator-protocol/src/collator_side.rs +++ b/node/network/collator-protocol/src/collator_side.rs @@ -501,11 +501,19 @@ async fn send_collation( receipt: CandidateReceipt, pov: PoV, ) { - let wire_message = protocol_v1::CollatorProtocolMessage::Collation( - request_id, - receipt, - pov, - ); + let pov = match protocol_v1::CompressedPoV::from_pov(&pov) { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + error = ?error, + "Failed to create `CompressedPov`", + ); + return + } + }; + + let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendCollationMessage( @@ -1280,7 +1288,7 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => { assert_eq!(req_id, request_id); assert_eq!(receipt, candidate); - assert_eq!(pov, pov_block); + assert_eq!(pov.as_pov().unwrap(), pov_block); } ); } diff --git a/node/network/collator-protocol/src/validator_side.rs b/node/network/collator-protocol/src/validator_side.rs index 23fde5c75c96..aae30519456c 100644 --- a/node/network/collator-protocol/src/validator_side.rs +++ b/node/network/collator-protocol/src/validator_side.rs @@ -353,7 +353,7 @@ async fn received_collation( origin: PeerId, request_id: RequestId, receipt: CandidateReceipt, - pov: PoV, + pov: protocol_v1::CompressedPoV, ) where Context: SubsystemContext @@ -368,6 +368,21 @@ where if let Some(per_request) = state.requests_info.remove(&id) { let _ = per_request.received.send(()); if let Some(collator_id) = state.known_collators.get(&origin) { + let pov = match pov.as_pov() { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + %request_id, + ?error, + "Failed to extract PoV", + ); + return; + } + }; + + let _span = jaeger::pov_span(&pov, "received-collation"); + tracing::debug!( target: LOG_TARGET, %request_id, @@ -529,9 +544,8 @@ where modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; } Collation(request_id, receipt, pov) => { - let _span1 = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent) + let _span = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent) .map(|s| s.child("received-collation")); - let _span2 = jaeger::pov_span(&pov, "received-collation"); received_collation(ctx, state, origin, request_id, receipt, pov).await; } } @@ -1295,9 +1309,9 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation( request_id, candidate_a.clone(), - PoV { + protocol_v1::CompressedPoV::from_pov(&PoV { block_data: BlockData(vec![]), - }, + }).unwrap(), ) ) ) @@ -1333,9 +1347,9 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation( request_id, candidate_b.clone(), - PoV { + protocol_v1::CompressedPoV::from_pov(&PoV { block_data: BlockData(vec![1, 2, 3]), - }, + }).unwrap(), ) ) ) diff --git a/node/network/pov-distribution/src/lib.rs b/node/network/pov-distribution/src/lib.rs index 5ee1f14063e2..c7223190b6e6 100644 --- a/node/network/pov-distribution/src/lib.rs +++ b/node/network/pov-distribution/src/lib.rs @@ -106,7 +106,7 @@ struct State { } struct BlockBasedState { - known: HashMap>, + known: HashMap, protocol_v1::CompressedPoV)>, /// All the PoVs we are or were fetching, coupled with channels expecting the data. /// @@ -131,11 +131,13 @@ fn awaiting_message(relay_parent: Hash, awaiting: Vec) ) } -fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV) - -> protocol_v1::ValidationProtocol -{ +fn send_pov_message( + relay_parent: Hash, + pov_hash: Hash, + pov: &protocol_v1::CompressedPoV, +) -> protocol_v1::ValidationProtocol { protocol_v1::ValidationProtocol::PoVDistribution( - protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov) + protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov.clone()) ) } @@ -267,7 +269,7 @@ async fn distribute_to_awaiting( metrics: &Metrics, relay_parent: Hash, pov_hash: Hash, - pov: &PoV, + pov: &protocol_v1::CompressedPoV, ) { // Send to all peers who are awaiting the PoV and have that relay-parent in their view. // @@ -284,7 +286,7 @@ async fn distribute_to_awaiting( if peers_to_send.is_empty() { return; } - let payload = send_pov_message(relay_parent, pov_hash, pov.clone()); + let payload = send_pov_message(relay_parent, pov_hash, pov); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, @@ -379,7 +381,7 @@ async fn handle_fetch( None => return, }; - if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) { + if let Some((pov, _)) = relay_parent_state.known.get(&descriptor.pov_hash) { let _ = response_sender.send(pov.clone()); return; } @@ -468,7 +470,17 @@ async fn handle_distribute( } } - relay_parent_state.known.insert(descriptor.pov_hash, pov.clone()); + let encoded_pov = match protocol_v1::CompressedPoV::from_pov(&*pov) { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + error = ?error, + "Failed to create `CompressedPov`." + ); + return + } + }; distribute_to_awaiting( &mut state.peer_state, @@ -476,8 +488,10 @@ async fn handle_distribute( &state.metrics, relay_parent, descriptor.pov_hash, - &*pov, - ).await + &encoded_pov, + ).await; + + relay_parent_state.known.insert(descriptor.pov_hash, (pov, encoded_pov)); } /// Report a reputation change for a peer. @@ -527,8 +541,9 @@ async fn handle_awaiting( for pov_hash in pov_hashes { // For all requested PoV hashes, if we have it, we complete the request immediately. // Otherwise, we note that the peer is awaiting the PoV. - if let Some(pov) = relay_parent_state.known.get(&pov_hash) { - let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone()); + if let Some((_, ref pov)) = relay_parent_state.known.get(&pov_hash) { + let payload = send_pov_message(relay_parent, pov_hash, pov); + ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) )).await; @@ -544,23 +559,35 @@ async fn handle_awaiting( /// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not. /// /// Completes any requests awaiting that PoV. -#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, state, encoded_pov), fields(subsystem = LOG_TARGET))] async fn handle_incoming_pov( state: &mut State, ctx: &mut impl SubsystemContext, peer: PeerId, relay_parent: Hash, pov_hash: Hash, - pov: PoV, + encoded_pov: protocol_v1::CompressedPoV, ) { let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { - None => { + None => { report_peer(ctx, peer, COST_UNEXPECTED_POV).await; return; }, Some(r) => r, }; + let pov = match encoded_pov.as_pov() { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + error = ?error, + "Could not extract PoV", + ); + return; + } + }; + let pov = { // Do validity checks and complete all senders awaiting this PoV. let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) { @@ -607,8 +634,10 @@ async fn handle_incoming_pov( &state.metrics, relay_parent, pov_hash, - &*pov, - ).await + &encoded_pov, + ).await; + + relay_parent_state.known.insert(pov_hash, (pov, encoded_pov)); } /// Handles a newly connected validator in the context of some relay leaf. diff --git a/node/network/pov-distribution/src/tests.rs b/node/network/pov-distribution/src/tests.rs index 285f479e76ba..89033a0782dc 100644 --- a/node/network/pov-distribution/src/tests.rs +++ b/node/network/pov-distribution/src/tests.rs @@ -396,7 +396,11 @@ fn ask_validators_for_povs() { PoVDistributionMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerMessage( test_state.validator_peer_id[2].clone(), - protocol_v1::PoVDistributionMessage::SendPoV(current, pov_hash, pov_block.clone()), + protocol_v1::PoVDistributionMessage::SendPoV( + current, + pov_hash, + protocol_v1::CompressedPoV::from_pov(&pov_block).unwrap(), + ), ) ) ).await; @@ -631,7 +635,7 @@ fn distributes_to_those_awaiting_and_completes_local() { assert_eq!(peers, vec![peer_a.clone()]); assert_eq!( message, - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), ); } ) @@ -943,7 +947,7 @@ fn peer_complete_fetch_and_is_rewarded() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -952,7 +956,7 @@ fn peer_complete_fetch_and_is_rewarded() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_b.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1033,7 +1037,7 @@ fn peer_punished_for_sending_bad_pov() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, bad_pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&bad_pov).unwrap()), ).focus().unwrap(), ).await; @@ -1098,7 +1102,7 @@ fn peer_punished_for_sending_unexpected_pov() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1161,7 +1165,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_b, pov_hash, pov.clone()), + send_pov_message(hash_b, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1450,7 +1454,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1474,7 +1478,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { assert_eq!(peers, vec![peer_b.clone()]); assert_eq!( message, - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), ); } ); @@ -1534,7 +1538,7 @@ fn peer_completing_request_no_longer_awaiting() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), ).focus().unwrap(), ).await; diff --git a/node/network/protocol/Cargo.toml b/node/network/protocol/Cargo.toml index f06f2ccd4e06..3441e28a86b1 100644 --- a/node/network/protocol/Cargo.toml +++ b/node/network/protocol/Cargo.toml @@ -12,3 +12,5 @@ polkadot-node-jaeger = { path = "../../jaeger" } parity-scale-codec = { version = "1.3.6", default-features = false, features = ["derive"] } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } strum = { version = "0.20", features = ["derive"] } +thiserror = "1.0.23" +flate2 = "1.0.19" diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 937c17981a19..60741ff323b7 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -16,7 +16,7 @@ //! Network protocol types for parachains. -#![deny(unused_crate_dependencies, unused_results)] +#![deny(unused_crate_dependencies)] #![warn(missing_docs)] use polkadot_primitives::v1::{Hash, BlockNumber}; @@ -286,8 +286,9 @@ pub mod v1 { }; use polkadot_node_primitives::SignedFullStatement; use parity_scale_codec::{Encode, Decode}; - use std::convert::TryFrom; use super::RequestId; + use std::{convert::TryFrom, io::Write}; + use flate2::{Compression, write::{GzEncoder, GzDecoder}}; /// Network messages used by the availability distribution subsystem #[derive(Debug, Clone, Encode, Decode, PartialEq)] @@ -323,9 +324,9 @@ pub mod v1 { #[codec(index = "0")] Awaiting(Hash, Vec), /// Notification of an awaited PoV, in a given relay-parent context. - /// (relay_parent, pov_hash, pov) + /// (relay_parent, pov_hash, compressed_pov) #[codec(index = "1")] - SendPoV(Hash, Hash, PoV), + SendPoV(Hash, Hash, CompressedPoV), } /// Network messages used by the statement distribution subsystem. @@ -336,6 +337,40 @@ pub mod v1 { Statement(Hash, SignedFullStatement) } + #[derive(Debug, Clone, Copy, PartialEq, thiserror::Error)] + #[allow(missing_docs)] + pub enum CompressedPoVError { + #[error("Failed to compress a PoV")] + Compress, + #[error("Failed to uncompress a PoV")] + Uncompress, + #[error("Failed to decode the uncompressed PoV")] + Decode, + } + + /// SCALE and GZip encoded [`PoV`]. + #[derive(Debug, Clone, Encode, Decode, PartialEq)] + pub struct CompressedPoV(Vec); + + impl CompressedPoV { + /// Create from the given [`PoV`]. + pub fn from_pov(pov: &PoV) -> Result { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + pov.encode_to(&mut encoder); + encoder.finish().map(Self).map_err(|_| CompressedPoVError::Compress) + } + + /// Uncompress, decode and return the [`PoV`]. + pub fn as_pov(&self) -> Result { + let mut writer = Vec::new(); + let mut decoder = GzDecoder::new(&mut writer); + decoder.write_all(&self.0).map_err(|_| CompressedPoVError::Uncompress)?; + decoder.finish().map_err(|_| CompressedPoVError::Uncompress)?; + + PoV::decode(&mut &writer[..]).map_err(|_| CompressedPoVError::Decode) + } + } + /// Network messages used by the collator protocol subsystem #[derive(Debug, Clone, Encode, Decode, PartialEq)] pub enum CollatorProtocolMessage { @@ -351,7 +386,7 @@ pub mod v1 { RequestCollation(RequestId, Hash, ParaId), /// A requested collation. #[codec(index = "3")] - Collation(RequestId, CandidateReceipt, PoV), + Collation(RequestId, CandidateReceipt, CompressedPoV), } /// All network messages on the validation peer-set. diff --git a/roadmap/implementers-guide/src/types/network.md b/roadmap/implementers-guide/src/types/network.md index 79eab39002c6..bbb002f71ca4 100644 --- a/roadmap/implementers-guide/src/types/network.md +++ b/roadmap/implementers-guide/src/types/network.md @@ -19,6 +19,9 @@ enum ObservedRole { Full, Light, } + +/// SCALE and GZip encoded `PoV`. +struct CompressedPoV(Vec); ``` ## V1 Network Subsystem Message Types @@ -75,8 +78,8 @@ enum PoVDistributionV1Message { /// specific relay-parent hash. Awaiting(Hash, Vec), /// Notification of an awaited PoV, in a given relay-parent context. - /// (relay_parent, pov_hash, pov) - SendPoV(Hash, Hash, PoV), + /// (relay_parent, pov_hash, compressed_pov) + SendPoV(Hash, Hash, CompressedPoV), } ``` @@ -101,7 +104,7 @@ enum CollatorProtocolV1Message { /// Request the advertised collation at that relay-parent. RequestCollation(RequestId, Hash, ParaId), /// A requested collation. - Collation(RequestId, CandidateReceipt, PoV), + Collation(RequestId, CandidateReceipt, CompressedPoV), } ``` From 247ae6b0fa0e76fcdfde37d92132b0020b57f0e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 19 Jan 2021 13:23:18 +0100 Subject: [PATCH 2/9] Preallocate 1KB --- node/network/protocol/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 60741ff323b7..aed99bf4f926 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -355,14 +355,14 @@ pub mod v1 { impl CompressedPoV { /// Create from the given [`PoV`]. pub fn from_pov(pov: &PoV) -> Result { - let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + let mut encoder = GzEncoder::new(Vec::with_capacity(1024), Compression::default()); pov.encode_to(&mut encoder); encoder.finish().map(Self).map_err(|_| CompressedPoVError::Compress) } /// Uncompress, decode and return the [`PoV`]. pub fn as_pov(&self) -> Result { - let mut writer = Vec::new(); + let mut writer = Vec::with_capacity(1024); let mut decoder = GzDecoder::new(&mut writer); decoder.write_all(&self.0).map_err(|_| CompressedPoVError::Uncompress)?; decoder.finish().map_err(|_| CompressedPoVError::Uncompress)?; From f3eb3b59d4223041f225a2304afbf334b6da4697 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 19 Jan 2021 14:17:28 +0100 Subject: [PATCH 3/9] Try something.. --- Cargo.lock | 11 ++++++----- node/network/protocol/Cargo.toml | 2 +- scripts/gitlab/check_web_wasm.sh | 9 ++------- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d9cbfd7d4804..9b7d227800c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1463,11 +1463,11 @@ checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" [[package]] name = "flate2" -version = "1.0.19" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7411863d55df97a419aa64cb4d2f167103ea9d767e2c54a1868b7ac3f6b47129" +checksum = "68c90b0fc46cf89d227cc78b40e494ff81287a92dd07631e5af0d06fe3cf885e" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 0.1.10", "crc32fast", "libc", "libz-sys", @@ -3272,11 +3272,12 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.2" +version = "1.0.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "602113192b08db8f38796c4e85c39e960c145965140e918018bcde1952429655" +checksum = "2eb5e43362e38e2bca2fd5f5134c4d4564a23a5c28e9b95411652021a8675ebe" dependencies = [ "cc", + "libc", "pkg-config", "vcpkg", ] diff --git a/node/network/protocol/Cargo.toml b/node/network/protocol/Cargo.toml index 3441e28a86b1..66064850a769 100644 --- a/node/network/protocol/Cargo.toml +++ b/node/network/protocol/Cargo.toml @@ -13,4 +13,4 @@ parity-scale-codec = { version = "1.3.6", default-features = false, features = [ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } strum = { version = "0.20", features = ["derive"] } thiserror = "1.0.23" -flate2 = "1.0.19" +flate2 = "1.0.16" diff --git a/scripts/gitlab/check_web_wasm.sh b/scripts/gitlab/check_web_wasm.sh index 9d9006e908a7..0b9494713875 100755 --- a/scripts/gitlab/check_web_wasm.sh +++ b/scripts/gitlab/check_web_wasm.sh @@ -1,13 +1,8 @@ #!/usr/bin/env bash +set -e + #shellcheck source=lib.sh source "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )/lib.sh" -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path runtime/polkadot/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path runtime/kusama/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path erasure-coding/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path parachain/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path primitives/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path rpc/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path statement-table/Cargo.toml time cargo build --locked --target=wasm32-unknown-unknown --manifest-path cli/Cargo.toml --no-default-features --features browser From 3c5dd799cb7883b48f7ecb646e90a147652e1691 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 21 Jan 2021 15:16:40 +0100 Subject: [PATCH 4/9] Switch to zstd and some renamings --- Cargo.lock | 2 +- .../collator-protocol/src/collator_side.rs | 4 +-- .../collator-protocol/src/validator_side.rs | 6 ++-- node/network/pov-distribution/src/lib.rs | 4 +-- node/network/pov-distribution/src/tests.rs | 20 +++++------ node/network/protocol/Cargo.toml | 2 +- node/network/protocol/src/lib.rs | 35 ++++++++++--------- 7 files changed, 38 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9b7d227800c7..1c474323d91e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5270,7 +5270,6 @@ dependencies = [ name = "polkadot-node-network-protocol" version = "0.1.0" dependencies = [ - "flate2", "parity-scale-codec", "polkadot-node-jaeger", "polkadot-node-primitives", @@ -5278,6 +5277,7 @@ dependencies = [ "sc-network", "strum 0.20.0", "thiserror", + "zstd", ] [[package]] diff --git a/node/network/collator-protocol/src/collator_side.rs b/node/network/collator-protocol/src/collator_side.rs index 2ea8f3e4e655..d33a98637374 100644 --- a/node/network/collator-protocol/src/collator_side.rs +++ b/node/network/collator-protocol/src/collator_side.rs @@ -501,7 +501,7 @@ async fn send_collation( receipt: CandidateReceipt, pov: PoV, ) { - let pov = match protocol_v1::CompressedPoV::from_pov(&pov) { + let pov = match protocol_v1::CompressedPoV::compress(&pov) { Ok(pov) => pov, Err(error) => { tracing::debug!( @@ -1288,7 +1288,7 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => { assert_eq!(req_id, request_id); assert_eq!(receipt, candidate); - assert_eq!(pov.as_pov().unwrap(), pov_block); + assert_eq!(pov.decompress().unwrap(), pov_block); } ); } diff --git a/node/network/collator-protocol/src/validator_side.rs b/node/network/collator-protocol/src/validator_side.rs index aae30519456c..50203e4c71a7 100644 --- a/node/network/collator-protocol/src/validator_side.rs +++ b/node/network/collator-protocol/src/validator_side.rs @@ -368,7 +368,7 @@ where if let Some(per_request) = state.requests_info.remove(&id) { let _ = per_request.received.send(()); if let Some(collator_id) = state.known_collators.get(&origin) { - let pov = match pov.as_pov() { + let pov = match pov.decompress() { Ok(pov) => pov, Err(error) => { tracing::debug!( @@ -1309,7 +1309,7 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation( request_id, candidate_a.clone(), - protocol_v1::CompressedPoV::from_pov(&PoV { + protocol_v1::CompressedPoV::compress(&PoV { block_data: BlockData(vec![]), }).unwrap(), ) @@ -1347,7 +1347,7 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation( request_id, candidate_b.clone(), - protocol_v1::CompressedPoV::from_pov(&PoV { + protocol_v1::CompressedPoV::compress(&PoV { block_data: BlockData(vec![1, 2, 3]), }).unwrap(), ) diff --git a/node/network/pov-distribution/src/lib.rs b/node/network/pov-distribution/src/lib.rs index c7223190b6e6..4fe81ee0e1d2 100644 --- a/node/network/pov-distribution/src/lib.rs +++ b/node/network/pov-distribution/src/lib.rs @@ -470,7 +470,7 @@ async fn handle_distribute( } } - let encoded_pov = match protocol_v1::CompressedPoV::from_pov(&*pov) { + let encoded_pov = match protocol_v1::CompressedPoV::compress(&*pov) { Ok(pov) => pov, Err(error) => { tracing::debug!( @@ -576,7 +576,7 @@ async fn handle_incoming_pov( Some(r) => r, }; - let pov = match encoded_pov.as_pov() { + let pov = match encoded_pov.decompress() { Ok(pov) => pov, Err(error) => { tracing::debug!( diff --git a/node/network/pov-distribution/src/tests.rs b/node/network/pov-distribution/src/tests.rs index 89033a0782dc..afe83b6399eb 100644 --- a/node/network/pov-distribution/src/tests.rs +++ b/node/network/pov-distribution/src/tests.rs @@ -399,7 +399,7 @@ fn ask_validators_for_povs() { protocol_v1::PoVDistributionMessage::SendPoV( current, pov_hash, - protocol_v1::CompressedPoV::from_pov(&pov_block).unwrap(), + protocol_v1::CompressedPoV::compress(&pov_block).unwrap(), ), ) ) @@ -635,7 +635,7 @@ fn distributes_to_those_awaiting_and_completes_local() { assert_eq!(peers, vec![peer_a.clone()]); assert_eq!( message, - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ); } ) @@ -947,7 +947,7 @@ fn peer_complete_fetch_and_is_rewarded() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -956,7 +956,7 @@ fn peer_complete_fetch_and_is_rewarded() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_b.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1037,7 +1037,7 @@ fn peer_punished_for_sending_bad_pov() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&bad_pov).unwrap()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&bad_pov).unwrap()), ).focus().unwrap(), ).await; @@ -1102,7 +1102,7 @@ fn peer_punished_for_sending_unexpected_pov() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1165,7 +1165,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_b, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), + send_pov_message(hash_b, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1454,7 +1454,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1478,7 +1478,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { assert_eq!(peers, vec![peer_b.clone()]); assert_eq!( message, - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ); } ); @@ -1538,7 +1538,7 @@ fn peer_completing_request_no_longer_awaiting() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::from_pov(&pov).unwrap()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; diff --git a/node/network/protocol/Cargo.toml b/node/network/protocol/Cargo.toml index 66064850a769..08674e75a10d 100644 --- a/node/network/protocol/Cargo.toml +++ b/node/network/protocol/Cargo.toml @@ -13,4 +13,4 @@ parity-scale-codec = { version = "1.3.6", default-features = false, features = [ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } strum = { version = "0.20", features = ["derive"] } thiserror = "1.0.23" -flate2 = "1.0.16" +zstd = "0.5.0" diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index aed99bf4f926..3936905c78c6 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -287,8 +287,7 @@ pub mod v1 { use polkadot_node_primitives::SignedFullStatement; use parity_scale_codec::{Encode, Decode}; use super::RequestId; - use std::{convert::TryFrom, io::Write}; - use flate2::{Compression, write::{GzEncoder, GzDecoder}}; + use std::{convert::TryFrom, io::Read}; /// Network messages used by the availability distribution subsystem #[derive(Debug, Clone, Encode, Decode, PartialEq)] @@ -342,8 +341,8 @@ pub mod v1 { pub enum CompressedPoVError { #[error("Failed to compress a PoV")] Compress, - #[error("Failed to uncompress a PoV")] - Uncompress, + #[error("Failed to decompress a PoV")] + Decompress, #[error("Failed to decode the uncompressed PoV")] Decode, } @@ -353,21 +352,25 @@ pub mod v1 { pub struct CompressedPoV(Vec); impl CompressedPoV { - /// Create from the given [`PoV`]. - pub fn from_pov(pov: &PoV) -> Result { - let mut encoder = GzEncoder::new(Vec::with_capacity(1024), Compression::default()); - pov.encode_to(&mut encoder); - encoder.finish().map(Self).map_err(|_| CompressedPoVError::Compress) + /// Compress the given [`PoV`] and returns a [`CompressedPoV`]. + pub fn compress(pov: &PoV) -> Result { + zstd::encode_all(pov.encode().as_slice(), 3).map_err(|_| CompressedPoVError::Compress).map(Self) } - /// Uncompress, decode and return the [`PoV`]. - pub fn as_pov(&self) -> Result { - let mut writer = Vec::with_capacity(1024); - let mut decoder = GzDecoder::new(&mut writer); - decoder.write_all(&self.0).map_err(|_| CompressedPoVError::Uncompress)?; - decoder.finish().map_err(|_| CompressedPoVError::Uncompress)?; + /// Decompress `self` and returns the [`PoV`] on success. + pub fn decompress(&self) -> Result { + struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder); + impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> { + fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> { + self.0.read_exact(into).map_err(Into::into) + } + fn remaining_len(&mut self) -> Result, parity_scale_codec::Error> { + Ok(None) + } + } - PoV::decode(&mut &writer[..]).map_err(|_| CompressedPoVError::Decode) + let mut decoder = zstd::Decoder::new(self.0.as_slice()).map_err(|_| CompressedPoVError::Decompress)?; + PoV::decode(&mut InputDecoder(&mut decoder)).map_err(|_| CompressedPoVError::Decode) } } From 579ed3d4332c4b64d4e455698c6bfebfa60ed3c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 21 Jan 2021 15:37:14 +0100 Subject: [PATCH 5/9] Make compression/decompression fail in browsers --- node/network/protocol/Cargo.toml | 2 ++ node/network/protocol/src/lib.rs | 19 ++++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/node/network/protocol/Cargo.toml b/node/network/protocol/Cargo.toml index 08674e75a10d..f40b063a8e41 100644 --- a/node/network/protocol/Cargo.toml +++ b/node/network/protocol/Cargo.toml @@ -13,4 +13,6 @@ parity-scale-codec = { version = "1.3.6", default-features = false, features = [ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } strum = { version = "0.20", features = ["derive"] } thiserror = "1.0.23" + +[target.'cfg(not(target_os = "unknown"))'.dependencies] zstd = "0.5.0" diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 3936905c78c6..d40005ce4168 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -287,7 +287,7 @@ pub mod v1 { use polkadot_node_primitives::SignedFullStatement; use parity_scale_codec::{Encode, Decode}; use super::RequestId; - use std::{convert::TryFrom, io::Read}; + use std::convert::TryFrom; /// Network messages used by the availability distribution subsystem #[derive(Debug, Clone, Encode, Decode, PartialEq)] @@ -345,6 +345,8 @@ pub mod v1 { Decompress, #[error("Failed to decode the uncompressed PoV")] Decode, + #[error("Architecture is not supported")] + NotSupported, } /// SCALE and GZip encoded [`PoV`]. @@ -353,12 +355,21 @@ pub mod v1 { impl CompressedPoV { /// Compress the given [`PoV`] and returns a [`CompressedPoV`]. + #[cfg(not(target_os = "unknown"))] pub fn compress(pov: &PoV) -> Result { zstd::encode_all(pov.encode().as_slice(), 3).map_err(|_| CompressedPoVError::Compress).map(Self) } + /// Compress the given [`PoV`] and returns a [`CompressedPoV`]. + #[cfg(target_os = "unknown")] + pub fn compress(_: &PoV) -> Result { + Err(CompressedPoVError::NotSupported) + } + /// Decompress `self` and returns the [`PoV`] on success. + #[cfg(not(target_os = "unknown"))] pub fn decompress(&self) -> Result { + use std::io::Read; struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder); impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> { fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> { @@ -372,6 +383,12 @@ pub mod v1 { let mut decoder = zstd::Decoder::new(self.0.as_slice()).map_err(|_| CompressedPoVError::Decompress)?; PoV::decode(&mut InputDecoder(&mut decoder)).map_err(|_| CompressedPoVError::Decode) } + + /// Decompress `self` and returns the [`PoV`] on success. + #[cfg(target_os = "unknown")] + pub fn decompress(&self) -> Result { + Err(CompressedPoVError::NotSupported) + } } /// Network messages used by the collator protocol subsystem From 9bbd97e3bd33e01ac59d240a3b3814da5b1cf210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 21 Jan 2021 15:45:34 +0100 Subject: [PATCH 6/9] Use some sane maximum value --- node/network/protocol/src/lib.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index d40005ce4168..452780e813f4 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -370,9 +370,14 @@ pub mod v1 { #[cfg(not(target_os = "unknown"))] pub fn decompress(&self) -> Result { use std::io::Read; - struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder); + const MAX_POV_BLOCK_SIZE: usize = 32 * 1024 * 1024; + + struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder, usize); impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> { fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> { + if self.1.saturating_add(into.len()) > MAX_POV_BLOCK_SIZE { + return Err("pov block too big".into()) + } self.0.read_exact(into).map_err(Into::into) } fn remaining_len(&mut self) -> Result, parity_scale_codec::Error> { @@ -381,7 +386,7 @@ pub mod v1 { } let mut decoder = zstd::Decoder::new(self.0.as_slice()).map_err(|_| CompressedPoVError::Decompress)?; - PoV::decode(&mut InputDecoder(&mut decoder)).map_err(|_| CompressedPoVError::Decode) + PoV::decode(&mut InputDecoder(&mut decoder, 0)).map_err(|_| CompressedPoVError::Decode) } /// Decompress `self` and returns the [`PoV`] on success. From 9d35a3295463e3b5953d9721cfab275c5ab34e21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 21 Jan 2021 16:08:26 +0100 Subject: [PATCH 7/9] Update roadmap/implementers-guide/src/types/network.md Co-authored-by: Andronik Ordian --- roadmap/implementers-guide/src/types/network.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/roadmap/implementers-guide/src/types/network.md b/roadmap/implementers-guide/src/types/network.md index bbb002f71ca4..eb7bbe7070e9 100644 --- a/roadmap/implementers-guide/src/types/network.md +++ b/roadmap/implementers-guide/src/types/network.md @@ -20,7 +20,7 @@ enum ObservedRole { Light, } -/// SCALE and GZip encoded `PoV`. +/// SCALE and zstd encoded `PoV`. struct CompressedPoV(Vec); ``` From 3f9a2fce7b317ea75cf9a07a0f219ce3c49eaabd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 21 Jan 2021 16:50:15 +0100 Subject: [PATCH 8/9] Fix and add test --- node/network/protocol/src/lib.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 452780e813f4..e7dcf18d354d 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -349,7 +349,7 @@ pub mod v1 { NotSupported, } - /// SCALE and GZip encoded [`PoV`]. + /// SCALE and Zstd encoded [`PoV`]. #[derive(Debug, Clone, Encode, Decode, PartialEq)] pub struct CompressedPoV(Vec); @@ -375,7 +375,8 @@ pub mod v1 { struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder, usize); impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> { fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> { - if self.1.saturating_add(into.len()) > MAX_POV_BLOCK_SIZE { + self.1 = self.1.saturating_add(into.len()); + if self.1 > MAX_POV_BLOCK_SIZE { return Err("pov block too big".into()) } self.0.read_exact(into).map_err(Into::into) @@ -449,3 +450,17 @@ pub mod v1 { impl_try_from!(CollationProtocol, CollatorProtocol, CollatorProtocolMessage); } + +#[cfg(test)] +mod tests { + use polkadot_primitives::v1::PoV; + use super::v1::{CompressedPoV, CompressedPoVError}; + + #[test] + fn decompress_huge_pov_block_fails() { + let pov = PoV { block_data: vec![0; 63 * 1024 * 1024].into() }; + + let compressed = CompressedPoV::compress(&pov).unwrap(); + assert_eq!(CompressedPoVError::Decode, compressed.decompress().unwrap_err()); + } +} From 4547aefcec21edb27b602e70186e1f6940c08cfa Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 21 Jan 2021 12:19:15 -0500 Subject: [PATCH 9/9] add --- node/network/protocol/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 452780e813f4..c8af80b1e723 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -375,7 +375,8 @@ pub mod v1 { struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder, usize); impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> { fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> { - if self.1.saturating_add(into.len()) > MAX_POV_BLOCK_SIZE { + self.1 = self.1.saturating_add(into.len()); + if self.1 > MAX_POV_BLOCK_SIZE { return Err("pov block too big".into()) } self.0.read_exact(into).map_err(Into::into)