Skip to content

Commit

Permalink
fix rustfmt issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Schweiger authored and Aaron Schweiger committed Sep 15, 2023
1 parent 7baf04a commit 537b964
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 26 deletions.
19 changes: 8 additions & 11 deletions benches/sync_mpsc_recv_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fn rt() -> tokio::runtime::Runtime {

// Simulate a use case of an actor that must update
// a resource, but resource only needs last value
fn publish_last_value(last_value:usize) -> usize {
fn publish_last_value(last_value: usize) -> usize {
std::thread::sleep(std::time::Duration::from_nanos(1));
last_value
}
Expand Down Expand Up @@ -64,15 +64,14 @@ fn contention_bounded_updater_recv_many(b: &mut Bencher) {
let count = rx.recv_many(&mut buffer).await;
total += count;
if count > 0 {
last_value = buffer[buffer.len()-1]
last_value = buffer[buffer.len() - 1]
}
}
last_value
})
});
}


fn contention_bounded_updater_publish_recv(b: &mut Bencher) {
let rt = rt();

Expand All @@ -89,7 +88,7 @@ fn contention_bounded_updater_publish_recv(b: &mut Bencher) {
});
}

for _ in 0..1_000 {
for _ in 0..1_000 {
let Some(v) = rx.recv().await else {continue};
let _ = publish_last_value(v);
}
Expand Down Expand Up @@ -119,7 +118,7 @@ fn contention_bounded_updater_publish_recv_many(b: &mut Bencher) {
let count = rx.recv_many(&mut buffer).await;
total += count;
if count > 0 {
publish_last_value(buffer[buffer.len()-1]);
publish_last_value(buffer[buffer.len() - 1]);
}
}
})
Expand Down Expand Up @@ -175,7 +174,7 @@ fn contention_bounded_full_updater_recv_many(b: &mut Bencher) {
let count = rx.recv_many(&mut buffer).await;
total += count;
if count > 0 {
last_value = buffer[buffer.len()-1]
last_value = buffer[buffer.len() - 1]
}
}
last_value
Expand Down Expand Up @@ -232,7 +231,7 @@ fn contention_unbounded_updater_recv_many(b: &mut Bencher) {
let count = rx.recv_many(&mut buffer).await;
total += count;
if count > 0 {
last_value = buffer[buffer.len()-1]
last_value = buffer[buffer.len() - 1]
}
}
last_value
Expand Down Expand Up @@ -279,15 +278,14 @@ fn uncontented_bounded_updater_recv_many(b: &mut Bencher) {
let count = rx.recv_many(&mut buffer).await;
total += count;
if count > 0 {
last_value = buffer[buffer.len()-1]
last_value = buffer[buffer.len() - 1]
}
}
last_value
})
});
}


fn uncontented_unbounded_updater_recv(b: &mut Bencher) {
let rt = rt();

Expand Down Expand Up @@ -327,7 +325,7 @@ fn uncontented_unbounded_updater_recv_many(b: &mut Bencher) {
let count = rx.recv_many(&mut buffer).await;
total += count;
if count > 0 {
last_value = buffer[buffer.len()-1]
last_value = buffer[buffer.len() - 1]
}
}
last_value
Expand All @@ -347,7 +345,6 @@ bencher::benchmark_group!(
contention_bounded_updater_publish_recv_many
);


bencher::benchmark_group!(
contention_bounded_full_updater,
contention_bounded_full_updater_recv,
Expand Down
27 changes: 14 additions & 13 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub(crate) trait Semaphore {

fn add_permit(&self);

fn add_permits(&self, n:usize);
fn add_permits(&self, n: usize);

fn close(&self);

Expand Down Expand Up @@ -321,14 +321,17 @@ impl<T, S: Semaphore> Rx<T, S> {
}
buffer.push(value);
let mut next = rx_fields.list.peek();
while (buffer.len() < capacity && match next {
Some(Value(value)) => {
rx_fields.list.pop(&self.inner.tx);
buffer.push(value);
next = rx_fields.list.peek();
true }
_ => false
}) {}
while (buffer.len() < capacity
&& match next {
Some(Value(value)) => {
rx_fields.list.pop(&self.inner.tx);
buffer.push(value);
next = rx_fields.list.peek();
true
}
_ => false,
})
{}
self.inner.semaphore.add_permits(buffer.len());
coop.made_progress();
return Ready(buffer.len());
Expand Down Expand Up @@ -367,7 +370,6 @@ impl<T, S: Semaphore> Rx<T, S> {
})
}


/// Try to receive the next value.
pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
use super::list::TryPopResult;
Expand Down Expand Up @@ -464,7 +466,7 @@ impl Semaphore for bounded::Semaphore {
self.semaphore.release(1)
}

fn add_permits(&self, n:usize) {
fn add_permits(&self, n: usize) {
self.semaphore.release(n)
}

Expand Down Expand Up @@ -497,7 +499,7 @@ impl Semaphore for unbounded::Semaphore {
}
}

fn add_permits(&self, n:usize) {
fn add_permits(&self, n: usize) {
let prev = self.0.fetch_sub(n << 1, Release);

if prev >> 1 == 0 {
Expand All @@ -521,5 +523,4 @@ impl Semaphore for unbounded::Semaphore {
fn num_acquired(&self) -> usize {
self.0.load(Acquire) >> 1
}

}
3 changes: 1 addition & 2 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ async fn async_send_recv_many_with_buffer() {
assert_eq!(None, rx.recv().await);
}


#[tokio::test]
#[cfg(feature = "full")]
async fn start_send_past_cap() {
Expand Down Expand Up @@ -220,7 +219,7 @@ async fn send_recv_many_unbounded() {

let mut buffer = vec![0; 0];
assert_eq!(rx.recv_many(&mut buffer).await, 4);
assert_eq!(vec![7,13,100,1002], buffer);
assert_eq!(vec![7, 13, 100, 1002], buffer);
assert!(buffer.capacity() >= 4);

drop(tx);
Expand Down

0 comments on commit 537b964

Please sign in to comment.