Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
checks for duplicate validator instances using gossip (bp #14018) (#1…
Browse files Browse the repository at this point in the history
…4028)

* checks for duplicate validator instances using gossip

(cherry picked from commit 8cd5eb9)

# Conflicts:
#	core/src/cluster_info.rs

* pushes node-instance along with version early in gossip

(cherry picked from commit 5421981)

* removes RwLock on ClusterInfo.instance

(cherry picked from commit 895d7d6)

# Conflicts:
#	core/src/cluster_info.rs

* std::process::exit to kill all threads

(cherry picked from commit 1d267ea)

* removes backport merge conflicts

Co-authored-by: behzad nouri <[email protected]>
  • Loading branch information
mergify[bot] and behzadnouri authored Dec 9, 2020
1 parent c20e74a commit 07191dc
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 38 deletions.
107 changes: 74 additions & 33 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::{
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_value::{
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash,
Version, Vote, MAX_WALLCLOCK,
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance,
SnapshotHash, Version, Vote, MAX_WALLCLOCK,
},
data_budget::DataBudget,
epoch_slots::EpochSlots,
Expand Down Expand Up @@ -298,6 +298,7 @@ pub struct ClusterInfo {
stats: GossipStats,
socket: UdpSocket,
local_message_pending_push_queue: RwLock<Vec<(CrdsValue, u64)>>,
instance: NodeInstance,
}

impl Default for ClusterInfo {
Expand Down Expand Up @@ -422,7 +423,7 @@ pub fn make_accounts_hashes_message(
type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "8L3mKuv292LTa3XFCGNVdaFihWnsgYE4hf941p9gqUxF")]
#[frozen_abi(digest = "6PpTdBvyX37y5ERokb8DejgKobpsuTbFJC39f8Eqz7Vy")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
enum Protocol {
Expand Down Expand Up @@ -553,6 +554,7 @@ impl ClusterInfo {
stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
local_message_pending_push_queue: RwLock::new(vec![]),
instance: NodeInstance::new(id, timestamp()),
};
{
let mut gossip = me.gossip.write().unwrap();
Expand Down Expand Up @@ -586,6 +588,7 @@ impl ClusterInfo {
.unwrap()
.clone(),
),
instance: NodeInstance::new(*new_id, timestamp()),
}
}

Expand All @@ -606,16 +609,24 @@ impl ClusterInfo {
) {
let now = timestamp();
self.my_contact_info.write().unwrap().wallclock = now;
let entry =
CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair);
let entries: Vec<_> = vec![
CrdsData::ContactInfo(self.my_contact_info()),
CrdsData::NodeInstance(self.instance.with_wallclock(now)),
]
.into_iter()
.map(|v| CrdsValue::new_signed(v, &self.keypair))
.collect();
{
let mut local_message_pending_push_queue =
self.local_message_pending_push_queue.write().unwrap();
for entry in entries {
local_message_pending_push_queue.push((entry, now));
}
}
self.gossip
.write()
.unwrap()
.refresh_push_active_set(stakes, gossip_validators);
self.local_message_pending_push_queue
.write()
.unwrap()
.push((entry, now));
}

// TODO kill insert_info, only used by tests
Expand Down Expand Up @@ -1789,9 +1800,14 @@ impl ClusterInfo {
let mut last_contact_info_trace = timestamp();
let mut adopt_shred_version = self.my_shred_version() == 0;
let recycler = PacketsRecycler::default();

let message = CrdsData::Version(Version::new(self.id()));
self.push_message(CrdsValue::new_signed(message, &self.keypair));
let crds_data = vec![
CrdsData::Version(Version::new(self.id())),
CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())),
];
for value in crds_data {
let value = CrdsValue::new_signed(value, &self.keypair);
self.push_message(value);
}
let mut generate_pull_requests = true;
loop {
let start = timestamp();
Expand Down Expand Up @@ -2487,8 +2503,8 @@ impl ClusterInfo {
stakes: HashMap<Pubkey, u64>,
feature_set: Option<&FeatureSet>,
epoch_time_ms: u64,
) {
let mut timer = Measure::start("process_gossip_packets_time");
) -> Result<()> {
let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time);
let packets: Vec<_> = thread_pool.install(|| {
packets
.into_par_iter()
Expand All @@ -2501,6 +2517,16 @@ impl ClusterInfo {
})
.collect()
});
// Check if there is a duplicate instance of
// this node with more recent timestamp.
let check_duplicate_instance = |values: &[CrdsValue]| {
for value in values {
if self.instance.check_duplicate(value) {
return Err(Error::DuplicateNodeInstance);
}
}
Ok(())
};
// Split packets based on their types.
let mut pull_requests = vec![];
let mut pull_responses = vec![];
Expand All @@ -2513,8 +2539,14 @@ impl ClusterInfo {
Protocol::PullRequest(filter, caller) => {
pull_requests.push((from_addr, filter, caller))
}
Protocol::PullResponse(from, data) => pull_responses.push((from, data)),
Protocol::PushMessage(from, data) => push_messages.push((from, data)),
Protocol::PullResponse(from, data) => {
check_duplicate_instance(&data)?;
pull_responses.push((from, data));
}
Protocol::PushMessage(from, data) => {
check_duplicate_instance(&data)?;
push_messages.push((from, data));
}
Protocol::PruneMessage(from, data) => prune_messages.push((from, data)),
Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)),
Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)),
Expand All @@ -2539,9 +2571,7 @@ impl ClusterInfo {
response_sender,
feature_set,
);
self.stats
.process_gossip_packets_time
.add_measure(&mut timer);
Ok(())
}

/// Process messages from the network
Expand Down Expand Up @@ -2588,7 +2618,7 @@ impl ClusterInfo {
stakes,
feature_set.as_deref(),
epoch_time_ms,
);
)?;

self.print_reset_stats(last_print);

Expand Down Expand Up @@ -2853,25 +2883,36 @@ impl ClusterInfo {
.build()
.unwrap();
let mut last_print = Instant::now();
loop {
let e = self.run_listen(
while !exit.load(Ordering::Relaxed) {
if let Err(err) = self.run_listen(
&recycler,
bank_forks.as_ref(),
&requests_receiver,
&response_sender,
&thread_pool,
&mut last_print,
);
if exit.load(Ordering::Relaxed) {
return;
}
if e.is_err() {
let r_gossip = self.gossip.read().unwrap();
debug!(
"{}: run_listen timeout, table size: {}",
self.id(),
r_gossip.crds.len()
);
) {
match err {
Error::RecvTimeoutError(_) => {
let table_size = self.gossip.read().unwrap().crds.len();
debug!(
"{}: run_listen timeout, table size: {}",
self.id(),
table_size,
);
}
Error::DuplicateNodeInstance => {
error!(
"duplicate running instances of the same validator node: {}",
self.id()
);
exit.store(true, Ordering::Relaxed);
// TODO: Pass through ValidatorExit here so
// that this will exit cleanly.
std::process::exit(1);
}
_ => error!("gossip run_listen failed: {}", err),
}
}
thread_mem_usage::datapoint("solana-listen");
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/crds_gossip_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl CrdsGossipPush {
for i in start..(start + push_fanout) {
let index = i % self.active_set.len();
let (peer, filter) = self.active_set.get_index(index).unwrap();
if !filter.contains(&origin) {
if !filter.contains(&origin) || value.should_force_push(peer) {
trace!("new_push_messages insert {} {:?}", *peer, value);
push_messages.entry(*peer).or_default().push(value.clone());
num_pushes += 1;
Expand Down
Loading

0 comments on commit 07191dc

Please sign in to comment.