Skip to content

Commit

Permalink
Use Duration
Browse files Browse the repository at this point in the history
  • Loading branch information
Duhemm committed Jun 29, 2017
1 parent 3f7488b commit 75afc79
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 55 deletions.
15 changes: 8 additions & 7 deletions io/src/main/scala/sbt/internal/io/SourceModificationWatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import sbt.io.{ DirectoryFilter, FileFilter, WatchService }
import sbt.io.syntax._

import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration

private[sbt] object SourceModificationWatch {

Expand All @@ -18,7 +19,7 @@ private[sbt] object SourceModificationWatch {
* until changes are detected or `terminationCondition` evaluates to `true`.
*/
@tailrec
def watch(delayMillis: Long, state: WatchState)(terminationCondition: => Boolean): (Boolean, WatchState) = {
def watch(delay: FiniteDuration, state: WatchState)(terminationCondition: => Boolean): (Boolean, WatchState) = {
if (state.count == 0) (true, state.withCount(1))
else {
val events =
Expand All @@ -28,8 +29,8 @@ private[sbt] object SourceModificationWatch {
if (terminationCondition) {
(false, state)
} else {
Thread.sleep(delayMillis)
watch(delayMillis, state)(terminationCondition)
Thread.sleep(delay.toMillis)
watch(delay, state)(terminationCondition)
}
} else {
val previousFiles = state.registered.keySet
Expand All @@ -54,8 +55,8 @@ private[sbt] object SourceModificationWatch {
if (filteredCreated.nonEmpty || filteredDeleted.nonEmpty || filteredModified.nonEmpty) {
(true, newState.withCount(newState.count + 1))
} else {
Thread.sleep(delayMillis)
watch(delayMillis, newState)(terminationCondition)
Thread.sleep(delay.toMillis)
watch(delay, newState)(terminationCondition)
}
}
}
Expand All @@ -72,7 +73,7 @@ private[sbt] object SourceModificationWatch {
}

/** The state of the file watch. */
final class WatchState private (
private[sbt] final class WatchState private (
val count: Int,
private[sbt] val sources: Seq[Source],
service: WatchService,
Expand Down Expand Up @@ -150,7 +151,7 @@ final class Source(base: File, includeFilter: FileFilter, excludeFilter: FileFil
base.allPaths.get.map(_.toPath)
}

object WatchState {
private[sbt] object WatchState {
/** What events should be monitored */
val events: Array[WatchEvent.Kind[Path]] = Array(ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)

Expand Down
19 changes: 13 additions & 6 deletions io/src/main/scala/sbt/io/PollingWatchService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import java.util.{List => JList}

import sbt.io.syntax._
import scala.collection.mutable
import scala.concurrent.duration.{Duration, FiniteDuration}

/** A `WatchService` that polls the filesystem every `delayMs` milliseconds. */
class PollingWatchService(delayMs: Long) extends WatchService {
/** A `WatchService` that polls the filesystem every `delay`. */
class PollingWatchService(delay: FiniteDuration) extends WatchService {
private var closed: Boolean = false
private val thread: PollingThread = new PollingThread(delayMs)
private val thread: PollingThread = new PollingThread(delay)
private val keys: mutable.Map[JPath, PollingWatchKey] = mutable.Map.empty
private val pathLengthOrdering: Ordering[JPath] =
Ordering.fromLessThan {
Expand All @@ -33,8 +34,13 @@ class PollingWatchService(delayMs: Long) extends WatchService {
}
}

override def poll(timeoutMs: Long): WatchKey = thread.keysWithEvents.synchronized {
override def poll(timeout: Duration): WatchKey = thread.keysWithEvents.synchronized {
ensureNotClosed()
thread.keysWithEvents.synchronized {
if (thread.keysWithEvents.isEmpty) {
thread.keysWithEvents.wait(timeout.toMillis)
}
}
thread.keysWithEvents.headOption.map { k =>
thread.keysWithEvents -= k
k
Expand Down Expand Up @@ -62,7 +68,7 @@ class PollingWatchService(delayMs: Long) extends WatchService {
private def ensureNotClosed(): Unit =
if (closed) throw new ClosedWatchServiceException

private class PollingThread(delayMs: Long) extends Thread {
private class PollingThread(delay: FiniteDuration) extends Thread {
private var fileTimes: Map[JPath, Long] = Map.empty
var initDone = false
val keysWithEvents = mutable.LinkedHashSet.empty[WatchKey]
Expand All @@ -71,7 +77,7 @@ class PollingWatchService(delayMs: Long) extends WatchService {
while (!closed) {
populateEvents()
initDone = true
Thread.sleep(delayMs)
Thread.sleep(delay.toMillis)
}

def getFileTimes(): Map[JPath, Long] = {
Expand All @@ -88,6 +94,7 @@ class PollingWatchService(delayMs: Long) extends WatchService {
keys.get(path).foreach { k =>
keysWithEvents += k
k.events.add(ev)
keysWithEvents.notifyAll()
}
}

Expand Down
25 changes: 17 additions & 8 deletions io/src/main/scala/sbt/io/WatchService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package sbt.io
import java.nio.file.{ ClosedWatchServiceException, WatchEvent, WatchKey, Path => JPath, WatchService => JWatchService }
import java.util.concurrent.TimeUnit

import scala.annotation.tailrec
import scala.collection.mutable

import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration

object WatchService {

Expand All @@ -27,9 +28,16 @@ object WatchService {
else Some((k, events.asScala.asInstanceOf[Seq[WatchEvent[JPath]]]))
}.toMap

override def poll(timeoutMs: Long): WatchKey = {
service.poll(timeoutMs, TimeUnit.MILLISECONDS)
}
@tailrec
override def poll(timeout: Duration): WatchKey =
if (timeout.isFinite) {
service.poll(timeout.toMillis, TimeUnit.MILLISECONDS)
} else {
service.poll(1000L, TimeUnit.MILLISECONDS) match {
case null => poll(timeout)
case key => key
}
}

override def register(path: JPath, events: WatchEvent.Kind[JPath]*): WatchKey = {
if (closed) throw new ClosedWatchServiceException
Expand Down Expand Up @@ -69,11 +77,12 @@ trait WatchService {

/**
* Retrieves the next `WatchKey` that has a `WatchEvent` waiting. Waits
* up to `timeoutMs` milliseconds if no such key exists.
* @param timeoutMs Maximum time to wait, in milliseconds.
* @return The next `WatchKey` that received an event.
* until the `timeout` is expired is no such key exists.
* @param timeout Maximum time to wait
* @return The next `WatchKey` that received an event, or null if no such
* key exists.
*/
def poll(timeoutMs: Long): WatchKey
def poll(timeout: Duration): WatchKey

/**
* Registers a path to be monitored.
Expand Down
14 changes: 10 additions & 4 deletions io/src/test/scala/sbt/internal/io/DefaultWatchServiceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ package sbt.internal.io

import java.nio.file.FileSystems

import scala.concurrent.duration._

object DefaultWatchServiceSpec {
val (pollDelayMs, maxWaitTimeMs) =
// java.nio's default watch service is much slower on MacOS at the moment.
// We give it more time to detect changes.
val (pollDelay, maxWaitTime) =
Option(sys.props("os.name")) match {
case Some("Mac OS X") => (200L, 15000L)
case _ => (50L, 3000L)
case Some("Mac OS X") => (1.second, 15.seconds)
case _ => (50.milliseconds, 3.seconds)
}
}
class DefaultWatchServiceSpec extends SourceModificationWatchSpec(FileSystems.getDefault.newWatchService, DefaultWatchServiceSpec.pollDelayMs, DefaultWatchServiceSpec.maxWaitTimeMs)
class DefaultWatchServiceSpec extends SourceModificationWatchSpec(FileSystems.getDefault.newWatchService,
DefaultWatchServiceSpec.pollDelay,
DefaultWatchServiceSpec.maxWaitTime)

Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@ package sbt.internal.io

import sbt.io.PollingWatchService

class PollingWatchServiceSpec extends SourceModificationWatchSpec(new PollingWatchService(500L), 500L, 3000L)
import scala.concurrent.duration._

class PollingWatchServiceSpec extends SourceModificationWatchSpec(new PollingWatchService(500.milliseconds),
500.milliseconds,
3.seconds)

Loading

0 comments on commit 75afc79

Please sign in to comment.