Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TPU cleanup #204

Merged
merged 25 commits into from
May 12, 2018
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
95bf68f
Correct some strange naming
garious May 11, 2018
600a1f8
Initialize thin client with events port
garious May 11, 2018
5510085
Better names
garious May 11, 2018
f0be595
Create function for thin client thread
garious May 11, 2018
0488d0a
Extract sig verify functions
garious May 12, 2018
3cedbc4
Reorder to reflect the pipeline order
garious May 12, 2018
765d901
Better names
garious May 12, 2018
b781fdb
Reorganize
garious May 12, 2018
3c11a91
Cleanup verifier error handling
garious May 12, 2018
1960788
Move sig verification stage into its own module
garious May 12, 2018
ca80bc3
Move the writer stage's utilities to its own module
garious May 12, 2018
cd96843
Free up name ThinClientService
garious May 12, 2018
d2f95d5
Move thin client service thread into thin_client_service.rs
garious May 12, 2018
2376dfc
Let thin client own the receiver channel
garious May 12, 2018
73abea0
No need for TPU dependency
garious May 12, 2018
b4ca414
More object-oriented
garious May 12, 2018
7ab3331
Move validation processor to its own module
garious May 12, 2018
898f497
Free up name 'thin_client_service'
garious May 12, 2018
421d9aa
Free up the name 'tpu'
garious May 12, 2018
4180571
Don't pass events_socket to RPU
garious May 12, 2018
3d82807
Delete dead code
garious May 12, 2018
1511dc4
Move RequestProcessor out of Rpu/Tvu state
garious May 12, 2018
a3d2831
Free up the name 'accounting_stage'
garious May 12, 2018
a3869dd
Move entry_receiver to RequestStage
garious May 12, 2018
6264508
Consistent naming of senders and receivers
garious May 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Extract sig verify functions
garious committed May 12, 2018

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 0488d0a82f655eeb29ddf8cadecc07e87abe8bd2
61 changes: 30 additions & 31 deletions src/tpu.rs
Original file line number Diff line number Diff line change
@@ -220,6 +220,31 @@ impl Tpu {
})
}

fn verifier_service(
exit: Arc<AtomicBool>,
packets_receiver: Arc<Mutex<streamer::PacketReceiver>>,
verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> JoinHandle<()> {
spawn(move || loop {
let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone());
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
}

fn verifier_services(
exit: Arc<AtomicBool>,
packets_receiver: streamer::PacketReceiver,
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Vec<JoinHandle<()>> {
let sender = Arc::new(Mutex::new(verified_sender));
let receiver = Arc::new(Mutex::new(packets_receiver));
(0..4)
.map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone()))
.collect()
}

/// Create a UDP microservice that forwards messages the given Tpu.
/// This service is the network leader
/// Set `exit` to shutdown its threads.
@@ -257,23 +282,10 @@ impl Tpu {
blob_recycler.clone(),
responder_receiver,
);
let (verified_sender, verified_receiver) = channel();

let mut verify_threads = Vec::new();
let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
for _ in 0..4 {
let exit_ = exit.clone();
let recv = shared_packet_receiver.clone();
let sender = shared_verified_sender.clone();
let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}
let (verified_sender, verified_receiver) = channel();
let verify_threads: Vec<_> =
Self::verifier_services(exit.clone(), packet_receiver, verified_sender);

let (broadcast_sender, broadcast_receiver) = channel();

@@ -422,22 +434,9 @@ impl Tpu {
responder_receiver,
);
let (verified_sender, verified_receiver) = channel();
let verify_threads: Vec<_> =
Self::verifier_services(exit.clone(), packet_receiver, verified_sender);

let mut verify_threads = Vec::new();
let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
for _ in 0..4 {
let exit_ = exit.clone();
let recv = shared_packet_receiver.clone();
let sender = shared_verified_sender.clone();
let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}
let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone());

let t_thin_client = Self::thin_client_service(