Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2963 from mesosphere/pk/2957-less-use-of-io-threa…
Browse files Browse the repository at this point in the history
…dpool

Fixes #2957 - improve usage of io threadpool
  • Loading branch information
gkleiman committed Jan 13, 2016
2 parents afd82a2 + 56e1ad5 commit 9325b39
Show file tree
Hide file tree
Showing 30 changed files with 96 additions and 42 deletions.
47 changes: 47 additions & 0 deletions src/main/scala/mesosphere/marathon/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,60 @@ class MarathonApp extends App {
override lazy val conf = new AllConf(args)

def runDefault(): Unit = {
setConcurrentContextDefaults()

log.info(s"Starting Marathon ${BuildInfo.version} with ${args.mkString(" ")}")

run(
classOf[HttpService],
classOf[MarathonSchedulerService],
classOf[MetricsReporterService]
)
}

/**
* Make sure that we have more than one thread -- otherwise some unmarked blocking operations might cause trouble.
*
* See
* [The Global Execution
* Context](http://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context)
* in the scala documentation.
*
* Here is the relevant excerpt in case the link gets broken:
*
* # The Global Execution Context
*
* ExecutionContext.global is an ExecutionContext backed by a ForkJoinPool. It should be sufficient for most
* situations but requires some care. A ForkJoinPool manages a limited amount of threads (the maximum amount of
* thread being referred to as parallelism level). The number of concurrently blocking computations can exceed the
* parallelism level only if each blocking call is wrapped inside a blocking call (more on that below). Otherwise,
* there is a risk that the thread pool in the global execution context is starved, and no computation can proceed.
*
* By default the ExecutionContext.global sets the parallelism level of its underlying fork-join pool to the amount
* of available processors (Runtime.availableProcessors). This configuration can be overriden by setting one
* (or more) of the following VM attributes:
*
* scala.concurrent.context.minThreads - defaults to Runtime.availableProcessors
* scala.concurrent.context.numThreads - can be a number or a multiplier (N) in the form ‘xN’ ;
* defaults to Runtime.availableProcessors
* scala.concurrent.context.maxThreads - defaults to Runtime.availableProcessors
*
* The parallelism level will be set to numThreads as long as it remains within [minThreads; maxThreads].
*
* As stated above the ForkJoinPool can increase the amount of threads beyond its parallelismLevel in the
* presence of blocking computation.
*/
private[this] def setConcurrentContextDefaults(): Unit = {
def setIfNotDefined(property: String, value: String): Unit = {
if (!sys.props.contains(property)) {
sys.props += property -> value
}
}

setIfNotDefined("scala.concurrent.context.minThreads", "5")
setIfNotDefined("scala.concurrent.context.numThreads", "x2")
setIfNotDefined("scala.concurrent.context.maxThreads", "64")
}
}

object Main extends MarathonApp {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/mesosphere/marathon/MarathonScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class MarathonScheduler @Inject() (

private[this] val log = LoggerFactory.getLogger(getClass.getName)

import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global

implicit val zkTimeout = config.zkTimeoutDuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class MarathonSchedulerService @Inject() (
leadershipCallbacks: Seq[LeadershipCallback] = Seq.empty)
extends AbstractExecutionThreadService with Leader with LeadershipAbdication {

import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global

implicit val zkTimeout = config.zkTimeoutDuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class AppsResource @Inject() (
val authorizer: Authorizer,
groupManager: GroupManager) extends RestResource with AuthResource {

import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global

private[this] val log = LoggerFactory.getLogger(getClass)
private[this] val ListApps = """^((?:.+/)|)\*$""".r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import mesosphere.marathon.state.PathId._
import mesosphere.marathon.state._
import mesosphere.marathon.upgrade.DeploymentPlan
import mesosphere.marathon.{ ConflictingChangeException, MarathonConf }
import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global
import play.api.libs.json.{ Json, Writes }

@Path("v2/groups")
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/mesosphere/marathon/api/v2/TasksResource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import play.api.libs.json.Json

import scala.collection.IterableView
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.{ ExecutionContext, Future }

@Path("v2/tasks")
class TasksResource @Inject() (
Expand All @@ -39,7 +39,7 @@ class TasksResource @Inject() (
val authorizer: Authorizer) extends AuthResource {

val log = LoggerFactory.getLogger(getClass.getName)
implicit val ec = ThreadPoolContext.context
implicit val ec = ExecutionContext.Implicits.global

@GET
@Produces(Array(MarathonMediaType.PREFERRED_APPLICATION_JSON))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ class HttpEventStreamHandleActor(

private[this] def sendAllMessages(): Unit = {
if (outstanding.nonEmpty) {
implicit val ec = ThreadPoolContext.context
val toSend = outstanding.reverse
outstanding = List.empty[MarathonEvent]
context.become(stashEvents)
Future {
val sendFuture = Future {
toSend.foreach(event => handle.sendEvent(event.eventType, Json.stringify(eventToJson(event))))
WorkDone
} pipeTo self
}(ThreadPoolContext.ioContext)

import context.dispatcher
sendFuture pipeTo self
}
else {
context.become(waitForEvent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import mesosphere.marathon.tasks.MarathonTasks

import akka.actor.{ Actor, ActorLogging, PoisonPill }
import akka.util.Timeout
import mesosphere.util.ThreadPoolContext

import spray.http._
import spray.client.pipelining._
Expand All @@ -29,7 +30,7 @@ class HealthCheckWorkerActor extends Actor with ActorLogging {
import HealthCheckWorker._

implicit val system = context.system
implicit val ec = mesosphere.util.ThreadPoolContext.context // execution context for futures
import context.dispatcher // execution context for futures

def receive: Receive = {
case HealthCheckJob(app, task, check) =>
Expand Down Expand Up @@ -114,10 +115,12 @@ class HealthCheckWorkerActor extends Actor with ActorLogging {
Future {
val address = new InetSocketAddress(host, port)
val socket = new Socket
socket.connect(address, timeoutMillis)
socket.close()
scala.concurrent.blocking {
socket.connect(address, timeoutMillis)
socket.close()
}
Some(Healthy(task.getId, task.getVersion, Timestamp.now()))
}
}(ThreadPoolContext.ioContext)
}

def https(app: AppDefinition, task: MarathonTask, check: HealthCheck, port: Int): Future[Option[HealthResult]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import mesosphere.marathon.state.{ AppDefinition, AppRepository, PathId, Timesta
import mesosphere.marathon.tasks.{ TaskIdUtil }
import mesosphere.marathon.{ ZookeeperConf, MarathonScheduler, MarathonSchedulerDriverHolder }
import mesosphere.util.RWLock
import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.mesos.Protos.TaskStatus

import scala.collection.immutable.{ Map, Seq }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package mesosphere.marathon.io

import java.net.URL
import java.util.UUID
import scala.concurrent.Future

import mesosphere.marathon.CanceledActionException
import mesosphere.marathon.io.storage.StorageProvider
import mesosphere.util.Logging
import mesosphere.util.ThreadPoolContext.context
import mesosphere.util.{ Logging, ThreadPoolContext }

import scala.concurrent.Future

/**
* Download given url to given path of given storage provider.
Expand Down Expand Up @@ -38,7 +38,7 @@ final class CancelableDownload(val url: URL, val provider: StorageProvider, val
throw new CanceledActionException(s"Download of $path from $url has been canceled")
}
this
}
}(ThreadPoolContext.ioContext)

override def hashCode(): Int = url.hashCode()
override def equals(other: Any): Boolean = other match {
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/mesosphere/marathon/io/PathFun.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import scala.concurrent.Future

import org.apache.commons.io.FilenameUtils.getName

import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global

trait PathFun {

Expand Down Expand Up @@ -40,7 +40,8 @@ trait PathFun {
http
case other: URLConnection => other
}
connection.getHeaderFields.asScala.toMap.map { case (key, list) => (key, list.asScala.toList) }
scala.concurrent.blocking(connection.getHeaderFields)
.asScala.toMap.map { case (key, list) => (key, list.asScala.toList) }
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mesosphere.marathon.state

import mesosphere.marathon.metrics.Metrics
import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.Future

/**
Expand All @@ -22,6 +21,7 @@ class AppRepository(
val maxVersions: Option[Int] = None,
val metrics: Metrics)
extends EntityRepository[AppDefinition] {
import scala.concurrent.ExecutionContext.Implicits.global

def allPathIds(): Future[Iterable[PathId]] = allIds().map(_.map(PathId.fromSafePath))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class DeploymentRepository(
val metrics: Metrics)
extends EntityRepository[DeploymentPlan] with StateMetrics {

import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global

def store(plan: DeploymentPlan): Future[DeploymentPlan] =
storeWithVersion(plan.id, plan.version, plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package mesosphere.marathon.state
import scala.concurrent.Future

trait EntityRepository[T <: MarathonState[_, T]] extends StateMetrics with VersionedEntry {
import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global

protected def store: EntityStore[T]
protected def maxVersions: Option[Int]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class EntityStoreCache[T <: MarathonState[_, T]](store: EntityStore[T])

@volatile
private[state] var cacheOpt: Option[TrieMap[String, Option[T]]] = None
private[this] implicit val ec = ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global
private[this] val log = LoggerFactory.getLogger(getClass)

override def fetch(key: String): Future[Option[T]] = directOrCached(store.fetch(key)) { cache =>
Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/mesosphere/marathon/state/GroupManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import akka.event.EventStream
import com.google.inject.Singleton

import mesosphere.marathon.Protos.MarathonTask
import mesosphere.marathon.api.v2.json.V2Group
import mesosphere.marathon.event.{ EventModule, GroupChangeFailed, GroupChangeSuccess }
import mesosphere.marathon.io.PathFun
import mesosphere.marathon.io.storage.StorageProvider
import mesosphere.marathon.upgrade._
import mesosphere.marathon.{ MarathonConf, MarathonSchedulerService, ModuleNames, PortRangeExhaustedException }
import mesosphere.util.SerializeExecution
import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global
import org.slf4j.LoggerFactory

import scala.collection.immutable.Seq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class MarathonStore[S <: MarathonState[_, S]](
newState: () => S,
prefix: String)(implicit ct: ClassTag[S]) extends EntityStore[S] {

import ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global
private[this] val log = LoggerFactory.getLogger(getClass)

private[this] lazy val lockManager = LockManager.create()
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/mesosphere/marathon/state/Migration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import mesosphere.marathon.metrics.Metrics
import mesosphere.marathon.state.StorageVersions._
import mesosphere.marathon.{ BuildInfo, MarathonConf, MigrationFailedException }
import mesosphere.util.Logging
import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global
import mesosphere.util.state.{ PersistentStore, PersistentStoreManagement }
import org.slf4j.LoggerFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package mesosphere.marathon.state

import mesosphere.marathon.Protos.MarathonTask
import mesosphere.marathon.metrics.Metrics
import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Future

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ class ResolveArtifactsActor(
with Logging {

import mesosphere.marathon.upgrade.ResolveArtifactsActor.DownloadFinished
import mesosphere.util.ThreadPoolContext.{ context => executionContext }

// all downloads that have to be performed by this actor
var downloads = url2Path.map { case (url, path) => new CancelableDownload(url, storage, path) }

override def preStart(): Unit = {
import context.dispatcher
downloads.map(_.get.map(DownloadFinished) pipeTo self)
if (url2Path.isEmpty) promise.success(true) // handle empty list
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/mesosphere/util/ThreadPoolContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import java.util.concurrent.Executors

object ThreadPoolContext {

private val numberOfThreads: Int = System.getProperty("numberOfIoThreads", "100").toInt

/**
* This execution context is backed by a cached thread pool.
* Use this context instead of the global execution context,
* if you do blocking IO operations.
*/
implicit lazy val context = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
implicit lazy val ioContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(numberOfThreads))

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.concurrent.{ ExecutionContext, Future }
* This is intended only for tests - do not use in production!
* @param ec the execution context to use.
*/
class InMemoryStore(implicit val ec: ExecutionContext = ThreadPoolContext.context) extends PersistentStore {
class InMemoryStore(implicit val ec: ExecutionContext = ExecutionContext.Implicits.global) extends PersistentStore {

private[this] val entities = TrieMap.empty[ID, InMemoryEntity]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import org.apache.mesos.state.{ State, Variable }
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

class MesosStateStore(state: State, timeout: Duration) extends PersistentStore {

private[this] val log = LoggerFactory.getLogger(getClass)
implicit val timeoutDuration = Timeout(timeout)
implicit val ec = ThreadPoolContext.context
implicit val ec = ExecutionContext.Implicits.global
import mesosphere.util.BackToTheFuture.futureToFuture

override def load(key: ID): Future[Option[PersistentEntity]] = {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/mesosphere/util/state/zk/ZKStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.{ NoNodeException, NodeExistsException }
import org.slf4j.LoggerFactory

import scala.concurrent.{ Future, Promise }
import scala.concurrent.{ ExecutionContext, Future, Promise }

case class CompressionConf(enabled: Boolean, sizeLimit: Long)

class ZKStore(val client: ZkClient, root: ZNode, compressionConf: CompressionConf) extends PersistentStore
with PersistentStoreManagement {

private[this] val log = LoggerFactory.getLogger(getClass)
private[this] implicit val ec = ThreadPoolContext.context
private[this] implicit val ec = ExecutionContext.Implicits.global

/**
* Fetch data and return entity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class HealthCheckWorkerActorTest
with Matchers {

import HealthCheckWorker._
import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global

test("A TCP health check should correctly resolve the hostname") {
val socket = new ServerSocket(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import scala.concurrent.Await._
import scala.concurrent.duration._

class AppMock(appId: String, version: String, url: String) extends AbstractHandler {
import mesosphere.util.ThreadPoolContext.context
import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem()
val pipeline = sendReceive
Expand Down
Loading

0 comments on commit 9325b39

Please sign in to comment.