diff --git a/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala b/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala index 58634095..43bc0679 100644 --- a/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala +++ b/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala @@ -10,6 +10,7 @@ import sbt.io.{ DirectoryFilter, FileFilter, WatchService } import sbt.io.syntax._ import scala.annotation.tailrec +import scala.concurrent.duration.FiniteDuration private[sbt] object SourceModificationWatch { @@ -18,7 +19,7 @@ private[sbt] object SourceModificationWatch { * until changes are detected or `terminationCondition` evaluates to `true`. */ @tailrec - def watch(delayMillis: Long, state: WatchState)(terminationCondition: => Boolean): (Boolean, WatchState) = { + def watch(delay: FiniteDuration, state: WatchState)(terminationCondition: => Boolean): (Boolean, WatchState) = { if (state.count == 0) (true, state.withCount(1)) else { val events = @@ -28,8 +29,8 @@ private[sbt] object SourceModificationWatch { if (terminationCondition) { (false, state) } else { - Thread.sleep(delayMillis) - watch(delayMillis, state)(terminationCondition) + Thread.sleep(delay.toMillis) + watch(delay, state)(terminationCondition) } } else { val previousFiles = state.registered.keySet @@ -54,8 +55,8 @@ private[sbt] object SourceModificationWatch { if (filteredCreated.nonEmpty || filteredDeleted.nonEmpty || filteredModified.nonEmpty) { (true, newState.withCount(newState.count + 1)) } else { - Thread.sleep(delayMillis) - watch(delayMillis, newState)(terminationCondition) + Thread.sleep(delay.toMillis) + watch(delay, newState)(terminationCondition) } } } @@ -72,7 +73,7 @@ private[sbt] object SourceModificationWatch { } /** The state of the file watch. */ -final class WatchState private ( +private[sbt] final class WatchState private ( val count: Int, private[sbt] val sources: Seq[Source], service: WatchService, @@ -150,7 +151,7 @@ final class Source(base: File, includeFilter: FileFilter, excludeFilter: FileFil base.allPaths.get.map(_.toPath) } -object WatchState { +private[sbt] object WatchState { /** What events should be monitored */ val events: Array[WatchEvent.Kind[Path]] = Array(ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY) diff --git a/io/src/main/scala/sbt/io/PollingWatchService.scala b/io/src/main/scala/sbt/io/PollingWatchService.scala index 3dba9b85..21693d84 100644 --- a/io/src/main/scala/sbt/io/PollingWatchService.scala +++ b/io/src/main/scala/sbt/io/PollingWatchService.scala @@ -6,11 +6,12 @@ import java.util.{List => JList} import sbt.io.syntax._ import scala.collection.mutable +import scala.concurrent.duration.{Duration, FiniteDuration} -/** A `WatchService` that polls the filesystem every `delayMs` milliseconds. */ -class PollingWatchService(delayMs: Long) extends WatchService { +/** A `WatchService` that polls the filesystem every `delay`. */ +class PollingWatchService(delay: FiniteDuration) extends WatchService { private var closed: Boolean = false - private val thread: PollingThread = new PollingThread(delayMs) + private val thread: PollingThread = new PollingThread(delay) private val keys: mutable.Map[JPath, PollingWatchKey] = mutable.Map.empty private val pathLengthOrdering: Ordering[JPath] = Ordering.fromLessThan { @@ -33,8 +34,13 @@ class PollingWatchService(delayMs: Long) extends WatchService { } } - override def poll(timeoutMs: Long): WatchKey = thread.keysWithEvents.synchronized { + override def poll(timeout: Duration): WatchKey = thread.keysWithEvents.synchronized { ensureNotClosed() + thread.keysWithEvents.synchronized { + if (thread.keysWithEvents.isEmpty) { + thread.keysWithEvents.wait(timeout.toMillis) + } + } thread.keysWithEvents.headOption.map { k => thread.keysWithEvents -= k k @@ -62,7 +68,7 @@ class PollingWatchService(delayMs: Long) extends WatchService { private def ensureNotClosed(): Unit = if (closed) throw new ClosedWatchServiceException - private class PollingThread(delayMs: Long) extends Thread { + private class PollingThread(delay: FiniteDuration) extends Thread { private var fileTimes: Map[JPath, Long] = Map.empty var initDone = false val keysWithEvents = mutable.LinkedHashSet.empty[WatchKey] @@ -71,7 +77,7 @@ class PollingWatchService(delayMs: Long) extends WatchService { while (!closed) { populateEvents() initDone = true - Thread.sleep(delayMs) + Thread.sleep(delay.toMillis) } def getFileTimes(): Map[JPath, Long] = { @@ -88,6 +94,7 @@ class PollingWatchService(delayMs: Long) extends WatchService { keys.get(path).foreach { k => keysWithEvents += k k.events.add(ev) + keysWithEvents.notifyAll() } } diff --git a/io/src/main/scala/sbt/io/WatchService.scala b/io/src/main/scala/sbt/io/WatchService.scala index df74dd91..1999fc04 100644 --- a/io/src/main/scala/sbt/io/WatchService.scala +++ b/io/src/main/scala/sbt/io/WatchService.scala @@ -3,9 +3,10 @@ package sbt.io import java.nio.file.{ ClosedWatchServiceException, WatchEvent, WatchKey, Path => JPath, WatchService => JWatchService } import java.util.concurrent.TimeUnit +import scala.annotation.tailrec import scala.collection.mutable - import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration object WatchService { @@ -27,9 +28,16 @@ object WatchService { else Some((k, events.asScala.asInstanceOf[Seq[WatchEvent[JPath]]])) }.toMap - override def poll(timeoutMs: Long): WatchKey = { - service.poll(timeoutMs, TimeUnit.MILLISECONDS) - } + @tailrec + override def poll(timeout: Duration): WatchKey = + if (timeout.isFinite) { + service.poll(timeout.toMillis, TimeUnit.MILLISECONDS) + } else { + service.poll(1000L, TimeUnit.MILLISECONDS) match { + case null => poll(timeout) + case key => key + } + } override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = { if (closed) throw new ClosedWatchServiceException @@ -69,11 +77,12 @@ trait WatchService { /** * Retrieves the next `WatchKey` that has a `WatchEvent` waiting. Waits - * up to `timeoutMs` milliseconds if no such key exists. - * @param timeoutMs Maximum time to wait, in milliseconds. - * @return The next `WatchKey` that received an event. + * until the `timeout` is expired is no such key exists. + * @param timeout Maximum time to wait + * @return The next `WatchKey` that received an event, or null if no such + * key exists. */ - def poll(timeoutMs: Long): WatchKey + def poll(timeout: Duration): WatchKey /** * Registers a path to be monitored. diff --git a/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala index 7b453968..ec96c53c 100644 --- a/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala +++ b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala @@ -2,12 +2,18 @@ package sbt.internal.io import java.nio.file.FileSystems +import scala.concurrent.duration._ + object DefaultWatchServiceSpec { - val (pollDelayMs, maxWaitTimeMs) = + // java.nio's default watch service is much slower on MacOS at the moment. + // We give it more time to detect changes. + val (pollDelay, maxWaitTime) = Option(sys.props("os.name")) match { - case Some("Mac OS X") => (200L, 15000L) - case _ => (50L, 3000L) + case Some("Mac OS X") => (1.second, 15.seconds) + case _ => (50.milliseconds, 3.seconds) } } -class DefaultWatchServiceSpec extends SourceModificationWatchSpec(FileSystems.getDefault.newWatchService, DefaultWatchServiceSpec.pollDelayMs, DefaultWatchServiceSpec.maxWaitTimeMs) +class DefaultWatchServiceSpec extends SourceModificationWatchSpec(FileSystems.getDefault.newWatchService, + DefaultWatchServiceSpec.pollDelay, + DefaultWatchServiceSpec.maxWaitTime) diff --git a/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala index 464edee2..57901c5a 100644 --- a/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala +++ b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala @@ -2,5 +2,9 @@ package sbt.internal.io import sbt.io.PollingWatchService -class PollingWatchServiceSpec extends SourceModificationWatchSpec(new PollingWatchService(500L), 500L, 3000L) +import scala.concurrent.duration._ + +class PollingWatchServiceSpec extends SourceModificationWatchSpec(new PollingWatchService(500.milliseconds), + 500.milliseconds, + 3.seconds) diff --git a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala index c409d09b..a8831243 100644 --- a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala +++ b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala @@ -6,7 +6,9 @@ import org.scalatest.{ Assertion, FlatSpec, Matchers } import sbt.io.syntax._ import sbt.io.{ IO, SimpleFilter, WatchService } -abstract class SourceModificationWatchSpec(getService: => WatchService, pollDelayMs: Long, maxWaitMs: Long) extends FlatSpec with Matchers { +import scala.concurrent.duration._ + +abstract class SourceModificationWatchSpec(getService: => WatchService, pollDelay: FiniteDuration, maxWait: FiniteDuration) extends FlatSpec with Matchers { it should "watch a directory for file creation" in IO.withTemporaryDirectory { dir => val parentDir = dir / "src" / "watchme" @@ -14,7 +16,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(parentDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs) { + watchTest(parentDir)(pollDelay, maxWait) { IO.write(created, "foo") } } @@ -25,7 +27,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(parentDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.createDirectory(created) } } @@ -36,7 +38,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(parentDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.touch(created) } } @@ -47,7 +49,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(parentDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.touch(created) } } @@ -58,7 +60,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(parentDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.createDirectory(created) } } @@ -70,7 +72,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(subDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs) { + watchTest(parentDir)(pollDelay, maxWait) { IO.write(created, "foo") } } @@ -82,7 +84,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(subDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.touch(created) } } @@ -94,7 +96,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(subDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.touch(created) } } @@ -106,7 +108,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela IO.createDirectory(subDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.createDirectory(created) } } @@ -116,7 +118,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val file = parentDir / "WillBeDeleted.scala" IO.write(file, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs) { + watchTest(parentDir)(pollDelay, maxWait) { IO.delete(file) } } @@ -126,7 +128,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val file = parentDir / "ignoreme" IO.write(file, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(file) } } @@ -136,7 +138,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val file = parentDir / ".hidden.scala" IO.write(file, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(file) } } @@ -146,7 +148,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val subDir = parentDir / "ignoreme" IO.createDirectory(subDir) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(subDir) } } @@ -157,7 +159,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val willBeDeleted = subDir / "WillBeDeleted.scala" IO.write(willBeDeleted, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs) { + watchTest(parentDir)(pollDelay, maxWait) { IO.delete(willBeDeleted) } } @@ -168,7 +170,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val willBeDeleted = subDir / "ignoreme" IO.write(willBeDeleted, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(willBeDeleted) } } @@ -179,7 +181,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val willBeDeleted = subDir / ".hidden.scala" IO.write(willBeDeleted, "foo") - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(willBeDeleted) } } @@ -190,7 +192,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela val willBeDeleted = subDir / "ignoreme" IO.createDirectory(willBeDeleted) - watchTest(parentDir)(pollDelayMs, maxWaitMs, expectedTrigger = false) { + watchTest(parentDir)(pollDelay, maxWait, expectedTrigger = false) { IO.delete(willBeDeleted) } } @@ -203,13 +205,13 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela try { val initState = emptyState(service, parentDir) - val (triggered0, newState0) = watchTest(initState)(pollDelayMs, maxWaitMs) { + val (triggered0, newState0) = watchTest(initState)(pollDelay, maxWait) { IO.createDirectory(subDir) } triggered0 shouldBe false newState0.count shouldBe 1 - val (triggered1, newState1) = watchTest(newState0)(pollDelayMs, maxWaitMs) { + val (triggered1, newState1) = watchTest(newState0)(pollDelay, maxWait) { IO.delete(subDir) } triggered1 shouldBe false @@ -227,14 +229,14 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela try { val initState = emptyState(service, parentDir) - val (triggered0, newState0) = watchTest(initState)(pollDelayMs, maxWaitMs) { + val (triggered0, newState0) = watchTest(initState)(pollDelay, maxWait) { IO.createDirectory(subDir) IO.touch(src) } triggered0 shouldBe true newState0.count shouldBe 2 - val (triggered1, newState1) = watchTest(newState0)(pollDelayMs, maxWaitMs) { + val (triggered1, newState1) = watchTest(newState0)(pollDelay, maxWait) { IO.delete(subDir) } triggered1 shouldBe true @@ -245,7 +247,7 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela "WatchService.poll" should "throw a `ClosedWatchServiceException` if used after `close`" in { val service = getService service.close() - assertThrows[ClosedWatchServiceException](service.poll(1000L)) + assertThrows[ClosedWatchServiceException](service.poll(1.second)) } "WatchService.register" should "throw a `ClosedWatchServiceException` if used after `close`" in { @@ -260,28 +262,28 @@ abstract class SourceModificationWatchSpec(getService: => WatchService, pollDela service.close() } - private def watchTest(initState: WatchState)(pollDelayMs: Long, maxWaitMs: Long)(modifier: => Unit): (Boolean, WatchState) = { + private def watchTest(initState: WatchState)(pollDelay: FiniteDuration, maxWait: FiniteDuration)(modifier: => Unit): (Boolean, WatchState) = { var started = false - val startTime = System.currentTimeMillis() + val deadline = maxWait.fromNow val modThread = new Thread { override def run(): Unit = { modifier } } - SourceModificationWatch.watch(pollDelayMs, initState) { + SourceModificationWatch.watch(pollDelay, initState) { if (!started) { started = true modThread.start() } - System.currentTimeMillis() - startTime > maxWaitMs + deadline.isOverdue() } } - private def watchTest(base: File)(pollDelayMs: Long, maxWaitMs: Long, expectedTrigger: Boolean = true)(modifier: => Unit): Assertion = { + private def watchTest(base: File)(pollDelay: FiniteDuration, maxWait: FiniteDuration, expectedTrigger: Boolean = true)(modifier: => Unit): Assertion = { val service = getService try { val initState = emptyState(service, base) - val (triggered, _) = watchTest(initState)(pollDelayMs, maxWaitMs)(modifier) + val (triggered, _) = watchTest(initState)(pollDelay, maxWait)(modifier) triggered shouldBe expectedTrigger } finally service.close() }