Skip to content

Commit

Permalink
Limit running user containers on invokers.
Browse files Browse the repository at this point in the history
Adapt invoker-tests.
  • Loading branch information
cbickel committed Jun 19, 2018
1 parent 59eda69 commit bfff7cf
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 120 deletions.
2 changes: 0 additions & 2 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ invoker:
port: 12001
heap: "{{ invoker_heap | default('2g') }}"
arguments: "{{ invoker_arguments | default('') }}"
numcore: 2
coreshare: 2
userMemory: "{{ invoker_user_memory | default('4096 m') }}"
instances: "{{ groups['invokers'] | length }}"
# Specify if it is allowed to deploy more than 1 invoker on a single machine.
Expand Down
3 changes: 1 addition & 2 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,7 @@
"CONFIG_whisk_runtimes_localImagePrefix": "{{ runtimes_local_image_prefix | default() }}"
"CONFIG_whisk_containerFactory_containerArgs_network": "{{ invoker_container_network_name | default('bridge') }}"
"INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}"
"CONFIG_whisk_containerPool_numCore": "{{ invoker.numcore }}"
"CONFIG_whisk_containerPool_coreShare": "{{ invoker.coreshare }}"
"CONFIG_whisk_containerPool_userMemory": "{{ invoker.userMemory }}"
"CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc }}"
"INVOKER_NAME": "{{ groups['invokers'].index(inventory_hostname) }}"
"WHISK_LOGS_DIR": "{{ whisk_logs_dir }}"
Expand Down
2 changes: 0 additions & 2 deletions ansible/templates/whisk.properties.j2
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ controller.protocol={{ controller.protocol }}
invoker.container.network=bridge
invoker.container.policy={{ invoker_container_policy_name | default()}}
invoker.container.dns={{ invoker_container_network_dns_servers | default()}}
invoker.numcore={{ invoker.numcore }}
invoker.coreshare={{ invoker.coreshare }}
invoker.useRunc={{ invoker.useRunc }}

main.docker.endpoint={{ hostvars[groups["controllers"]|first].ansible_host }}:{{ docker.port }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,18 @@
package whisk.core.containerpool

import akka.actor.ActorSystem
import scala.concurrent.Future
import whisk.common.Logging
import whisk.common.TransactionId
import whisk.common.{Logging, TransactionId}
import whisk.core.WhiskConfig
import whisk.core.entity.ByteSize
import whisk.core.entity.ExecManifest
import whisk.core.entity.InstanceId
import whisk.core.entity.{ByteSize, ExecManifest, InstanceId}
import whisk.spi.Spi

import scala.concurrent.Future

case class ContainerArgsConfig(network: String,
dnsServers: Seq[String] = Seq.empty,
extraArgs: Map[String, Set[String]] = Map.empty)

case class ContainerPoolConfig(numCore: Int, coreShare: Int) {

/**
* The total number of containers is simply the number of cores dilated by the cpu sharing.
*/
def maxActiveContainers = numCore * coreShare
case class ContainerPoolConfig(userMemory: ByteSize) {

/**
* The shareFactor indicates the number of containers that would share a single core, on average.
Expand All @@ -45,7 +38,8 @@ case class ContainerPoolConfig(numCore: Int, coreShare: Int) {
* On an idle/underloaded system, a container will still get to use underutilized CPU shares.
*/
private val totalShare = 1024.0 // This is a pre-defined value coming from docker and not our hard-coded value.
def cpuShare = (totalShare / maxActiveContainers).toInt
// Grant more CPU to a container if it allocates more memory.
def cpuShare(reservedMemory: ByteSize) = (totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt
}

/**
Expand Down
5 changes: 0 additions & 5 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ whisk {
use-runc: true
}

container-pool {
num-core: 4 # used for computing --cpushares, and max number of containers allowed
core-share: 2 # used for computing --cpushares, and max number of containers allowed
}

kubernetes {
# Timeouts for k8s commands. Set to "Inf" to disable timeout.
timeouts {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package whisk.core.containerpool

import scala.collection.immutable
import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
import whisk.core.connector.MessageFeed
import whisk.core.entity._
import whisk.core.entity.size._
import whisk.core.connector.MessageFeed

import scala.collection.immutable
import scala.concurrent.duration._
import scala.util.Try

sealed trait WorkerState
case object Busy extends WorkerState
Expand Down Expand Up @@ -91,39 +92,46 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
// their requests and send them back to the pool for rescheduling (this may happen if "docker" operations
// fail for example, or a container has aged and was destroying itself when a new request was assigned)
case r: Run =>
val createdContainer = if (busyPool.size < poolConfig.maxActiveContainers) {

// Schedule a job to a warm container
ContainerPool
.schedule(r.action, r.msg.user.namespace.name, freePool)
.map(container => {
(container, "warm")
})
.orElse {
if (busyPool.size + freePool.size < poolConfig.maxActiveContainers) {
takePrewarmContainer(r.action)
.map(container => {
(container, "prewarmed")
})
.orElse {
Some(createContainer(), "cold")
}
} else None
}
.orElse {
// Remove a container and create a new one for the given job
ContainerPool.remove(freePool).map { toDelete =>
removeContainer(toDelete)
takePrewarmContainer(r.action)
.map(container => {
(container, "recreated")
})
.getOrElse {
(createContainer(), "recreated")
val createdContainer =
if (busyPool.map(_._2.memoryLimit.toMB).sum + r.action.limits.memory.megabytes <= poolConfig.userMemory.toMB) {
// Schedule a job to a warm container
ContainerPool
.schedule(r.action, r.msg.user.namespace.name, freePool)
.map(container => {
(container, "warm")
})
.orElse {
if (busyPool
.map(_._2.memoryLimit.toMB)
.sum + freePool.map(_._2.memoryLimit.toMB).sum < poolConfig.userMemory.toMB) {
takePrewarmContainer(r.action)
.map(container => {
(container, "prewarmed")
})
.orElse {
Some(createContainer(r.action.limits.memory.megabytes.MB), "cold")
}
} else None
}
.orElse {
// Remove a container and create a new one for the given job
ContainerPool
.remove(freePool, r.action.limits.memory.megabytes.MB)
.map(removeContainer)
// If the list had at least one entry, enough containers were removed to start the new container. After
// removing the containers, we are not interested anymore in the containers that have been removed.
.headOption
.map { _ =>
takePrewarmContainer(r.action)
.map(container => {
(container, "recreated")
})
.getOrElse {
(createContainer(r.action.limits.memory.megabytes.MB), "recreated")
}
}
}
}
} else None
} else None

createdContainer match {
case Some(((actor, data), containerState)) =>
Expand All @@ -139,9 +147,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
val retryLogDeadline = if (isErrorLogged) {
logging.error(
this,
s"Rescheduling Run message, too many message in the pool, freePoolSize: ${freePool.size}, " +
s"busyPoolSize: ${busyPool.size}, maxActiveContainers ${poolConfig.maxActiveContainers}, " +
s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}")(r.msg.transid)
s"Rescheduling Run message, too many message in the pool, " +
s"freePoolSize: ${freePool.size} containers and ${freePool.map(_._2.memoryLimit.toMB).sum} MB, " +
s"busyPoolSize: ${busyPool.size} containers and ${busyPool.map(_._2.memoryLimit.toMB).sum} MB, " +
s"maxContainersMemory ${poolConfig.userMemory}, " +
s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
s"needed memory: ${r.action.limits.memory.megabytes} MB")(r.msg.transid)
Some(logMessageInterval.fromNow)
} else {
r.retryLogDeadline
Expand Down Expand Up @@ -181,9 +192,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}

/** Creates a new container and updates state accordingly. */
def createContainer(): (ActorRef, ContainerData) = {
def createContainer(memoryLimit: ByteSize): (ActorRef, ContainerData) = {
val ref = childFactory(context)
val data = NoData()
val data = MemoryData(memoryLimit)
freePool = freePool + (ref -> data)
ref -> data
}
Expand Down Expand Up @@ -262,15 +273,24 @@ object ContainerPool {
* @param pool a map of all free containers in the pool
* @return a container to be removed iff found
*/
protected[containerpool] def remove[A](pool: Map[A, ContainerData]): Option[A] = {
protected[containerpool] def remove[A](pool: Map[A, ContainerData], memory: ByteSize): List[A] = {
val freeContainers = pool.collect {
// Only warm containers will be removed. Prewarmed containers will stay always.
case (ref, w: WarmedData) => ref -> w
}

if (freeContainers.nonEmpty) {
val (ref, _) = freeContainers.minBy(_._2.lastUsed)
Some(ref)
} else None
if (freeContainers.nonEmpty && freeContainers.map(_._2.memoryLimit.toMB).sum >= memory.toMB) {
if (memory > 0.B) {
val (ref, data) = freeContainers.minBy(_._2.lastUsed)
// Catch exception if remaining memory will be negative
val remainingMemory = Try(memory - data.memoryLimit).getOrElse(0.B)
List(ref) ++ remove(freeContainers.filterKeys(_ != ref), remainingMemory)
} else {
// Enough containers are found to get the memory, that is necessary. -> Abort recursion
List.empty
}
// else case: All containers are in use currently, or there is more memory needed than containers can be removed.
} else List.empty
}

def props(factory: ActorRefFactory => ActorRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,25 @@ package whisk.core.containerpool

import java.time.Instant

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Success
import scala.util.Failure
import akka.actor.FSM
import akka.actor.Props
import akka.actor.Stash
import akka.actor.{FSM, Props, Stash}
import akka.actor.Status.{Failure => FailureMessage}
import akka.event.Logging.InfoLevel
import akka.pattern.pipe
import spray.json._
import pureconfig.loadConfigOrThrow
import spray.json.DefaultJsonProtocol._
import spray.json._
import whisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId}
import whisk.core.ConfigKeys
import whisk.core.connector.ActivationMessage
import whisk.core.containerpool.logging.LogCollectingException
import whisk.core.entity.ExecManifest.ImageName
import whisk.core.entity._
import whisk.core.entity.size._
import whisk.core.entity.ExecManifest.ImageName
import whisk.http.Messages
import akka.event.Logging.InfoLevel
import pureconfig.loadConfigOrThrow
import whisk.core.ConfigKeys

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}

// States
sealed trait ContainerState
Expand All @@ -53,14 +51,16 @@ case object Paused extends ContainerState
case object Removing extends ContainerState

// Data
sealed abstract class ContainerData(val lastUsed: Instant)
case class NoData() extends ContainerData(Instant.EPOCH)
case class PreWarmedData(container: Container, kind: String, memoryLimit: ByteSize) extends ContainerData(Instant.EPOCH)
sealed abstract class ContainerData(val lastUsed: Instant, val memoryLimit: ByteSize)
case class NoData() extends ContainerData(Instant.EPOCH, 0.B)
case class MemoryData(override val memoryLimit: ByteSize) extends ContainerData(Instant.EPOCH, memoryLimit)
case class PreWarmedData(container: Container, kind: String, override val memoryLimit: ByteSize)
extends ContainerData(Instant.EPOCH, memoryLimit)
case class WarmedData(container: Container,
invocationNamespace: EntityName,
action: ExecutableWhiskAction,
override val lastUsed: Instant)
extends ContainerData(lastUsed)
extends ContainerData(lastUsed, action.limits.memory.megabytes.MB)

// Events received by the actor
case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
Expand Down Expand Up @@ -119,7 +119,7 @@ class ContainerProxy(
job.exec.image,
job.exec.pull,
job.memoryLimit,
poolConfig.cpuShare)
poolConfig.cpuShare(job.memoryLimit))
.map(container => PreWarmedData(container, job.exec.kind, job.memoryLimit))
.pipeTo(self)

Expand All @@ -136,7 +136,7 @@ class ContainerProxy(
job.action.exec.image,
job.action.exec.pull,
job.action.limits.memory.megabytes.MB,
poolConfig.cpuShare)
poolConfig.cpuShare(job.action.limits.memory.megabytes.MB))

// container factory will either yield a new container ready to execute the action, or
// starting up the container failed; for the latter, it's either an internal error starting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,22 @@ import akka.event.Logging.InfoLevel
import akka.stream.ActorMaterializer
import org.apache.kafka.common.errors.RecordTooLargeException
import pureconfig._
import spray.json.DefaultJsonProtocol._
import spray.json._
import whisk.common._
import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.core.connector._
import whisk.core.containerpool._
import whisk.core.containerpool.logging.LogStoreProvider
import whisk.core.database._
import whisk.core.entity._
import whisk.core.entity.size._
import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.http.Messages
import whisk.spi.SpiLoader

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import DefaultJsonProtocol._

class InvokerReactive(
config: WhiskConfig,
Expand Down Expand Up @@ -97,7 +98,7 @@ class InvokerReactive(

/** Initialize message consumers */
private val topic = s"invoker${instance.toInt}"
private val maximumContainers = poolConfig.maxActiveContainers
private val maximumContainers = (poolConfig.userMemory / MemoryLimit.minMemory).toInt
private val msgProvider = SpiLoader.get[MessagingProvider]
private val consumer = msgProvider.getConsumer(
config,
Expand Down
Loading

0 comments on commit bfff7cf

Please sign in to comment.