diff --git a/.travis.yml b/.travis.yml index 66d86f04..961b68cc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,21 +1,72 @@ language: scala -jdk: oraclejdk8 + +env: + global: + - SBT_VER=1.1.2 matrix: include: - scala: 2.10.7 env: CMD="mimaReportBinaryIssues scalafmt::test test:scalafmt::test sbt:scalafmt::test test" + os: linux + dist: trusty + + - scala: 2.10.7 + os: osx + env: + CMD="mimaReportBinaryIssues scalafmt::test test:scalafmt::test sbt:scalafmt::test test" + TRAVIS_SCALA_VERSION=2.10.7 + language: java + osx_image: xcode9.3 - scala: 2.11.12 env: CMD="mimaReportBinaryIssues scalafmt::test test:scalafmt::test sbt:scalafmt::test test" + os: linux + dist: trusty + + - scala: 2.11.12 + os: osx + env: + CMD="mimaReportBinaryIssues scalafmt::test test:scalafmt::test sbt:scalafmt::test test" + TRAVIS_SCALA_VERSION=2.11.12 + language: java + osx_image: xcode9.3 - scala: 2.12.4 env: CMD="mimaReportBinaryIssues scalafmt::test test:scalafmt::test sbt:scalafmt::test test" + os: linux + dist: trusty + + - scala: 2.12.4 + os: osx + env: + CMD="mimaReportBinaryIssues scalafmt::test test:scalafmt::test sbt:scalafmt::test test" + TRAVIS_SCALA_VERSION=2.12.4 + language: java + osx_image: xcode9.3 - scala: 2.13.0-M3 env: CMD=compile + os: linux + dist: trusty + + - scala: 2.13.0-M3 + os: osx + env: + CMD="compile" + TRAVIS_SCALA_VERSION=2.13.0-M3 + language: java + osx_image: xcode9.3 + +script: sbt -Dsbt.version=$SBT_VER ++$TRAVIS_SCALA_VERSION $CMD -script: sbt ++$TRAVIS_SCALA_VERSION $CMD +before_install: + # https://github.com/travis-ci/travis-ci/issues/8408 + - unset _JAVA_OPTIONS + - if [[ "$TRAVIS_OS_NAME" = "osx" ]]; then + brew update; + brew install sbt; + fi cache: directories: diff --git a/io/src/main/scala/sbt/internal/io/EventMonitor.scala b/io/src/main/scala/sbt/internal/io/EventMonitor.scala new file mode 100644 index 00000000..8d1932b4 --- /dev/null +++ b/io/src/main/scala/sbt/internal/io/EventMonitor.scala @@ -0,0 +1,241 @@ +package sbt.internal.io + +import java.nio.file.{ ClosedWatchServiceException, Files, Path, WatchKey } +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } + +import sbt.io.WatchService + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ + +/** + * Waits for build triggers. Builds can be triggered by source file updates as well as when the + * watch is terminated by user input. + * + * An EventMonitor can be made using the apply method in the EventMonitor companion. Using this + * handle, the user can block the current thread with awaitEvents until a build is triggered. + * SBT has a few keys (watchingMessage, triggeredMessage) and internal tasks that require a + * WatchState. The EventMonitor provides access to a WatchState via the state() method to ensure + * that the apis that + * use these keys continue working. + * + * No implementation details are specified so that the EventMonitor may be treated as a black box. + */ +private[sbt] sealed trait EventMonitor extends AutoCloseable { + + /** Block indefinitely until the monitor receives a file event or the user stops the watch. */ + def awaitEvent(): Boolean + + /** A snapshot of the WatchState that includes the number of build triggers and watch sources. */ + def state(): WatchState +} + +/** + * Provides factory methods for creating instances of EventMonitor. + */ +private[sbt] object EventMonitor { + private sealed trait Event + private case object Cancelled extends Event + private case class Triggered(path: Path) extends Event + + private class EventMonitorImpl private[EventMonitor] ( + private[this] val service: WatchService, + private[this] val events: ArrayBlockingQueue[Event], + private[this] val eventThread: Looper with HasWatchState, + private[this] val userInputThread: Looper, + private[this] val logger: Logger, + private[this] val closeService: Boolean) + extends EventMonitor { + + override def state(): WatchState = eventThread.state() + + override def awaitEvent(): Boolean = events.take() match { + case Cancelled => false + case Triggered(path) => + logger.debug(s"Triggered watch event due to updated path: $path") + eventThread.incrementCount() + true + } + + override def close(): Unit = { + if (closed.compareAndSet(false, true)) { + if (closeService) service.close() + userInputThread.close() + eventThread.close() + logger.debug("Closed EventMonitor") + } + } + + private[this] val closed = new AtomicBoolean(false) + } + + /** + * Create a new EventMonitor + * @param state The initial watch state for the monitor + * @param delay Maximum duration that the monitor will poll the watch service for events + * @param antiEntropy Minimum duration that must elapse before a build may by re-triggered by + * the same file + * @param terminationCondition Exits watch if evaluates to true + * @param logger + * @return The new EventMonitor + */ + def apply(state: WatchState, + delay: FiniteDuration, + antiEntropy: FiniteDuration, + terminationCondition: => Boolean, + logger: Logger = NullLogger): EventMonitor = + applyImpl(state, delay, antiEntropy, terminationCondition, logger, closeService = true) + + private[EventMonitor] def applyImpl(state: WatchState, + delay: FiniteDuration, + antiEntropy: FiniteDuration, + terminationCondition: => Boolean, + logger: Logger, + closeService: Boolean): EventMonitor = { + val events = new ArrayBlockingQueue[Event](1) + val eventThread = newEventsThread(delay, antiEntropy, state, events, logger) + val userInputThread = newUserInputThread(terminationCondition, events, logger) + new EventMonitorImpl(state.service, events, eventThread, userInputThread, logger, closeService) + } + + private[io] def legacy(state: WatchState, + delay: FiniteDuration, + terminationCondition: => Boolean): EventMonitor = { + val tc = () => { val res = terminationCondition; if (!res) Thread.sleep(10); res } + applyImpl(state, delay, 40.milliseconds, tc(), NullLogger, closeService = false) + } + + private trait HasWatchState { + def state(): WatchState + def incrementCount(): Unit + } + private def newEventsThread(delay: FiniteDuration, + antiEntropy: FiniteDuration, + s: WatchState, + events: ArrayBlockingQueue[Event], + logger: Logger): Looper with HasWatchState = { + var recentEvents = Map.empty[Path, Deadline] + var registered = s.registered + var count = s.count + val lock = new Object + new Looper(s"watch-state-event-thread-${eventThreadId.incrementAndGet()}") with HasWatchState { + def incrementCount(): Unit = lock.synchronized { count += 1 } + def state(): WatchState = lock.synchronized(s.withCount(count).withRegistered(registered)) + override def loop(): Unit = { + recentEvents = recentEvents.filterNot(_._2.isOverdue) + getFilesForKey(s.service.poll(delay)).foreach(maybeTrigger) + } + def getFilesForKey(key: WatchKey): Seq[Path] = key match { + case null => Nil + case k => + val allEvents = k.pollEvents.asScala + .map(e => k.watchable.asInstanceOf[Path].resolve(e.context.asInstanceOf[Path])) + logger.debug(s"Received events:\n${allEvents.mkString("\n")}") + val (exist, notExist) = allEvents.partition(Files.exists(_)) + val (updatedDirectories, updatedFiles) = exist.partition(Files.isDirectory(_)) + val newFiles = updatedDirectories.flatMap(filesForNewDirectory) + lock.synchronized { registered --= notExist } + notExist.foreach(s.unregister) + updatedFiles ++ newFiles ++ notExist + } + /* + * Returns new files found in new directory and any subdirectories, assuming that there is + * a recursive source with a base that is parent to the directory. + */ + def filesForNewDirectory(dir: Path): Seq[Path] = { + lazy val recursive = + s.sources.exists(src => dir.startsWith(src.base.toPath) && src.recursive) + if (!registered.contains(dir) && recursive) { + val dirs = Files.walk(dir).iterator.asScala.filter(Files.isDirectory(_)) + val newDirs = dirs.map(d => d -> s.register(d)).toIndexedSeq + lock.synchronized { registered ++= newDirs } + Files.walk(dir).iterator.asScala.toSeq + } else Nil + } + /* + * Triggers only if there is no pending Trigger and the file is not in an anti-entropy + * quarantine. + */ + def maybeTrigger(path: Path): Unit = + if (s.accept(path)) { + if (recentEvents.get(path).fold(false)(!_.isOverdue)) + logger.debug(s"Ignoring watch event for $path due to anti-entropy constraint") + else + events.peek() match { + case Cancelled => + logger.debug(s"Watch cancelled, not offering event for path $path") + case _ => + recentEvents += path -> antiEntropy.fromNow + if (!events.offer(Triggered(path))) { + logger.debug(s"Event already pending, dropping event for path: $path") + } + } + } + } + } + // Shutup the compiler about unused arguments + @inline private[this] def ignoreArg(arg: => Any): Unit = if (true) () else { arg; () } + trait Logger { + def debug(msg: => Any): Unit = ignoreArg(msg) + } + object NullLogger extends Logger + private def newUserInputThread(terminationCondition: => Boolean, + events: ArrayBlockingQueue[Event], + logger: Logger): Looper = + new Looper(s"watch-state-user-input-${userInputId.incrementAndGet}") { + override final def loop(): Unit = { + if (terminationCondition) { + logger.debug("Received termination condition. Stopping watch...") + events.peek match { + case Cancelled => + case _ => + while (!events.offer(Cancelled)) { + events.clear() + } + } + } else {} + } + } + + private abstract class Looper(name: String) extends Thread(name) with AutoCloseable { + private[this] var stopped = false + private[this] var started = false + private[this] val lock = new Object() + def isStopped: Boolean = this.synchronized(stopped) + def loop(): Unit + @tailrec + private final def runImpl(firstTime: Boolean): Unit = { + if (firstTime) { + started = true + lock.synchronized(lock.notifyAll) + } + try { + if (!isStopped) { + loop() + } + } catch { + case (_: ClosedWatchServiceException | _: InterruptedException) => + this.synchronized { stopped = true } + } + if (!isStopped) { + runImpl(firstTime = false) + } + } + override final def run(): Unit = runImpl(firstTime = true) + def close(): Unit = this.synchronized { + if (!stopped) { + stopped = true + this.interrupt() + this.join(5000) + } + } + setDaemon(true) + start() + lock.synchronized { if (!started) lock.wait() } + } + private val eventThreadId = new AtomicInteger(0) + private val userInputId = new AtomicInteger(0) + +} diff --git a/io/src/main/scala/sbt/internal/io/Milli.scala b/io/src/main/scala/sbt/internal/io/Milli.scala index ea38a313..6d2643dd 100644 --- a/io/src/main/scala/sbt/internal/io/Milli.scala +++ b/io/src/main/scala/sbt/internal/io/Milli.scala @@ -200,7 +200,7 @@ private trait Linux64 extends Library with Utimensat[Long] { } private object Linux64Milli extends PosixMilliLongUtim[Linux64] { protected final val AT_FDCWD: Int = -100 - protected final val UTIME_OMIT: Long = ((1 << 30) - 2) + protected final val UTIME_OMIT: Long = (1L << 30) - 2 protected def getModifiedTimeNative(filePath: String) = { val stat = new Linux64FileStat checkedIO(filePath) { libc.__xstat64(1, filePath, stat) } diff --git a/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala b/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala index cb92c244..5c5bf175 100644 --- a/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala +++ b/io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala @@ -3,76 +3,33 @@ */ package sbt.internal.io -import java.nio.file.{ Files, Path, WatchEvent, WatchKey } import java.nio.file.StandardWatchEventKinds._ +import java.nio.file.{ WatchService => _, _ } -import sbt.io.{ DirectoryFilter, FileFilter, WatchService, AllPassFilter, NothingFilter } +import sbt.io._ import sbt.io.syntax._ -import scala.annotation.tailrec -import scala.concurrent.duration.FiniteDuration +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ private[sbt] object SourceModificationWatch { /** * Checks for modifications on the file system every `delay`, * until changes are detected or `terminationCondition` evaluates to `true`. + * Uses default anti-entropy time of 40.milliseconds. */ - @tailrec + @deprecated("This is superseded by EventMonitor.watch", "1.1.7") def watch(delay: FiniteDuration, state: WatchState)( - terminationCondition: => Boolean - ): (Boolean, WatchState) = { - if (state.count == 0) (true, state.withCount(1)) - else { - val events = - state.pollEvents().map(expandEvent) - - if (events.isEmpty) { - if (terminationCondition) { - (false, state) - } else { - Thread.sleep(delay.toMillis) - watch(delay, state)(terminationCondition) - } - } else { - val previousFiles = state.registered.keySet - val newFiles = state.sources.flatMap(_.getUnfilteredPaths()).toSet - val createdFiles = newFiles -- previousFiles - val deletedFiles = previousFiles -- newFiles - - // We may have events that are not relevant (e.g., created an empty directory.) - // We filter out those changes, so that we don't trigger unnecessarily. - val filteredDeleted = deletedFiles.filter(p => state.sources.exists(_.accept(p, false))) - val filteredCreated = createdFiles.filter(p => state.sources.exists(_.accept(p, false))) - val filteredModified = events.collect { - case (p, ENTRY_MODIFY) if state.sources.exists(_.accept(p, false)) => p - } - - // Register and remove _unfiltered_ files. This is correct because directories - // are likely to be filtered out (for instance), but we should still add them - // to the files that are watched. - // We don't increment count because we don't know yet if we'll trigger. - val newState = state ++ createdFiles -- deletedFiles - - if (filteredCreated.nonEmpty || filteredDeleted.nonEmpty || filteredModified.nonEmpty) { - (true, newState.withCount(newState.count + 1)) - } else { - Thread.sleep(delay.toMillis) - watch(delay, newState)(terminationCondition) - } - } - } - } - - private def expandEvent(event: (Path, WatchEvent[_])): (Path, WatchEvent.Kind[Path]) = { - event match { - case (base, ev) => - val fullPath = Option(ev.context().asInstanceOf[Path]) match { - case Some(path) => base.resolve(path) - case None => base - } - val kind = ev.kind().asInstanceOf[WatchEvent.Kind[Path]] - (fullPath, kind) + terminationCondition: => Boolean): (Boolean, WatchState) = { + if (state.count == 0) { + (true, state.withCount(1)) + } else { + val eventMonitor = EventMonitor.legacy(state, delay, terminationCondition) + try { + val triggered = eventMonitor.awaitEvent() + (triggered, eventMonitor.state()) + } finally eventMonitor.close() } } } @@ -81,17 +38,22 @@ private[sbt] object SourceModificationWatch { private[sbt] final class WatchState private ( val count: Int, private[sbt] val sources: Seq[Source], - service: WatchService, + private[sbt] val service: WatchService, private[sbt] val registered: Map[Path, WatchKey] -) { +) extends AutoCloseable { + def accept(p: Path): Boolean = sources.exists(_.accept(p)) + def unregister(path: Path): Unit = service match { + case s: Unregisterable => s.unregister(path) + case _ => + } /** Removes all of `fs` from the watched paths. */ private[sbt] def --(fs: Iterable[Path]): WatchState = { for { - f <- fs; - wk <- registered.get(f); - if (registered.values.count(_ == wk)) <= 1 - } wk.cancel() + f <- fs + wk <- registered.get(f) + if registered.values.count(_ == wk) <= 1 + } unregister(wk.watchable().asInstanceOf[Path]) withRegistered(registered -- fs) } @@ -101,12 +63,11 @@ private[sbt] final class WatchState private ( fs.filter(Files.exists(_)).foldLeft(registered) { case (ks, d) if Files.isDirectory(d) => if (ks.contains(d)) ks - else ks + (d -> service.register(d, WatchState.events: _*)) - + else ks + (d -> register(d)) case (ks, f) => val parent = f.getParent - if (ks.contains(parent)) ks + (f -> ks(parent)) - else ks + (f -> service.register(parent, WatchState.events: _*)) + if (!ks.contains(parent)) ks + (parent -> register(parent)) + else ks } withRegistered(newKeys) } @@ -119,6 +80,9 @@ private[sbt] final class WatchState private ( } } + /** register a path with the watch service */ + private[sbt] def register(path: Path): WatchKey = service.register(path, WatchState.events: _*) + /** A new state, with a new `count`. */ private[sbt] def withCount(count: Int): WatchState = new WatchState(count, sources, service, registered) @@ -126,6 +90,11 @@ private[sbt] final class WatchState private ( /** A new state, with new keys registered. */ private[sbt] def withRegistered(registered: Map[Path, WatchKey]): WatchState = new WatchState(count, sources, service, registered) + + /** Shutsdown the EventMonitor and the watch service. */ + override def close(): Unit = { + service.close() + } } /** @@ -136,10 +105,10 @@ private[sbt] final class WatchState private ( * @param recursive Whether the lists is recursive or immediate children. */ final class Source( - base: File, - includeFilter: FileFilter, - excludeFilter: FileFilter, - recursive: Boolean + val base: File, + val includeFilter: FileFilter, + val excludeFilter: FileFilter, + val recursive: Boolean ) { def this(base: File, includeFilter: FileFilter, excludeFilter: FileFilter) = @@ -156,7 +125,8 @@ final class Source( if (includeDirs) DirectoryFilter || includeFilter else includeFilter - p.startsWith(base.toPath) && inc.accept(p.toFile) && !excludeFilter.accept(p.toFile) + (if (!recursive) p.getParent == base.toPath else p.startsWith(base.toPath)) && inc.accept( + p.toFile) && !excludeFilter.accept(p.toFile) } /** @@ -171,7 +141,7 @@ final class Source( def withRecursive(recursive: Boolean): Source = new Source(base, includeFilter, excludeFilter, recursive) - override def toString = + override def toString: String = s"""Source( | base = $base, | includeFilter = $includeFilter, @@ -201,9 +171,18 @@ private[sbt] object WatchState { * @return An initial `WatchState`. */ def empty(service: WatchService, sources: Seq[Source]): WatchState = { - val initFiles = sources.flatMap(_.getUnfilteredPaths()) + val initFiles = sources.flatMap { + case s if s.recursive => + val base = s.base.toPath + if (Files.exists(base)) { + Files.walk(base).iterator.asScala.collect { + case d if Files.isDirectory(d) => d.toRealPath() + } + } else Seq(base) + case s => Seq(s.base.toPath) + }.sorted assert(initFiles.nonEmpty) - val initState = new WatchState(0, sources, service, Map.empty) ++ initFiles + val initState = new WatchState(count = 1, sources, service, Map.empty) ++ initFiles service.init() initState } diff --git a/io/src/main/scala/sbt/io/MacOSXWatchService.scala b/io/src/main/scala/sbt/io/MacOSXWatchService.scala index c831cf60..bd67e169 100644 --- a/io/src/main/scala/sbt/io/MacOSXWatchService.scala +++ b/io/src/main/scala/sbt/io/MacOSXWatchService.scala @@ -1,65 +1,84 @@ package sbt.io import java.nio.file.StandardWatchEventKinds.{ ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY, OVERFLOW } -import java.nio.file.{ Files, WatchEvent, WatchKey, Path => JPath, Paths => JPaths } -import java.util.Collections +import java.nio.file.{ + ClosedWatchServiceException, + Files, + WatchEvent, + WatchKey, + Path => JPath, + Paths => JPaths +} import java.util.concurrent._ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } -import java.util.{ List => JList } +import java.util.{ Collections, List => JList } import com.swoval.concurrent.ThreadFactory import com.swoval.files.apple.FileEventsApi.Consumer import com.swoval.files.apple.{ FileEvent, FileEventsApi, Flags } -import scala.collection.mutable import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.duration._ -class MacOSXWatchService extends WatchService { +class MacOSXWatchService extends WatchService with Unregisterable { // The FsEvents api doesn't seem to report events at lower than 10 millisecond intervals. private[this] val watchLatency: Duration = 10.milliseconds private[this] val queueSize = 256 private[this] val executor = Executors.newSingleThreadExecutor(new ThreadFactory("sbt.io.MacOSXWatchService")) + private[this] val streams = mutable.Set.empty[JPath] + private[this] def async[R](f: => R): Unit = { + executor.submit(new Runnable() { override def run(): Unit = { f; () } }) + () + } private[this] val dropEvent = new Consumer[String] { - override def accept(s: String): Unit = {} + override def accept(s: String): Unit = async { + registered.synchronized { + val path = JPaths.get(s) + streams -= path + registered.get(path) match { + case None => + case Some((k, _)) => + registered += path -> (k -> -1) + } + } + } } private val onFileEvent = new Consumer[FileEvent] { - override def accept(fileEvent: FileEvent): Unit = { - executor.submit(new Runnable { - override def run(): Unit = { - val path = JPaths.get(fileEvent.fileName) - registered.synchronized(registered.get(path.getParent).foreach { key => - val exists = Files.exists(path) - if (exists && key.reportModifyEvents) createEvent(key, ENTRY_MODIFY, path) - else if (!exists && key.reportDeleteEvents) createEvent(key, ENTRY_DELETE, path) - }) - } + override def accept(fileEvent: FileEvent): Unit = async { + val path = JPaths.get(fileEvent.fileName) + registered.synchronized(registered.get(path) orElse registered.get(path.getParent) foreach { + case (key, _) => + val exists = Files.exists(path) + if (exists && key.reportModifyEvents) createEvent(key, ENTRY_MODIFY, path) + else if (!exists && key.reportDeleteEvents) createEvent(key, ENTRY_DELETE, path) }) - () } } private[this] val watcher: FileEventsApi = FileEventsApi.apply(onFileEvent, dropEvent) - override def close(): Unit = { + override def close(): Unit = watcher.synchronized { if (open.compareAndSet(true, false)) { watcher.close() executor.shutdownNow() + executor.awaitTermination(5, TimeUnit.SECONDS) () - } + } else {} } override def init(): Unit = {} - override def poll(timeout: Duration): WatchKey = { - readyKeys.poll(timeout.toNanos, TimeUnit.NANOSECONDS) - } + override def poll(timeout: Duration): WatchKey = + if (isOpen) { + readyKeys.poll(timeout.toNanos, TimeUnit.NANOSECONDS) + } else throw new ClosedWatchServiceException override def pollEvents(): Map[WatchKey, Seq[WatchEvent[JPath]]] = registered .synchronized(registered.flatMap { - case (_, k) => + case (_, (k, _)) => val events = k.pollEvents() if (events.isEmpty) None else Some(k -> events.asScala.map(_.asInstanceOf[WatchEvent[JPath]])) @@ -67,19 +86,40 @@ class MacOSXWatchService extends WatchService { .toMap[WatchKey, Seq[WatchEvent[JPath]]] override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = { - registered.synchronized { - registered get path match { - case Some(k) => k; - case _ => - val key = new MacOSXWatchKey(path, queueSize, events: _*) - registered += path -> key - val flags = new Flags.Create().setNoDefer().setFileEvents().value - watcher.createStream(path.toRealPath().toString, watchLatency.toMillis / 1000.0, flags) - key + if (isOpen) { + registered.synchronized { + val realPath = path.toRealPath() + registered get realPath match { + case Some((k, _)) => k; + case _ => + val key = new MacOSXWatchKey(realPath, queueSize, events: _*) + val flags = new Flags.Create().setNoDefer().setFileEvents().value + val id = + if (streams.exists(s => realPath.startsWith(s))) -1 + else { + streams += realPath + watcher.createStream(realPath.toString, watchLatency.toMillis / 1000.0, flags) + } + + registered += realPath -> (key -> id) + key + } } - } + } else throw new ClosedWatchServiceException } + override def unregister(path: JPath): Unit = + if (isOpen) registered.synchronized { + registered.get(path) match { + case Some((k, i)) => + if (i >= 0) watcher.stopStream(i) + k.cancel() + registered -= path + case None => + } + () + } else throw new ClosedWatchServiceException + private def createEvent(key: MacOSXWatchKey, kind: WatchEvent.Kind[JPath], file: JPath): Unit = { val event = Event(kind, 1, file) key.addEvent(event) @@ -93,7 +133,7 @@ class MacOSXWatchService extends WatchService { private[this] val open = new AtomicBoolean(true) private[this] val readyKeys = new LinkedBlockingQueue[MacOSXWatchKey] - private[this] val registered = mutable.Map.empty[JPath, MacOSXWatchKey] + private[this] val registered = mutable.Map.empty[JPath, (MacOSXWatchKey, Int)] } private case class Event[T](kind: WatchEvent.Kind[T], count: Int, context: T) extends WatchEvent[T] diff --git a/io/src/main/scala/sbt/io/PollingWatchService.scala b/io/src/main/scala/sbt/io/PollingWatchService.scala index 0669d78e..a83adb56 100644 --- a/io/src/main/scala/sbt/io/PollingWatchService.scala +++ b/io/src/main/scala/sbt/io/PollingWatchService.scala @@ -1,22 +1,24 @@ package sbt.io +import java.nio.file.StandardWatchEventKinds._ import java.nio.file.{ ClosedWatchServiceException, Files, - Path => JPath, - Watchable, + WatchEvent, WatchKey, - WatchEvent + Watchable, + Path => JPath } -import java.nio.file.StandardWatchEventKinds._ +import java.util.concurrent.atomic.AtomicBoolean import java.util.{ List => JList } import sbt.io.syntax._ + import scala.collection.mutable import scala.concurrent.duration.{ Duration, FiniteDuration } /** A `WatchService` that polls the filesystem every `delay`. */ -class PollingWatchService(delay: FiniteDuration) extends WatchService { +class PollingWatchService(delay: FiniteDuration) extends WatchService with Unregisterable { private var closed: Boolean = false private val thread: PollingThread = new PollingThread(delay) private val keys: mutable.Map[JPath, PollingWatchKey] = mutable.Map.empty @@ -36,32 +38,25 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService { override def init(): Unit = { ensureNotClosed() thread.start() - while (!thread.initDone) { - Thread.sleep(100) - } } - override def poll(timeout: Duration): WatchKey = thread.keysWithEvents.synchronized { + override def poll(timeout: Duration): WatchKey = thread.withKeys { keys => ensureNotClosed() - thread.keysWithEvents.synchronized { - if (thread.keysWithEvents.isEmpty) { - thread.keysWithEvents.wait(timeout.toMillis) - } + if (keys.isEmpty) { + keys.wait(timeout.toMillis) } - thread.keysWithEvents.headOption.map { k => - thread.keysWithEvents -= k + keys.headOption.map { k => + keys -= k k }.orNull } override def pollEvents(): Map[WatchKey, Seq[WatchEvent[JPath]]] = - thread.keysWithEvents.synchronized { + thread.withKeys { keys => import scala.collection.JavaConverters._ ensureNotClosed() - val events = thread.keysWithEvents.map { k => - k -> k.pollEvents().asScala.asInstanceOf[Seq[WatchEvent[JPath]]] - } - thread.keysWithEvents.clear() + val events = keys.map(k => k -> k.pollEvents().asScala.asInstanceOf[Seq[WatchEvent[JPath]]]) + keys.clear() events.toMap } @@ -69,25 +64,52 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService { ensureNotClosed() val key = new PollingWatchKey(path, new java.util.ArrayList[WatchEvent[_]]) keys += path -> key + thread.setFileTimes(path) watched += path -> events key } + override def unregister(path: JPath): Unit = { + ensureNotClosed() + watched -= path + () + } + private def ensureNotClosed(): Unit = if (closed) throw new ClosedWatchServiceException private class PollingThread(delay: FiniteDuration) extends Thread { - private var fileTimes: Map[JPath, Long] = Map.empty - var initDone = false - val keysWithEvents = mutable.LinkedHashSet.empty[WatchKey] + private[this] val _keysWithEvents = mutable.LinkedHashSet.empty[WatchKey] + private[this] val _initDone = new AtomicBoolean(false) + private[this] var fileTimes: Map[JPath, Long] = Map.empty + + private[PollingWatchService] def withKeys[R](f: mutable.LinkedHashSet[WatchKey] => R): R = + _keysWithEvents.synchronized(f(_keysWithEvents)) + + @deprecated("The initDone variable should not be accessed externally", "1.1.17") + def initDone: Boolean = _initDone.get() + @deprecated("The initDone variable should not be set externally", "1.1.17") + def initDone_=(initDone: Boolean) = _initDone.set(initDone) + @deprecated("Use withKeys instead of directly accessing keysWithEvents", "1.1.17") + def keysWithEvents: mutable.LinkedHashSet[WatchKey] = _keysWithEvents override def run(): Unit = while (!closed) { populateEvents() - initDone = true + _initDone.synchronized { + _initDone.set(true) + _initDone.notify() + } Thread.sleep(delay.toMillis) } - + override def start(): Unit = { + super.start() + _initDone.synchronized { while (!_initDone.get()) _initDone.wait() } + } + private[PollingWatchService] def setFileTimes(path: JPath): Unit = { + val entries = path.toFile.allPaths.get.map(f => f.toPath -> IO.getModifiedTimeOrZero(f)) + fileTimes.synchronized(fileTimes ++= entries) + } def getFileTimes(): Map[JPath, Long] = { val results = mutable.Map.empty[JPath, Long] watched.toSeq.sortBy(_._1)(pathLengthOrdering).foreach { @@ -98,35 +120,40 @@ class PollingWatchService(delay: FiniteDuration) extends WatchService { results.toMap } - private def addEvent(path: JPath, ev: WatchEvent[JPath]): Unit = keysWithEvents.synchronized { + private def addEvent(path: JPath, ev: WatchEvent[JPath]): Unit = _keysWithEvents.synchronized { keys.get(path).foreach { k => - keysWithEvents += k + _keysWithEvents += k k.events.add(ev) - keysWithEvents.notifyAll() + _keysWithEvents.notifyAll() } } private def populateEvents(): Unit = { - val newFileTimes = getFileTimes() - val newFiles = newFileTimes.keySet - val oldFiles = fileTimes.keySet + val (deletedFiles, createdFiles, modifiedFiles) = fileTimes.synchronized { + val newFileTimes = getFileTimes() + val newFiles = newFileTimes.keySet + val oldFiles = fileTimes.keySet - val deletedFiles = (oldFiles -- newFiles).toSeq - val createdFiles = (newFiles -- oldFiles).toSeq + val deletedFiles = (oldFiles -- newFiles).toSeq + val createdFiles = (newFiles -- oldFiles).toSeq - val modifiedFiles = fileTimes.collect { - case (p, oldTime) if newFileTimes.getOrElse(p, 0L) > oldTime => p + val modifiedFiles = fileTimes.collect { + case (p, oldTime) if newFileTimes.getOrElse(p, 0L) > oldTime => p + } + fileTimes = newFileTimes + (deletedFiles, createdFiles, modifiedFiles) } - fileTimes = newFileTimes - deletedFiles.foreach { deleted => - val parent = deleted.getParent - if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_DELETE)) { - val ev = new PollingWatchEvent(parent.relativize(deleted), ENTRY_DELETE) - addEvent(parent, ev) + deletedFiles + .map { deleted => + val parent = deleted.getParent + if (watched.getOrElse(parent, Seq.empty).contains(ENTRY_DELETE)) { + val ev = new PollingWatchEvent(parent.relativize(deleted), ENTRY_DELETE) + addEvent(parent, ev) + } + deleted } - watched -= deleted - } + .foreach(watched -= _) createdFiles.sorted(pathLengthOrdering).foreach { case dir if Files.isDirectory(dir) => diff --git a/io/src/main/scala/sbt/io/WatchService.scala b/io/src/main/scala/sbt/io/WatchService.scala index 9c8ad8c2..438b1754 100644 --- a/io/src/main/scala/sbt/io/WatchService.scala +++ b/io/src/main/scala/sbt/io/WatchService.scala @@ -20,19 +20,23 @@ object WatchService { * Adapts a Java `WatchService` to be used with sbt's `WatchService` infrastructure. * @param service The `WatchService` to use. */ - implicit final class WatchServiceAdapter(service: JWatchService) extends WatchService { + implicit final class WatchServiceAdapter(service: JWatchService) + extends WatchService + with Unregisterable { private var closed: Boolean = false - private val registered: mutable.Buffer[WatchKey] = mutable.Buffer.empty + private val registered: mutable.Map[JPath, WatchKey] = mutable.Map.empty override def init(): Unit = () - override def pollEvents(): Map[WatchKey, Seq[WatchEvent[JPath]]] = - registered.flatMap { k => + override def pollEvents(): Map[WatchKey, Seq[WatchEvent[JPath]]] = { + val values = registered.synchronized(registered.values.toIndexedSeq) + values.flatMap { k => val events = k.pollEvents() if (events.isEmpty) None else Some((k, events.asScala.asInstanceOf[Seq[WatchEvent[JPath]]])) }.toMap + } @tailrec override def poll(timeout: Duration): WatchKey = @@ -48,12 +52,32 @@ object WatchService { override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = { if (closed) throw new ClosedWatchServiceException else { - val key = path.register(service, events: _*) - registered += key - key + registered.synchronized { + registered.get(path) match { + case None => + val key = path.register(service, events: _*) + registered += path -> key + key + case Some(key) => + key + } + } } } + override def unregister(path: JPath): Unit = { + if (closed) throw new ClosedWatchServiceException + registered.synchronized { + registered.get(path) match { + case Some(key) => + key.cancel() + registered -= path + case _ => + } + } + () + } + override def close(): Unit = { closed = true service.close() @@ -103,3 +127,12 @@ trait WatchService { */ def close(): Unit } + +trait Unregisterable { self: WatchService => + + /** + * Unregisters a monitored path. + * @param path The monitored path. + */ + def unregister(path: JPath): Unit +} diff --git a/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala index dba5354d..27af6f2f 100644 --- a/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala +++ b/io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala @@ -2,21 +2,17 @@ package sbt.internal.io import java.nio.file.FileSystems +import sbt.io.MacOSXWatchService + import scala.concurrent.duration._ +import scala.util.Properties object DefaultWatchServiceSpec { - // java.nio's default watch service is much slower on MacOS at the moment. - // We give it more time to detect changes. - val (pollDelay, maxWaitTime) = - Option(sys.props("os.name")) match { - case Some("Mac OS X") => (1.second, 15.seconds) - case _ => (50.milliseconds, 3.seconds) - } + val pollDelay = 100.milliseconds } class DefaultWatchServiceSpec extends SourceModificationWatchSpec( - FileSystems.getDefault.newWatchService, - DefaultWatchServiceSpec.pollDelay, - DefaultWatchServiceSpec.maxWaitTime + if (Properties.isMac) new MacOSXWatchService else FileSystems.getDefault.newWatchService, + DefaultWatchServiceSpec.pollDelay ) diff --git a/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala index 056a501f..54e72733 100644 --- a/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala +++ b/io/src/test/scala/sbt/internal/io/PollingWatchServiceSpec.scala @@ -5,8 +5,5 @@ import sbt.io.PollingWatchService import scala.concurrent.duration._ class PollingWatchServiceSpec - extends SourceModificationWatchSpec( - new PollingWatchService(500.milliseconds), - 500.milliseconds, - 3.seconds - ) + extends SourceModificationWatchSpec(new PollingWatchService(5.milliseconds), + DefaultWatchServiceSpec.pollDelay) diff --git a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala index ac5a849c..8140ea4b 100644 --- a/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala +++ b/io/src/test/scala/sbt/internal/io/SourceModificationWatchSpec.scala @@ -1,25 +1,26 @@ package sbt.internal.io +import java.io.IOException import java.nio.file.{ ClosedWatchServiceException, Paths } import org.scalatest.{ Assertion, FlatSpec, Matchers } import sbt.io.syntax._ import sbt.io.{ IO, SimpleFilter, WatchService } +import scala.annotation.tailrec import scala.concurrent.duration._ abstract class SourceModificationWatchSpec( getService: => WatchService, - pollDelay: FiniteDuration, - maxWait: FiniteDuration + pollDelay: FiniteDuration ) extends FlatSpec with Matchers { - + val maxWait = 2 * pollDelay it should "detect modified files" in IO.withTemporaryDirectory { dir => val parentDir = dir / "src" / "watchme" val file = parentDir / "Foo.scala" - IO.write(file, "foo") + writeNewFile(file, "foo") watchTest(parentDir) { IO.write(file, "bar") @@ -227,23 +228,33 @@ abstract class SourceModificationWatchSpec( dir => val parentDir = dir / "src" / "watchme" val subDir = parentDir / "subdir" - val service = getService IO.createDirectory(parentDir) - + val firstDeadline = maxWait.fromNow + val secondDeadline = (2 * maxWait).fromNow + var firstDeadlinePassed = false + + val tc = () => { + if (!firstDeadlinePassed) { + firstDeadlinePassed = firstDeadline.isOverdue() + firstDeadlinePassed + } else { + secondDeadline.isOverdue() + } + } + val monitor = defaultMonitor(getService, parentDir, tc = tc) try { - val initState = emptyState(service, parentDir) - val (triggered0, newState0) = watchTest(initState) { + val triggered0 = watchTest(monitor) { IO.createDirectory(subDir) } triggered0 shouldBe false - newState0.count shouldBe 1 + monitor.state().count shouldBe 1 - val (triggered1, newState1) = watchTest(newState0) { + val triggered1 = watchTest(monitor) { IO.delete(subDir) } triggered1 shouldBe false - newState1.count shouldBe 1 - } finally service.close() + monitor.state().count shouldBe 1 + } finally monitor.close() } it should "detect deletion of a directory containing watched files" in IO.withTemporaryDirectory { @@ -251,25 +262,90 @@ abstract class SourceModificationWatchSpec( val parentDir = dir / "src" / "watchme" val subDir = parentDir / "subdir" val src = subDir / "src.scala" - val service = getService IO.createDirectory(parentDir) + val monitor = defaultMonitor(getService, parentDir) try { - val initState = emptyState(service, parentDir) - val (triggered0, newState0) = watchTest(initState) { + val triggered0 = watchTest(monitor) { IO.createDirectory(subDir) IO.touch(src) } triggered0 shouldBe true - newState0.count shouldBe 2 + monitor.state().count shouldBe 2 - val (triggered1, newState1) = watchTest(newState0) { + val triggered1 = watchTest(monitor) { IO.delete(subDir) } triggered1 shouldBe true - newState1.count shouldBe 3 - } finally service.close() + monitor.state().count shouldBe 3 + } finally monitor.close() + } + + it should "not generate multiple events for the same file within anti-entropy period" in IO + .withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val file = parentDir / "Foo.scala" + + writeNewFile(file, "foo") + val monitor = defaultMonitor(getService, parentDir, antiEntropy = maxWait * 2) + try { + val triggered0 = watchTest(monitor) { + IO.write(file, "bar") + } + assert(triggered0) + assert(IO.read(file) == "bar") + + val triggered1 = watchTest(monitor) { + IO.write(file, "baz") + } + assert(!triggered1) + assert(IO.read(file) == "baz") + } finally monitor.close() + } + + it should "ignore valid files in non-recursive subdirectories" in IO.withTemporaryDirectory { + dir => + val file = dir / "src" / "Foo.scala" + val source = + Source(dir.toPath.toRealPath().toFile, "*.scala", new SimpleFilter(_.startsWith("."))) + .withRecursive(false) + val tc = defaultTerminationCondition + val monitor = addMonitor(WatchState.empty(getService, Seq(source)), 0.seconds, tc = tc()) + try { + val triggered = watchTest(monitor) { + IO.write(file, "foo") + } + assert(!triggered) + assert(IO.read(file) == "foo") + } finally monitor.close() + } + + it should "log triggered files" in IO.withTemporaryDirectory { dir => + val parentDir = dir / "src" / "watchme" + val file = parentDir / "Foo.scala" + + writeNewFile(file, "foo") + + val sources = Seq( + Source(parentDir.toPath.toRealPath().toFile, "*.scala", new SimpleFilter(_.startsWith(".")))) + var lines: Seq[String] = Nil + val logger = new EventMonitor.Logger { + override def debug(msg: => Any): Unit = { + lines :+= msg.toString + } + } + val tc = defaultTerminationCondition + val monitor = + EventMonitor(WatchState.empty(getService, sources), pollDelay, 0.seconds, tc(), logger) + try { + val triggered = watchTest(monitor) { + IO.write(file, "bar") + } + assert(triggered) + assert(monitor.state().count == 2) + assert(lines.exists(_.startsWith("Triggered"))) + } finally monitor.close() } "WatchService.poll" should "throw a `ClosedWatchServiceException` if used after `close`" in { @@ -290,32 +366,50 @@ abstract class SourceModificationWatchSpec( service.close() } - private def watchTest(initState: WatchState)(modifier: => Unit): (Boolean, WatchState) = { - var started = false - val deadline = maxWait.fromNow - val modThread = new Thread { override def run() = modifier } - SourceModificationWatch.watch(pollDelay, initState) { - if (!started) { - started = true - modThread.start() - } - deadline.isOverdue() - } + private def watchTest(eventMonitor: EventMonitor)(modifier: => Unit): Boolean = { + modifier + eventMonitor.awaitEvent() } private def watchTest(base: File, expectedTrigger: Boolean = true)( modifier: => Unit): Assertion = { - val service = getService + val monitor = defaultMonitor(getService, base) try { - val initState = emptyState(service, base) - val (triggered, _) = watchTest(initState)(modifier) + val triggered = watchTest(monitor)(modifier) triggered shouldBe expectedTrigger - } finally service.close() + } finally monitor.close() } - private def emptyState(service: WatchService, base: File): WatchState = { - val sources = Seq(Source(base, "*.scala", new SimpleFilter(_.startsWith(".")))) - WatchState.empty(service, sources).withCount(1) + private def defaultTerminationCondition: () => Boolean = { + lazy val deadline = maxWait.fromNow + () => + { + val res = deadline.isOverdue() + if (!res) Thread.sleep(5) + res + } + } + private def addMonitor(s: WatchState, + antiEntropy: FiniteDuration, + tc: => Boolean): EventMonitor = { + EventMonitor(s, pollDelay, antiEntropy, tc) + } + private def defaultMonitor(service: WatchService, + base: File, + antiEntropy: FiniteDuration = 0.milliseconds, + tc: () => Boolean = defaultTerminationCondition): EventMonitor = { + val sources = Seq( + Source(base.toPath.toRealPath().toFile, "*.scala", new SimpleFilter(_.startsWith(".")))) + addMonitor(WatchState.empty(service, sources), antiEntropy, tc()) } + @tailrec + private def writeNewFile(file: File, content: String, attempt: Int = 0): Unit = { + if (attempt == 0) IO.write(file, content) + // IO.setModifiedTimeOrFalse sometimes throws an invalid argument exception + val res = try { + IO.setModifiedTimeOrFalse(file, (Deadline.now - 5.seconds).timeLeft.toMillis) + } catch { case _: IOException if attempt < 10 => false } + if (!res) writeNewFile(file, content, attempt + 1) + } } diff --git a/io/src/test/scala/sbt/internal/io/SourceSpec.scala b/io/src/test/scala/sbt/internal/io/SourceSpec.scala new file mode 100644 index 00000000..2e045cd9 --- /dev/null +++ b/io/src/test/scala/sbt/internal/io/SourceSpec.scala @@ -0,0 +1,34 @@ +package sbt.internal.io + +import java.io.File +import java.nio.file.Paths + +import org.scalatest.{ FlatSpec, Matchers } +import sbt.io.{ AllPassFilter, NothingFilter, SimpleFileFilter } + +class SourceSpec extends FlatSpec with Matchers { + it should "accept recursive paths" in { + val source = new Source(new File("/foo"), AllPassFilter, NothingFilter, true) + source.accept(Paths.get("/foo/bar/baz")) shouldBe true + } + it should "reject subdirectories without recursive flag" in { + val source = new Source(new File("/foo"), AllPassFilter, NothingFilter, false) + source.accept(Paths.get("/foo/bar/baz")) shouldBe false + } + it should "apply include filter" in { + val source = new Source(new File("/foo"), + new SimpleFileFilter(_.toString.endsWith(".scala")), + NothingFilter, + true) + source.accept(Paths.get("/foo/bar/baz.scala")) shouldBe true + source.accept(Paths.get("/foo/bar/baz.java")) shouldBe false + } + it should "apply exclude filter" in { + val source = new Source(new File("/foo"), + new SimpleFileFilter(_.toString.endsWith(".scala")), + new SimpleFileFilter(_.toString == "/foo/bar/buzz.scala"), + true) + source.accept(Paths.get("/foo/bar/baz.scala")) shouldBe true + source.accept(Paths.get("/foo/bar/buzz.scala")) shouldBe false + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0c9426b1..30bdc6c9 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -13,5 +13,5 @@ object Dependencies { val scalatest = "org.scalatest" %% "scalatest" % "3.0.3" val jna = "net.java.dev.jna" % "jna" % "4.5.0" val jnaPlatform = "net.java.dev.jna" % "jna-platform" % "4.5.0" - val appleFileEvents = "com.swoval" % "apple-file-events" % "1.3.0" + val appleFileEvents = "com.swoval" % "apple-file-events" % "1.3.2" }