Skip to content

Commit

Permalink
sync::broadcast: don't lock in channel()
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospb19 committed Jul 14, 2023
1 parent f5d2b5a commit d769972
Showing 1 changed file with 37 additions and 18 deletions.
55 changes: 37 additions & 18 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,12 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2;
/// than `usize::MAX / 2`.
#[track_caller]
pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let tx = Sender::new(capacity);
let rx = tx.subscribe();
// SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
let rx = Receiver {
shared: tx.shared.clone(),
next: 0,
};
(tx, rx)
}

Expand All @@ -464,9 +468,28 @@ impl<T> Sender<T> {
/// [`broadcast`]: crate::sync::broadcast
/// [`broadcast::channel`]: crate::sync::broadcast
#[track_caller]
pub fn new(mut capacity: usize) -> Self {
assert!(capacity > 0, "capacity is empty");
assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
pub fn new(capacity: usize) -> Self {
// SAFETY: We don't create extra receivers, so there are 0.
unsafe { Self::new_with_receiver_count(0, capacity) }
}

/// Creates the sending-half of the [`broadcast`] channel, and provide the receiver count.
///
/// See the documentation of [`broadcast::channel`] for more errors when calling this
/// function.
///
/// # Safety:
///
/// The caller must ensure that the amount of receivers for this Sender is correct before
/// the channel functionalities are used, the count is zero by default, as this function
/// does not create any receivers by itself.
#[track_caller]
unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
assert!(capacity > 0, "broadcast channel capacity cannot be zero");
assert!(
capacity <= usize::MAX >> 1,
"broadcast channel capacity exceeded `usize::MAX / 2`"
);

// Round to a power of two
capacity = capacity.next_power_of_two();
Expand All @@ -486,7 +509,7 @@ impl<T> Sender<T> {
mask: capacity - 1,
tail: Mutex::new(Tail {
pos: 0,
rx_cnt: 0,
rx_cnt: receiver_count,
closed: false,
waiters: LinkedList::new(),
}),
Expand Down Expand Up @@ -1383,37 +1406,33 @@ mod tests {

#[test]
fn receiver_count_on_sender_constructor() {
let count_of = |sender: &Sender<i32>| sender.shared.tail.lock().rx_cnt;

let sender = Sender::<i32>::new(16);
assert_eq!(count_of(&sender), 0);
assert_eq!(sender.receiver_count(), 0);

let rx_1 = sender.subscribe();
assert_eq!(count_of(&sender), 1);
assert_eq!(sender.receiver_count(), 1);

let rx_2 = rx_1.resubscribe();
assert_eq!(count_of(&sender), 2);
assert_eq!(sender.receiver_count(), 2);

let rx_3 = sender.subscribe();
assert_eq!(count_of(&sender), 3);
assert_eq!(sender.receiver_count(), 3);

drop(rx_3);
drop(rx_1);
assert_eq!(count_of(&sender), 1);
assert_eq!(sender.receiver_count(), 1);

drop(rx_2);
assert_eq!(count_of(&sender), 0);
assert_eq!(sender.receiver_count(), 0);
}

#[cfg(not(loom))]
#[test]
fn receiver_count_on_channel_constructor() {
let count_of = |sender: &Sender<i32>| sender.shared.tail.lock().rx_cnt;

let (sender, rx) = channel::<i32>(16);
assert_eq!(count_of(&sender), 1);
assert_eq!(sender.receiver_count(), 1);

let _rx_2 = rx.resubscribe();
assert_eq!(count_of(&sender), 2);
assert_eq!(sender.receiver_count(), 2);
}
}

0 comments on commit d769972

Please sign in to comment.