Skip to content

Commit

Permalink
Drop support for EntryInfo subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
garious committed May 14, 2018
1 parent 0ae69bd commit cc447c0
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 86 deletions.
10 changes: 2 additions & 8 deletions src/entry_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use accountant::Accountant;
use entry::Entry;
use ledger;
use packet;
use request_processor::RequestProcessor;
use result::Result;
use serde_json;
use std::collections::VecDeque;
Expand All @@ -17,16 +16,12 @@ use streamer;

pub struct EntryWriter<'a> {
accountant: &'a Accountant,
request_processor: &'a RequestProcessor,
}

impl<'a> EntryWriter<'a> {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(accountant: &'a Accountant, request_processor: &'a RequestProcessor) -> Self {
EntryWriter {
accountant,
request_processor,
}
pub fn new(accountant: &'a Accountant) -> Self {
EntryWriter { accountant }
}

fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
Expand All @@ -37,7 +32,6 @@ impl<'a> EntryWriter<'a> {
"{}",
serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry")
).expect("writeln! in fn write_entry");
self.request_processor.notify_entry_info_subscribers(&entry);
}

fn write_entries<W: Write>(
Expand Down
14 changes: 0 additions & 14 deletions src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,6 @@ pub enum Request {
GetBalance { key: PublicKey },
GetLastId,
GetTransactionCount,
Subscribe { subscriptions: Vec<Subscription> },
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Subscription {
EntryInfo,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EntryInfo {
pub id: Hash,
pub num_hashes: u64,
pub num_events: u64,
}

impl Request {
Expand All @@ -44,7 +31,6 @@ pub enum Response {
Balance { key: PublicKey, val: Option<i64> },
LastId { id: Hash },
TransactionCount { transaction_count: u64 },
EntryInfo(EntryInfo),
}

pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> {
Expand Down
45 changes: 4 additions & 41 deletions src/request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,25 @@ use event_processor::EventProcessor;
use packet;
use packet::SharedPackets;
use rayon::prelude::*;
use request::{EntryInfo, Request, Response, Subscription};
use request::{Request, Response};
use result::Result;
use std::collections::VecDeque;
use std::net::{SocketAddr, UdpSocket};
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::Instant;
use streamer;
use timing;

pub struct RequestProcessor {
accountant: Arc<Accountant>,
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
}

impl RequestProcessor {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(accountant: Arc<Accountant>) -> Self {
RequestProcessor {
accountant,
entry_info_subscribers: Mutex::new(vec![]),
}
RequestProcessor { accountant }
}

/// Process Request items sent by clients.
Expand Down Expand Up @@ -59,16 +55,6 @@ impl RequestProcessor {
Some(rsp)
}
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
for subscription in subscriptions {
match subscription {
Subscription::EntryInfo => {
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
}
}
}
None
}
}
}

Expand All @@ -81,29 +67,6 @@ impl RequestProcessor {
.collect()
}

pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
// TODO: No need to bind().
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");

// copy subscribers to avoid taking lock while doing io
let addrs = self.entry_info_subscribers.lock().unwrap().clone();
trace!("Sending to {} addrs", addrs.len());
for addr in addrs {
let entry_info = EntryInfo {
id: entry.id,
num_hashes: entry.num_hashes,
num_events: entry.events.len() as u64,
};
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
trace!("sending {} to {}", data.len(), addr);
//TODO dont do IO here, this needs to be on a separate channel
let res = socket.send_to(&data, addr);
if res.is_err() {
eprintln!("couldn't send response: {:?}", res);
}
}
}

fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
p.packets
.par_iter()
Expand Down
4 changes: 1 addition & 3 deletions src/rpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ impl Rpu {

fn write_service<W: Write + Send + 'static>(
accountant: Arc<Accountant>,
request_processor: Arc<RequestProcessor>,
exit: Arc<AtomicBool>,
broadcast: streamer::BlobSender,
blob_recycler: packet::BlobRecycler,
writer: Mutex<W>,
entry_receiver: Receiver<Entry>,
) -> JoinHandle<()> {
spawn(move || loop {
let entry_writer = EntryWriter::new(&accountant, &request_processor);
let entry_writer = EntryWriter::new(&accountant);
let _ = entry_writer.write_and_send_entries(
&broadcast,
&blob_recycler,
Expand Down Expand Up @@ -99,7 +98,6 @@ impl Rpu {
let (broadcast_sender, broadcast_receiver) = channel();
let t_write = Self::write_service(
self.event_processor.accountant.clone(),
request_stage.request_processor.clone(),
exit.clone(),
broadcast_sender,
blob_recycler.clone(),
Expand Down
18 changes: 1 addition & 17 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use bincode::{deserialize, serialize};
use futures::future::{ok, FutureResult};
use hash::Hash;
use request::{Request, Response, Subscription};
use request::{Request, Response};
use signature::{KeyPair, PublicKey, Signature};
use std::collections::HashMap;
use std::io;
Expand All @@ -18,7 +18,6 @@ pub struct ThinClient {
pub requests_socket: UdpSocket,
pub events_socket: UdpSocket,
last_id: Option<Hash>,
num_events: u64,
transaction_count: u64,
balances: HashMap<PublicKey, Option<i64>>,
}
Expand All @@ -33,22 +32,12 @@ impl ThinClient {
requests_socket,
events_socket,
last_id: None,
num_events: 0,
transaction_count: 0,
balances: HashMap::new(),
};
client.init();
client
}

pub fn init(&self) {
let subscriptions = vec![Subscription::EntryInfo];
let req = Request::Subscribe { subscriptions };
let data = serialize(&req).expect("serialize Subscribe in thin_client");
trace!("subscribing to {}", self.addr);
let _res = self.requests_socket.send_to(&data, &self.addr);
}

pub fn recv_response(&self) -> io::Result<Response> {
let mut buf = vec![0u8; 1024];
info!("start recv_from");
Expand All @@ -72,11 +61,6 @@ impl ThinClient {
info!("Response transaction count {:?}", transaction_count);
self.transaction_count = transaction_count;
}
Response::EntryInfo(entry_info) => {
trace!("Response entry_info {:?}", entry_info.id);
self.last_id = Some(entry_info.id);
self.num_events += entry_info.num_events;
}
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ impl Tvu {

fn drain_service(
accountant: Arc<Accountant>,
request_processor: Arc<RequestProcessor>,
exit: Arc<AtomicBool>,
entry_receiver: Receiver<Entry>,
) -> JoinHandle<()> {
spawn(move || {
let entry_writer = EntryWriter::new(&accountant, &request_processor);
let entry_writer = EntryWriter::new(&accountant);
loop {
let _ = entry_writer.drain_entries(&entry_receiver);
if exit.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -183,7 +182,6 @@ impl Tvu {

let t_write = Self::drain_service(
obj.event_processor.accountant.clone(),
request_stage.request_processor.clone(),
exit.clone(),
request_stage.entry_receiver,
);
Expand Down

0 comments on commit cc447c0

Please sign in to comment.