Skip to content

Commit

Permalink
[feature] hyperledger-iroha#2127: Add sanity check to ensure that all…
Browse files Browse the repository at this point in the history
… data decoded by `parity_scale_codec` is consumed (hyperledger-iroha#2378)

Signed-off-by: Daniil Polyakov <[email protected]>
Signed-off-by: BAStos525 <[email protected]>
  • Loading branch information
Arjentix authored and BAStos525 committed Jul 8, 2022
1 parent 6305013 commit d782656
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 38 deletions.
6 changes: 4 additions & 2 deletions cli/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use core::result::Result;
use std::time::Duration;

use futures::{SinkExt, StreamExt};
use iroha_logger::prelude::*;
use iroha_version::prelude::*;

#[cfg(test)]
Expand Down Expand Up @@ -113,8 +114,9 @@ pub trait Stream<R: DecodeVersioned>:
return Err(Error::NonBinaryMessage);
}

Ok(R::decode_versioned(
subscription_request_message.as_bytes(),
Ok(try_decode_all_or_just_decode!(
R as "Message",
subscription_request_message.as_bytes()
)?)
}
}
Expand Down
9 changes: 5 additions & 4 deletions cli/src/torii/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::convert::Infallible;

use iroha_cli_derive::generate_endpoints;
use iroha_version::scale::DecodeVersioned;
use iroha_version::prelude::*;
use parity_scale_codec::Encode;
use warp::{hyper::body::Bytes, reply::Response, Filter, Rejection, Reply};

Expand Down Expand Up @@ -39,6 +39,7 @@ macro_rules! add_state {
pub mod body {
use iroha_core::smartcontracts::query::Error as QueryError;
use iroha_data_model::query::VersionedSignedQueryRequest;
use iroha_logger::warn;

use super::*;

Expand All @@ -57,8 +58,8 @@ pub mod body {
type Error = WarpQueryError;

fn try_from(body: &Bytes) -> Result<Self, Self::Error> {
let query = VersionedSignedQueryRequest::decode_versioned(body.as_ref())
.map_err(|e| WarpQueryError(Box::new(e).into()))?;
let res = try_decode_all_or_just_decode!(VersionedSignedQueryRequest, body.as_ref());
let query = res.map_err(|e| WarpQueryError(Box::new(e).into()))?;
let VersionedSignedQueryRequest::V1(query) = query;
Ok(Self::try_from(query)?)
}
Expand All @@ -74,7 +75,7 @@ pub mod body {
pub fn versioned<T: DecodeVersioned>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy
{
warp::body::bytes().and_then(|body: Bytes| async move {
DecodeVersioned::decode_versioned(body.as_ref()).map_err(warp::reject::custom)
try_decode_all_or_just_decode!(T as "Body", body.as_ref()).map_err(warp::reject::custom)
})
}
}
Expand Down
3 changes: 1 addition & 2 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ tungstenite = { version = "0.16" }
base64 = "0.13.0"
thiserror = "1.0.30"
derive_more = "0.99.17"
parity-scale-codec = { version = "2.3.1", default-features = false, features = ["derive"] }


[dev-dependencies]
iroha_permissions_validators = { version = "=2.0.0-pre-rc.5", path = "../permissions_validators" }
iroha = { path = "../cli", features = ["dev-telemetry", "telemetry"] }

test_network = { version = "=2.0.0-pre-rc.5", path = "../core/test_network" }

parity-scale-codec = { version = "2.3.1", default-features = false, features = ["derive"] }
tokio = { version = "1.6.0", features = ["rt", "rt-multi-thread"]}
criterion = { version = "0.3.5", features = ["html_reports"] }
color-eyre = "0.5.11"
Expand Down
31 changes: 21 additions & 10 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use iroha_data_model::{predicate::PredicateBox, prelude::*, query::SignedQueryRe
use iroha_logger::prelude::*;
use iroha_telemetry::metrics::Status;
use iroha_version::prelude::*;
use parity_scale_codec::DecodeAll;
use rand::Rng;
use serde::de::DeserializeOwned;
use small::SmallStr;
Expand Down Expand Up @@ -62,15 +63,25 @@ where
resp: &Response<Vec<u8>>,
) -> QueryHandlerResult<VersionedPaginatedQueryResult> {
match resp.status() {
StatusCode::OK => VersionedPaginatedQueryResult::decode_versioned(resp.body())
.wrap_err("Failed to decode response body as VersionedPaginatedQueryResult")
.map_err(Into::into),
StatusCode::OK => {
let res =
try_decode_all_or_just_decode!(VersionedPaginatedQueryResult, resp.body());
res.wrap_err(
"Failed to decode the whole response body as `VersionedPaginatedQueryResult`",
)
.map_err(Into::into)
}
StatusCode::BAD_REQUEST
| StatusCode::UNAUTHORIZED
| StatusCode::FORBIDDEN
| StatusCode::NOT_FOUND => {
let err = QueryError::decode(&mut resp.body().as_ref())
.wrap_err("Failed to decode response body as QueryError")?;
let mut res = QueryError::decode_all(resp.body().as_ref());
if res.is_err() {
warn!("Can't decode query error, not all bytes were consumed");
res = QueryError::decode(&mut resp.body().as_ref());
}
let err =
res.wrap_err("Failed to decode the whole response body as `QueryError`")?;
Err(ClientQueryError::QueryError(err))
}
_ => Err(ResponseReport::with_msg("Unexpected query response", resp).into()),
Expand Down Expand Up @@ -716,7 +727,7 @@ impl Client {

if response.status() == StatusCode::OK {
let pending_transactions =
VersionedPendingTransactions::decode_versioned(response.body())?;
try_decode_all_or_just_decode!(VersionedPendingTransactions, response.body())?;
let VersionedPendingTransactions::V1(pending_transactions) = pending_transactions;
let transaction = pending_transactions
.into_iter()
Expand Down Expand Up @@ -931,7 +942,8 @@ pub mod events_api {
Self::Next: FlowEvents,
{
if let EventPublisherMessage::SubscriptionAccepted =
VersionedEventPublisherMessage::decode_versioned(&message)?.into_v1()
try_decode_all_or_just_decode!(VersionedEventPublisherMessage, &message)?
.into_v1()
{
return Ok(Events);
}
Expand All @@ -948,9 +960,8 @@ pub mod events_api {

fn message(&self, message: Vec<u8>) -> Result<EventData<Self::Event>> {
let event_socket_message =
VersionedEventPublisherMessage::decode_versioned(&message)
.map(iroha_data_model::events::VersionedEventPublisherMessage::into_v1)
.map_err(Into::<eyre::Error>::into)?;
try_decode_all_or_just_decode!(VersionedEventPublisherMessage, &message)?
.into_v1();
let event = match event_socket_message {
EventPublisherMessage::Event(event) => event,
msg => return Err(eyre!("Expected Event but got {:?}", msg)),
Expand Down
9 changes: 7 additions & 2 deletions core/src/kura.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use futures::{Stream, StreamExt, TryStreamExt};
use iroha_actor::{broker::*, prelude::*};
use iroha_crypto::{HashOf, MerkleTree};
use iroha_logger::prelude::*;
use iroha_version::scale::{DecodeVersioned, EncodeVersioned};
use iroha_version::{
scale::{DecodeVersioned, EncodeVersioned},
try_decode_all_or_just_decode,
};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use tokio::{
Expand Down Expand Up @@ -404,7 +407,9 @@ impl<IO: DiskIO> BlockStore<IO> {
#[allow(clippy::cast_possible_truncation)]
buffer.resize(len as usize, 0);
let _len = file_stream.read_exact(&mut buffer).await?;
Ok(Some(VersionedCommittedBlock::decode_versioned(&buffer)?))

let block = try_decode_all_or_just_decode!(VersionedCommittedBlock, &buffer)?;
Ok(Some(block))
}

/// Converts raw file stream into stream of decoded blocks
Expand Down
2 changes: 1 addition & 1 deletion data_model/src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Events for streaming API.

#[cfg(not(feature = "std"))]
use alloc::{format, string::String, vec::Vec};
use alloc::{boxed::Box, format, string::String, vec::Vec};

use iroha_macro::FromVariant;
use iroha_schema::prelude::*;
Expand Down
2 changes: 1 addition & 1 deletion data_model/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#![allow(clippy::missing_inline_in_public_items)]

#[cfg(not(feature = "std"))]
use alloc::{format, string::String, vec::Vec};
use alloc::{boxed::Box, format, string::String, vec::Vec};

use iroha_crypto::SignatureOf;
use iroha_macro::FromVariant;
Expand Down
8 changes: 6 additions & 2 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use iroha_crypto::ursa::{
keys::{PrivateKey, PublicKey},
};
use iroha_logger::{debug, error, info, trace, warn};
use parity_scale_codec::{Decode, Encode};
use parity_scale_codec::{Decode, DecodeAll, Encode};
use rand::{Rng, RngCore};
use tokio::{
io,
Expand Down Expand Up @@ -546,7 +546,11 @@ where
}
}
};
let decoded: Result<T, _> = Decode::decode(&mut data.as_slice());
let mut decoded: Result<T, _> = DecodeAll::decode_all(data.as_slice());
if decoded.is_err() {
warn!("Error parsing message using all bytes");
decoded = Decode::decode(&mut data.as_slice());
}
match decoded {
Ok(decoded_data) => {
let message_with_data =
Expand Down
35 changes: 32 additions & 3 deletions version/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub fn version_with_json(attr: TokenStream, item: TokenStream) -> TokenStream {
///
/// Adds support for both scale codec and json serialization. To declare only with json support use [`declare_versioned_with_json`](`declare_versioned_with_json()`), for scale - [`declare_versioned_with_scale`](`declare_versioned_with_json()`).
///
/// It's a user responsibility to export `Box` so that this macro works properly
///
/// ### Arguments
/// 1. positional `versioned_enum_name`
/// 2. positional `supported_version_range`
Expand Down Expand Up @@ -246,7 +248,34 @@ fn impl_decode_versioned(enum_name: &Ident) -> proc_macro2::TokenStream {
let mut input = input.clone();
Ok(Self::decode(&mut input)?)
} else {
Err(Error::UnsupportedVersion(UnsupportedVersion::new(*version, RawVersioned::ScaleBytes(input.to_vec()))))
Err(Error::UnsupportedVersion(Box::new(UnsupportedVersion::new(
*version,
RawVersioned::ScaleBytes(input.to_vec())
))))
}
} else {
Err(Error::NotVersioned)
}
}

fn decode_all_versioned(input: &[u8]) -> iroha_version::error::Result<Self> {
use iroha_version::{error::Error, Version, UnsupportedVersion, RawVersioned};
use parity_scale_codec::Decode;

if let Some(version) = input.first() {
if Self::supported_versions().contains(version) {
let mut input = input.clone();
let obj = Self::decode(&mut input)?;
if input.is_empty() {
Ok(obj)
} else {
Err(Error::ExtraBytesLeft(input.len().try_into().expect("`u64` always fit in `usize`")))
}
} else {
Err(Error::UnsupportedVersion(Box::new(UnsupportedVersion::new(
*version,
RawVersioned::ScaleBytes(input.to_vec())
))))
}
} else {
Err(Error::NotVersioned)
Expand Down Expand Up @@ -278,9 +307,9 @@ fn impl_json(enum_name: &Ident, version_field_name: &str) -> proc_macro2::TokenS
if Self::supported_versions().contains(&version) {
Ok(serde_json::from_str(input)?)
} else {
Err(Error::UnsupportedVersion(
Err(Error::UnsupportedVersion(Box::new(
UnsupportedVersion::new(version, RawVersioned::Json(String::from(input)))
))
)))
}
} else {
Err(Error::NotVersioned)
Expand Down
72 changes: 61 additions & 11 deletions version/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize};
/// Module which contains error and result for versioning
pub mod error {
#[cfg(not(feature = "std"))]
use alloc::{format, string::String, vec::Vec};
use alloc::{borrow::ToOwned, boxed::Box, format, string::String, vec::Vec};
use core::fmt;

use iroha_macro::FromVariant;
Expand Down Expand Up @@ -53,7 +53,9 @@ pub mod error {
/// Problem with parsing integers
ParseInt,
/// Input version unsupported
UnsupportedVersion(UnsupportedVersion),
UnsupportedVersion(Box<UnsupportedVersion>),
/// Buffer is not empty after decoding. Returned by `decode_all_versioned()`
ExtraBytesLeft(u64),
}

#[cfg(feature = "json")]
Expand All @@ -79,20 +81,23 @@ pub mod error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let msg = match self {
Self::NotVersioned => "Not a versioned object",
Self::NotVersioned => "Not a versioned object".to_owned(),
Self::UnsupportedJsonEncode => {
"Cannot encode unsupported version from JSON to SCALE"
"Cannot encode unsupported version from JSON to SCALE".to_owned()
}
Self::ExpectedJson => "Expected JSON object",
Self::ExpectedJson => "Expected JSON object".to_owned(),
Self::UnsupportedScaleEncode => {
"Cannot encode unsupported version from SCALE to JSON"
"Cannot encode unsupported version from SCALE to JSON".to_owned()
}
#[cfg(feature = "json")]
Self::Serde => "JSON (de)serialization issue",
Self::Serde => "JSON (de)serialization issue".to_owned(),
#[cfg(feature = "scale")]
Self::ParityScale => "Parity SCALE (de)serialization issue",
Self::ParseInt => "Problem with parsing integers",
Self::UnsupportedVersion(_) => "Input version unsupported",
Self::ParityScale => "Parity SCALE (de)serialization issue".to_owned(),
Self::ParseInt => "Issue with parsing integers".to_owned(),
Self::UnsupportedVersion(v) => {
format!("Input version {} is unsupported", v.version)
}
Self::ExtraBytesLeft(n) => format!("Buffer contains {n} bytes after decoding"),
};

write!(f, "{}", msg)
Expand Down Expand Up @@ -196,15 +201,60 @@ pub mod scale {
/// Use this function for versioned objects instead of `decode`.
///
/// # Errors
/// Will return error if version is unsupported or if input won't have enough bytes for decoding.
/// - Version is unsupported
/// - Input won't have enough bytes for decoding
fn decode_versioned(input: &[u8]) -> Result<Self>;

/// Use this function for versioned objects instead of `decode_all`.
///
/// # Errors
/// - Version is unsupported
/// - Input won't have enough bytes for decoding
/// - Input has extra bytes
fn decode_all_versioned(input: &[u8]) -> Result<Self>;
}

/// [`Encode`] versioned analog.
pub trait EncodeVersioned: Encode + Version {
/// Use this function for versioned objects instead of `encode`.
fn encode_versioned(&self) -> Vec<u8>;
}

/// Try to decode type `t` from input `i` with [`DecodeVersioned::decode_all_versioned`]
/// and if it failed then print warning message to the log
/// and use [`DecodeVersioned::decode_versioned`].
///
/// Implemented as a macro so that warning message will be displayed
/// with the file name of calling side.
///
/// Will be removed in favor of just [`DecodeVersioned::decode_all_versioned`] in the future.
///
/// # Example
///
/// ```no_run
/// // Will print `Can't decode `i32`, not all bytes were consumed`
/// let n = try_decode_all_or_just_decode!(i32, &bytes)?;
///
/// // Will print `Can't decode `Message`, not all bytes were consumed`
/// let t = try_decode_all_or_just_decode!(T as "Message", &message_bytes)?;
/// ```
#[macro_export]
macro_rules! try_decode_all_or_just_decode {
($t:ty, $i:expr) => {
try_decode_all_or_just_decode!(impl $t, $i, stringify!(t))
};
($t:ty as $l:literal, $i:expr) => {
try_decode_all_or_just_decode!(impl $t, $i, $l)
};
(impl $t:ty, $i:expr, $n:expr) => {{
let mut res = <$t as DecodeVersioned>::decode_all_versioned($i);
if let Err(iroha_version::error::Error::ExtraBytesLeft(left)) = res {
warn!(left_bytes = %left, "Can't decode `{}`, not all bytes were consumed", $n);
res = <$t as DecodeVersioned>::decode_versioned($i);
}
res
}};
}
}

/// JSON related versioned (de)serialization traits.
Expand Down

0 comments on commit d782656

Please sign in to comment.