Skip to content

Commit

Permalink
adds new contact-info with forward compatible sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Jan 9, 2023
1 parent 3234af4 commit b74c948
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 6 deletions.
1 change: 1 addition & 0 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ impl Sanitize for Protocol {
fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
values.retain(|value| {
match value.data {
CrdsData::ContactInfo(_) => true,
CrdsData::LegacyContactInfo(_) => true,
// May Impact new validators starting up without any stake yet.
CrdsData::Vote(_, _) => true,
Expand Down
4 changes: 4 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.counts[10],
i64
),
("ContactInfo-push", crds_stats.push.counts[11], i64),
("ContactInfo-pull", crds_stats.pull.counts[11], i64),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
Expand Down Expand Up @@ -686,6 +688,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.fails[10],
i64
),
("ContactInfo-push", crds_stats.push.fails[11], i64),
("ContactInfo-pull", crds_stats.pull.fails[11], i64),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);
Expand Down
139 changes: 139 additions & 0 deletions gossip/src/contact_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
pub use crate::legacy_contact_info::LegacyContactInfo;
use {
crate::crds_value::MAX_WALLCLOCK,
serde::{Deserialize, Deserializer, Serialize, Serializer},
solana_sdk::{
pubkey::Pubkey,
sanitize::{Sanitize, SanitizeError},
serde_varint, short_vec,
},
std::{
borrow::Cow,
net::{IpAddr, SocketAddr},
},
};

const SOCKET_TAG_GOSSIP: u8 = 0u8;
const SOCKET_TAG_REPAIR: u8 = 1u8;
const SOCKET_TAG_RPC: u8 = 2u8;
const SOCKET_TAG_RPC_PUBSUB: u8 = 3u8;
const SOCKET_TAG_SERVE_REPAIR: u8 = 4u8;
const SOCKET_TAG_TPU: u8 = 5u8;
const SOCKET_TAG_TPU_FORWARDS: u8 = 6u8;
const SOCKET_TAG_TPU_VOTE: u8 = 7u8;
const SOCKET_TAG_TVU: u8 = 8u8;
const SOCKET_TAG_TVU_FORWARDS: u8 = 9u8;

#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct ContactInfo {
pubkey: Pubkey,
wallclock: u64,
shred_version: u16,
addrs: Vec<IpAddr>,
// TODO: use port offset with varint? should be sorted!
sockets: Vec<(/*tag:*/ u8, /*addr-idx:*/ u8, /*port:*/ u16)>,
// TODO: should this also include Version?
gossip: Option<SocketAddr>,
repair: Option<SocketAddr>,
rpc: Option<SocketAddr>,
rpc_pubsub: Option<SocketAddr>,
serve_repair: Option<SocketAddr>,
tpu: Option<SocketAddr>,
tpu_forwards: Option<SocketAddr>,
tpu_vote: Option<SocketAddr>,
tvu: Option<SocketAddr>,
tvu_forwards: Option<SocketAddr>,
}

// Workaround since serde does not have an initializer for skipped fields.
// https://github.com/serde-rs/serde/issues/642
#[derive(Deserialize, Serialize)]
struct ContactInfoLite<'a> {
pubkey: Cow<'a, Pubkey>,
#[serde(with = "serde_varint")]
wallclock: u64,
shred_version: u16,
#[serde(with = "short_vec")]
addrs: Cow<'a, [IpAddr]>,
#[serde(with = "short_vec")]
sockets: Cow<'a, [(u8, u8, u16)]>,
}

impl ContactInfo {
#[inline]
pub fn pubkey(&self) -> &Pubkey {
&self.pubkey
}

#[inline]
pub(crate) fn wallclock(&self) -> u64 {
self.wallclock
}

fn get_socket(&self, tag: u8) -> Option<SocketAddr> {
let &(_, ix, port) = self.sockets.iter().find(|(k, _, _)| *k == tag)?;
let addr = self.addrs.get(usize::from(ix))?;
(port != 0u16 && !addr.is_unspecified() && !addr.is_multicast())
.then_some(SocketAddr::new(*addr, port))
}
}

impl Sanitize for ContactInfo {
fn sanitize(&self) -> Result<(), SanitizeError> {
if self.wallclock >= MAX_WALLCLOCK {
return Err(SanitizeError::ValueOutOfBounds);
}
Ok(())
}
}

impl Serialize for ContactInfo {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
ContactInfoLite {
pubkey: Cow::Borrowed(&self.pubkey),
wallclock: self.wallclock,
shred_version: self.shred_version,
addrs: Cow::Borrowed(&self.addrs),
sockets: Cow::Borrowed(&self.sockets),
}
.serialize(serializer)
}
}

impl<'de> Deserialize<'de> for ContactInfo {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let ContactInfoLite {
pubkey,
wallclock,
shred_version,
addrs,
sockets,
} = ContactInfoLite::deserialize(deserializer)?;
// TODO: sanity check on the fields.
let mut node = ContactInfo {
pubkey: pubkey.into_owned(),
wallclock,
shred_version,
addrs: addrs.into_owned(),
sockets: sockets.into_owned(),
..ContactInfo::default()
};
node.gossip = node.get_socket(SOCKET_TAG_GOSSIP);
node.repair = node.get_socket(SOCKET_TAG_REPAIR);
node.rpc = node.get_socket(SOCKET_TAG_RPC);
node.rpc_pubsub = node.get_socket(SOCKET_TAG_RPC_PUBSUB);
node.serve_repair = node.get_socket(SOCKET_TAG_SERVE_REPAIR);
node.tpu = node.get_socket(SOCKET_TAG_TPU);
node.tpu_forwards = node.get_socket(SOCKET_TAG_TPU_FORWARDS);
node.tpu_vote = node.get_socket(SOCKET_TAG_TPU_VOTE);
node.tvu = node.get_socket(SOCKET_TAG_TVU);
node.tvu_forwards = node.get_socket(SOCKET_TAG_TVU_FORWARDS);
Ok(node)
}
}
4 changes: 3 additions & 1 deletion gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub enum GossipRoute {
PushMessage,
}

type CrdsCountsArray = [usize; 11];
type CrdsCountsArray = [usize; 12];

pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
Expand Down Expand Up @@ -700,6 +700,8 @@ impl CrdsDataStats {
CrdsData::NodeInstance(_) => 8,
CrdsData::DuplicateShred(_, _) => 9,
CrdsData::IncrementalSnapshotHashes(_) => 10,
CrdsData::ContactInfo(_) => 11,
// Update CrdsCountsArray if new items are added here.
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
crate::{
cluster_info::MAX_SNAPSHOT_HASHES,
contact_info::ContactInfo,
deprecated,
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
epoch_slots::EpochSlots,
Expand Down Expand Up @@ -92,6 +93,7 @@ pub enum CrdsData {
NodeInstance(NodeInstance),
DuplicateShred(DuplicateShredIndex, DuplicateShred),
IncrementalSnapshotHashes(IncrementalSnapshotHashes),
ContactInfo(ContactInfo),
}

impl Sanitize for CrdsData {
Expand Down Expand Up @@ -129,6 +131,7 @@ impl Sanitize for CrdsData {
}
}
CrdsData::IncrementalSnapshotHashes(val) => val.sanitize(),
CrdsData::ContactInfo(node) => node.sanitize(),
}
}
}
Expand Down Expand Up @@ -492,6 +495,7 @@ pub enum CrdsValueLabel {
NodeInstance(Pubkey),
DuplicateShred(DuplicateShredIndex, Pubkey),
IncrementalSnapshotHashes(Pubkey),
ContactInfo(Pubkey),
}

impl fmt::Display for CrdsValueLabel {
Expand All @@ -512,6 +516,7 @@ impl fmt::Display for CrdsValueLabel {
CrdsValueLabel::IncrementalSnapshotHashes(_) => {
write!(f, "IncrementalSnapshotHashes({})", self.pubkey())
}
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
}
}
}
Expand All @@ -530,6 +535,7 @@ impl CrdsValueLabel {
CrdsValueLabel::NodeInstance(p) => *p,
CrdsValueLabel::DuplicateShred(_, p) => *p,
CrdsValueLabel::IncrementalSnapshotHashes(p) => *p,
CrdsValueLabel::ContactInfo(pubkey) => *pubkey,
}
}
}
Expand Down Expand Up @@ -579,6 +585,7 @@ impl CrdsValue {
CrdsData::NodeInstance(node) => node.wallclock,
CrdsData::DuplicateShred(_, shred) => shred.wallclock,
CrdsData::IncrementalSnapshotHashes(hash) => hash.wallclock,
CrdsData::ContactInfo(node) => node.wallclock(),
}
}
pub fn pubkey(&self) -> Pubkey {
Expand All @@ -594,6 +601,7 @@ impl CrdsValue {
CrdsData::NodeInstance(node) => node.from,
CrdsData::DuplicateShred(_, shred) => shred.from,
CrdsData::IncrementalSnapshotHashes(hash) => hash.from,
CrdsData::ContactInfo(node) => *node.pubkey(),
}
}
pub fn label(&self) -> CrdsValueLabel {
Expand All @@ -611,6 +619,7 @@ impl CrdsValue {
CrdsData::IncrementalSnapshotHashes(_) => {
CrdsValueLabel::IncrementalSnapshotHashes(self.pubkey())
}
CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()),
}
}
pub fn contact_info(&self) -> Option<&LegacyContactInfo> {
Expand Down
1 change: 1 addition & 0 deletions gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub mod cluster_info;
pub mod cluster_info_metrics;
pub mod contact_info;
pub mod crds;
pub mod crds_entry;
pub mod crds_gossip;
Expand Down
7 changes: 5 additions & 2 deletions sdk/program/src/short_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,16 @@ where
///
/// #[serde(with = "short_vec")]
///
pub fn deserialize<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
pub fn deserialize<'de, D, T, R>(deserializer: D) -> Result<R, D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de>,
R: From<Vec<T>>,
{
let visitor = ShortVecVisitor { _t: PhantomData };
deserializer.deserialize_tuple(std::usize::MAX, visitor)
deserializer
.deserialize_tuple(std::usize::MAX, visitor)
.map(R::from)
}

pub struct ShortVec<T>(pub Vec<T>);
Expand Down
6 changes: 3 additions & 3 deletions sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ pub use solana_program::{
decode_error, ed25519_program, epoch_schedule, fee_calculator, impl_sysvar_get, incinerator,
instruction, keccak, lamports, loader_instruction, loader_upgradeable_instruction, message,
msg, native_token, nonce, program, program_error, program_memory, program_option, program_pack,
rent, sanitize, sdk_ids, secp256k1_program, secp256k1_recover, serialize_utils, short_vec,
slot_hashes, slot_history, stake, stake_history, syscalls, system_instruction, system_program,
sysvar, unchecked_div_by_const, vote, wasm_bindgen,
rent, sanitize, sdk_ids, secp256k1_program, secp256k1_recover, serde_varint, serialize_utils,
short_vec, slot_hashes, slot_history, stake, stake_history, syscalls, system_instruction,
system_program, sysvar, unchecked_div_by_const, vote, wasm_bindgen,
};

pub mod account;
Expand Down

0 comments on commit b74c948

Please sign in to comment.