diff --git a/Cargo.lock b/Cargo.lock index 024d3641ed66..1c474323d91e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5067,7 +5067,7 @@ name = "polkadot-node-core-approval-voting" version = "0.1.0" dependencies = [ "bitvec", - "futures 0.3.8", + "futures 0.3.12", "parity-scale-codec", "polkadot-node-primitives", "polkadot-node-subsystem", @@ -5276,6 +5276,8 @@ dependencies = [ "polkadot-primitives", "sc-network", "strum 0.20.0", + "thiserror", + "zstd", ] [[package]] diff --git a/node/network/collator-protocol/src/collator_side.rs b/node/network/collator-protocol/src/collator_side.rs index ef061062ae1d..d33a98637374 100644 --- a/node/network/collator-protocol/src/collator_side.rs +++ b/node/network/collator-protocol/src/collator_side.rs @@ -501,11 +501,19 @@ async fn send_collation( receipt: CandidateReceipt, pov: PoV, ) { - let wire_message = protocol_v1::CollatorProtocolMessage::Collation( - request_id, - receipt, - pov, - ); + let pov = match protocol_v1::CompressedPoV::compress(&pov) { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + error = ?error, + "Failed to create `CompressedPov`", + ); + return + } + }; + + let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendCollationMessage( @@ -1280,7 +1288,7 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => { assert_eq!(req_id, request_id); assert_eq!(receipt, candidate); - assert_eq!(pov, pov_block); + assert_eq!(pov.decompress().unwrap(), pov_block); } ); } diff --git a/node/network/collator-protocol/src/validator_side.rs b/node/network/collator-protocol/src/validator_side.rs index 23fde5c75c96..50203e4c71a7 100644 --- a/node/network/collator-protocol/src/validator_side.rs +++ b/node/network/collator-protocol/src/validator_side.rs @@ -353,7 +353,7 @@ async fn received_collation( origin: PeerId, request_id: RequestId, receipt: CandidateReceipt, - pov: PoV, + pov: protocol_v1::CompressedPoV, ) where Context: SubsystemContext @@ -368,6 +368,21 @@ where if let Some(per_request) = state.requests_info.remove(&id) { let _ = per_request.received.send(()); if let Some(collator_id) = state.known_collators.get(&origin) { + let pov = match pov.decompress() { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + %request_id, + ?error, + "Failed to extract PoV", + ); + return; + } + }; + + let _span = jaeger::pov_span(&pov, "received-collation"); + tracing::debug!( target: LOG_TARGET, %request_id, @@ -529,9 +544,8 @@ where modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; } Collation(request_id, receipt, pov) => { - let _span1 = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent) + let _span = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent) .map(|s| s.child("received-collation")); - let _span2 = jaeger::pov_span(&pov, "received-collation"); received_collation(ctx, state, origin, request_id, receipt, pov).await; } } @@ -1295,9 +1309,9 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation( request_id, candidate_a.clone(), - PoV { + protocol_v1::CompressedPoV::compress(&PoV { block_data: BlockData(vec![]), - }, + }).unwrap(), ) ) ) @@ -1333,9 +1347,9 @@ mod tests { protocol_v1::CollatorProtocolMessage::Collation( request_id, candidate_b.clone(), - PoV { + protocol_v1::CompressedPoV::compress(&PoV { block_data: BlockData(vec![1, 2, 3]), - }, + }).unwrap(), ) ) ) diff --git a/node/network/pov-distribution/src/lib.rs b/node/network/pov-distribution/src/lib.rs index 5ee1f14063e2..4fe81ee0e1d2 100644 --- a/node/network/pov-distribution/src/lib.rs +++ b/node/network/pov-distribution/src/lib.rs @@ -106,7 +106,7 @@ struct State { } struct BlockBasedState { - known: HashMap>, + known: HashMap, protocol_v1::CompressedPoV)>, /// All the PoVs we are or were fetching, coupled with channels expecting the data. /// @@ -131,11 +131,13 @@ fn awaiting_message(relay_parent: Hash, awaiting: Vec) ) } -fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV) - -> protocol_v1::ValidationProtocol -{ +fn send_pov_message( + relay_parent: Hash, + pov_hash: Hash, + pov: &protocol_v1::CompressedPoV, +) -> protocol_v1::ValidationProtocol { protocol_v1::ValidationProtocol::PoVDistribution( - protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov) + protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov.clone()) ) } @@ -267,7 +269,7 @@ async fn distribute_to_awaiting( metrics: &Metrics, relay_parent: Hash, pov_hash: Hash, - pov: &PoV, + pov: &protocol_v1::CompressedPoV, ) { // Send to all peers who are awaiting the PoV and have that relay-parent in their view. // @@ -284,7 +286,7 @@ async fn distribute_to_awaiting( if peers_to_send.is_empty() { return; } - let payload = send_pov_message(relay_parent, pov_hash, pov.clone()); + let payload = send_pov_message(relay_parent, pov_hash, pov); ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, @@ -379,7 +381,7 @@ async fn handle_fetch( None => return, }; - if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) { + if let Some((pov, _)) = relay_parent_state.known.get(&descriptor.pov_hash) { let _ = response_sender.send(pov.clone()); return; } @@ -468,7 +470,17 @@ async fn handle_distribute( } } - relay_parent_state.known.insert(descriptor.pov_hash, pov.clone()); + let encoded_pov = match protocol_v1::CompressedPoV::compress(&*pov) { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + error = ?error, + "Failed to create `CompressedPov`." + ); + return + } + }; distribute_to_awaiting( &mut state.peer_state, @@ -476,8 +488,10 @@ async fn handle_distribute( &state.metrics, relay_parent, descriptor.pov_hash, - &*pov, - ).await + &encoded_pov, + ).await; + + relay_parent_state.known.insert(descriptor.pov_hash, (pov, encoded_pov)); } /// Report a reputation change for a peer. @@ -527,8 +541,9 @@ async fn handle_awaiting( for pov_hash in pov_hashes { // For all requested PoV hashes, if we have it, we complete the request immediately. // Otherwise, we note that the peer is awaiting the PoV. - if let Some(pov) = relay_parent_state.known.get(&pov_hash) { - let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone()); + if let Some((_, ref pov)) = relay_parent_state.known.get(&pov_hash) { + let payload = send_pov_message(relay_parent, pov_hash, pov); + ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) )).await; @@ -544,23 +559,35 @@ async fn handle_awaiting( /// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not. /// /// Completes any requests awaiting that PoV. -#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, state, encoded_pov), fields(subsystem = LOG_TARGET))] async fn handle_incoming_pov( state: &mut State, ctx: &mut impl SubsystemContext, peer: PeerId, relay_parent: Hash, pov_hash: Hash, - pov: PoV, + encoded_pov: protocol_v1::CompressedPoV, ) { let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { - None => { + None => { report_peer(ctx, peer, COST_UNEXPECTED_POV).await; return; }, Some(r) => r, }; + let pov = match encoded_pov.decompress() { + Ok(pov) => pov, + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + error = ?error, + "Could not extract PoV", + ); + return; + } + }; + let pov = { // Do validity checks and complete all senders awaiting this PoV. let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) { @@ -607,8 +634,10 @@ async fn handle_incoming_pov( &state.metrics, relay_parent, pov_hash, - &*pov, - ).await + &encoded_pov, + ).await; + + relay_parent_state.known.insert(pov_hash, (pov, encoded_pov)); } /// Handles a newly connected validator in the context of some relay leaf. diff --git a/node/network/pov-distribution/src/tests.rs b/node/network/pov-distribution/src/tests.rs index 285f479e76ba..afe83b6399eb 100644 --- a/node/network/pov-distribution/src/tests.rs +++ b/node/network/pov-distribution/src/tests.rs @@ -396,7 +396,11 @@ fn ask_validators_for_povs() { PoVDistributionMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerMessage( test_state.validator_peer_id[2].clone(), - protocol_v1::PoVDistributionMessage::SendPoV(current, pov_hash, pov_block.clone()), + protocol_v1::PoVDistributionMessage::SendPoV( + current, + pov_hash, + protocol_v1::CompressedPoV::compress(&pov_block).unwrap(), + ), ) ) ).await; @@ -631,7 +635,7 @@ fn distributes_to_those_awaiting_and_completes_local() { assert_eq!(peers, vec![peer_a.clone()]); assert_eq!( message, - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ); } ) @@ -943,7 +947,7 @@ fn peer_complete_fetch_and_is_rewarded() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -952,7 +956,7 @@ fn peer_complete_fetch_and_is_rewarded() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_b.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1033,7 +1037,7 @@ fn peer_punished_for_sending_bad_pov() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, bad_pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&bad_pov).unwrap()), ).focus().unwrap(), ).await; @@ -1098,7 +1102,7 @@ fn peer_punished_for_sending_unexpected_pov() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1161,7 +1165,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_b, pov_hash, pov.clone()), + send_pov_message(hash_b, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1450,7 +1454,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; @@ -1474,7 +1478,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { assert_eq!(peers, vec![peer_b.clone()]); assert_eq!( message, - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ); } ); @@ -1534,7 +1538,7 @@ fn peer_completing_request_no_longer_awaiting() { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - send_pov_message(hash_a, pov_hash, pov.clone()), + send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()), ).focus().unwrap(), ).await; diff --git a/node/network/protocol/Cargo.toml b/node/network/protocol/Cargo.toml index f06f2ccd4e06..f40b063a8e41 100644 --- a/node/network/protocol/Cargo.toml +++ b/node/network/protocol/Cargo.toml @@ -12,3 +12,7 @@ polkadot-node-jaeger = { path = "../../jaeger" } parity-scale-codec = { version = "1.3.6", default-features = false, features = ["derive"] } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } strum = { version = "0.20", features = ["derive"] } +thiserror = "1.0.23" + +[target.'cfg(not(target_os = "unknown"))'.dependencies] +zstd = "0.5.0" diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 937c17981a19..e7dcf18d354d 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -16,7 +16,7 @@ //! Network protocol types for parachains. -#![deny(unused_crate_dependencies, unused_results)] +#![deny(unused_crate_dependencies)] #![warn(missing_docs)] use polkadot_primitives::v1::{Hash, BlockNumber}; @@ -286,8 +286,8 @@ pub mod v1 { }; use polkadot_node_primitives::SignedFullStatement; use parity_scale_codec::{Encode, Decode}; - use std::convert::TryFrom; use super::RequestId; + use std::convert::TryFrom; /// Network messages used by the availability distribution subsystem #[derive(Debug, Clone, Encode, Decode, PartialEq)] @@ -323,9 +323,9 @@ pub mod v1 { #[codec(index = "0")] Awaiting(Hash, Vec), /// Notification of an awaited PoV, in a given relay-parent context. - /// (relay_parent, pov_hash, pov) + /// (relay_parent, pov_hash, compressed_pov) #[codec(index = "1")] - SendPoV(Hash, Hash, PoV), + SendPoV(Hash, Hash, CompressedPoV), } /// Network messages used by the statement distribution subsystem. @@ -336,6 +336,67 @@ pub mod v1 { Statement(Hash, SignedFullStatement) } + #[derive(Debug, Clone, Copy, PartialEq, thiserror::Error)] + #[allow(missing_docs)] + pub enum CompressedPoVError { + #[error("Failed to compress a PoV")] + Compress, + #[error("Failed to decompress a PoV")] + Decompress, + #[error("Failed to decode the uncompressed PoV")] + Decode, + #[error("Architecture is not supported")] + NotSupported, + } + + /// SCALE and Zstd encoded [`PoV`]. + #[derive(Debug, Clone, Encode, Decode, PartialEq)] + pub struct CompressedPoV(Vec); + + impl CompressedPoV { + /// Compress the given [`PoV`] and returns a [`CompressedPoV`]. + #[cfg(not(target_os = "unknown"))] + pub fn compress(pov: &PoV) -> Result { + zstd::encode_all(pov.encode().as_slice(), 3).map_err(|_| CompressedPoVError::Compress).map(Self) + } + + /// Compress the given [`PoV`] and returns a [`CompressedPoV`]. + #[cfg(target_os = "unknown")] + pub fn compress(_: &PoV) -> Result { + Err(CompressedPoVError::NotSupported) + } + + /// Decompress `self` and returns the [`PoV`] on success. + #[cfg(not(target_os = "unknown"))] + pub fn decompress(&self) -> Result { + use std::io::Read; + const MAX_POV_BLOCK_SIZE: usize = 32 * 1024 * 1024; + + struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder, usize); + impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> { + fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> { + self.1 = self.1.saturating_add(into.len()); + if self.1 > MAX_POV_BLOCK_SIZE { + return Err("pov block too big".into()) + } + self.0.read_exact(into).map_err(Into::into) + } + fn remaining_len(&mut self) -> Result, parity_scale_codec::Error> { + Ok(None) + } + } + + let mut decoder = zstd::Decoder::new(self.0.as_slice()).map_err(|_| CompressedPoVError::Decompress)?; + PoV::decode(&mut InputDecoder(&mut decoder, 0)).map_err(|_| CompressedPoVError::Decode) + } + + /// Decompress `self` and returns the [`PoV`] on success. + #[cfg(target_os = "unknown")] + pub fn decompress(&self) -> Result { + Err(CompressedPoVError::NotSupported) + } + } + /// Network messages used by the collator protocol subsystem #[derive(Debug, Clone, Encode, Decode, PartialEq)] pub enum CollatorProtocolMessage { @@ -351,7 +412,7 @@ pub mod v1 { RequestCollation(RequestId, Hash, ParaId), /// A requested collation. #[codec(index = "3")] - Collation(RequestId, CandidateReceipt, PoV), + Collation(RequestId, CandidateReceipt, CompressedPoV), } /// All network messages on the validation peer-set. @@ -389,3 +450,17 @@ pub mod v1 { impl_try_from!(CollationProtocol, CollatorProtocol, CollatorProtocolMessage); } + +#[cfg(test)] +mod tests { + use polkadot_primitives::v1::PoV; + use super::v1::{CompressedPoV, CompressedPoVError}; + + #[test] + fn decompress_huge_pov_block_fails() { + let pov = PoV { block_data: vec![0; 63 * 1024 * 1024].into() }; + + let compressed = CompressedPoV::compress(&pov).unwrap(); + assert_eq!(CompressedPoVError::Decode, compressed.decompress().unwrap_err()); + } +} diff --git a/roadmap/implementers-guide/src/types/network.md b/roadmap/implementers-guide/src/types/network.md index 79eab39002c6..eb7bbe7070e9 100644 --- a/roadmap/implementers-guide/src/types/network.md +++ b/roadmap/implementers-guide/src/types/network.md @@ -19,6 +19,9 @@ enum ObservedRole { Full, Light, } + +/// SCALE and zstd encoded `PoV`. +struct CompressedPoV(Vec); ``` ## V1 Network Subsystem Message Types @@ -75,8 +78,8 @@ enum PoVDistributionV1Message { /// specific relay-parent hash. Awaiting(Hash, Vec), /// Notification of an awaited PoV, in a given relay-parent context. - /// (relay_parent, pov_hash, pov) - SendPoV(Hash, Hash, PoV), + /// (relay_parent, pov_hash, compressed_pov) + SendPoV(Hash, Hash, CompressedPoV), } ``` @@ -101,7 +104,7 @@ enum CollatorProtocolV1Message { /// Request the advertised collation at that relay-parent. RequestCollation(RequestId, Hash, ParaId), /// A requested collation. - Collation(RequestId, CandidateReceipt, PoV), + Collation(RequestId, CandidateReceipt, CompressedPoV), } ``` diff --git a/scripts/gitlab/check_web_wasm.sh b/scripts/gitlab/check_web_wasm.sh index 9d9006e908a7..0b9494713875 100755 --- a/scripts/gitlab/check_web_wasm.sh +++ b/scripts/gitlab/check_web_wasm.sh @@ -1,13 +1,8 @@ #!/usr/bin/env bash +set -e + #shellcheck source=lib.sh source "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )/lib.sh" -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path runtime/polkadot/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path runtime/kusama/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path erasure-coding/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path parachain/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path primitives/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path rpc/Cargo.toml -time cargo build --locked --target=wasm32-unknown-unknown --manifest-path statement-table/Cargo.toml time cargo build --locked --target=wasm32-unknown-unknown --manifest-path cli/Cargo.toml --no-default-features --features browser