From d78265602eab82039d593cc55bc20c47df1b98d8 Mon Sep 17 00:00:00 2001 From: Daniil Polyakov Date: Mon, 27 Jun 2022 11:55:31 +0300 Subject: [PATCH] [feature] #2127: Add sanity check to ensure that all data decoded by `parity_scale_codec` is consumed (#2378) Signed-off-by: Daniil Polyakov Signed-off-by: BAStos525 --- cli/src/stream.rs | 6 ++- cli/src/torii/utils.rs | 9 +++-- client/Cargo.toml | 3 +- client/src/client.rs | 31 +++++++++++----- core/src/kura.rs | 9 ++++- data_model/src/events/mod.rs | 2 +- data_model/src/query.rs | 2 +- p2p/src/peer.rs | 8 +++- version/derive/src/lib.rs | 35 ++++++++++++++++-- version/src/lib.rs | 72 ++++++++++++++++++++++++++++++------ 10 files changed, 139 insertions(+), 38 deletions(-) diff --git a/cli/src/stream.rs b/cli/src/stream.rs index f9c94dd2d33..b0abafcd4a3 100644 --- a/cli/src/stream.rs +++ b/cli/src/stream.rs @@ -6,6 +6,7 @@ use core::result::Result; use std::time::Duration; use futures::{SinkExt, StreamExt}; +use iroha_logger::prelude::*; use iroha_version::prelude::*; #[cfg(test)] @@ -113,8 +114,9 @@ pub trait Stream: return Err(Error::NonBinaryMessage); } - Ok(R::decode_versioned( - subscription_request_message.as_bytes(), + Ok(try_decode_all_or_just_decode!( + R as "Message", + subscription_request_message.as_bytes() )?) } } diff --git a/cli/src/torii/utils.rs b/cli/src/torii/utils.rs index f59988904c7..f8072016fd5 100644 --- a/cli/src/torii/utils.rs +++ b/cli/src/torii/utils.rs @@ -1,7 +1,7 @@ use std::convert::Infallible; use iroha_cli_derive::generate_endpoints; -use iroha_version::scale::DecodeVersioned; +use iroha_version::prelude::*; use parity_scale_codec::Encode; use warp::{hyper::body::Bytes, reply::Response, Filter, Rejection, Reply}; @@ -39,6 +39,7 @@ macro_rules! add_state { pub mod body { use iroha_core::smartcontracts::query::Error as QueryError; use iroha_data_model::query::VersionedSignedQueryRequest; + use iroha_logger::warn; use super::*; @@ -57,8 +58,8 @@ pub mod body { type Error = WarpQueryError; fn try_from(body: &Bytes) -> Result { - let query = VersionedSignedQueryRequest::decode_versioned(body.as_ref()) - .map_err(|e| WarpQueryError(Box::new(e).into()))?; + let res = try_decode_all_or_just_decode!(VersionedSignedQueryRequest, body.as_ref()); + let query = res.map_err(|e| WarpQueryError(Box::new(e).into()))?; let VersionedSignedQueryRequest::V1(query) = query; Ok(Self::try_from(query)?) } @@ -74,7 +75,7 @@ pub mod body { pub fn versioned() -> impl Filter + Copy { warp::body::bytes().and_then(|body: Bytes| async move { - DecodeVersioned::decode_versioned(body.as_ref()).map_err(warp::reject::custom) + try_decode_all_or_just_decode!(T as "Body", body.as_ref()).map_err(warp::reject::custom) }) } } diff --git a/client/Cargo.toml b/client/Cargo.toml index b5cc6af2c23..2cb73ea80cf 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -35,6 +35,7 @@ tungstenite = { version = "0.16" } base64 = "0.13.0" thiserror = "1.0.30" derive_more = "0.99.17" +parity-scale-codec = { version = "2.3.1", default-features = false, features = ["derive"] } [dev-dependencies] @@ -42,8 +43,6 @@ iroha_permissions_validators = { version = "=2.0.0-pre-rc.5", path = "../permiss iroha = { path = "../cli", features = ["dev-telemetry", "telemetry"] } test_network = { version = "=2.0.0-pre-rc.5", path = "../core/test_network" } - -parity-scale-codec = { version = "2.3.1", default-features = false, features = ["derive"] } tokio = { version = "1.6.0", features = ["rt", "rt-multi-thread"]} criterion = { version = "0.3.5", features = ["html_reports"] } color-eyre = "0.5.11" diff --git a/client/src/client.rs b/client/src/client.rs index 58409e3f4b3..064d082c039 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -14,6 +14,7 @@ use iroha_data_model::{predicate::PredicateBox, prelude::*, query::SignedQueryRe use iroha_logger::prelude::*; use iroha_telemetry::metrics::Status; use iroha_version::prelude::*; +use parity_scale_codec::DecodeAll; use rand::Rng; use serde::de::DeserializeOwned; use small::SmallStr; @@ -62,15 +63,25 @@ where resp: &Response>, ) -> QueryHandlerResult { match resp.status() { - StatusCode::OK => VersionedPaginatedQueryResult::decode_versioned(resp.body()) - .wrap_err("Failed to decode response body as VersionedPaginatedQueryResult") - .map_err(Into::into), + StatusCode::OK => { + let res = + try_decode_all_or_just_decode!(VersionedPaginatedQueryResult, resp.body()); + res.wrap_err( + "Failed to decode the whole response body as `VersionedPaginatedQueryResult`", + ) + .map_err(Into::into) + } StatusCode::BAD_REQUEST | StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND => { - let err = QueryError::decode(&mut resp.body().as_ref()) - .wrap_err("Failed to decode response body as QueryError")?; + let mut res = QueryError::decode_all(resp.body().as_ref()); + if res.is_err() { + warn!("Can't decode query error, not all bytes were consumed"); + res = QueryError::decode(&mut resp.body().as_ref()); + } + let err = + res.wrap_err("Failed to decode the whole response body as `QueryError`")?; Err(ClientQueryError::QueryError(err)) } _ => Err(ResponseReport::with_msg("Unexpected query response", resp).into()), @@ -716,7 +727,7 @@ impl Client { if response.status() == StatusCode::OK { let pending_transactions = - VersionedPendingTransactions::decode_versioned(response.body())?; + try_decode_all_or_just_decode!(VersionedPendingTransactions, response.body())?; let VersionedPendingTransactions::V1(pending_transactions) = pending_transactions; let transaction = pending_transactions .into_iter() @@ -931,7 +942,8 @@ pub mod events_api { Self::Next: FlowEvents, { if let EventPublisherMessage::SubscriptionAccepted = - VersionedEventPublisherMessage::decode_versioned(&message)?.into_v1() + try_decode_all_or_just_decode!(VersionedEventPublisherMessage, &message)? + .into_v1() { return Ok(Events); } @@ -948,9 +960,8 @@ pub mod events_api { fn message(&self, message: Vec) -> Result> { let event_socket_message = - VersionedEventPublisherMessage::decode_versioned(&message) - .map(iroha_data_model::events::VersionedEventPublisherMessage::into_v1) - .map_err(Into::::into)?; + try_decode_all_or_just_decode!(VersionedEventPublisherMessage, &message)? + .into_v1(); let event = match event_socket_message { EventPublisherMessage::Event(event) => event, msg => return Err(eyre!("Expected Event but got {:?}", msg)), diff --git a/core/src/kura.rs b/core/src/kura.rs index 45a051ad962..f1b112aef31 100644 --- a/core/src/kura.rs +++ b/core/src/kura.rs @@ -18,7 +18,10 @@ use futures::{Stream, StreamExt, TryStreamExt}; use iroha_actor::{broker::*, prelude::*}; use iroha_crypto::{HashOf, MerkleTree}; use iroha_logger::prelude::*; -use iroha_version::scale::{DecodeVersioned, EncodeVersioned}; +use iroha_version::{ + scale::{DecodeVersioned, EncodeVersioned}, + try_decode_all_or_just_decode, +}; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use tokio::{ @@ -404,7 +407,9 @@ impl BlockStore { #[allow(clippy::cast_possible_truncation)] buffer.resize(len as usize, 0); let _len = file_stream.read_exact(&mut buffer).await?; - Ok(Some(VersionedCommittedBlock::decode_versioned(&buffer)?)) + + let block = try_decode_all_or_just_decode!(VersionedCommittedBlock, &buffer)?; + Ok(Some(block)) } /// Converts raw file stream into stream of decoded blocks diff --git a/data_model/src/events/mod.rs b/data_model/src/events/mod.rs index e23637eceac..a7efb507f2c 100644 --- a/data_model/src/events/mod.rs +++ b/data_model/src/events/mod.rs @@ -1,7 +1,7 @@ //! Events for streaming API. #[cfg(not(feature = "std"))] -use alloc::{format, string::String, vec::Vec}; +use alloc::{boxed::Box, format, string::String, vec::Vec}; use iroha_macro::FromVariant; use iroha_schema::prelude::*; diff --git a/data_model/src/query.rs b/data_model/src/query.rs index e77409fc9a9..2d7154c59e8 100644 --- a/data_model/src/query.rs +++ b/data_model/src/query.rs @@ -3,7 +3,7 @@ #![allow(clippy::missing_inline_in_public_items)] #[cfg(not(feature = "std"))] -use alloc::{format, string::String, vec::Vec}; +use alloc::{boxed::Box, format, string::String, vec::Vec}; use iroha_crypto::SignatureOf; use iroha_macro::FromVariant; diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index eaf9e9e6c6c..8063ad0f9ca 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -12,7 +12,7 @@ use iroha_crypto::ursa::{ keys::{PrivateKey, PublicKey}, }; use iroha_logger::{debug, error, info, trace, warn}; -use parity_scale_codec::{Decode, Encode}; +use parity_scale_codec::{Decode, DecodeAll, Encode}; use rand::{Rng, RngCore}; use tokio::{ io, @@ -546,7 +546,11 @@ where } } }; - let decoded: Result = Decode::decode(&mut data.as_slice()); + let mut decoded: Result = DecodeAll::decode_all(data.as_slice()); + if decoded.is_err() { + warn!("Error parsing message using all bytes"); + decoded = Decode::decode(&mut data.as_slice()); + } match decoded { Ok(decoded_data) => { let message_with_data = diff --git a/version/derive/src/lib.rs b/version/derive/src/lib.rs index f373cfc4afa..91b84ecb6c0 100644 --- a/version/derive/src/lib.rs +++ b/version/derive/src/lib.rs @@ -62,6 +62,8 @@ pub fn version_with_json(attr: TokenStream, item: TokenStream) -> TokenStream { /// /// Adds support for both scale codec and json serialization. To declare only with json support use [`declare_versioned_with_json`](`declare_versioned_with_json()`), for scale - [`declare_versioned_with_scale`](`declare_versioned_with_json()`). /// +/// It's a user responsibility to export `Box` so that this macro works properly +/// /// ### Arguments /// 1. positional `versioned_enum_name` /// 2. positional `supported_version_range` @@ -246,7 +248,34 @@ fn impl_decode_versioned(enum_name: &Ident) -> proc_macro2::TokenStream { let mut input = input.clone(); Ok(Self::decode(&mut input)?) } else { - Err(Error::UnsupportedVersion(UnsupportedVersion::new(*version, RawVersioned::ScaleBytes(input.to_vec())))) + Err(Error::UnsupportedVersion(Box::new(UnsupportedVersion::new( + *version, + RawVersioned::ScaleBytes(input.to_vec()) + )))) + } + } else { + Err(Error::NotVersioned) + } + } + + fn decode_all_versioned(input: &[u8]) -> iroha_version::error::Result { + use iroha_version::{error::Error, Version, UnsupportedVersion, RawVersioned}; + use parity_scale_codec::Decode; + + if let Some(version) = input.first() { + if Self::supported_versions().contains(version) { + let mut input = input.clone(); + let obj = Self::decode(&mut input)?; + if input.is_empty() { + Ok(obj) + } else { + Err(Error::ExtraBytesLeft(input.len().try_into().expect("`u64` always fit in `usize`"))) + } + } else { + Err(Error::UnsupportedVersion(Box::new(UnsupportedVersion::new( + *version, + RawVersioned::ScaleBytes(input.to_vec()) + )))) } } else { Err(Error::NotVersioned) @@ -278,9 +307,9 @@ fn impl_json(enum_name: &Ident, version_field_name: &str) -> proc_macro2::TokenS if Self::supported_versions().contains(&version) { Ok(serde_json::from_str(input)?) } else { - Err(Error::UnsupportedVersion( + Err(Error::UnsupportedVersion(Box::new( UnsupportedVersion::new(version, RawVersioned::Json(String::from(input))) - )) + ))) } } else { Err(Error::NotVersioned) diff --git a/version/src/lib.rs b/version/src/lib.rs index a9d4a1d4206..b822d870cec 100644 --- a/version/src/lib.rs +++ b/version/src/lib.rs @@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize}; /// Module which contains error and result for versioning pub mod error { #[cfg(not(feature = "std"))] - use alloc::{format, string::String, vec::Vec}; + use alloc::{borrow::ToOwned, boxed::Box, format, string::String, vec::Vec}; use core::fmt; use iroha_macro::FromVariant; @@ -53,7 +53,9 @@ pub mod error { /// Problem with parsing integers ParseInt, /// Input version unsupported - UnsupportedVersion(UnsupportedVersion), + UnsupportedVersion(Box), + /// Buffer is not empty after decoding. Returned by `decode_all_versioned()` + ExtraBytesLeft(u64), } #[cfg(feature = "json")] @@ -79,20 +81,23 @@ pub mod error { impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let msg = match self { - Self::NotVersioned => "Not a versioned object", + Self::NotVersioned => "Not a versioned object".to_owned(), Self::UnsupportedJsonEncode => { - "Cannot encode unsupported version from JSON to SCALE" + "Cannot encode unsupported version from JSON to SCALE".to_owned() } - Self::ExpectedJson => "Expected JSON object", + Self::ExpectedJson => "Expected JSON object".to_owned(), Self::UnsupportedScaleEncode => { - "Cannot encode unsupported version from SCALE to JSON" + "Cannot encode unsupported version from SCALE to JSON".to_owned() } #[cfg(feature = "json")] - Self::Serde => "JSON (de)serialization issue", + Self::Serde => "JSON (de)serialization issue".to_owned(), #[cfg(feature = "scale")] - Self::ParityScale => "Parity SCALE (de)serialization issue", - Self::ParseInt => "Problem with parsing integers", - Self::UnsupportedVersion(_) => "Input version unsupported", + Self::ParityScale => "Parity SCALE (de)serialization issue".to_owned(), + Self::ParseInt => "Issue with parsing integers".to_owned(), + Self::UnsupportedVersion(v) => { + format!("Input version {} is unsupported", v.version) + } + Self::ExtraBytesLeft(n) => format!("Buffer contains {n} bytes after decoding"), }; write!(f, "{}", msg) @@ -196,8 +201,17 @@ pub mod scale { /// Use this function for versioned objects instead of `decode`. /// /// # Errors - /// Will return error if version is unsupported or if input won't have enough bytes for decoding. + /// - Version is unsupported + /// - Input won't have enough bytes for decoding fn decode_versioned(input: &[u8]) -> Result; + + /// Use this function for versioned objects instead of `decode_all`. + /// + /// # Errors + /// - Version is unsupported + /// - Input won't have enough bytes for decoding + /// - Input has extra bytes + fn decode_all_versioned(input: &[u8]) -> Result; } /// [`Encode`] versioned analog. @@ -205,6 +219,42 @@ pub mod scale { /// Use this function for versioned objects instead of `encode`. fn encode_versioned(&self) -> Vec; } + + /// Try to decode type `t` from input `i` with [`DecodeVersioned::decode_all_versioned`] + /// and if it failed then print warning message to the log + /// and use [`DecodeVersioned::decode_versioned`]. + /// + /// Implemented as a macro so that warning message will be displayed + /// with the file name of calling side. + /// + /// Will be removed in favor of just [`DecodeVersioned::decode_all_versioned`] in the future. + /// + /// # Example + /// + /// ```no_run + /// // Will print `Can't decode `i32`, not all bytes were consumed` + /// let n = try_decode_all_or_just_decode!(i32, &bytes)?; + /// + /// // Will print `Can't decode `Message`, not all bytes were consumed` + /// let t = try_decode_all_or_just_decode!(T as "Message", &message_bytes)?; + /// ``` + #[macro_export] + macro_rules! try_decode_all_or_just_decode { + ($t:ty, $i:expr) => { + try_decode_all_or_just_decode!(impl $t, $i, stringify!(t)) + }; + ($t:ty as $l:literal, $i:expr) => { + try_decode_all_or_just_decode!(impl $t, $i, $l) + }; + (impl $t:ty, $i:expr, $n:expr) => {{ + let mut res = <$t as DecodeVersioned>::decode_all_versioned($i); + if let Err(iroha_version::error::Error::ExtraBytesLeft(left)) = res { + warn!(left_bytes = %left, "Can't decode `{}`, not all bytes were consumed", $n); + res = <$t as DecodeVersioned>::decode_versioned($i); + } + res + }}; + } } /// JSON related versioned (de)serialization traits.