From 49188f7e7361417cc9e06548dc1b0b7824e5e226 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Sun, 28 Feb 2021 17:40:32 +0800 Subject: [PATCH 1/3] channel: discard messages eagerly If all receivers are dropped, the memory hold by channel will only be dropped when all senders are also dropped. If there are too many channels the memory leak can be significant. Signed-off-by: Jay Lee --- crossbeam-channel/src/channel.rs | 4 +- crossbeam-channel/src/flavors/list.rs | 60 ++++++++++++++++++++++++++- 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/crossbeam-channel/src/channel.rs b/crossbeam-channel/src/channel.rs index 7e337255e..8988235db 100644 --- a/crossbeam-channel/src/channel.rs +++ b/crossbeam-channel/src/channel.rs @@ -643,7 +643,7 @@ impl Drop for Sender { unsafe { match &self.flavor { SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), - SenderFlavor::List(chan) => chan.release(|c| c.disconnect()), + SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } } @@ -1135,7 +1135,7 @@ impl Drop for Receiver { unsafe { match &self.flavor { ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), - ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect()), + ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), ReceiverFlavor::At(_) => {} ReceiverFlavor::Tick(_) => {} diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index e17a6e912..d85f76b2e 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -530,10 +530,10 @@ impl Channel { None } - /// Disconnects the channel and wakes up all blocked receivers. + /// Disconnects senders and wakes up all blocked receivers. /// /// Returns `true` if this call disconnected the channel. - pub(crate) fn disconnect(&self) -> bool { + pub(crate) fn disconnect_senders(&self) -> bool { let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); if tail & MARK_BIT == 0 { @@ -544,6 +544,62 @@ impl Channel { } } + /// Disconnects receivers. + /// + /// Returns `true` if this call disconnected the channel. + pub(crate) fn disconnect_receivers(&self) -> bool { + let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); + + if tail & MARK_BIT == 0 { + // If receivers are dropped first, discard all messages to free + // memory eagerly. + self.discard_all_messages(); + true + } else { + false + } + } + + /// Discards all messages. + /// + /// This method should only be called when all receivers are dropped. + fn discard_all_messages(&self) { + let mut head = self.head.index.load(Ordering::Relaxed); + let tail = self.tail.index.load(Ordering::Relaxed); + let mut block = self.head.block.load(Ordering::Relaxed); + + unsafe { + // Drop all messages between head and tail and deallocate the heap-allocated blocks. + while head >> SHIFT != tail >> SHIFT { + let offset = (head >> SHIFT) % LAP; + + if offset < BLOCK_CAP { + // Drop the message in the slot. + let slot = (*block).slots.get_unchecked(offset); + slot.wait_write(); + let p = &mut *slot.msg.get(); + p.as_mut_ptr().drop_in_place(); + } else { + (*block).wait_next(); + // Deallocate the block and move to the next one. + let next = (*block).next.load(Ordering::Relaxed); + drop(Box::from_raw(block)); + block = next; + } + + head = head.wrapping_add(1 << SHIFT); + } + + // Deallocate the last remaining block. + if !block.is_null() { + drop(Box::from_raw(block)); + } + } + head = head & !MARK_BIT; + self.head.block.store(ptr::null_mut(), Ordering::Release); + self.head.index.store(head, Ordering::Release); + } + /// Returns `true` if the channel is disconnected. pub(crate) fn is_disconnected(&self) -> bool { self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 From f28ea86f2ce1e2572215c853741d60dfaf44386d Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Sun, 28 Feb 2021 18:40:59 +0800 Subject: [PATCH 2/3] make clippy happy Signed-off-by: Jay Lee --- crossbeam-channel/src/flavors/list.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index d85f76b2e..4e3e90ba9 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -595,7 +595,7 @@ impl Channel { drop(Box::from_raw(block)); } } - head = head & !MARK_BIT; + head &= !MARK_BIT; self.head.block.store(ptr::null_mut(), Ordering::Release); self.head.index.store(head, Ordering::Release); } From f0e414d1c0b7f6a7b2058e708dd149f8c45f8db5 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Sun, 28 Feb 2021 22:25:07 +0800 Subject: [PATCH 3/3] wait at boundary Signed-off-by: Jay Lee --- crossbeam-channel/src/flavors/list.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 4e3e90ba9..5056aa431 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -564,9 +564,23 @@ impl Channel { /// /// This method should only be called when all receivers are dropped. fn discard_all_messages(&self) { - let mut head = self.head.index.load(Ordering::Relaxed); - let tail = self.tail.index.load(Ordering::Relaxed); - let mut block = self.head.block.load(Ordering::Relaxed); + let backoff = Backoff::new(); + let mut tail = self.tail.index.load(Ordering::Acquire); + loop { + let offset = (tail >> SHIFT) % LAP; + if offset != BLOCK_CAP { + break; + } + + // New updates to tail will be rejected by MARK_BIT and aborted unless it's + // at boundary. We need to wait for the updates take affect otherwise there + // can be memory leaks. + backoff.snooze(); + tail = self.tail.index.load(Ordering::Acquire); + } + + let mut head = self.head.index.load(Ordering::Acquire); + let mut block = self.head.block.load(Ordering::Acquire); unsafe { // Drop all messages between head and tail and deallocate the heap-allocated blocks. @@ -582,7 +596,7 @@ impl Channel { } else { (*block).wait_next(); // Deallocate the block and move to the next one. - let next = (*block).next.load(Ordering::Relaxed); + let next = (*block).next.load(Ordering::Acquire); drop(Box::from_raw(block)); block = next; }