Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removes repeated bincode serialization of gossip CrdsValues #3575

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ use {
collections::{HashMap, HashSet, VecDeque},
fmt::Debug,
fs::{self, File},
io::BufReader,
iter::repeat,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
num::NonZeroUsize,
Expand Down Expand Up @@ -343,12 +342,13 @@ impl ClusterInfo {
return;
}

let num_nodes = nodes.len();
let filename = self.contact_info_path.join("contact-info.bin");
let tmp_filename = &filename.with_extension("tmp");

match File::create(tmp_filename) {
Ok(mut file) => {
if let Err(err) = bincode::serialize_into(&mut file, &nodes) {
if let Err(err) = CrdsValue::bincode_serialize_many(nodes, &mut file) {
warn!(
"Failed to serialize contact info info {}: {}",
tmp_filename.display(),
Expand All @@ -366,8 +366,7 @@ impl ClusterInfo {
match fs::rename(tmp_filename, &filename) {
Ok(()) => {
info!(
"Saved contact info for {} nodes into {}",
nodes.len(),
"Saved contact info for {num_nodes} nodes into {}",
filename.display()
);
}
Expand All @@ -391,13 +390,13 @@ impl ClusterInfo {
return;
}

let nodes: Vec<CrdsValue> = match File::open(&filename) {
Ok(file) => {
bincode::deserialize_from(&mut BufReader::new(file)).unwrap_or_else(|err| {
let nodes: Vec<CrdsValue> = match fs::read(&filename) {
Ok(bytes) => CrdsValue::bincode_deserialize_many(&bytes)
.collect::<Result<_, _>>()
.unwrap_or_else(|err| {
warn!("Failed to deserialize {}: {}", filename.display(), err);
vec![]
})
}
}),
Err(err) => {
warn!("Failed to open {}: {}", filename.display(), err);
vec![]
Expand Down Expand Up @@ -2316,7 +2315,7 @@ impl ClusterInfo {
}
}
let verify_packet = |packet: Packet| {
let protocol: Protocol = packet.deserialize_slice(..).ok()?;
let protocol = packet.data(..).map(Protocol::bincode_deserialize)?.ok()?;
protocol.sanitize().ok()?;
let protocol = protocol.par_verify(&self.stats)?;
Some((packet.meta().socket_addr(), protocol))
Expand Down Expand Up @@ -3284,8 +3283,10 @@ mod tests {
) {
assert_eq!(packet.meta().socket_addr(), socket);
let bytes = serialize(&pong).unwrap();
match packet.deserialize_slice(..).unwrap() {
Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes),
match packet.data(..).map(Protocol::bincode_deserialize) {
Some(Ok(Protocol::PongMessage(pong))) => {
assert_eq!(serialize(&pong).unwrap(), bytes)
}
_ => panic!("invalid packet!"),
}
}
Expand Down
6 changes: 5 additions & 1 deletion gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,11 @@ pub(crate) mod tests {
for filter in Vec::<CrdsFilter>::from(filters) {
let request_bytes = 4 + request_bytes + bincode::serialized_size(&filter).unwrap();
let request = Protocol::PullRequest(filter, caller.clone());
let request = bincode::serialize(&request).unwrap();
let request = {
let mut buffer = Vec::<u8>::new();
request.bincode_serialize(&mut buffer).unwrap();
buffer
};
assert!(packet_data_size_range.contains(&request.len()));
assert_eq!(request.len() as u64, request_bytes);
}
Expand Down
214 changes: 178 additions & 36 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,31 @@ use {
duplicate_shred::DuplicateShredIndex,
epoch_slots::EpochSlots,
},
bincode::serialize,
bincode::Options,
itertools::Either,
rand::Rng,
serde::de::{Deserialize, Deserializer},
solana_sanitize::{Sanitize, SanitizeError},
solana_sdk::{
hash::Hash,
packet::Encode,
pubkey::Pubkey,
signature::{Keypair, Signable, Signature, Signer},
signature::{Keypair, Signable, Signature, Signer, SIGNATURE_BYTES},
},
std::{
borrow::{Borrow, Cow},
io::{Cursor, Error as IoError, ErrorKind as IoErrorKind, Write},
},
std::borrow::{Borrow, Cow},
};

/// CrdsValue that is replicated across the cluster
#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[derive(Serialize, Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CrdsValue {
signature: Signature,
data: CrdsData,
#[serde(skip_serializing)]
hash: Hash, // Sha256 hash of [signature, data].
// Bincode serialized self.data.
bincode_serialized_data: Vec<u8>,
}

impl Sanitize for CrdsValue {
Expand All @@ -40,7 +45,7 @@ impl Signable for CrdsValue {
}

fn signable_data(&self) -> Cow<[u8]> {
Cow::Owned(serialize(&self.data).expect("failed to serialize CrdsData"))
Cow::Borrowed(&self.bincode_serialized_data)
}

fn get_signature(&self) -> Signature {
Expand Down Expand Up @@ -107,6 +112,7 @@ impl CrdsValue {
signature,
data,
hash,
bincode_serialized_data,
}
}

Expand All @@ -119,6 +125,7 @@ impl CrdsValue {
signature,
data,
hash,
bincode_serialized_data,
}
}

Expand Down Expand Up @@ -163,6 +170,113 @@ impl CrdsValue {
self.data.pubkey()
}

// Implements bincode::serialize_into for CrdsValue.
pub(crate) fn bincode_serialize<W: Write>(&self, writer: &mut W) -> Result<(), IoError> {
writer.write_all(self.signature.as_ref())?;
writer.write_all(&self.bincode_serialized_data)
}

// Implements bincode::serialize_into for Vec<CrdsValue>.
pub(crate) fn bincode_serialize_many<I, T: Borrow<Self>, W: Write>(
values: I,
writer: &mut W,
) -> Result<(), IoError>
where
I: IntoIterator<Item = T>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
let mut values = values.into_iter();
let size = u64::try_from(values.len()).unwrap();
writer.write_all(&size.to_le_bytes())?;
values.try_for_each(|value| value.borrow().bincode_serialize(writer))
}

// Implements bincode::deserialize_from for CrdsValue.
pub(crate) fn bincode_deserialize(
bytes: &[u8],
allow_trailing_bytes: bool,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain trailing bytes? Not sure I understand why we would allow these?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the same thing as in bincode:
https://docs.rs/bincode/1.3.3/bincode/config/trait.Options.html#method.allow_trailing_bytes

If you are only deserializing a single CrdsValue then generally you do not want to allow trailing bytes.
But you may be deserializing a struct Foo(CrdsValue, Bar) or a Vec<CrdsValue> in which case the trailing bytes are the next value to be deserialized, so you would want to allow trailing bytes.

) -> Result<Self, bincode::Error> {
let (signature, bytes) = convert_fixed_bytes::<Signature, SIGNATURE_BYTES>(bytes)?;
let mut cursor = Cursor::new(bytes);
// Default bincode options:
// * unlimited byte limit
// * little endian
// * varint encoding
// * rejects trailing bytes
// https://docs.rs/bincode/1.3.3/bincode/fn.options.html
let options = bincode::options()
.with_limit(bytes.len() as u64)
.with_fixint_encoding();
let data: CrdsData = if allow_trailing_bytes {
options.allow_trailing_bytes().deserialize_from(&mut cursor)
} else {
options.deserialize_from(&mut cursor).and_then(|data| {
// We have to manually check if all bytes are read because
// options.reject_trailing_bytes() does not really work.
// https://github.com/bincode-org/bincode/issues/732
if cursor.position() == cursor.get_ref().len() as u64 {
Ok(data)
} else {
Err(bincode::Error::from(bincode::ErrorKind::Custom(
String::from("Slice had bytes remaining after deserialization"),
)))
}
})
}?;
let offset = usize::try_from(cursor.position()).map_err(|err| {
let err = format!("{err:?}, cursor: {}", cursor.position());
bincode::ErrorKind::Custom(err)
})?;
let bincode_serialized_data = bytes.get(..offset).map(Vec::from).ok_or_else(|| {
let err = format!("Invalid offset: {offset}, bytes.len(): {}", bytes.len());
bincode::ErrorKind::Custom(err)
})?;
let hash = solana_sdk::hash::hashv(&[signature.as_ref(), &bincode_serialized_data]);
Ok(Self {
signature,
data,
hash,
bincode_serialized_data,
})
}

// Implements bincode::deserialize_from for Vec<CrdsValue>.
// Verifies that there are exactly as many entries as the encoded size.
// Rejects trailing bytes.
pub(crate) fn bincode_deserialize_many(
bytes: &[u8],
) -> impl Iterator<Item = Result<Self, bincode::Error>> + '_ {
// Decode number of items in the slice.
// bincode::serialize{,_into} serializes sequence length as
// a u64 (8 bytes) with fixint encoding in little endian.
let (size, mut bytes) = match convert_fixed_bytes::<[u8; 8], 8>(bytes) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert_fixed_bytes::<[u8; 8], 8> what exactly is the last 8 doing here? can we be more descriptive here on what these 8s represent?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size is just the first 8 bytes of the serialized struct? and dictates how long the Vec<CrdsValue> is in terms of items (not bytes)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment to clarify.

// bincode::serialize{,_into} serializes sequence length as
// a u64 (8 bytes) with fixint encoding in little endian.

Ok(out) => out,
Err(err) => {
return Either::Left(std::iter::once(Err(bincode::Error::from(err))));
}
};
let size = u64::from_le_bytes(size);
// If size is zero reject trailing bytes.
if size == 0 && !bytes.is_empty() {
return Either::Left(std::iter::once(Err(bincode::Error::from(
bincode::ErrorKind::Custom(String::from("Zero length sequence has trailing bytes")),
))));
}
// Decode exactly size many items.
let mut count = 0;
Either::Right(
std::iter::repeat_with(move || {
count += 1;
let allow_trailing_bytes = count < size;
Self::bincode_deserialize(bytes, allow_trailing_bytes).inspect(|value| {
let offset = value.bincode_serialized_size();
bytes = bytes.get(offset..).unwrap();
})
})
.take(size as usize),
)
}

pub fn label(&self) -> CrdsValueLabel {
let pubkey = self.data.pubkey();
match self.data {
Expand Down Expand Up @@ -201,10 +315,7 @@ impl CrdsValue {

/// Returns the bincode serialized size (in bytes) of the CrdsValue.
pub fn bincode_serialized_size(&self) -> usize {
bincode::serialized_size(&self)
.map(usize::try_from)
.unwrap()
.unwrap()
SIGNATURE_BYTES + self.bincode_serialized_data.len()
}

/// Returns true if, regardless of prunes, this crds-value
Expand All @@ -214,35 +325,30 @@ impl CrdsValue {
}
}

// Manual implementation of Deserialize for CrdsValue in order to populate
// CrdsValue.hash which is skipped in serialization.
impl<'de> Deserialize<'de> for CrdsValue {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
struct CrdsValue {
signature: Signature,
data: CrdsData,
}
let CrdsValue { signature, data } = CrdsValue::deserialize(deserializer)?;
let bincode_serialized_data = bincode::serialize(&data).unwrap();
let hash = solana_sdk::hash::hashv(&[signature.as_ref(), &bincode_serialized_data]);
Ok(Self {
signature,
data,
hash,
})
impl Encode for CrdsValue {
fn encode<W: Write>(&self, mut writer: W) -> Result<(), bincode::Error> {
Ok(self.bincode_serialize(&mut writer)?)
}
}

// Converts first N bytes into a value of type T: From<[u8; N]>,
// returning along with the remaining bytes.
pub(crate) fn convert_fixed_bytes<T, const N: usize>(bytes: &[u8]) -> Result<(T, &[u8]), IoError>
where
T: From<[u8; N]>,
{
let (bytes, rest) = (N <= bytes.len())
.then(|| bytes.split_at(N))
.ok_or_else(|| IoError::from(IoErrorKind::UnexpectedEof))?;
let value = <[u8; N]>::try_from(bytes).map(T::from).unwrap();
Ok((value, rest))
}

#[cfg(test)]
mod test {
use {
super::*,
crate::crds_data::{LowestSlot, NodeInstance, Vote},
bincode::deserialize,
rand0_7::{Rng, SeedableRng},
rand_chacha0_2::ChaChaRng,
solana_perf::test_tx::new_test_vote_tx,
Expand Down Expand Up @@ -309,8 +415,16 @@ mod test {
value.sign(keypair);
let original_signature = value.get_signature();
for _ in 0..num_tries {
let serialized_value = serialize(value).unwrap();
let deserialized_value: CrdsValue = deserialize(&serialized_value).unwrap();
let serialized_value = {
let mut buffer = Vec::<u8>::new();
value.bincode_serialize(&mut buffer).unwrap();
buffer
};
let deserialized_value = CrdsValue::bincode_deserialize(
&serialized_value,
false, // allow_trailing_bytes
)
.unwrap();

// Signatures shouldn't change
let deserialized_signature = deserialized_value.get_signature();
Expand Down Expand Up @@ -426,15 +540,43 @@ mod test {
CrdsValue::new(CrdsData::Vote(5, vote), &keypair)
},
];
let bytes = bincode::serialize(&values).unwrap();
let mut bytes = Vec::<u8>::new();
CrdsValue::bincode_serialize_many(&values, &mut bytes).unwrap();
// Serialized bytes are fixed and should never change.
assert_eq!(
solana_sdk::hash::hash(&bytes),
Hash::from_str("7gtcoafccWE964njbs2bA1QuVFeV34RaoY781yLx2A8N").unwrap()
);
// serialize -> deserialize should round trip.
assert_eq!(
bincode::deserialize::<Vec<CrdsValue>>(&bytes).unwrap(),
CrdsValue::bincode_deserialize_many(&bytes)
.collect::<Result<Vec<_>, _>>()
.unwrap(),
values
);
// More entries than the encoded size should fail deserialization.
for size in 0..2u64 {
bytes[..8].copy_from_slice(&size.to_le_bytes());
assert_matches!(
CrdsValue::bincode_deserialize_many(&bytes).collect::<Result<Vec<_>, _>>(),
Err(err) if matches!(*err, bincode::ErrorKind::Custom(_))
);
}
// Fewer entries than the encoded size should fail deserialization.
for size in 3..5u64 {
bytes[..8].copy_from_slice(&size.to_le_bytes());
assert_matches!(
CrdsValue::bincode_deserialize_many(&bytes)
.collect::<Result<Vec<_>, _>>(),
Err(err) if matches!(*err, bincode::ErrorKind::Io(_))
);
}
// Back to the right size.
bytes[..8].copy_from_slice(&2u64.to_le_bytes());
assert_eq!(
CrdsValue::bincode_deserialize_many(&bytes)
.collect::<Result<Vec<_>, _>>()
.unwrap(),
values
);
}
Expand Down
Loading
Loading