diff --git a/build.sbt b/build.sbt index 7142bb52..4ee2a7c3 100644 --- a/build.sbt +++ b/build.sbt @@ -51,6 +51,9 @@ val io = (project in file("io")) // method this(sbt.io.PollingWatchService,sbt.io.PollingWatchService#PollingThread,java.nio.file.Watchable,java.util.List)Unit in class sbt.io.PollingWatchService#PollingWatchKey does not have a correspondent in current version exclude[DirectMissingMethodProblem]("sbt.io.PollingWatchService#PollingWatchKey.this"), + // This is a private class + exclude[DirectMissingMethodProblem]("sbt.io.PollingWatchService#PollingWatchKey.events"), + // moved JavaMilli to sbt.io exclude[MissingClassProblem]("sbt.internal.io.JavaMilli$"), exclude[MissingClassProblem]("sbt.internal.io.JavaMilli"), diff --git a/io/src/main/scala/sbt/internal/io/EventMonitor.scala b/io/src/main/scala/sbt/internal/io/EventMonitor.scala index 664f7a8c..6a9db46a 100644 --- a/io/src/main/scala/sbt/internal/io/EventMonitor.scala +++ b/io/src/main/scala/sbt/internal/io/EventMonitor.scala @@ -1,6 +1,9 @@ package sbt.internal.io -import java.nio.file.{ ClosedWatchServiceException, Files, Path, WatchKey } +import java.io.IOException +import java.nio.file.StandardWatchEventKinds.OVERFLOW +import java.nio.file._ +import java.nio.file.attribute.BasicFileAttributes import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } @@ -8,6 +11,7 @@ import sbt.io.WatchService import scala.annotation.tailrec import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.duration._ /** @@ -135,8 +139,13 @@ private[sbt] object EventMonitor { k.reset() events } - val allEvents = rawEvents.flatMap { e => - Option(e.context).map(c => k.watchable.asInstanceOf[Path].resolve(c.asInstanceOf[Path])) + val keyPath = k.watchable.asInstanceOf[Path] + val allEvents = rawEvents.flatMap { + case e if e.kind.equals(OVERFLOW) => + handleOverflow(k) + case e if !e.kind.equals(OVERFLOW) && e.context != null => + Some(keyPath.resolve(e.context.asInstanceOf[Path])) + case _ => None } logger.debug(s"Received events:\n${allEvents.mkString("\n")}") val (exist, notExist) = allEvents.partition(Files.exists(_)) @@ -146,11 +155,62 @@ private[sbt] object EventMonitor { notExist.foreach(s.unregister) updatedFiles ++ newFiles ++ notExist } + + /* + * In the case of an overflow, we must poll the file system to find out if there are added + * or removed directories. When there are new directories, we also want to return file + * events for the files that are found therein. Because an overflow is likely to occur while + * a directory is still being modified, we poll repeatedly until we get the same list of + * files consecutively. We will not trigger for any files that are updated while the WatchKey + * is in the OVERFLOW state. There is no good way to fix this without caching mtimes for + * all of the files, which I don't think is worth doing at this juncture. + */ + private def handleOverflow(key: WatchKey): Vector[Path] = lock.synchronized { + val allFiles = new mutable.HashSet[Path] + def getNewFiles(): Unit = { + allFiles.clear() + val path = key.watchable.asInstanceOf[Path] + Files.walkFileTree( + path, + new FileVisitor[Path] { + override def preVisitDirectory(dir: Path, + attrs: BasicFileAttributes): FileVisitResult = { + allFiles += dir + if (!registered.contains(dir)) registered += dir -> s.register(dir) + FileVisitResult.CONTINUE + } + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + allFiles += file + FileVisitResult.CONTINUE + } + override def visitFileFailed(file: Path, exc: IOException): FileVisitResult = + FileVisitResult.SKIP_SUBTREE + override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = + FileVisitResult.CONTINUE + } + ) + () + } + + var oldFiles = mutable.Set.empty[Path] + do { + oldFiles = allFiles + getNewFiles() + } while (oldFiles != allFiles) + registered --= registered.collect { + case (d, k) if !Files.exists(d) => + k.reset() + k.cancel() + d + } + allFiles.toVector + } + /* * Returns new files found in new directory and any subdirectories, assuming that there is * a recursive source with a base that is parent to the directory. */ - def filesForNewDirectory(dir: Path): Iterator[Path] = { + private def filesForNewDirectory(dir: Path): Iterator[Path] = { lazy val recursive = s.sources.exists(src => dir.startsWith(src.base.toPath) && src.recursive) if (!registered.contains(dir) && recursive) { @@ -164,7 +224,7 @@ private[sbt] object EventMonitor { * Triggers only if there is no pending Trigger and the file is not in an anti-entropy * quarantine. */ - def maybeTrigger(path: Path): Unit = + private def maybeTrigger(path: Path): Unit = if (s.accept(path)) { if (recentEvents.get(path).fold(false)(!_.isOverdue)) logger.debug(s"Ignoring watch event for $path due to anti-entropy constraint") diff --git a/io/src/main/scala/sbt/io/MacOSXWatchService.scala b/io/src/main/scala/sbt/io/MacOSXWatchService.scala index 3a0fe5c4..f5076e8c 100644 --- a/io/src/main/scala/sbt/io/MacOSXWatchService.scala +++ b/io/src/main/scala/sbt/io/MacOSXWatchService.scala @@ -145,7 +145,7 @@ private class MacOSXWatchKey(val watchable: JPath, queueSize: Int, kinds: WatchE override def isValid: Boolean = valid.get - override def pollEvents(): JList[WatchEvent[_]] = { + override def pollEvents(): JList[WatchEvent[_]] = this.synchronized { val result = new mutable.ArrayBuffer[WatchEvent[_]](events.size).asJava events.drainTo(result) val overflowCount = overflow.getAndSet(0) @@ -167,8 +167,10 @@ private class MacOSXWatchKey(val watchable: JPath, queueSize: Int, kinds: WatchE private val overflow = new AtomicInteger() private val valid = new AtomicBoolean(true) - @inline def addEvent(event: Event[JPath]): Unit = if (!events.offer(event)) { - overflow.incrementAndGet() - () + @inline def addEvent(event: Event[JPath]): Unit = this.synchronized { + if (!events.offer(event)) { + overflow.incrementAndGet() + () + } } } diff --git a/io/src/main/scala/sbt/io/Path.scala b/io/src/main/scala/sbt/io/Path.scala index fa2862de..a25e962c 100644 --- a/io/src/main/scala/sbt/io/Path.scala +++ b/io/src/main/scala/sbt/io/Path.scala @@ -3,11 +3,20 @@ */ package sbt.io -import java.io.File +import java.io.{ File, IOException } import java.net.URL + import scala.collection.mutable import java.nio.file.attribute._ -import java.nio.file.{ Path => NioPath, LinkOption, FileSystem, Files } +import java.nio.file.{ + FileSystem, + FileVisitResult, + FileVisitor, + Files, + LinkOption, + Path => NioPath +} + import scala.collection.JavaConverters._ final class RichFile(val asFile: File) extends AnyVal with RichNioPath { @@ -439,9 +448,27 @@ private class DescendantOrSelfPathFinder(val parent: PathFinder, val filter: Fil } private def handleFileDescendant(file: File, fileSet: mutable.Set[File]): Unit = { - handleFile(file, fileSet) - for (childDirectory <- IO.wrapNull(file listFiles DirectoryFilter)) - handleFileDescendant(new File(file, childDirectory.getName), fileSet) + Files.walkFileTree( + file.toPath, + new FileVisitor[NioPath] { + override def preVisitDirectory(dir: NioPath, + attrs: BasicFileAttributes): FileVisitResult = { + val file = dir.toFile + if (filter.accept(file)) fileSet += file + FileVisitResult.CONTINUE + } + override def visitFile(file: NioPath, attrs: BasicFileAttributes): FileVisitResult = { + val ioFile = file.toFile + if (filter.accept(ioFile)) fileSet += ioFile + FileVisitResult.CONTINUE + } + override def visitFileFailed(file: NioPath, exc: IOException): FileVisitResult = + FileVisitResult.SKIP_SUBTREE + override def postVisitDirectory(dir: NioPath, exc: IOException): FileVisitResult = + FileVisitResult.CONTINUE + } + ) + () } } diff --git a/io/src/main/scala/sbt/io/PollingWatchService.scala b/io/src/main/scala/sbt/io/PollingWatchService.scala index 522b5a9f..7eab09c4 100644 --- a/io/src/main/scala/sbt/io/PollingWatchService.scala +++ b/io/src/main/scala/sbt/io/PollingWatchService.scala @@ -9,7 +9,9 @@ import java.nio.file.{ Watchable, Path => JPath } +import java.nio.file.StandardWatchEventKinds.OVERFLOW import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.ArrayBlockingQueue import java.util.{ List => JList } import sbt.io.syntax._ @@ -63,7 +65,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = { ensureNotClosed() - val key = new PollingWatchKey(path, new java.util.ArrayList[WatchEvent[_]]) + val key = new PollingWatchKey(path) keys += path -> key thread.setFileTimes(path) watched += path -> events @@ -80,11 +82,12 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg if (closed) throw new ClosedWatchServiceException private class PollingThread(delay: FiniteDuration) extends Thread { - private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[WatchKey] + private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[PollingWatchKey] private[this] val _initDone = new AtomicBoolean(false) private[this] var fileTimes: Map[JPath, Long] = Map.empty - private[PollingWatchService] def withKeys[R](f: mutable.LinkedHashSet[WatchKey] => R): R = + private[PollingWatchService] def withKeys[R]( + f: mutable.LinkedHashSet[PollingWatchKey] => R): R = _keysWithEvents.synchronized(f(_keysWithEvents)) @deprecated("The initDone variable should not be accessed externally", "1.1.17") @@ -92,7 +95,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg @deprecated("The initDone variable should not be set externally", "1.1.17") def initDone_=(initDone: Boolean) = _initDone.set(initDone) @deprecated("Use withKeys instead of directly accessing keysWithEvents", "1.1.17") - def keysWithEvents: mutable.LinkedHashSet[WatchKey] = _keysWithEvents + def keysWithEvents: mutable.LinkedHashSet[PollingWatchKey] = _keysWithEvents override def run(): Unit = while (!closed) { @@ -124,7 +127,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg private def addEvent(path: JPath, ev: WatchEvent[JPath]): Unit = _keysWithEvents.synchronized { keys.get(path).foreach { k => _keysWithEvents += k - k.events.add(ev) + k.offer(ev) _keysWithEvents.notifyAll() } } @@ -173,30 +176,37 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg } } - modifiedFiles.foreach { - case file => - val parent = file.getParent - if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) { - val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY) - addEvent(parent, ev) - } + modifiedFiles.foreach { file => + val parent = file.getParent + if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) { + val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY) + addEvent(parent, ev) + } } } } - private class PollingWatchKey( - override val watchable: Watchable, - val events: JList[WatchEvent[_]] - ) extends WatchKey { + private object Overflow + extends PollingWatchEvent(null, OVERFLOW.asInstanceOf[WatchEvent.Kind[JPath]]) + private class PollingWatchKey(override val watchable: Watchable) extends WatchKey { + private[this] val events = new ArrayBlockingQueue[WatchEvent[_]](256) + private[this] val hasOverflow = new AtomicBoolean(false) override def cancel(): Unit = () override def isValid(): Boolean = true - override def pollEvents(): java.util.List[WatchEvent[_]] = { - val evs = new java.util.ArrayList[WatchEvent[_]](events) - events.clear() + override def pollEvents(): JList[WatchEvent[_]] = this.synchronized { + val evs = new java.util.ArrayList[WatchEvent[_]]() + val overflow = hasOverflow.getAndSet(false) + events.drainTo(evs) + if (overflow) evs.add(Overflow) evs } override def reset(): Boolean = true + def offer(ev: WatchEvent[_]): Unit = this.synchronized { + if (!hasOverflow.get && !events.offer(ev)) { + hasOverflow.set(true) + } + } } } diff --git a/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala index 27af6f2f..60347f7e 100644 --- a/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala +++ b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala @@ -13,6 +13,6 @@ object DefaultWatchServiceSpec { class DefaultWatchServiceSpec extends SourceModificationWatchSpec( - if (Properties.isMac) new MacOSXWatchService else FileSystems.getDefault.newWatchService, + _ => if (Properties.isMac) new MacOSXWatchService else FileSystems.getDefault.newWatchService, DefaultWatchServiceSpec.pollDelay ) diff --git a/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala index 54e72733..f83b9ba4 100644 --- a/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala +++ b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala @@ -5,5 +5,5 @@ import sbt.io.PollingWatchService import scala.concurrent.duration._ class PollingWatchServiceSpec - extends SourceModificationWatchSpec(new PollingWatchService(5.milliseconds), + extends SourceModificationWatchSpec((d: FiniteDuration) => new PollingWatchService(d), DefaultWatchServiceSpec.pollDelay) diff --git a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala index 51caa0ff..c61841ed 100644 --- a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala +++ b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala @@ -1,7 +1,7 @@ package sbt.internal.io import java.io.IOException -import java.nio.file.{ ClosedWatchServiceException, Paths } +import java.nio.file.{ ClosedWatchServiceException, Files, Paths } import org.scalatest.{ Assertion, FlatSpec, Matchers } import sbt.io.syntax._ @@ -11,10 +11,11 @@ import scala.annotation.tailrec import scala.concurrent.duration._ abstract class SourceModificationWatchSpec( - getService: => WatchService, + getServiceWithPollDelay: FiniteDuration => WatchService, pollDelay: FiniteDuration ) extends FlatSpec with Matchers { + def getService = getServiceWithPollDelay(10.milliseconds) val maxWait = 2 * pollDelay it should "detect modified files" in IO.withTemporaryDirectory { dir => val parentDir = dir / "src" / "watchme" @@ -331,7 +332,7 @@ abstract class SourceModificationWatchSpec( Source(parentDir.toPath.toRealPath().toFile, "*.scala", new SimpleFilter(_.startsWith(".")))) var lines: Seq[String] = Nil val logger = new EventMonitor.Logger { - override def debug(msg: => Any): Unit = { + override def debug(msg: => Any): Unit = lines.synchronized { lines :+= msg.toString } } @@ -348,28 +349,42 @@ abstract class SourceModificationWatchSpec( } finally monitor.close() } - it should "reset keys" in IO.withTemporaryDirectory { dir => - val parentDir = dir / "src" / "watchme" - val file = parentDir / "Foo.scala" - - writeNewFile(file, "foo") - // Longer timeout because there are many file system operations - val deadline = 5.seconds.fromNow - val monitor = defaultMonitor(getService, parentDir, tc = () => deadline.isOverdue) - try { - val n = 1000 - val triggered0 = watchTest(monitor) { - (0 to n).foreach(i => IO.write(parentDir / s"Foo$i.scala", s"foo$i")) - } - assert(triggered0) - assert(IO.read(file) == s"foo") + it should "handle rapid creation of many subdirectories and files" in IO.withTemporaryDirectory { + dir => + val parentDir = dir / "src" / "watchme" + Files.createDirectories(parentDir.toPath) + val subdirCount = 2000 + val subdirFileCount = 4 + var files = Seq.empty[File] + + // Longer timeout because there are many file system operations. This can be very expensive + // especially in the PollingWatchSpec since both the PollingWatchService and the EventMonitor + // overflow handler are hammering the file system. To minimize the conflicts, we set a long + // interval between polls in the PollingWatchService using getServiceWithPollDelay. + val deadline = 20.seconds.fromNow + val monitor = + defaultMonitor(getServiceWithPollDelay(1.second), parentDir, tc = () => deadline.isOverdue) + try { + val triggered0 = watchTest(monitor) { + val subdirs = + (1 to subdirCount).map(i => + Files.createDirectories(parentDir.toPath.resolve(s"subdir-$i"))) + files = subdirs.flatMap { subdir => + subdir.toFile +: (1 to subdirFileCount).map { j => + Files.write(subdir.resolve(s"file-$j.scala"), s"foo".getBytes).toFile + } + } + } + val lastFile = files.last + assert(triggered0) + assert(IO.read(lastFile) == s"foo") - val triggered1 = watchTest(monitor) { - IO.write(file, "baz") - } - assert(triggered1) - assert(IO.read(file) == "baz") - } finally monitor.close() + val triggered1 = watchTest(monitor) { + IO.write(lastFile, "baz") + } + assert(triggered1) + assert(IO.read(lastFile) == "baz") + } finally monitor.close() } "WatchService.poll" should "throw a `ClosedWatchServiceException` if used after `close`" in {