Skip to content

Commit

Permalink
removes repeated bincode serialization of gossip CrdsValues
Browse files Browse the repository at this point in the history
Gossip CrdsValues are deserialized when received as a gossip push
message or pull response, but serialized again immediately to obtain a
value hash during Crds::insert and then again serialized repeatedly
every time the value is pushed to another node or returned as a response
to a pull request.

In order to avoid repeated serialization of a CrdsValue during its
lifetime, the commit manually implements bincode (de)serialization of
CrdsValue to hold on to bincode serialized bytes of CrdsData and reuses
that for serializing CrdsValue.
  • Loading branch information
behzadnouri committed Nov 13, 2024
1 parent ac72395 commit 330df94
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 80 deletions.
32 changes: 20 additions & 12 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ use {
collections::{HashMap, HashSet, VecDeque},
fmt::Debug,
fs::{self, File},
io::BufReader,
iter::repeat,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
num::NonZeroUsize,
Expand Down Expand Up @@ -346,12 +345,13 @@ impl ClusterInfo {
return;
}

let num_nodes = nodes.len();
let filename = self.contact_info_path.join("contact-info.bin");
let tmp_filename = &filename.with_extension("tmp");

match File::create(tmp_filename) {
Ok(mut file) => {
if let Err(err) = bincode::serialize_into(&mut file, &nodes) {
if let Err(err) = CrdsValue::bincode_serialize_many(nodes, &mut file) {
warn!(
"Failed to serialize contact info info {}: {}",
tmp_filename.display(),
Expand All @@ -369,8 +369,7 @@ impl ClusterInfo {
match fs::rename(tmp_filename, &filename) {
Ok(()) => {
info!(
"Saved contact info for {} nodes into {}",
nodes.len(),
"Saved contact info for {num_nodes} nodes into {}",
filename.display()
);
}
Expand All @@ -394,13 +393,13 @@ impl ClusterInfo {
return;
}

let nodes: Vec<CrdsValue> = match File::open(&filename) {
Ok(file) => {
bincode::deserialize_from(&mut BufReader::new(file)).unwrap_or_else(|err| {
let nodes: Vec<CrdsValue> = match fs::read(&filename) {
Ok(bytes) => CrdsValue::bincode_deserialize_many(&bytes)
.collect::<Result<_, _>>()
.inspect_err(|err| {
warn!("Failed to deserialize {}: {}", filename.display(), err);
vec![]
})
}
.unwrap_or_default(),
Err(err) => {
warn!("Failed to open {}: {}", filename.display(), err);
vec![]
Expand Down Expand Up @@ -2315,7 +2314,14 @@ impl ClusterInfo {
}
}
let verify_packet = |packet: Packet| {
let protocol: Protocol = packet.deserialize_slice(..).ok()?;
let protocol = packet.data(..).map(Protocol::bincode_deserialize); // ?.ok()?;
if let Some(Err(err)) = &protocol {
error!(
"Protocol::bincode_deserialize: {:?}, {err:?}",
packet.data(..4)
);
}
let protocol = protocol?.ok()?;
protocol.sanitize().ok()?;
let protocol = protocol.par_verify(&self.stats)?;
Some((packet.meta().socket_addr(), protocol))
Expand Down Expand Up @@ -3284,8 +3290,10 @@ mod tests {
) {
assert_eq!(packet.meta().socket_addr(), socket);
let bytes = serialize(&pong).unwrap();
match packet.deserialize_slice(..).unwrap() {
Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes),
match packet.data(..).map(Protocol::bincode_deserialize) {
Some(Ok(Protocol::PongMessage(pong))) => {
assert_eq!(serialize(&pong).unwrap(), bytes)
}
_ => panic!("invalid packet!"),
}
}
Expand Down
6 changes: 5 additions & 1 deletion gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,11 @@ pub(crate) mod tests {
for filter in Vec::<CrdsFilter>::from(filters) {
let request_bytes = 4 + request_bytes + bincode::serialized_size(&filter).unwrap();
let request = Protocol::PullRequest(filter, caller.clone());
let request = bincode::serialize(&request).unwrap();
let request = {
let mut buffer = Vec::<u8>::new();
request.bincode_serialize(&mut buffer).unwrap();
buffer
};
assert!(packet_data_size_range.contains(&request.len()));
assert_eq!(request.len() as u64, request_bytes);
}
Expand Down
163 changes: 129 additions & 34 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,31 @@ use {
duplicate_shred::DuplicateShredIndex,
epoch_slots::EpochSlots,
},
bincode::serialize,
bincode::Options,
itertools::Either,
rand::Rng,
serde::de::{Deserialize, Deserializer},
solana_sanitize::{Sanitize, SanitizeError},
solana_sdk::{
hash::Hash,
packet::Serialize,
pubkey::Pubkey,
signature::{Keypair, Signable, Signature, Signer},
signature::{Keypair, Signable, Signature, Signer, SIGNATURE_BYTES},
},
std::{
borrow::{Borrow, Cow},
io::{Cursor, Error as IoError, ErrorKind as IoErrorKind, Write},
},
std::borrow::{Borrow, Cow},
};

/// CrdsValue that is replicated across the cluster
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[derive(Serialize, Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CrdsValue {
signature: Signature,
data: CrdsData,
#[serde(skip_serializing)]
hash: Hash, // Sha256 hash of [signature, data].
// Bincode serialized self.data.
bincode_serialized_data: Vec<u8>,
}

impl Sanitize for CrdsValue {
Expand All @@ -40,7 +45,7 @@ impl Signable for CrdsValue {
}

fn signable_data(&self) -> Cow<[u8]> {
Cow::Owned(serialize(&self.data).expect("failed to serialize CrdsData"))
Cow::Borrowed(&self.bincode_serialized_data)
}

fn get_signature(&self) -> Signature {
Expand Down Expand Up @@ -107,6 +112,7 @@ impl CrdsValue {
signature,
data,
hash,
bincode_serialized_data,
}
}

Expand All @@ -119,6 +125,7 @@ impl CrdsValue {
signature,
data,
hash,
bincode_serialized_data,
}
}

Expand Down Expand Up @@ -163,6 +170,94 @@ impl CrdsValue {
self.data.pubkey()
}

pub(crate) fn bincode_serialize<W: Write>(&self, writer: &mut W) -> Result<(), IoError> {
writer.write_all(self.signature.as_ref())?;
writer.write_all(&self.bincode_serialized_data)
}

pub(crate) fn bincode_serialize_many<I, T: Borrow<Self>, W: Write>(
values: I,
writer: &mut W,
) -> Result<(), IoError>
where
I: IntoIterator<Item = T>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
let mut values = values.into_iter();
let size = u64::try_from(values.len()).unwrap();
writer.write_all(&size.to_le_bytes())?;
values.try_for_each(|value| value.borrow().bincode_serialize(writer))
}

pub(crate) fn bincode_deserialize(
bytes: &[u8],
allow_trailing_bytes: bool,
) -> Result<Self, bincode::Error> {
let (signature, bytes) = convert_fixed_bytes::<Signature, SIGNATURE_BYTES>(bytes)?;
let mut cursor = Cursor::new(bytes);
// Default bincode options:
// * unlimited byte limit
// * little endian
// * varint encoding
// * rejects trailing bytes
// https://docs.rs/bincode/1.3.3/bincode/fn.options.html
let options = bincode::options()
.with_limit(bytes.len() as u64)
.with_fixint_encoding();
let data: CrdsData = if allow_trailing_bytes {
options.allow_trailing_bytes().deserialize_from(&mut cursor)
} else {
options.deserialize_from(&mut cursor)
}?;
let offset = usize::try_from(cursor.position()).map_err(|err| {
let err = format!("{err:?}, cursor: {}", cursor.position());
bincode::ErrorKind::Custom(err)
})?;
let bincode_serialized_data = bytes.get(..offset).map(Vec::from).ok_or_else(|| {
let err = format!("Invalid offset: {offset}, bytes.len(): {}", bytes.len());
bincode::ErrorKind::Custom(err)
})?;
let hash = solana_sdk::hash::hashv(&[signature.as_ref(), &bincode_serialized_data]);
Ok(Self {
signature,
data,
hash,
bincode_serialized_data,
})
}

pub(crate) fn bincode_deserialize_many(
bytes: &[u8],
) -> impl Iterator<Item = Result<Self, bincode::Error>> + '_ {
// Decode number of items in the slice.
let (size, mut bytes) = match convert_fixed_bytes::<[u8; 8], 8>(bytes) {
Ok(out) => out,
Err(err) => {
return Either::Left(std::iter::once(Err(bincode::Error::from(err))));
}
};
let size = u64::from_le_bytes(size);
// If size is zero reject trailing bytes.
if size == 0 && !bytes.is_empty() {
return Either::Left(std::iter::once(Err(bincode::Error::from(
bincode::ErrorKind::Custom(String::from("Zero length sequence has trailing bytes")),
))));
}
// Decode exactly size many items.
let mut count = 0;
Either::Right(
std::iter::repeat_with(move || {
count += 1;
let allow_trailing_bytes = count < size;
Self::bincode_deserialize(bytes, allow_trailing_bytes).inspect(|value| {
let offset = SIGNATURE_BYTES + value.bincode_serialized_data.len();
bytes = bytes.get(offset..).unwrap();
})
})
.take(size as usize),
)
}

pub fn label(&self) -> CrdsValueLabel {
let pubkey = self.data.pubkey();
match self.data {
Expand Down Expand Up @@ -201,10 +296,7 @@ impl CrdsValue {

/// Returns the bincode serialized size (in bytes) of the CrdsValue.
pub fn bincode_serialized_size(&self) -> usize {
bincode::serialized_size(&self)
.map(usize::try_from)
.unwrap()
.unwrap()
solana_sdk::signature::SIGNATURE_BYTES + self.bincode_serialized_data.len()
}

/// Returns true if, regardless of prunes, this crds-value
Expand All @@ -214,35 +306,30 @@ impl CrdsValue {
}
}

// Manual implementation of Deserialize for CrdsValue in order to populate
// CrdsValue.hash which is skipped in serialization.
impl<'de> Deserialize<'de> for CrdsValue {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
struct CrdsValue {
signature: Signature,
data: CrdsData,
}
let CrdsValue { signature, data } = CrdsValue::deserialize(deserializer)?;
let bincode_serialized_data = bincode::serialize(&data).unwrap();
let hash = solana_sdk::hash::hashv(&[signature.as_ref(), &bincode_serialized_data]);
Ok(Self {
signature,
data,
hash,
})
impl Serialize for CrdsValue {
fn bincode_serialize<W: Write>(&self, mut writer: W) -> Result<(), bincode::Error> {
Ok(self.bincode_serialize(&mut writer)?)
}
}

// Converts first N bytes into a value of type T: From<[u8; N]>,
// returning along with the remaining bytes.
pub(crate) fn convert_fixed_bytes<T, const N: usize>(bytes: &[u8]) -> Result<(T, &[u8]), IoError>
where
T: From<[u8; N]>,
{
let (bytes, rest) = (N <= bytes.len())
.then(|| bytes.split_at(N))
.ok_or_else(|| IoError::from(IoErrorKind::UnexpectedEof))?;
let value = <[u8; N]>::try_from(bytes).map(T::from).unwrap();
Ok((value, rest))
}

#[cfg(test)]
mod test {
use {
super::*,
crate::crds_data::{LowestSlot, NodeInstance, Vote},
bincode::deserialize,
solana_perf::test_tx::new_test_vote_tx,
solana_sdk::{
signature::{Keypair, Signer},
Expand Down Expand Up @@ -304,8 +391,16 @@ mod test {
value.sign(keypair);
let original_signature = value.get_signature();
for _ in 0..num_tries {
let serialized_value = serialize(value).unwrap();
let deserialized_value: CrdsValue = deserialize(&serialized_value).unwrap();
let serialized_value = {
let mut buffer = Vec::<u8>::new();
value.bincode_serialize(&mut buffer).unwrap();
buffer
};
let deserialized_value = CrdsValue::bincode_deserialize(
&serialized_value,
false, // allow_trailing_bytes
)
.unwrap();

// Signatures shouldn't change
let deserialized_signature = deserialized_value.get_signature();
Expand Down
Loading

0 comments on commit 330df94

Please sign in to comment.