From 7186d1f28dc4f04422fa9de22eb76b0c666de3ce Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Fri, 18 May 2018 10:13:26 -0700 Subject: [PATCH 1/3] Switch to Files.walkFileTree in PathFinder The handleFileDescendant method was recursively constructing a list of results by calling File.list and recursively calling back into itself for each directory that it found in the listed files. There were two performance bugs in this approach. One was that handleFileDescendant actually called File.list twice for each directory. The other was that after it got the second list of files, it had to filter the results based on whether or not it was a directory. When I called jstack on my sbt process during a slow test run, invariably one of the threads was bloced on File.isDirectory. The fix is simple, if verbose. Use Files.walkFileTree instead. It is verbose, but it recursively lists all of the files in the tree specified by the path. An alternative would be to use Files.walk. I didn't go this route because Files.walk is somewhat hard to reason about because the iterator can throw an exception at any time. This is a problem if the directory is being traversed and modified at the same time. With Files.walkFileTree, I know that if, for example, a directory disappears from out from underneath us, then we'll just move on without throwing an exception. --- io/src/main/scala/sbt/io/Path.scala | 37 +++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/io/src/main/scala/sbt/io/Path.scala b/io/src/main/scala/sbt/io/Path.scala index fa2862de..a25e962c 100644 --- a/io/src/main/scala/sbt/io/Path.scala +++ b/io/src/main/scala/sbt/io/Path.scala @@ -3,11 +3,20 @@ */ package sbt.io -import java.io.File +import java.io.{ File, IOException } import java.net.URL + import scala.collection.mutable import java.nio.file.attribute._ -import java.nio.file.{ Path => NioPath, LinkOption, FileSystem, Files } +import java.nio.file.{ + FileSystem, + FileVisitResult, + FileVisitor, + Files, + LinkOption, + Path => NioPath +} + import scala.collection.JavaConverters._ final class RichFile(val asFile: File) extends AnyVal with RichNioPath { @@ -439,9 +448,27 @@ private class DescendantOrSelfPathFinder(val parent: PathFinder, val filter: Fil } private def handleFileDescendant(file: File, fileSet: mutable.Set[File]): Unit = { - handleFile(file, fileSet) - for (childDirectory <- IO.wrapNull(file listFiles DirectoryFilter)) - handleFileDescendant(new File(file, childDirectory.getName), fileSet) + Files.walkFileTree( + file.toPath, + new FileVisitor[NioPath] { + override def preVisitDirectory(dir: NioPath, + attrs: BasicFileAttributes): FileVisitResult = { + val file = dir.toFile + if (filter.accept(file)) fileSet += file + FileVisitResult.CONTINUE + } + override def visitFile(file: NioPath, attrs: BasicFileAttributes): FileVisitResult = { + val ioFile = file.toFile + if (filter.accept(ioFile)) fileSet += ioFile + FileVisitResult.CONTINUE + } + override def visitFileFailed(file: NioPath, exc: IOException): FileVisitResult = + FileVisitResult.SKIP_SUBTREE + override def postVisitDirectory(dir: NioPath, exc: IOException): FileVisitResult = + FileVisitResult.CONTINUE + } + ) + () } } From 5d81e41b85248e4b95ac769633da3e25ea9f34a4 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Fri, 18 May 2018 11:06:32 -0700 Subject: [PATCH 2/3] Add synchronization This test sometimes spuriously fails and I'm pretty sure it's because of concurrent modifications of lines. --- .../scala/sbt/internal/io/SourceModificationWatchSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala index 51caa0ff..8ca97356 100644 --- a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala +++ b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala @@ -331,7 +331,7 @@ abstract class SourceModificationWatchSpec( Source(parentDir.toPath.toRealPath().toFile, "*.scala", new SimpleFilter(_.startsWith(".")))) var lines: Seq[String] = Nil val logger = new EventMonitor.Logger { - override def debug(msg: => Any): Unit = { + override def debug(msg: => Any): Unit = lines.synchronized { lines :+= msg.toString } } From 14f5e5d1ff5c67fdc235e57565e7235428e3dcc4 Mon Sep 17 00:00:00 2001 From: Ethan Atkins Date: Tue, 15 May 2018 16:01:10 -0700 Subject: [PATCH 3/3] Handle overflows in EventMonitor While working on a related project, I realized that it's essential to handle the OVERFLOW case for a WatchEvent. The EventMonitor relies on subdirectory creation events to create a new watch key for each subdirectory. If those events are missed due to overflow, the EventMonitor will not be monitoring the newly created subdirectories. To fix this, when the EventMonitor detects an overflow, it will now poll the directory of the watch key repeatedly until it stabilizes. Once that happens, it will register all of the newly found directories (if any) with the watch service and will create events for all of the files in said directories. It is still possible that a trigger could be missed in the event that a lot of files in a watched directory cause an overflow before a valid source file is touched and before the EventMonitor handles the overflow. I am not particularly worried about this. I also discovered that the PollingWatchService was using an unbounded list of events. This seemed risky so I switched to using an ArrayBlockingQueue of size 256, which is what the MacOSXWatchService uses as well. Since the queue is now bounded, I added overflow events to the PollingWatchService as well. I also added synchronization for a few methods in PollingWatchey and MacOSXWatchKey. This is more consistent with the AbstractWatchKey in the jdk. The other methods return constant values so synchronization is pointless. I ran a number of travis builds against the content of this commit and none of them failed. --- build.sbt | 3 + .../scala/sbt/internal/io/EventMonitor.scala | 70 +++++++++++++++++-- .../scala/sbt/io/MacOSXWatchService.scala | 10 +-- .../scala/sbt/io/PollingWatchService.scala | 48 ++++++++----- .../internal/io/DefaultWatchServiceSpec.scala | 2 +- .../internal/io/PollingWatchServiceSpec.scala | 2 +- .../io/SourceModificationWatchSpec.scala | 61 ++++++++++------ 7 files changed, 143 insertions(+), 53 deletions(-) diff --git a/build.sbt b/build.sbt index 7142bb52..4ee2a7c3 100644 --- a/build.sbt +++ b/build.sbt @@ -51,6 +51,9 @@ val io = (project in file("io")) // method this(sbt.io.PollingWatchService,sbt.io.PollingWatchService#PollingThread,java.nio.file.Watchable,java.util.List)Unit in class sbt.io.PollingWatchService#PollingWatchKey does not have a correspondent in current version exclude[DirectMissingMethodProblem]("sbt.io.PollingWatchService#PollingWatchKey.this"), + // This is a private class + exclude[DirectMissingMethodProblem]("sbt.io.PollingWatchService#PollingWatchKey.events"), + // moved JavaMilli to sbt.io exclude[MissingClassProblem]("sbt.internal.io.JavaMilli$"), exclude[MissingClassProblem]("sbt.internal.io.JavaMilli"), diff --git a/io/src/main/scala/sbt/internal/io/EventMonitor.scala b/io/src/main/scala/sbt/internal/io/EventMonitor.scala index 664f7a8c..6a9db46a 100644 --- a/io/src/main/scala/sbt/internal/io/EventMonitor.scala +++ b/io/src/main/scala/sbt/internal/io/EventMonitor.scala @@ -1,6 +1,9 @@ package sbt.internal.io -import java.nio.file.{ ClosedWatchServiceException, Files, Path, WatchKey } +import java.io.IOException +import java.nio.file.StandardWatchEventKinds.OVERFLOW +import java.nio.file._ +import java.nio.file.attribute.BasicFileAttributes import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } @@ -8,6 +11,7 @@ import sbt.io.WatchService import scala.annotation.tailrec import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.duration._ /** @@ -135,8 +139,13 @@ private[sbt] object EventMonitor { k.reset() events } - val allEvents = rawEvents.flatMap { e => - Option(e.context).map(c => k.watchable.asInstanceOf[Path].resolve(c.asInstanceOf[Path])) + val keyPath = k.watchable.asInstanceOf[Path] + val allEvents = rawEvents.flatMap { + case e if e.kind.equals(OVERFLOW) => + handleOverflow(k) + case e if !e.kind.equals(OVERFLOW) && e.context != null => + Some(keyPath.resolve(e.context.asInstanceOf[Path])) + case _ => None } logger.debug(s"Received events:\n${allEvents.mkString("\n")}") val (exist, notExist) = allEvents.partition(Files.exists(_)) @@ -146,11 +155,62 @@ private[sbt] object EventMonitor { notExist.foreach(s.unregister) updatedFiles ++ newFiles ++ notExist } + + /* + * In the case of an overflow, we must poll the file system to find out if there are added + * or removed directories. When there are new directories, we also want to return file + * events for the files that are found therein. Because an overflow is likely to occur while + * a directory is still being modified, we poll repeatedly until we get the same list of + * files consecutively. We will not trigger for any files that are updated while the WatchKey + * is in the OVERFLOW state. There is no good way to fix this without caching mtimes for + * all of the files, which I don't think is worth doing at this juncture. + */ + private def handleOverflow(key: WatchKey): Vector[Path] = lock.synchronized { + val allFiles = new mutable.HashSet[Path] + def getNewFiles(): Unit = { + allFiles.clear() + val path = key.watchable.asInstanceOf[Path] + Files.walkFileTree( + path, + new FileVisitor[Path] { + override def preVisitDirectory(dir: Path, + attrs: BasicFileAttributes): FileVisitResult = { + allFiles += dir + if (!registered.contains(dir)) registered += dir -> s.register(dir) + FileVisitResult.CONTINUE + } + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + allFiles += file + FileVisitResult.CONTINUE + } + override def visitFileFailed(file: Path, exc: IOException): FileVisitResult = + FileVisitResult.SKIP_SUBTREE + override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = + FileVisitResult.CONTINUE + } + ) + () + } + + var oldFiles = mutable.Set.empty[Path] + do { + oldFiles = allFiles + getNewFiles() + } while (oldFiles != allFiles) + registered --= registered.collect { + case (d, k) if !Files.exists(d) => + k.reset() + k.cancel() + d + } + allFiles.toVector + } + /* * Returns new files found in new directory and any subdirectories, assuming that there is * a recursive source with a base that is parent to the directory. */ - def filesForNewDirectory(dir: Path): Iterator[Path] = { + private def filesForNewDirectory(dir: Path): Iterator[Path] = { lazy val recursive = s.sources.exists(src => dir.startsWith(src.base.toPath) && src.recursive) if (!registered.contains(dir) && recursive) { @@ -164,7 +224,7 @@ private[sbt] object EventMonitor { * Triggers only if there is no pending Trigger and the file is not in an anti-entropy * quarantine. */ - def maybeTrigger(path: Path): Unit = + private def maybeTrigger(path: Path): Unit = if (s.accept(path)) { if (recentEvents.get(path).fold(false)(!_.isOverdue)) logger.debug(s"Ignoring watch event for $path due to anti-entropy constraint") diff --git a/io/src/main/scala/sbt/io/MacOSXWatchService.scala b/io/src/main/scala/sbt/io/MacOSXWatchService.scala index 3a0fe5c4..f5076e8c 100644 --- a/io/src/main/scala/sbt/io/MacOSXWatchService.scala +++ b/io/src/main/scala/sbt/io/MacOSXWatchService.scala @@ -145,7 +145,7 @@ private class MacOSXWatchKey(val watchable: JPath, queueSize: Int, kinds: WatchE override def isValid: Boolean = valid.get - override def pollEvents(): JList[WatchEvent[_]] = { + override def pollEvents(): JList[WatchEvent[_]] = this.synchronized { val result = new mutable.ArrayBuffer[WatchEvent[_]](events.size).asJava events.drainTo(result) val overflowCount = overflow.getAndSet(0) @@ -167,8 +167,10 @@ private class MacOSXWatchKey(val watchable: JPath, queueSize: Int, kinds: WatchE private val overflow = new AtomicInteger() private val valid = new AtomicBoolean(true) - @inline def addEvent(event: Event[JPath]): Unit = if (!events.offer(event)) { - overflow.incrementAndGet() - () + @inline def addEvent(event: Event[JPath]): Unit = this.synchronized { + if (!events.offer(event)) { + overflow.incrementAndGet() + () + } } } diff --git a/io/src/main/scala/sbt/io/PollingWatchService.scala b/io/src/main/scala/sbt/io/PollingWatchService.scala index 522b5a9f..7eab09c4 100644 --- a/io/src/main/scala/sbt/io/PollingWatchService.scala +++ b/io/src/main/scala/sbt/io/PollingWatchService.scala @@ -9,7 +9,9 @@ import java.nio.file.{ Watchable, Path => JPath } +import java.nio.file.StandardWatchEventKinds.OVERFLOW import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.ArrayBlockingQueue import java.util.{ List => JList } import sbt.io.syntax._ @@ -63,7 +65,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = { ensureNotClosed() - val key = new PollingWatchKey(path, new java.util.ArrayList[WatchEvent[_]]) + val key = new PollingWatchKey(path) keys += path -> key thread.setFileTimes(path) watched += path -> events @@ -80,11 +82,12 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg if (closed) throw new ClosedWatchServiceException private class PollingThread(delay: FiniteDuration) extends Thread { - private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[WatchKey] + private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[PollingWatchKey] private[this] val _initDone = new AtomicBoolean(false) private[this] var fileTimes: Map[JPath, Long] = Map.empty - private[PollingWatchService] def withKeys[R](f: mutable.LinkedHashSet[WatchKey] => R): R = + private[PollingWatchService] def withKeys[R]( + f: mutable.LinkedHashSet[PollingWatchKey] => R): R = _keysWithEvents.synchronized(f(_keysWithEvents)) @deprecated("The initDone variable should not be accessed externally", "1.1.17") @@ -92,7 +95,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg @deprecated("The initDone variable should not be set externally", "1.1.17") def initDone_=(initDone: Boolean) = _initDone.set(initDone) @deprecated("Use withKeys instead of directly accessing keysWithEvents", "1.1.17") - def keysWithEvents: mutable.LinkedHashSet[WatchKey] = _keysWithEvents + def keysWithEvents: mutable.LinkedHashSet[PollingWatchKey] = _keysWithEvents override def run(): Unit = while (!closed) { @@ -124,7 +127,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg private def addEvent(path: JPath, ev: WatchEvent[JPath]): Unit = _keysWithEvents.synchronized { keys.get(path).foreach { k => _keysWithEvents += k - k.events.add(ev) + k.offer(ev) _keysWithEvents.notifyAll() } } @@ -173,30 +176,37 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg } } - modifiedFiles.foreach { - case file => - val parent = file.getParent - if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) { - val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY) - addEvent(parent, ev) - } + modifiedFiles.foreach { file => + val parent = file.getParent + if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) { + val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY) + addEvent(parent, ev) + } } } } - private class PollingWatchKey( - override val watchable: Watchable, - val events: JList[WatchEvent[_]] - ) extends WatchKey { + private object Overflow + extends PollingWatchEvent(null, OVERFLOW.asInstanceOf[WatchEvent.Kind[JPath]]) + private class PollingWatchKey(override val watchable: Watchable) extends WatchKey { + private[this] val events = new ArrayBlockingQueue[WatchEvent[_]](256) + private[this] val hasOverflow = new AtomicBoolean(false) override def cancel(): Unit = () override def isValid(): Boolean = true - override def pollEvents(): java.util.List[WatchEvent[_]] = { - val evs = new java.util.ArrayList[WatchEvent[_]](events) - events.clear() + override def pollEvents(): JList[WatchEvent[_]] = this.synchronized { + val evs = new java.util.ArrayList[WatchEvent[_]]() + val overflow = hasOverflow.getAndSet(false) + events.drainTo(evs) + if (overflow) evs.add(Overflow) evs } override def reset(): Boolean = true + def offer(ev: WatchEvent[_]): Unit = this.synchronized { + if (!hasOverflow.get && !events.offer(ev)) { + hasOverflow.set(true) + } + } } } diff --git a/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala index 27af6f2f..60347f7e 100644 --- a/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala +++ b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala @@ -13,6 +13,6 @@ object DefaultWatchServiceSpec { class DefaultWatchServiceSpec extends SourceModificationWatchSpec( - if (Properties.isMac) new MacOSXWatchService else FileSystems.getDefault.newWatchService, + _ => if (Properties.isMac) new MacOSXWatchService else FileSystems.getDefault.newWatchService, DefaultWatchServiceSpec.pollDelay ) diff --git a/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala index 54e72733..f83b9ba4 100644 --- a/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala +++ b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala @@ -5,5 +5,5 @@ import sbt.io.PollingWatchService import scala.concurrent.duration._ class PollingWatchServiceSpec - extends SourceModificationWatchSpec(new PollingWatchService(5.milliseconds), + extends SourceModificationWatchSpec((d: FiniteDuration) => new PollingWatchService(d), DefaultWatchServiceSpec.pollDelay) diff --git a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala index 8ca97356..c61841ed 100644 --- a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala +++ b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala @@ -1,7 +1,7 @@ package sbt.internal.io import java.io.IOException -import java.nio.file.{ ClosedWatchServiceException, Paths } +import java.nio.file.{ ClosedWatchServiceException, Files, Paths } import org.scalatest.{ Assertion, FlatSpec, Matchers } import sbt.io.syntax._ @@ -11,10 +11,11 @@ import scala.annotation.tailrec import scala.concurrent.duration._ abstract class SourceModificationWatchSpec( - getService: => WatchService, + getServiceWithPollDelay: FiniteDuration => WatchService, pollDelay: FiniteDuration ) extends FlatSpec with Matchers { + def getService = getServiceWithPollDelay(10.milliseconds) val maxWait = 2 * pollDelay it should "detect modified files" in IO.withTemporaryDirectory { dir => val parentDir = dir / "src" / "watchme" @@ -348,28 +349,42 @@ abstract class SourceModificationWatchSpec( } finally monitor.close() } - it should "reset keys" in IO.withTemporaryDirectory { dir => - val parentDir = dir / "src" / "watchme" - val file = parentDir / "Foo.scala" - - writeNewFile(file, "foo") - // Longer timeout because there are many file system operations - val deadline = 5.seconds.fromNow - val monitor = defaultMonitor(getService, parentDir, tc = () => deadline.isOverdue) - try { - val n = 1000 - val triggered0 = watchTest(monitor) { - (0 to n).foreach(i => IO.write(parentDir / s"Foo$i.scala", s"foo$i")) - } - assert(triggered0) - assert(IO.read(file) == s"foo") + it should "handle rapid creation of many subdirectories and files" in IO.withTemporaryDirectory { + dir => + val parentDir = dir / "src" / "watchme" + Files.createDirectories(parentDir.toPath) + val subdirCount = 2000 + val subdirFileCount = 4 + var files = Seq.empty[File] + + // Longer timeout because there are many file system operations. This can be very expensive + // especially in the PollingWatchSpec since both the PollingWatchService and the EventMonitor + // overflow handler are hammering the file system. To minimize the conflicts, we set a long + // interval between polls in the PollingWatchService using getServiceWithPollDelay. + val deadline = 20.seconds.fromNow + val monitor = + defaultMonitor(getServiceWithPollDelay(1.second), parentDir, tc = () => deadline.isOverdue) + try { + val triggered0 = watchTest(monitor) { + val subdirs = + (1 to subdirCount).map(i => + Files.createDirectories(parentDir.toPath.resolve(s"subdir-$i"))) + files = subdirs.flatMap { subdir => + subdir.toFile +: (1 to subdirFileCount).map { j => + Files.write(subdir.resolve(s"file-$j.scala"), s"foo".getBytes).toFile + } + } + } + val lastFile = files.last + assert(triggered0) + assert(IO.read(lastFile) == s"foo") - val triggered1 = watchTest(monitor) { - IO.write(file, "baz") - } - assert(triggered1) - assert(IO.read(file) == "baz") - } finally monitor.close() + val triggered1 = watchTest(monitor) { + IO.write(lastFile, "baz") + } + assert(triggered1) + assert(IO.read(lastFile) == "baz") + } finally monitor.close() } "WatchService.poll" should "throw a `ClosedWatchServiceException` if used after `close`" in {