Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Move CompressedPoV to primitives.
Browse files Browse the repository at this point in the history
  • Loading branch information
eskimor committed Mar 10, 2021
1 parent 8f97e51 commit bb34beb
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 81 deletions.
81 changes: 0 additions & 81 deletions node/network/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,73 +357,6 @@ pub mod v1 {
Approvals(Vec<IndirectSignedApprovalVote>),
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
#[allow(missing_docs)]
pub enum CompressedPoVError {
#[error("Failed to compress a PoV")]
Compress,
#[error("Failed to decompress a PoV")]
Decompress,
#[error("Failed to decode the uncompressed PoV")]
Decode,
#[error("Architecture is not supported")]
NotSupported,
}

/// SCALE and Zstd encoded [`PoV`].
#[derive(Clone, Encode, Decode, PartialEq, Eq)]
pub struct CompressedPoV(Vec<u8>);

impl CompressedPoV {
/// Compress the given [`PoV`] and returns a [`CompressedPoV`].
#[cfg(not(target_os = "unknown"))]
pub fn compress(pov: &PoV) -> Result<Self, CompressedPoVError> {
zstd::encode_all(pov.encode().as_slice(), 3).map_err(|_| CompressedPoVError::Compress).map(Self)
}

/// Compress the given [`PoV`] and returns a [`CompressedPoV`].
#[cfg(target_os = "unknown")]
pub fn compress(_: &PoV) -> Result<Self, CompressedPoVError> {
Err(CompressedPoVError::NotSupported)
}

/// Decompress `self` and returns the [`PoV`] on success.
#[cfg(not(target_os = "unknown"))]
pub fn decompress(&self) -> Result<PoV, CompressedPoVError> {
use std::io::Read;
const MAX_POV_BLOCK_SIZE: usize = 32 * 1024 * 1024;

struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder<T>, usize);
impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> {
fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> {
self.1 = self.1.saturating_add(into.len());
if self.1 > MAX_POV_BLOCK_SIZE {
return Err("pov block too big".into())
}
self.0.read_exact(into).map_err(Into::into)
}
fn remaining_len(&mut self) -> Result<Option<usize>, parity_scale_codec::Error> {
Ok(None)
}
}

let mut decoder = zstd::Decoder::new(self.0.as_slice()).map_err(|_| CompressedPoVError::Decompress)?;
PoV::decode(&mut InputDecoder(&mut decoder, 0)).map_err(|_| CompressedPoVError::Decode)
}

/// Decompress `self` and returns the [`PoV`] on success.
#[cfg(target_os = "unknown")]
pub fn decompress(&self) -> Result<PoV, CompressedPoVError> {
Err(CompressedPoVError::NotSupported)
}
}

impl std::fmt::Debug for CompressedPoV {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CompressedPoV({} bytes)", self.0.len())
}
}

/// Network messages used by the collator protocol subsystem
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
pub enum CollatorProtocolMessage {
Expand Down Expand Up @@ -481,17 +414,3 @@ pub mod v1 {

impl_try_from!(CollationProtocol, CollatorProtocol, CollatorProtocolMessage);
}

#[cfg(test)]
mod tests {
use polkadot_primitives::v1::PoV;
use super::v1::{CompressedPoV, CompressedPoVError};

#[test]
fn decompress_huge_pov_block_fails() {
let pov = PoV { block_data: vec![0; 63 * 1024 * 1024].into() };

let compressed = CompressedPoV::compress(&pov).unwrap();
assert_eq!(CompressedPoVError::Decode, compressed.decompress().unwrap_err());
}
}
3 changes: 3 additions & 0 deletions primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ bitvec = { version = "0.20.1", default-features = false, features = ["alloc"] }
frame-system = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
hex-literal = "0.3.1"
parity-util-mem = { version = "0.9.0", default-features = false, optional = true }
thiserror = "1.0.23"
[target.'cfg(not(target_os = "unknown"))'.dependencies]
zstd = "0.5.0"

[dev-dependencies]
sp-serializer = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
77 changes: 77 additions & 0 deletions primitives/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,73 @@ impl PoV {
}
}

/// SCALE and Zstd encoded [`PoV`].
#[derive(Clone, Encode, Decode, PartialEq, Eq)]
pub struct CompressedPoV(Vec<u8>);

#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
#[allow(missing_docs)]
pub enum CompressedPoVError {
#[error("Failed to compress a PoV")]
Compress,
#[error("Failed to decompress a PoV")]
Decompress,
#[error("Failed to decode the uncompressed PoV")]
Decode,
#[error("Architecture is not supported")]
NotSupported,
}

impl CompressedPoV {
/// Compress the given [`PoV`] and returns a [`CompressedPoV`].
#[cfg(not(target_os = "unknown"))]
pub fn compress(pov: &PoV) -> Result<Self, CompressedPoVError> {
zstd::encode_all(pov.encode().as_slice(), 3).map_err(|_| CompressedPoVError::Compress).map(Self)
}

/// Compress the given [`PoV`] and returns a [`CompressedPoV`].
#[cfg(target_os = "unknown")]
pub fn compress(_: &PoV) -> Result<Self, CompressedPoVError> {
Err(CompressedPoVError::NotSupported)
}

/// Decompress `self` and returns the [`PoV`] on success.
#[cfg(not(target_os = "unknown"))]
pub fn decompress(&self) -> Result<PoV, CompressedPoVError> {
use std::io::Read;
const MAX_POV_BLOCK_SIZE: usize = 32 * 1024 * 1024;

struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder<T>, usize);
impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> {
fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> {
self.1 = self.1.saturating_add(into.len());
if self.1 > MAX_POV_BLOCK_SIZE {
return Err("pov block too big".into())
}
self.0.read_exact(into).map_err(Into::into)
}
fn remaining_len(&mut self) -> Result<Option<usize>, parity_scale_codec::Error> {
Ok(None)
}
}

let mut decoder = zstd::Decoder::new(self.0.as_slice()).map_err(|_| CompressedPoVError::Decompress)?;
PoV::decode(&mut InputDecoder(&mut decoder, 0)).map_err(|_| CompressedPoVError::Decode)
}

/// Decompress `self` and returns the [`PoV`] on success.
#[cfg(target_os = "unknown")]
pub fn decompress(&self) -> Result<PoV, CompressedPoVError> {
Err(CompressedPoVError::NotSupported)
}
}

impl std::fmt::Debug for CompressedPoV {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CompressedPoV({} bytes)", self.0.len())
}
}

/// A bitfield concerning availability of backed candidates.
#[derive(PartialEq, Eq, Clone, Encode, Decode, RuntimeDebug)]
pub struct AvailabilityBitfield(pub BitVec<bitvec::order::Lsb0, u8>);
Expand Down Expand Up @@ -982,6 +1049,7 @@ pub struct AbridgedHrmpChannel {
#[cfg(test)]
mod tests {
use super::*;
use super::{CompressedPoV, CompressedPoVError, PoV};

#[test]
fn group_rotation_info_calculations() {
Expand All @@ -1008,4 +1076,13 @@ mod tests {
&Hash::repeat_byte(3),
);
}


#[test]
fn decompress_huge_pov_block_fails() {
let pov = PoV { block_data: vec![0; 63 * 1024 * 1024].into() };

let compressed = CompressedPoV::compress(&pov).unwrap();
assert_eq!(CompressedPoVError::Decode, compressed.decompress().unwrap_err());
}
}

0 comments on commit bb34beb

Please sign in to comment.