Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
fixed failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pgarg66 committed Jan 31, 2019
1 parent a30a5d2 commit c7fb918
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/broadcast_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl Broadcast {

inc_new_counter_info!("streamer-broadcast-sent", blobs.len());

blob_sender.send(blobs.clone()).unwrap();
blob_sender.send(blobs.clone())?;

// don't count coding blobs in the blob indexes
self.blob_index += blobs.len() as u64;
Expand Down
11 changes: 4 additions & 7 deletions src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub struct Fullnode {
broadcast_socket: UdpSocket,
pub node_services: NodeServices,
pub role_notifiers: (TvuRotationReceiver, TpuRotationReceiver),
blob_sender: Option<BlobSender>,
blob_sender: BlobSender,
}

impl Fullnode {
Expand Down Expand Up @@ -277,7 +277,7 @@ impl Fullnode {
tpu_sockets: node.sockets.tpu,
broadcast_socket: node.sockets.broadcast,
role_notifiers: (to_leader_receiver, to_validator_receiver),
blob_sender: Some(blob_sender),
blob_sender,
}
}

Expand Down Expand Up @@ -337,7 +337,7 @@ impl Fullnode {
&last_id,
self.id,
&to_validator_sender,
&self.blob_sender.as_ref().unwrap(),
&self.blob_sender,
)
}

Expand Down Expand Up @@ -378,11 +378,8 @@ impl Fullnode {
self.node_services.exit()
}

pub fn close(mut self) -> Result<()> {
pub fn close(self) -> Result<()> {
self.exit();
if let Some(blob_sender) = self.blob_sender.take() {
drop(blob_sender);
}
self.join()
}

Expand Down
1 change: 1 addition & 0 deletions src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl Replicator {
leader_pubkey,
))),
done.clone(),
exit.clone(),
);

info!("window created, waiting for ledger download done");
Expand Down
4 changes: 3 additions & 1 deletion src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub struct RetransmitStage {
}

impl RetransmitStage {
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new(
bank: &Arc<Bank>,
db_ledger: Arc<DbLedger>,
Expand All @@ -134,6 +134,7 @@ impl RetransmitStage {
repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: BlobReceiver,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
exit: Arc<AtomicBool>,
) -> (Self, Receiver<Vec<Entry>>) {
let (retransmit_sender, retransmit_receiver) = channel();

Expand All @@ -157,6 +158,7 @@ impl RetransmitStage {
repair_socket,
leader_scheduler,
done,
exit,
);

let thread_hdls = vec![t_retransmit, t_window];
Expand Down
1 change: 1 addition & 0 deletions src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl Tvu {
repair_socket,
blob_fetch_receiver,
bank.leader_scheduler.clone(),
exit.clone(),
);

let l_entry_height = Arc::new(RwLock::new(entry_height));
Expand Down
8 changes: 7 additions & 1 deletion src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use solana_metrics::{influxdb, submit};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle};
Expand Down Expand Up @@ -129,6 +129,7 @@ pub fn window_service(
repair_socket: Arc<UdpSocket>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
done: Arc<AtomicBool>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-window".to_string())
Expand All @@ -139,6 +140,9 @@ pub fn window_service(
let id = cluster_info.read().unwrap().id();
trace!("{}: RECV_WINDOW started", id);
loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = recv_window(
&db_ledger,
&id,
Expand Down Expand Up @@ -273,6 +277,7 @@ mod test {
Arc::new(tn.sockets.repair),
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),
done,
exit.clone(),
);
let t_responder = {
let (s_responder, r_responder) = channel();
Expand Down Expand Up @@ -342,6 +347,7 @@ mod test {
Arc::new(tn.sockets.repair),
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),
done,
exit.clone(),
);
let t_responder = {
let (s_responder, r_responder) = channel();
Expand Down

0 comments on commit c7fb918

Please sign in to comment.