Skip to content

Commit

Permalink
Merge pull request #155 from eatkins/overflows
Browse files Browse the repository at this point in the history
Overflows
  • Loading branch information
eed3si9n authored May 27, 2018
2 parents 0b1c1b0 + 14f5e5d commit 3328e02
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 59 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ val io = (project in file("io"))
// method this(sbt.io.PollingWatchService,sbt.io.PollingWatchService#PollingThread,java.nio.file.Watchable,java.util.List)Unit in class sbt.io.PollingWatchService#PollingWatchKey does not have a correspondent in current version
exclude[DirectMissingMethodProblem]("sbt.io.PollingWatchService#PollingWatchKey.this"),

// This is a private class
exclude[DirectMissingMethodProblem]("sbt.io.PollingWatchService#PollingWatchKey.events"),

// moved JavaMilli to sbt.io
exclude[MissingClassProblem]("sbt.internal.io.JavaMilli$"),
exclude[MissingClassProblem]("sbt.internal.io.JavaMilli"),
Expand Down
70 changes: 65 additions & 5 deletions io/src/main/scala/sbt/internal/io/EventMonitor.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package sbt.internal.io

import java.nio.file.{ ClosedWatchServiceException, Files, Path, WatchKey }
import java.io.IOException
import java.nio.file.StandardWatchEventKinds.OVERFLOW
import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }

import sbt.io.WatchService

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -135,8 +139,13 @@ private[sbt] object EventMonitor {
k.reset()
events
}
val allEvents = rawEvents.flatMap { e =>
Option(e.context).map(c => k.watchable.asInstanceOf[Path].resolve(c.asInstanceOf[Path]))
val keyPath = k.watchable.asInstanceOf[Path]
val allEvents = rawEvents.flatMap {
case e if e.kind.equals(OVERFLOW) =>
handleOverflow(k)
case e if !e.kind.equals(OVERFLOW) && e.context != null =>
Some(keyPath.resolve(e.context.asInstanceOf[Path]))
case _ => None
}
logger.debug(s"Received events:\n${allEvents.mkString("\n")}")
val (exist, notExist) = allEvents.partition(Files.exists(_))
Expand All @@ -146,11 +155,62 @@ private[sbt] object EventMonitor {
notExist.foreach(s.unregister)
updatedFiles ++ newFiles ++ notExist
}

/*
* In the case of an overflow, we must poll the file system to find out if there are added
* or removed directories. When there are new directories, we also want to return file
* events for the files that are found therein. Because an overflow is likely to occur while
* a directory is still being modified, we poll repeatedly until we get the same list of
* files consecutively. We will not trigger for any files that are updated while the WatchKey
* is in the OVERFLOW state. There is no good way to fix this without caching mtimes for
* all of the files, which I don't think is worth doing at this juncture.
*/
private def handleOverflow(key: WatchKey): Vector[Path] = lock.synchronized {
val allFiles = new mutable.HashSet[Path]
def getNewFiles(): Unit = {
allFiles.clear()
val path = key.watchable.asInstanceOf[Path]
Files.walkFileTree(
path,
new FileVisitor[Path] {
override def preVisitDirectory(dir: Path,
attrs: BasicFileAttributes): FileVisitResult = {
allFiles += dir
if (!registered.contains(dir)) registered += dir -> s.register(dir)
FileVisitResult.CONTINUE
}
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
allFiles += file
FileVisitResult.CONTINUE
}
override def visitFileFailed(file: Path, exc: IOException): FileVisitResult =
FileVisitResult.SKIP_SUBTREE
override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult =
FileVisitResult.CONTINUE
}
)
()
}

var oldFiles = mutable.Set.empty[Path]
do {
oldFiles = allFiles
getNewFiles()
} while (oldFiles != allFiles)
registered --= registered.collect {
case (d, k) if !Files.exists(d) =>
k.reset()
k.cancel()
d
}
allFiles.toVector
}

/*
* Returns new files found in new directory and any subdirectories, assuming that there is
* a recursive source with a base that is parent to the directory.
*/
def filesForNewDirectory(dir: Path): Iterator[Path] = {
private def filesForNewDirectory(dir: Path): Iterator[Path] = {
lazy val recursive =
s.sources.exists(src => dir.startsWith(src.base.toPath) && src.recursive)
if (!registered.contains(dir) && recursive) {
Expand All @@ -164,7 +224,7 @@ private[sbt] object EventMonitor {
* Triggers only if there is no pending Trigger and the file is not in an anti-entropy
* quarantine.
*/
def maybeTrigger(path: Path): Unit =
private def maybeTrigger(path: Path): Unit =
if (s.accept(path)) {
if (recentEvents.get(path).fold(false)(!_.isOverdue))
logger.debug(s"Ignoring watch event for $path due to anti-entropy constraint")
Expand Down
10 changes: 6 additions & 4 deletions io/src/main/scala/sbt/io/MacOSXWatchService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private class MacOSXWatchKey(val watchable: JPath, queueSize: Int, kinds: WatchE

override def isValid: Boolean = valid.get

override def pollEvents(): JList[WatchEvent[_]] = {
override def pollEvents(): JList[WatchEvent[_]] = this.synchronized {
val result = new mutable.ArrayBuffer[WatchEvent[_]](events.size).asJava
events.drainTo(result)
val overflowCount = overflow.getAndSet(0)
Expand All @@ -167,8 +167,10 @@ private class MacOSXWatchKey(val watchable: JPath, queueSize: Int, kinds: WatchE
private val overflow = new AtomicInteger()
private val valid = new AtomicBoolean(true)

@inline def addEvent(event: Event[JPath]): Unit = if (!events.offer(event)) {
overflow.incrementAndGet()
()
@inline def addEvent(event: Event[JPath]): Unit = this.synchronized {
if (!events.offer(event)) {
overflow.incrementAndGet()
()
}
}
}
37 changes: 32 additions & 5 deletions io/src/main/scala/sbt/io/Path.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
*/
package sbt.io

import java.io.File
import java.io.{ File, IOException }
import java.net.URL

import scala.collection.mutable
import java.nio.file.attribute._
import java.nio.file.{ Path => NioPath, LinkOption, FileSystem, Files }
import java.nio.file.{
FileSystem,
FileVisitResult,
FileVisitor,
Files,
LinkOption,
Path => NioPath
}

import scala.collection.JavaConverters._

final class RichFile(val asFile: File) extends AnyVal with RichNioPath {
Expand Down Expand Up @@ -439,9 +448,27 @@ private class DescendantOrSelfPathFinder(val parent: PathFinder, val filter: Fil
}

private def handleFileDescendant(file: File, fileSet: mutable.Set[File]): Unit = {
handleFile(file, fileSet)
for (childDirectory <- IO.wrapNull(file listFiles DirectoryFilter))
handleFileDescendant(new File(file, childDirectory.getName), fileSet)
Files.walkFileTree(
file.toPath,
new FileVisitor[NioPath] {
override def preVisitDirectory(dir: NioPath,
attrs: BasicFileAttributes): FileVisitResult = {
val file = dir.toFile
if (filter.accept(file)) fileSet += file
FileVisitResult.CONTINUE
}
override def visitFile(file: NioPath, attrs: BasicFileAttributes): FileVisitResult = {
val ioFile = file.toFile
if (filter.accept(ioFile)) fileSet += ioFile
FileVisitResult.CONTINUE
}
override def visitFileFailed(file: NioPath, exc: IOException): FileVisitResult =
FileVisitResult.SKIP_SUBTREE
override def postVisitDirectory(dir: NioPath, exc: IOException): FileVisitResult =
FileVisitResult.CONTINUE
}
)
()
}
}

Expand Down
48 changes: 29 additions & 19 deletions io/src/main/scala/sbt/io/PollingWatchService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import java.nio.file.{
Watchable,
Path => JPath
}
import java.nio.file.StandardWatchEventKinds.OVERFLOW
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.ArrayBlockingQueue
import java.util.{ List => JList }

import sbt.io.syntax._
Expand Down Expand Up @@ -63,7 +65,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg

override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = {
ensureNotClosed()
val key = new PollingWatchKey(path, new java.util.ArrayList[WatchEvent[_]])
val key = new PollingWatchKey(path)
keys += path -> key
thread.setFileTimes(path)
watched += path -> events
Expand All @@ -80,19 +82,20 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg
if (closed) throw new ClosedWatchServiceException

private class PollingThread(delay: FiniteDuration) extends Thread {
private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[WatchKey]
private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[PollingWatchKey]
private[this] val _initDone = new AtomicBoolean(false)
private[this] var fileTimes: Map[JPath, Long] = Map.empty

private[PollingWatchService] def withKeys[R](f: mutable.LinkedHashSet[WatchKey] => R): R =
private[PollingWatchService] def withKeys[R](
f: mutable.LinkedHashSet[PollingWatchKey] => R): R =
_keysWithEvents.synchronized(f(_keysWithEvents))

@deprecated("The initDone variable should not be accessed externally", "1.1.17")
def initDone: Boolean = _initDone.get()
@deprecated("The initDone variable should not be set externally", "1.1.17")
def initDone_=(initDone: Boolean) = _initDone.set(initDone)
@deprecated("Use withKeys instead of directly accessing keysWithEvents", "1.1.17")
def keysWithEvents: mutable.LinkedHashSet[WatchKey] = _keysWithEvents
def keysWithEvents: mutable.LinkedHashSet[PollingWatchKey] = _keysWithEvents

override def run(): Unit =
while (!closed) {
Expand Down Expand Up @@ -124,7 +127,7 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg
private def addEvent(path: JPath, ev: WatchEvent[JPath]): Unit = _keysWithEvents.synchronized {
keys.get(path).foreach { k =>
_keysWithEvents += k
k.events.add(ev)
k.offer(ev)
_keysWithEvents.notifyAll()
}
}
Expand Down Expand Up @@ -173,30 +176,37 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService with Unreg
}
}

modifiedFiles.foreach {
case file =>
val parent = file.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) {
val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY)
addEvent(parent, ev)
}
modifiedFiles.foreach { file =>
val parent = file.getParent
if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_MODIFY)) {
val ev = new PollingWatchEvent(parent.relativize(file), ENTRY_MODIFY)
addEvent(parent, ev)
}
}
}

}

private class PollingWatchKey(
override val watchable: Watchable,
val events: JList[WatchEvent[_]]
) extends WatchKey {
private object Overflow
extends PollingWatchEvent(null, OVERFLOW.asInstanceOf[WatchEvent.Kind[JPath]])
private class PollingWatchKey(override val watchable: Watchable) extends WatchKey {
private[this] val events = new ArrayBlockingQueue[WatchEvent[_]](256)
private[this] val hasOverflow = new AtomicBoolean(false)
override def cancel(): Unit = ()
override def isValid(): Boolean = true
override def pollEvents(): java.util.List[WatchEvent[_]] = {
val evs = new java.util.ArrayList[WatchEvent[_]](events)
events.clear()
override def pollEvents(): JList[WatchEvent[_]] = this.synchronized {
val evs = new java.util.ArrayList[WatchEvent[_]]()
val overflow = hasOverflow.getAndSet(false)
events.drainTo(evs)
if (overflow) evs.add(Overflow)
evs
}
override def reset(): Boolean = true
def offer(ev: WatchEvent[_]): Unit = this.synchronized {
if (!hasOverflow.get && !events.offer(ev)) {
hasOverflow.set(true)
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ object DefaultWatchServiceSpec {

class DefaultWatchServiceSpec
extends SourceModificationWatchSpec(
if (Properties.isMac) new MacOSXWatchService else FileSystems.getDefault.newWatchService,
_ => if (Properties.isMac) new MacOSXWatchService else FileSystems.getDefault.newWatchService,
DefaultWatchServiceSpec.pollDelay
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ import sbt.io.PollingWatchService
import scala.concurrent.duration._

class PollingWatchServiceSpec
extends SourceModificationWatchSpec(new PollingWatchService(5.milliseconds),
extends SourceModificationWatchSpec((d: FiniteDuration) => new PollingWatchService(d),
DefaultWatchServiceSpec.pollDelay)
Loading

0 comments on commit 3328e02

Please sign in to comment.