diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index b6b35ccc35790..547b233894cf8 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -155,6 +155,11 @@ impl Select { /// event could either be that data is available or the corresponding /// channel has been closed. pub fn wait(&self) -> uint { + self.wait2(false) + } + + /// Helper method for skipping the preflight checks during testing + fn wait2(&self, do_preflight_checks: bool) -> uint { // Note that this is currently an inefficient implementation. We in // theory have knowledge about all ports in the set ahead of time, so // this method shouldn't really have to iterate over all of them yet @@ -179,7 +184,7 @@ impl Select { let mut amt = 0; for p in self.iter() { amt += 1; - if (*p).packet.can_recv() { + if do_preflight_checks && (*p).packet.can_recv() { return (*p).id; } } @@ -507,7 +512,7 @@ mod test { let (p2, c2) = Chan::<()>::new(); let (p, c) = Chan::new(); spawn(proc() { - let mut s = Select::new(); + let s = Select::new(); let mut h1 = s.handle(&p1); let mut h2 = s.handle(&p2); unsafe { h2.add(); } @@ -521,4 +526,91 @@ mod test { c2.send(()); p.recv(); }) + + test!(fn preflight1() { + let (p, c) = Chan::new(); + c.send(()); + select!( + () = p.recv() => {}, + ) + }) + + test!(fn preflight2() { + let (p, c) = Chan::new(); + c.send(()); + c.send(()); + select!( + () = p.recv() => {}, + ) + }) + + test!(fn preflight3() { + let (p, c) = Chan::new(); + drop(c.clone()); + c.send(()); + select!( + () = p.recv() => {}, + ) + }) + + test!(fn preflight4() { + let (p, c) = Chan::new(); + c.send(()); + let s = Select::new(); + let mut h = s.handle(&p); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) + + test!(fn preflight5() { + let (p, c) = Chan::new(); + c.send(()); + c.send(()); + let s = Select::new(); + let mut h = s.handle(&p); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) + + test!(fn preflight6() { + let (p, c) = Chan::new(); + drop(c.clone()); + c.send(()); + let s = Select::new(); + let mut h = s.handle(&p); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) + + test!(fn preflight7() { + let (p, c) = Chan::<()>::new(); + drop(c); + let s = Select::new(); + let mut h = s.handle(&p); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) + + test!(fn preflight8() { + let (p, c) = Chan::new(); + c.send(()); + drop(c); + p.recv(); + let s = Select::new(); + let mut h = s.handle(&p); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) + + test!(fn preflight9() { + let (p, c) = Chan::new(); + drop(c.clone()); + c.send(()); + drop(c); + p.recv(); + let s = Select::new(); + let mut h = s.handle(&p); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) } diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs index 30e061bb7b916..77bf2d7a68d36 100644 --- a/src/libstd/comm/shared.rs +++ b/src/libstd/comm/shared.rs @@ -398,6 +398,17 @@ impl Packet { cnt == DISCONNECTED || cnt - self.steals > 0 } + // increment the count on the channel (used for selection) + fn bump(&mut self, amt: int) -> int { + match self.cnt.fetch_add(amt, atomics::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomics::SeqCst); + DISCONNECTED + } + n => n + } + } + // Inserts the blocked task for selection on this port, returning it back if // the port already has data on it. // @@ -408,8 +419,8 @@ impl Packet { match self.decrement(task) { Ok(()) => Ok(()), Err(task) => { - let prev = self.cnt.fetch_add(1, atomics::SeqCst); - assert!(prev >= 0); + let prev = self.bump(1); + assert!(prev == DISCONNECTED || prev >= 0); return Err(task); } } @@ -440,11 +451,10 @@ impl Packet { let cnt = self.cnt.load(atomics::SeqCst); if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} }; - let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst); + let prev = self.bump(steals + 1); if prev == DISCONNECTED { assert_eq!(self.to_wake.load(atomics::SeqCst), 0); - self.cnt.store(DISCONNECTED, atomics::SeqCst); true } else { let cur = prev + steals + 1; diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs index 0e249a55f8707..9c972a3771c1a 100644 --- a/src/libstd/comm/stream.rs +++ b/src/libstd/comm/stream.rs @@ -333,6 +333,17 @@ impl Packet { } } + // increment the count on the channel (used for selection) + fn bump(&mut self, amt: int) -> int { + match self.cnt.fetch_add(amt, atomics::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomics::SeqCst); + DISCONNECTED + } + n => n + } + } + // Attempts to start selecting on this port. Like a oneshot, this can fail // immediately because of an upgrade. pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult { @@ -351,8 +362,8 @@ impl Packet { }; // Undo our decrement above, and we should be guaranteed that the // previous value is positive because we're not going to sleep - let prev = self.cnt.fetch_add(1, atomics::SeqCst); - assert!(prev >= 0); + let prev = self.bump(1); + assert!(prev == DISCONNECTED || prev >= 0); return ret; } } @@ -384,13 +395,12 @@ impl Packet { // and in the stream case we can have at most one steal, so just assume // that we had one steal. let steals = 1; - let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst); + let prev = self.bump(steals + 1); // If we were previously disconnected, then we know for sure that there // is no task in to_wake, so just keep going let has_data = if prev == DISCONNECTED { assert_eq!(self.to_wake.load(atomics::SeqCst), 0); - self.cnt.store(DISCONNECTED, atomics::SeqCst); true // there is data, that data is that we're disconnected } else { let cur = prev + steals + 1;