Skip to content

Commit

Permalink
Fix up most of the formatting
Browse files Browse the repository at this point in the history
Signed-off-by: James Sturtevant <[email protected]>
  • Loading branch information
jsturtevant committed Feb 23, 2023
1 parent 5663531 commit d0ced61
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 122 deletions.
30 changes: 15 additions & 15 deletions src/sync/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@


use crate::error::{get_rpc_status, sock_error_msg, Error, Result};
use crate::sync::sys::{PipeConnection};
use crate::sync::sys::PipeConnection;
use crate::proto::{Code, MessageHeader, MESSAGE_HEADER_LENGTH, MESSAGE_LENGTH_MAX};

fn read_count (fd: &PipeConnection, count: usize) -> Result<Vec<u8>> {
fn read_count(conn: &PipeConnection, count: usize) -> Result<Vec<u8>> {
let mut v: Vec<u8> = vec![0; count];
let mut len = 0;

Expand All @@ -26,7 +26,7 @@ fn read_count (fd: &PipeConnection, count: usize) -> Result<Vec<u8>> {
}

loop {
match fd.read(&mut v[len..]) {
match conn.read(&mut v[len..]) {
Ok(l) => {
len += l;
// when socket peer closed, it would return 0.
Expand All @@ -43,15 +43,15 @@ fn read_count (fd: &PipeConnection, count: usize) -> Result<Vec<u8>> {
Ok(v[0..len].to_vec())
}

fn write_count(fd: &PipeConnection, buf: &[u8], count: usize) -> Result<usize> {
fn write_count(conn: &PipeConnection, buf: &[u8], count: usize) -> Result<usize> {
let mut len = 0;

if count == 0 {
return Ok(0);
}

loop {
match fd.write(&buf[len..]){
match conn.write(&buf[len..]){
Ok(l) => {
len += l;
if len == count {
Expand All @@ -67,8 +67,8 @@ fn write_count(fd: &PipeConnection, buf: &[u8], count: usize) -> Result<usize> {
Ok(len)
}

fn read_message_header(fd: &PipeConnection) -> Result<MessageHeader> {
let buf = read_count(fd, MESSAGE_HEADER_LENGTH)?;
fn read_message_header(conn: &PipeConnection) -> Result<MessageHeader> {
let buf = read_count(conn, MESSAGE_HEADER_LENGTH)?;
let size = buf.len();
if size != MESSAGE_HEADER_LENGTH {
return Err(sock_error_msg(
Expand All @@ -82,8 +82,8 @@ fn read_message_header(fd: &PipeConnection) -> Result<MessageHeader> {
Ok(mh)
}

pub fn read_message(fd: &PipeConnection) -> Result<(MessageHeader, Vec<u8>)> {
let mh = read_message_header(fd)?;
pub fn read_message(conn: &PipeConnection) -> Result<(MessageHeader, Vec<u8>)> {
let mh = read_message_header(conn)?;
trace!("Got Message header {:?}", mh);

if mh.length > MESSAGE_LENGTH_MAX as u32 {
Expand All @@ -96,7 +96,7 @@ pub fn read_message(fd: &PipeConnection) -> Result<(MessageHeader, Vec<u8>)> {
));
}

let buf = read_count(fd, mh.length as usize)?;
let buf = read_count(conn, mh.length as usize)?;
let size = buf.len();
if size != mh.length as usize {
return Err(sock_error_msg(
Expand All @@ -109,10 +109,10 @@ pub fn read_message(fd: &PipeConnection) -> Result<(MessageHeader, Vec<u8>)> {
Ok((mh, buf))
}

fn write_message_header(fd: &PipeConnection, mh: MessageHeader) -> Result<()> {
fn write_message_header(conn: &PipeConnection, mh: MessageHeader) -> Result<()> {
let buf: Vec<u8> = mh.into();

let size = write_count(fd, &buf, MESSAGE_HEADER_LENGTH)?;
let size = write_count(conn, &buf, MESSAGE_HEADER_LENGTH)?;
if size != MESSAGE_HEADER_LENGTH {
return Err(sock_error_msg(
size,
Expand All @@ -123,10 +123,10 @@ fn write_message_header(fd: &PipeConnection, mh: MessageHeader) -> Result<()> {
Ok(())
}

pub fn write_message(fd: &PipeConnection, mh: MessageHeader, buf: Vec<u8>) -> Result<()> {
write_message_header(fd, mh)?;
pub fn write_message(conn: &PipeConnection, mh: MessageHeader, buf: Vec<u8>) -> Result<()> {
write_message_header(conn, mh)?;

let size = write_count(fd, &buf, buf.len())?;
let size = write_count(conn, &buf, buf.len())?;
if size != buf.len() {
return Err(sock_error_msg(
size,
Expand Down
36 changes: 16 additions & 20 deletions src/sync/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Receiver = mpsc::Receiver<(Vec<u8>, mpsc::SyncSender<Result<Vec<u8>>>)>;
/// A ttrpc Client (sync).
#[derive(Clone)]
pub struct Client {
_fd: Arc<ClientConnection>,
_connection: Arc<ClientConnection>,
sender_tx: Sender,
}

Expand All @@ -60,30 +60,29 @@ impl Client {
let (sender_tx, rx): (Sender, Receiver) = mpsc::channel();
let recver_map_orig = Arc::new(Mutex::new(HashMap::new()));

//Sender
let recver_map = recver_map_orig.clone();


let receiver_map = recver_map_orig.clone();
let connection = Arc::new(client.get_pipe_connection());
let sender_client = connection.clone();

let recieve_client = connection.clone();

//Sender
thread::spawn(move || {
let mut stream_id: u32 = 1;
for (buf, recver_tx) in rx.iter() {
let current_stream_id = stream_id;
stream_id += 2;
//Put current_stream_id and recver_tx to recver_map
{
let mut map = recver_map.lock().unwrap();
let mut map = receiver_map.lock().unwrap();
map.insert(current_stream_id, recver_tx.clone());
}
let mut mh = MessageHeader::new_request(0, buf.len() as u32);
mh.set_stream_id(current_stream_id);

if let Err(e) = write_message(&recieve_client, mh, buf) {
if let Err(e) = write_message(&sender_client, mh, buf) {
//Remove current_stream_id and recver_tx to recver_map
{
let mut map = recver_map.lock().unwrap();
let mut map = receiver_map.lock().unwrap();
map.remove(&current_stream_id);
}
recver_tx
Expand All @@ -95,14 +94,11 @@ impl Client {
});

//Recver
let reciever_connection = connection;
let reciever_client = client.clone();
let receiver_connection = connection;
let receiver_client = client.clone();
thread::spawn(move || {


loop {

match reciever_client.ready() {
match receiver_client.ready() {
Ok(None) => {
continue;
}
Expand All @@ -112,10 +108,10 @@ impl Client {
break;
}
}

let mh;
let buf;

match read_message(&reciever_connection) {
match read_message(&receiver_connection) {
Ok((x, y)) => {
mh = x;
buf = y;
Expand Down Expand Up @@ -164,7 +160,7 @@ impl Client {
map.remove(&mh.stream_id);
}

let _ = reciever_client.close_receiver().map_err(|e| {
let _ = receiver_client.close_receiver().map_err(|e| {
warn!(
"failed to close with error: {:?}", e
)
Expand All @@ -174,7 +170,7 @@ impl Client {
});

Client {
_fd: client,
_connection: client,
sender_tx,
}
}
Expand Down Expand Up @@ -214,6 +210,6 @@ impl Client {
impl Drop for ClientConnection {
fn drop(&mut self) {
self.close().unwrap();
trace!("All client is dropped");
trace!("Client is dropped");
}
}
45 changes: 20 additions & 25 deletions src/sync/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,27 @@ pub struct Server {
thread_count_max: usize,
}

struct Connection
{
fd: Arc<PipeConnection>,
struct Connection {
connection: Arc<PipeConnection>,
quit: Arc<AtomicBool>,
handler: Option<JoinHandle<()>>,
}

impl Connection
{
impl Connection {
fn close (&self) {
self.fd.close().unwrap_or(());
self.connection.close().unwrap_or(());
}

fn shutdown(&self) {
self.quit.store(true, Ordering::SeqCst);

// in case the connection had closed
self.fd.shutdown().unwrap_or(());
self.connection.shutdown().unwrap_or(());
}
}

struct ThreadS<'a>
{
fd: &'a Arc<PipeConnection>,
struct ThreadS<'a> {
connection: &'a Arc<PipeConnection>,
fdlock: &'a Arc<Mutex<()>>,
wtc: &'a Arc<AtomicUsize>,
quit: &'a Arc<AtomicBool>,
Expand All @@ -94,7 +91,7 @@ struct ThreadS<'a>

#[allow(clippy::too_many_arguments)]
fn start_method_handler_thread(
fd: Arc<PipeConnection>,
connection: Arc<PipeConnection>,
fdlock: Arc<Mutex<()>>,
wtc: Arc<AtomicUsize>,
quit: Arc<AtomicBool>,
Expand Down Expand Up @@ -122,7 +119,7 @@ fn start_method_handler_thread(
.unwrap_or_else(|err| trace!("Failed to send {:?}", err));
break;
}
result = read_message(&fd);
result = read_message(&connection);
}

if quit.load(Ordering::SeqCst) {
Expand Down Expand Up @@ -213,7 +210,7 @@ fn start_method_handler_thread(
continue;
};
let ctx = TtrpcContext {
fd: fd.id(),
fd: connection.id(),
mh,
res_tx: res_tx.clone(),
metadata: context::from_pb(&req.metadata),
Expand All @@ -234,14 +231,13 @@ fn start_method_handler_thread(
});
}

fn start_method_handler_threads(num: usize, ts: &ThreadS)
{
fn start_method_handler_threads(num: usize, ts: &ThreadS) {
for _ in 0..num {
if ts.quit.load(Ordering::SeqCst) {
break;
}
start_method_handler_thread(
ts.fd.clone(),
ts.connection.clone(),
ts.fdlock.clone(),
ts.wtc.clone(),
ts.quit.clone(),
Expand All @@ -254,8 +250,7 @@ fn start_method_handler_threads(num: usize, ts: &ThreadS)
}
}

fn check_method_handler_threads(ts: &ThreadS)
{
fn check_method_handler_threads(ts: &ThreadS) {
let c = ts.wtc.load(Ordering::SeqCst);
if c < ts.min {
start_method_handler_threads(ts.default - c, ts);
Expand Down Expand Up @@ -388,9 +383,6 @@ impl Server {
let handler = thread::Builder::new()
.name("listener_loop".into())
.spawn(move || {

let listener = listener;

loop {
trace!("listening...");
let pipe_connection = match listener.accept(&listener_quit_flag) {
Expand All @@ -405,7 +397,6 @@ impl Server {
break;
}
};


let methods = methods.clone();
let quit = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -438,7 +429,7 @@ impl Server {
let (control_tx, control_rx): (SyncSender<()>, Receiver<()>) =
sync_channel(0);
let ts = ThreadS {
fd: &pipe,
connection: &pipe,
fdlock: &Arc::new(Mutex::new(())),
wtc: &Arc::new(AtomicUsize::new(0)),
methods: &methods,
Expand Down Expand Up @@ -477,7 +468,7 @@ impl Server {
cns.insert(
id,
Connection {
fd: pipe_connection,
connection: pipe_connection,
handler: Some(handler),
quit: quit.clone(),
},
Expand Down Expand Up @@ -514,7 +505,11 @@ impl Server {
pub fn stop_listen(mut self) -> Self {
self.listener_quit_flag.store(true, Ordering::SeqCst);

self.listeners[0].close().unwrap();
self.listeners[0].close().unwrap_or_else(|e| {
warn!(
"failed to close connection with error: {}", e
)
});

info!("close monitor");
if let Some(handler) = self.handler.take() {
Expand Down
2 changes: 1 addition & 1 deletion src/sync/sys/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ pub use crate::sync::sys::unix::{PipeConnection, PipeListener, ClientConnection}
#[cfg(windows)]
mod windows;
#[cfg(windows)]
pub use crate::sync::sys::windows::{PipeConnection, PipeListener, ClientConnection};
pub use crate::sync::sys::windows::{PipeConnection, PipeListener, ClientConnection};
2 changes: 1 addition & 1 deletion src/sync/sys/unix/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
mod net;
pub use net::{PipeConnection, PipeListener, ClientConnection};
pub use net::{PipeConnection, PipeListener, ClientConnection};
Loading

0 comments on commit d0ced61

Please sign in to comment.