From 860442e0028cc88230b23a6824f95440290db8a7 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 3 Nov 2024 15:40:13 -0600 Subject: [PATCH] removes repeated bincode serialization of gossip CrdsValues Gossip CrdsValues are deserialized when received as a gossip push message or pull response, but serialized again immediately to obtain a value hash during Crds::insert and then again serialized repeatedly every time the value is pushed to another node or returned as a response to a pull request. In order to avoid repeated serialization of a CrdsValue during its lifetime, the commit manually implements bincode (de)serialization of CrdsValue to hold on to bincode serialized bytes of CrdsData and reuses that for serializing CrdsValue. --- gossip/src/cluster_info.rs | 32 ++++--- gossip/src/crds_gossip_pull.rs | 6 +- gossip/src/crds_value.rs | 163 ++++++++++++++++++++++++++------- gossip/src/protocol.rs | 144 ++++++++++++++++++++++++----- 4 files changed, 273 insertions(+), 72 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index b6293da0487729..61542238b76634 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -88,7 +88,6 @@ use { collections::{HashMap, HashSet, VecDeque}, fmt::Debug, fs::{self, File}, - io::BufReader, iter::repeat, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, num::NonZeroUsize, @@ -346,12 +345,13 @@ impl ClusterInfo { return; } + let num_nodes = nodes.len(); let filename = self.contact_info_path.join("contact-info.bin"); let tmp_filename = &filename.with_extension("tmp"); match File::create(tmp_filename) { Ok(mut file) => { - if let Err(err) = bincode::serialize_into(&mut file, &nodes) { + if let Err(err) = CrdsValue::bincode_serialize_many(nodes, &mut file) { warn!( "Failed to serialize contact info info {}: {}", tmp_filename.display(), @@ -369,8 +369,7 @@ impl ClusterInfo { match fs::rename(tmp_filename, &filename) { Ok(()) => { info!( - "Saved contact info for {} nodes into {}", - nodes.len(), + "Saved contact info for {num_nodes} nodes into {}", filename.display() ); } @@ -394,13 +393,13 @@ impl ClusterInfo { return; } - let nodes: Vec = match File::open(&filename) { - Ok(file) => { - bincode::deserialize_from(&mut BufReader::new(file)).unwrap_or_else(|err| { + let nodes: Vec = match fs::read(&filename) { + Ok(bytes) => CrdsValue::bincode_deserialize_many(&bytes) + .collect::>() + .inspect_err(|err| { warn!("Failed to deserialize {}: {}", filename.display(), err); - vec![] }) - } + .unwrap_or_default(), Err(err) => { warn!("Failed to open {}: {}", filename.display(), err); vec![] @@ -2315,7 +2314,14 @@ impl ClusterInfo { } } let verify_packet = |packet: Packet| { - let protocol: Protocol = packet.deserialize_slice(..).ok()?; + let protocol = packet.data(..).map(Protocol::bincode_deserialize)?.ok()?; + // if let Some(Err(err)) = &protocol { + // error!( + // "Protocol::bincode_deserialize: {:?}, {err:?}", + // packet.data(..) + // ); + // } + // let protocol = protocol?.ok()?; protocol.sanitize().ok()?; let protocol = protocol.par_verify(&self.stats)?; Some((packet.meta().socket_addr(), protocol)) @@ -3284,8 +3290,10 @@ mod tests { ) { assert_eq!(packet.meta().socket_addr(), socket); let bytes = serialize(&pong).unwrap(); - match packet.deserialize_slice(..).unwrap() { - Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes), + match packet.data(..).map(Protocol::bincode_deserialize) { + Some(Ok(Protocol::PongMessage(pong))) => { + assert_eq!(serialize(&pong).unwrap(), bytes) + } _ => panic!("invalid packet!"), } } diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index f650d91497fbb5..eec1277880f3ec 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -1457,7 +1457,11 @@ pub(crate) mod tests { for filter in Vec::::from(filters) { let request_bytes = 4 + request_bytes + bincode::serialized_size(&filter).unwrap(); let request = Protocol::PullRequest(filter, caller.clone()); - let request = bincode::serialize(&request).unwrap(); + let request = { + let mut buffer = Vec::::new(); + request.bincode_serialize(&mut buffer).unwrap(); + buffer + }; assert!(packet_data_size_range.contains(&request.len())); assert_eq!(request.len() as u64, request_bytes); } diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index fb3061d1499f85..ca878316179129 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -5,26 +5,31 @@ use { duplicate_shred::DuplicateShredIndex, epoch_slots::EpochSlots, }, - bincode::serialize, + bincode::Options, + itertools::Either, rand::Rng, - serde::de::{Deserialize, Deserializer}, solana_sanitize::{Sanitize, SanitizeError}, solana_sdk::{ hash::Hash, + packet::Encode, pubkey::Pubkey, - signature::{Keypair, Signable, Signature, Signer}, + signature::{Keypair, Signable, Signature, Signer, SIGNATURE_BYTES}, + }, + std::{ + borrow::{Borrow, Cow}, + io::{Cursor, Error as IoError, ErrorKind as IoErrorKind, Write}, }, - std::borrow::{Borrow, Cow}, }; /// CrdsValue that is replicated across the cluster #[cfg_attr(feature = "frozen-abi", derive(AbiExample))] -#[derive(Serialize, Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct CrdsValue { signature: Signature, data: CrdsData, - #[serde(skip_serializing)] hash: Hash, // Sha256 hash of [signature, data]. + // Bincode serialized self.data. + bincode_serialized_data: Vec, } impl Sanitize for CrdsValue { @@ -40,7 +45,7 @@ impl Signable for CrdsValue { } fn signable_data(&self) -> Cow<[u8]> { - Cow::Owned(serialize(&self.data).expect("failed to serialize CrdsData")) + Cow::Borrowed(&self.bincode_serialized_data) } fn get_signature(&self) -> Signature { @@ -107,6 +112,7 @@ impl CrdsValue { signature, data, hash, + bincode_serialized_data, } } @@ -119,6 +125,7 @@ impl CrdsValue { signature, data, hash, + bincode_serialized_data, } } @@ -163,6 +170,94 @@ impl CrdsValue { self.data.pubkey() } + pub(crate) fn bincode_serialize(&self, writer: &mut W) -> Result<(), IoError> { + writer.write_all(self.signature.as_ref())?; + writer.write_all(&self.bincode_serialized_data) + } + + pub(crate) fn bincode_serialize_many, W: Write>( + values: I, + writer: &mut W, + ) -> Result<(), IoError> + where + I: IntoIterator, + ::IntoIter: ExactSizeIterator, + { + let mut values = values.into_iter(); + let size = u64::try_from(values.len()).unwrap(); + writer.write_all(&size.to_le_bytes())?; + values.try_for_each(|value| value.borrow().bincode_serialize(writer)) + } + + pub(crate) fn bincode_deserialize( + bytes: &[u8], + allow_trailing_bytes: bool, + ) -> Result { + let (signature, bytes) = convert_fixed_bytes::(bytes)?; + let mut cursor = Cursor::new(bytes); + // Default bincode options: + // * unlimited byte limit + // * little endian + // * varint encoding + // * rejects trailing bytes + // https://docs.rs/bincode/1.3.3/bincode/fn.options.html + let options = bincode::options() + .with_limit(bytes.len() as u64) + .with_fixint_encoding(); + let data: CrdsData = if allow_trailing_bytes { + options.allow_trailing_bytes().deserialize_from(&mut cursor) + } else { + options.deserialize_from(&mut cursor) + }?; + let offset = usize::try_from(cursor.position()).map_err(|err| { + let err = format!("{err:?}, cursor: {}", cursor.position()); + bincode::ErrorKind::Custom(err) + })?; + let bincode_serialized_data = bytes.get(..offset).map(Vec::from).ok_or_else(|| { + let err = format!("Invalid offset: {offset}, bytes.len(): {}", bytes.len()); + bincode::ErrorKind::Custom(err) + })?; + let hash = solana_sdk::hash::hashv(&[signature.as_ref(), &bincode_serialized_data]); + Ok(Self { + signature, + data, + hash, + bincode_serialized_data, + }) + } + + pub(crate) fn bincode_deserialize_many( + bytes: &[u8], + ) -> impl Iterator> + '_ { + // Decode number of items in the slice. + let (size, mut bytes) = match convert_fixed_bytes::<[u8; 8], 8>(bytes) { + Ok(out) => out, + Err(err) => { + return Either::Left(std::iter::once(Err(bincode::Error::from(err)))); + } + }; + let size = u64::from_le_bytes(size); + // If size is zero reject trailing bytes. + if size == 0 && !bytes.is_empty() { + return Either::Left(std::iter::once(Err(bincode::Error::from( + bincode::ErrorKind::Custom(String::from("Zero length sequence has trailing bytes")), + )))); + } + // Decode exactly size many items. + let mut count = 0; + Either::Right( + std::iter::repeat_with(move || { + count += 1; + let allow_trailing_bytes = count < size; + Self::bincode_deserialize(bytes, allow_trailing_bytes).inspect(|value| { + let offset = SIGNATURE_BYTES + value.bincode_serialized_data.len(); + bytes = bytes.get(offset..).unwrap(); + }) + }) + .take(size as usize), + ) + } + pub fn label(&self) -> CrdsValueLabel { let pubkey = self.data.pubkey(); match self.data { @@ -201,10 +296,7 @@ impl CrdsValue { /// Returns the bincode serialized size (in bytes) of the CrdsValue. pub fn bincode_serialized_size(&self) -> usize { - bincode::serialized_size(&self) - .map(usize::try_from) - .unwrap() - .unwrap() + SIGNATURE_BYTES + self.bincode_serialized_data.len() } /// Returns true if, regardless of prunes, this crds-value @@ -214,35 +306,30 @@ impl CrdsValue { } } -// Manual implementation of Deserialize for CrdsValue in order to populate -// CrdsValue.hash which is skipped in serialization. -impl<'de> Deserialize<'de> for CrdsValue { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - #[derive(Deserialize)] - struct CrdsValue { - signature: Signature, - data: CrdsData, - } - let CrdsValue { signature, data } = CrdsValue::deserialize(deserializer)?; - let bincode_serialized_data = bincode::serialize(&data).unwrap(); - let hash = solana_sdk::hash::hashv(&[signature.as_ref(), &bincode_serialized_data]); - Ok(Self { - signature, - data, - hash, - }) +impl Encode for CrdsValue { + fn encode(&self, mut writer: W) -> Result<(), bincode::Error> { + Ok(self.bincode_serialize(&mut writer)?) } } +// Converts first N bytes into a value of type T: From<[u8; N]>, +// returning along with the remaining bytes. +pub(crate) fn convert_fixed_bytes(bytes: &[u8]) -> Result<(T, &[u8]), IoError> +where + T: From<[u8; N]>, +{ + let (bytes, rest) = (N <= bytes.len()) + .then(|| bytes.split_at(N)) + .ok_or_else(|| IoError::from(IoErrorKind::UnexpectedEof))?; + let value = <[u8; N]>::try_from(bytes).map(T::from).unwrap(); + Ok((value, rest)) +} + #[cfg(test)] mod test { use { super::*, crate::crds_data::{LowestSlot, NodeInstance, Vote}, - bincode::deserialize, solana_perf::test_tx::new_test_vote_tx, solana_sdk::{ signature::{Keypair, Signer}, @@ -304,8 +391,16 @@ mod test { value.sign(keypair); let original_signature = value.get_signature(); for _ in 0..num_tries { - let serialized_value = serialize(value).unwrap(); - let deserialized_value: CrdsValue = deserialize(&serialized_value).unwrap(); + let serialized_value = { + let mut buffer = Vec::::new(); + value.bincode_serialize(&mut buffer).unwrap(); + buffer + }; + let deserialized_value = CrdsValue::bincode_deserialize( + &serialized_value, + false, // allow_trailing_bytes + ) + .unwrap(); // Signatures shouldn't change let deserialized_signature = deserialized_value.get_signature(); diff --git a/gossip/src/protocol.rs b/gossip/src/protocol.rs index c169bee4fb15b9..06e326d093dc81 100644 --- a/gossip/src/protocol.rs +++ b/gossip/src/protocol.rs @@ -3,21 +3,23 @@ use { cluster_info_metrics::GossipStats, crds_data::MAX_WALLCLOCK, crds_gossip_pull::CrdsFilter, - crds_value::CrdsValue, + crds_value::{convert_fixed_bytes, CrdsValue}, ping_pong::{self, Pong}, }, - bincode::serialize, + bincode::{serialize, Options}, rayon::prelude::*, - serde::Serialize, + serde::{de::DeserializeOwned, Serialize}, solana_perf::packet::PACKET_DATA_SIZE, solana_sanitize::{Sanitize, SanitizeError}, solana_sdk::{ - pubkey::Pubkey, + packet::Encode, + pubkey::{Pubkey, PUBKEY_BYTES}, signature::{Signable, Signature}, }, std::{ borrow::{Borrow, Cow}, fmt::Debug, + io::{Cursor, Write}, result::Result, }, }; @@ -43,12 +45,7 @@ const GOSSIP_PING_TOKEN_SIZE: usize = 32; pub(crate) const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161; // TODO These messages should go through the gpu pipeline for spam filtering -#[cfg_attr( - feature = "frozen-abi", - derive(AbiExample, AbiEnumVisitor), - frozen_abi(digest = "DBz7mUjknfMiea98WMx9b5jYyT9X9bmQV2JxmMN1VggR") -)] -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { /// Gossip protocol messages @@ -81,13 +78,105 @@ pub(crate) struct PruneData { } impl Protocol { + pub(crate) fn bincode_serialize(&self, writer: &mut W) -> Result<(), bincode::Error> { + match self { + Self::PullRequest(filter, caller) => { + writer.write_all(&0u32.to_le_bytes())?; + bincode::serialize_into(&mut *writer, filter)?; + Ok(caller.bincode_serialize(writer)?) + } + Self::PullResponse(pubkey, values) => { + writer.write_all(&1u32.to_le_bytes())?; + writer.write_all(pubkey.as_ref())?; + Ok(CrdsValue::bincode_serialize_many(values, writer)?) + } + Self::PushMessage(pubkey, values) => { + writer.write_all(&2u32.to_le_bytes())?; + writer.write_all(pubkey.as_ref())?; + Ok(CrdsValue::bincode_serialize_many(values, writer)?) + } + Self::PruneMessage(pubkey, prune_data) => { + writer.write_all(&3u32.to_le_bytes())?; + writer.write_all(pubkey.as_ref())?; + bincode::serialize_into(writer, prune_data) + } + Self::PingMessage(ping) => { + writer.write_all(&4u32.to_le_bytes())?; + bincode::serialize_into(writer, ping) + } + Self::PongMessage(pong) => { + writer.write_all(&5u32.to_le_bytes())?; + bincode::serialize_into(writer, pong) + } + } + } + + pub(crate) fn bincode_deserialize(bytes: &[u8]) -> Result { + fn bincode_deserialize(bytes: &[u8]) -> Result { + bincode::options() + .with_limit(bytes.len() as u64) + .with_fixint_encoding() + .reject_trailing_bytes() + .deserialize(bytes) + } + let (tag, bytes) = convert_fixed_bytes::<[u8; 4], 4>(bytes)?; + let tag = u32::from_le_bytes(tag); + match tag { + // PullRequest(CrdsFilter, CrdsValue) + 0 => { + let mut cursor = Cursor::new(bytes); + let filter: CrdsFilter = bincode::options() + .with_limit(bytes.len() as u64) + .with_fixint_encoding() + .allow_trailing_bytes() + .deserialize_from(&mut cursor)?; + let offset = usize::try_from(cursor.position()).map_err(|err| { + let err = format!("{err:?}, cursor: {}", cursor.position()); + bincode::ErrorKind::Custom(err) + })?; + let bytes = bytes.get(offset..).ok_or_else(|| { + let err = format!("Invalid offset: {offset}, bytes.len(): {}", bytes.len()); + bincode::ErrorKind::Custom(err) + })?; + CrdsValue::bincode_deserialize(bytes, /*allow_trailing_bytes:*/ false) + .map(|caller| Self::PullRequest(filter, caller)) + } + // PullResponse(Pubkey, Vec) + 1 => { + let (pubkey, bytes) = convert_fixed_bytes::(bytes)?; + CrdsValue::bincode_deserialize_many(bytes) + .collect::>() + .map(|values| Self::PullResponse(pubkey, values)) + } + // PushMessage(Pubkey, Vec) + 2 => { + let (pubkey, bytes) = convert_fixed_bytes::(bytes)?; + CrdsValue::bincode_deserialize_many(bytes) + .collect::>() + .map(|values| Self::PushMessage(pubkey, values)) + } + // PruneMessage(Pubkey, PruneData) + 3 => { + let (pubkey, bytes) = convert_fixed_bytes::(bytes)?; + bincode_deserialize(bytes).map(|prune_data| Self::PruneMessage(pubkey, prune_data)) + } + // PingMessage(Ping) + 4 => bincode_deserialize(bytes).map(Self::PingMessage), + // PongMessage(Pong) + 5 => bincode_deserialize(bytes).map(Self::PongMessage), + // Invalid enum tag. + _ => Err(bincode::Error::from( + bincode::ErrorKind::InvalidTagEncoding(tag as usize), + )), + } + } + /// Returns the bincode serialized size (in bytes) of the Protocol. #[cfg(test)] fn bincode_serialized_size(&self) -> usize { - bincode::serialized_size(self) - .map(usize::try_from) - .unwrap() - .unwrap() + let mut buffer = Vec::::new(); + self.bincode_serialize(&mut buffer).unwrap(); + buffer.len() } pub(crate) fn par_verify(self, stats: &GossipStats) -> Option { @@ -260,18 +349,29 @@ impl Signable for PruneData { } } +impl Encode for Protocol { + fn encode(&self, mut writer: W) -> Result<(), bincode::Error> { + Protocol::bincode_serialize(self, &mut writer) + } +} + +impl Encode for &Protocol { + fn encode(&self, mut writer: W) -> Result<(), bincode::Error> { + Protocol::bincode_serialize(self, &mut writer) + } +} + /// Splits an input feed of serializable data into chunks where the sum of /// serialized size of values within each chunk is no larger than /// max_chunk_size. /// Note: some messages cannot be contained within that size so in the worst case this returns /// N nested Vecs with 1 item each. -pub(crate) fn split_gossip_messages( +pub(crate) fn split_gossip_messages( max_chunk_size: usize, data_feed: I, -) -> impl Iterator> +) -> impl Iterator> where - T: Serialize + Debug, - I: IntoIterator, + I: IntoIterator, { let mut data_feed = data_feed.into_iter().fuse(); let mut buffer = vec![]; @@ -286,13 +386,7 @@ where }; } Some(data) => { - let data_size = match bincode::serialized_size(&data) { - Ok(size) => size as usize, - Err(err) => { - error!("serialized_size failed: {}", err); - continue; - } - }; + let data_size = data.bincode_serialized_size(); if buffer_size + data_size <= max_chunk_size { buffer_size += data_size; buffer.push(data);