diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 5c349c06586..3d47daf1733 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -159,8 +159,6 @@ invoker: port: 12001 heap: "{{ invoker_heap | default('2g') }}" arguments: "{{ invoker_arguments | default('') }}" - numcore: 2 - coreshare: 2 userMemory: "{{ invoker_user_memory | default('4096 m') }}" instances: "{{ groups['invokers'] | length }}" # Specify if it is allowed to deploy more than 1 invoker on a single machine. diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 0f8fd1e783c..9d1f5f1b911 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -195,8 +195,7 @@ "CONFIG_whisk_runtimes_localImagePrefix": "{{ runtimes_local_image_prefix | default() }}" "CONFIG_whisk_containerFactory_containerArgs_network": "{{ invoker_container_network_name | default('bridge') }}" "INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}" - "CONFIG_whisk_containerPool_numCore": "{{ invoker.numcore }}" - "CONFIG_whisk_containerPool_coreShare": "{{ invoker.coreshare }}" + "CONFIG_whisk_containerPool_userMemory": "{{ invoker.userMemory }}" "CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc }}" "INVOKER_NAME": "{{ groups['invokers'].index(inventory_hostname) }}" "WHISK_LOGS_DIR": "{{ whisk_logs_dir }}" diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2 index c3f4f6bbbd4..2f705fc1fdf 100644 --- a/ansible/templates/whisk.properties.j2 +++ b/ansible/templates/whisk.properties.j2 @@ -61,8 +61,6 @@ controller.protocol={{ controller.protocol }} invoker.container.network=bridge invoker.container.policy={{ invoker_container_policy_name | default()}} invoker.container.dns={{ invoker_container_network_dns_servers | default()}} -invoker.numcore={{ invoker.numcore }} -invoker.coreshare={{ invoker.coreshare }} invoker.useRunc={{ invoker.useRunc }} main.docker.endpoint={{ hostvars[groups["controllers"]|first].ansible_host }}:{{ docker.port }} diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala index 35d3b8bb0cf..26b5b3d77b3 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala @@ -18,25 +18,18 @@ package whisk.core.containerpool import akka.actor.ActorSystem -import scala.concurrent.Future -import whisk.common.Logging -import whisk.common.TransactionId +import whisk.common.{Logging, TransactionId} import whisk.core.WhiskConfig -import whisk.core.entity.ByteSize -import whisk.core.entity.ExecManifest -import whisk.core.entity.InstanceId +import whisk.core.entity.{ByteSize, ExecManifest, InstanceId} import whisk.spi.Spi +import scala.concurrent.Future + case class ContainerArgsConfig(network: String, dnsServers: Seq[String] = Seq.empty, extraArgs: Map[String, Set[String]] = Map.empty) -case class ContainerPoolConfig(numCore: Int, coreShare: Int) { - - /** - * The total number of containers is simply the number of cores dilated by the cpu sharing. - */ - def maxActiveContainers = numCore * coreShare +case class ContainerPoolConfig(userMemory: ByteSize) { /** * The shareFactor indicates the number of containers that would share a single core, on average. @@ -45,7 +38,8 @@ case class ContainerPoolConfig(numCore: Int, coreShare: Int) { * On an idle/underloaded system, a container will still get to use underutilized CPU shares. */ private val totalShare = 1024.0 // This is a pre-defined value coming from docker and not our hard-coded value. - def cpuShare = (totalShare / maxActiveContainers).toInt + // Grant more CPU to a container if it allocates more memory. + def cpuShare(reservedMemory: ByteSize) = (totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt } /** diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index ebd45e58387..deb76065a5c 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -26,11 +26,6 @@ whisk { use-runc: true } - container-pool { - num-core: 4 # used for computing --cpushares, and max number of containers allowed - core-share: 2 # used for computing --cpushares, and max number of containers allowed - } - kubernetes { # Timeouts for k8s commands. Set to "Inf" to disable timeout. timeouts { diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala index 90f5d734569..a0e48a925fb 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala @@ -17,14 +17,15 @@ package whisk.core.containerpool -import scala.collection.immutable -import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId} import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} +import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId} +import whisk.core.connector.MessageFeed import whisk.core.entity._ import whisk.core.entity.size._ -import whisk.core.connector.MessageFeed +import scala.collection.immutable import scala.concurrent.duration._ +import scala.util.Try sealed trait WorkerState case object Busy extends WorkerState @@ -91,39 +92,46 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, // their requests and send them back to the pool for rescheduling (this may happen if "docker" operations // fail for example, or a container has aged and was destroying itself when a new request was assigned) case r: Run => - val createdContainer = if (busyPool.size < poolConfig.maxActiveContainers) { - - // Schedule a job to a warm container - ContainerPool - .schedule(r.action, r.msg.user.namespace.name, freePool) - .map(container => { - (container, "warm") - }) - .orElse { - if (busyPool.size + freePool.size < poolConfig.maxActiveContainers) { - takePrewarmContainer(r.action) - .map(container => { - (container, "prewarmed") - }) - .orElse { - Some(createContainer(), "cold") - } - } else None - } - .orElse { - // Remove a container and create a new one for the given job - ContainerPool.remove(freePool).map { toDelete => - removeContainer(toDelete) - takePrewarmContainer(r.action) - .map(container => { - (container, "recreated") - }) - .getOrElse { - (createContainer(), "recreated") + val createdContainer = + if (busyPool.map(_._2.memoryLimit.toMB).sum + r.action.limits.memory.megabytes <= poolConfig.userMemory.toMB) { + // Schedule a job to a warm container + ContainerPool + .schedule(r.action, r.msg.user.namespace.name, freePool) + .map(container => { + (container, "warm") + }) + .orElse { + if (busyPool + .map(_._2.memoryLimit.toMB) + .sum + freePool.map(_._2.memoryLimit.toMB).sum < poolConfig.userMemory.toMB) { + takePrewarmContainer(r.action) + .map(container => { + (container, "prewarmed") + }) + .orElse { + Some(createContainer(r.action.limits.memory.megabytes.MB), "cold") + } + } else None + } + .orElse { + // Remove a container and create a new one for the given job + ContainerPool + .remove(freePool, r.action.limits.memory.megabytes.MB) + .map(removeContainer) + // If the list had at least one entry, enough containers were removed to start the new container. After + // removing the containers, we are not interested anymore in the containers that have been removed. + .headOption + .map { _ => + takePrewarmContainer(r.action) + .map(container => { + (container, "recreated") + }) + .getOrElse { + (createContainer(r.action.limits.memory.megabytes.MB), "recreated") + } } } - } - } else None + } else None createdContainer match { case Some(((actor, data), containerState)) => @@ -139,9 +147,12 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, val retryLogDeadline = if (isErrorLogged) { logging.error( this, - s"Rescheduling Run message, too many message in the pool, freePoolSize: ${freePool.size}, " + - s"busyPoolSize: ${busyPool.size}, maxActiveContainers ${poolConfig.maxActiveContainers}, " + - s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}")(r.msg.transid) + s"Rescheduling Run message, too many message in the pool, " + + s"freePoolSize: ${freePool.size} containers and ${freePool.map(_._2.memoryLimit.toMB).sum} MB, " + + s"busyPoolSize: ${busyPool.size} containers and ${busyPool.map(_._2.memoryLimit.toMB).sum} MB, " + + s"maxContainersMemory ${poolConfig.userMemory}, " + + s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " + + s"needed memory: ${r.action.limits.memory.megabytes} MB")(r.msg.transid) Some(logMessageInterval.fromNow) } else { r.retryLogDeadline @@ -181,9 +192,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, } /** Creates a new container and updates state accordingly. */ - def createContainer(): (ActorRef, ContainerData) = { + def createContainer(memoryLimit: ByteSize): (ActorRef, ContainerData) = { val ref = childFactory(context) - val data = NoData() + val data = MemoryData(memoryLimit) freePool = freePool + (ref -> data) ref -> data } @@ -262,15 +273,24 @@ object ContainerPool { * @param pool a map of all free containers in the pool * @return a container to be removed iff found */ - protected[containerpool] def remove[A](pool: Map[A, ContainerData]): Option[A] = { + protected[containerpool] def remove[A](pool: Map[A, ContainerData], memory: ByteSize): List[A] = { val freeContainers = pool.collect { + // Only warm containers will be removed. Prewarmed containers will stay always. case (ref, w: WarmedData) => ref -> w } - if (freeContainers.nonEmpty) { - val (ref, _) = freeContainers.minBy(_._2.lastUsed) - Some(ref) - } else None + if (freeContainers.nonEmpty && freeContainers.map(_._2.memoryLimit.toMB).sum >= memory.toMB) { + if (memory > 0.B) { + val (ref, data) = freeContainers.minBy(_._2.lastUsed) + // Catch exception if remaining memory will be negative + val remainingMemory = Try(memory - data.memoryLimit).getOrElse(0.B) + List(ref) ++ remove(freeContainers.filterKeys(_ != ref), remainingMemory) + } else { + // Enough containers are found to get the memory, that is necessary. -> Abort recursion + List.empty + } + // else case: All containers are in use currently, or there is more memory needed than containers can be removed. + } else List.empty } def props(factory: ActorRefFactory => ActorRef, diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index 0a7c1aca295..688f30d636b 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -19,27 +19,25 @@ package whisk.core.containerpool import java.time.Instant -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.util.Success -import scala.util.Failure -import akka.actor.FSM -import akka.actor.Props -import akka.actor.Stash +import akka.actor.{FSM, Props, Stash} import akka.actor.Status.{Failure => FailureMessage} +import akka.event.Logging.InfoLevel import akka.pattern.pipe -import spray.json._ +import pureconfig.loadConfigOrThrow import spray.json.DefaultJsonProtocol._ +import spray.json._ import whisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId} +import whisk.core.ConfigKeys import whisk.core.connector.ActivationMessage import whisk.core.containerpool.logging.LogCollectingException +import whisk.core.entity.ExecManifest.ImageName import whisk.core.entity._ import whisk.core.entity.size._ -import whisk.core.entity.ExecManifest.ImageName import whisk.http.Messages -import akka.event.Logging.InfoLevel -import pureconfig.loadConfigOrThrow -import whisk.core.ConfigKeys + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Success} // States sealed trait ContainerState @@ -53,14 +51,16 @@ case object Paused extends ContainerState case object Removing extends ContainerState // Data -sealed abstract class ContainerData(val lastUsed: Instant) -case class NoData() extends ContainerData(Instant.EPOCH) -case class PreWarmedData(container: Container, kind: String, memoryLimit: ByteSize) extends ContainerData(Instant.EPOCH) +sealed abstract class ContainerData(val lastUsed: Instant, val memoryLimit: ByteSize) +case class NoData() extends ContainerData(Instant.EPOCH, 0.B) +case class MemoryData(override val memoryLimit: ByteSize) extends ContainerData(Instant.EPOCH, memoryLimit) +case class PreWarmedData(container: Container, kind: String, override val memoryLimit: ByteSize) + extends ContainerData(Instant.EPOCH, memoryLimit) case class WarmedData(container: Container, invocationNamespace: EntityName, action: ExecutableWhiskAction, override val lastUsed: Instant) - extends ContainerData(lastUsed) + extends ContainerData(lastUsed, action.limits.memory.megabytes.MB) // Events received by the actor case class Start(exec: CodeExec[_], memoryLimit: ByteSize) @@ -119,7 +119,7 @@ class ContainerProxy( job.exec.image, job.exec.pull, job.memoryLimit, - poolConfig.cpuShare) + poolConfig.cpuShare(job.memoryLimit)) .map(container => PreWarmedData(container, job.exec.kind, job.memoryLimit)) .pipeTo(self) @@ -136,7 +136,7 @@ class ContainerProxy( job.action.exec.image, job.action.exec.pull, job.action.limits.memory.megabytes.MB, - poolConfig.cpuShare) + poolConfig.cpuShare(job.action.limits.memory.megabytes.MB)) // container factory will either yield a new container ready to execute the action, or // starting up the container failed; for the latter, it's either an internal error starting diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index 4ad6bbf988b..e7411148401 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -25,21 +25,22 @@ import akka.event.Logging.InfoLevel import akka.stream.ActorMaterializer import org.apache.kafka.common.errors.RecordTooLargeException import pureconfig._ +import spray.json.DefaultJsonProtocol._ import spray.json._ import whisk.common._ -import whisk.core.{ConfigKeys, WhiskConfig} import whisk.core.connector._ import whisk.core.containerpool._ import whisk.core.containerpool.logging.LogStoreProvider import whisk.core.database._ import whisk.core.entity._ +import whisk.core.entity.size._ +import whisk.core.{ConfigKeys, WhiskConfig} import whisk.http.Messages import whisk.spi.SpiLoader -import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} -import DefaultJsonProtocol._ class InvokerReactive( config: WhiskConfig, @@ -97,7 +98,7 @@ class InvokerReactive( /** Initialize message consumers */ private val topic = s"invoker${instance.toInt}" - private val maximumContainers = poolConfig.maxActiveContainers + private val maximumContainers = (poolConfig.userMemory / MemoryLimit.minMemory).toInt private val msgProvider = SpiLoader.get[MessagingProvider] private val consumer = msgProvider.getConsumer( config, diff --git a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala index 51d899c865a..69aa606bb03 100644 --- a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala +++ b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala @@ -78,9 +78,10 @@ class MesosContainerFactoryTest lastTaskId } - val poolConfig = ContainerPoolConfig(8, 10) - val dockerCpuShares = poolConfig.cpuShare - val mesosCpus = poolConfig.cpuShare / 1024.0 + // 80 slots, each 265MB + val poolConfig = ContainerPoolConfig(21200.MB) + val actionMemory = 265.MB + val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0 val containerArgsConfig = new ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))) @@ -134,8 +135,8 @@ class MesosContainerFactoryTest "mesosContainer", ImageName("fakeImage"), false, - 1.MB, - poolConfig.cpuShare) + actionMemory, + poolConfig.cpuShare(actionMemory)) expectMsg( SubmitTask(TaskDef( @@ -143,7 +144,7 @@ class MesosContainerFactoryTest "mesosContainer", "fakeImage", mesosCpus, - 1, + actionMemory.toMB.toInt, List(8080), Some(0), false, @@ -184,15 +185,15 @@ class MesosContainerFactoryTest "mesosContainer", ImageName("fakeImage"), false, - 1.MB, - poolConfig.cpuShare) + actionMemory, + poolConfig.cpuShare(actionMemory)) probe.expectMsg( SubmitTask(TaskDef( lastTaskId, "mesosContainer", "fakeImage", mesosCpus, - 1, + actionMemory.toMB.toInt, List(8080), Some(0), false, @@ -255,8 +256,8 @@ class MesosContainerFactoryTest "mesosContainer", ImageName("fakeImage"), false, - 1.MB, - poolConfig.cpuShare) + actionMemory, + poolConfig.cpuShare(actionMemory)) probe.expectMsg( SubmitTask(TaskDef( @@ -264,7 +265,7 @@ class MesosContainerFactoryTest "mesosContainer", "fakeImage", mesosCpus, - 1, + actionMemory.toMB.toInt, List(8080), Some(0), false, @@ -293,7 +294,7 @@ class MesosContainerFactoryTest implicit val tid = TransactionId.testing implicit val m = ActorMaterializer() val logs = container - .logs(1.MB, false) + .logs(actionMemory, false) .via(DockerToActivationLogStore.toFormattedString) .runWith(Sink.seq) await(logs)(0) should endWith diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala index 46f4559f829..179415b98f4 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala @@ -88,8 +88,13 @@ class ContainerPoolTests val differentInvocationNamespace = EntityName("invocationSpace2") val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec) val differentAction = action.copy(name = EntityName("actionName2")) + val largeAction = + action.copy( + name = EntityName("largeAction"), + limits = ActionLimits(memory = MemoryLimit(MemoryLimit.stdMemory * 2))) val runMessage = createRunMessage(action, invocationNamespace) + val runMessageLarge = createRunMessage(largeAction, invocationNamespace) val runMessageDifferentAction = createRunMessage(differentAction, invocationNamespace) val runMessageDifferentVersion = createRunMessage(action.copy().revision(DocRevision("v2")), invocationNamespace) val runMessageDifferentNamespace = createRunMessage(action, differentInvocationNamespace) @@ -124,7 +129,7 @@ class ContainerPoolTests it should "reuse a warm container" in within(timeout) { val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 4), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) @@ -138,7 +143,7 @@ class ContainerPoolTests it should "reuse a warm container when action is the same even if revision changes" in within(timeout) { val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 4), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) @@ -153,7 +158,7 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 4), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) // Note that the container doesn't respond, thus it's not free to take work @@ -167,7 +172,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) @@ -177,12 +182,35 @@ class ContainerPoolTests containers(1).expectMsg(runMessageDifferentEverything) } + it should "remove several containers to make space in the pool if it is already full and a different large action arrives" in within( + timeout) { + val (containers, factory) = testContainers(3) + val feed = TestProbe() + + // a pool with slots for 512MB + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(512.MB), feed.ref)) + pool ! runMessage + containers(0).expectMsg(runMessage) + pool ! runMessageDifferentAction + containers(1).expectMsg(runMessageDifferentAction) + + containers(0).send(pool, NeedWork(warmedData())) + feed.expectMsg(MessageFeed.Processed) + containers(1).send(pool, NeedWork(warmedData())) + feed.expectMsg(MessageFeed.Processed) + + pool ! runMessageLarge + containers(0).expectMsg(Remove) + containers(1).expectMsg(Remove) + containers(2).expectMsg(runMessageLarge) + } + it should "cache a container if there is still space in the pool" in within(timeout) { val (containers, factory) = testContainers(2) val feed = TestProbe() // a pool with only 1 active slot but 2 slots in total - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 2), feed.ref)) // Run the first container pool ! runMessage @@ -208,7 +236,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) @@ -223,7 +251,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, RescheduleJob) // emulate container failure ... @@ -232,6 +260,34 @@ class ContainerPoolTests containers(1).expectMsg(runMessage) // job resent to new actor } + it should "not start a new container if there is not enough space in the pool" in within(timeout) { + val (containers, factory) = testContainers(2) + val feed = TestProbe() + + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 2), feed.ref)) + + // Start first action + pool ! runMessage + containers(0).expectMsg(runMessage) + + // Send second action to the pool + pool ! runMessageLarge + containers(1).expectNoMessage(100.milliseconds) + + // First action is finished + containers(0).send(pool, NeedWork(warmedData())) + feed.expectMsg(MessageFeed.Processed) + + // Second action should run now + containers(1).expectMsgPF() { + // The `Some` assures, that it has been retried while the first action was still blocking the invoker. + case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true + } + + containers(1).send(pool, NeedWork(warmedData())) + feed.expectMsg(MessageFeed.Processed) + } + /* * CONTAINER PREWARMING */ @@ -241,7 +297,7 @@ class ContainerPoolTests val pool = system.actorOf( - ContainerPool.props(factory, ContainerPoolConfig(0, 0), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) + ContainerPool.props(factory, ContainerPoolConfig(0.MB), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) containers(0).expectMsg(Start(exec, memoryLimit)) } @@ -251,7 +307,11 @@ class ContainerPoolTests val pool = system.actorOf( - ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) + ContainerPool.props( + factory, + ContainerPoolConfig(MemoryLimit.stdMemory), + feed.ref, + List(PrewarmingConfig(1, exec, memoryLimit)))) containers(0).expectMsg(Start(exec, memoryLimit)) containers(0).send(pool, NeedWork(preWarmedData(exec.kind))) pool ! runMessage @@ -266,7 +326,11 @@ class ContainerPoolTests val pool = system.actorOf( ContainerPool - .props(factory, ContainerPoolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, alternativeExec, memoryLimit)))) + .props( + factory, + ContainerPoolConfig(MemoryLimit.stdMemory), + feed.ref, + List(PrewarmingConfig(1, alternativeExec, memoryLimit)))) containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind))) pool ! runMessage @@ -282,7 +346,11 @@ class ContainerPoolTests val pool = system.actorOf( ContainerPool - .props(factory, ContainerPoolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, alternativeLimit)))) + .props( + factory, + ContainerPoolConfig(MemoryLimit.stdMemory), + feed.ref, + List(PrewarmingConfig(1, exec, alternativeLimit)))) containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit))) pool ! runMessage @@ -296,7 +364,7 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 4), feed.ref)) // container0 is created and used pool ! runMessage @@ -415,18 +483,18 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { behavior of "ContainerPool remove()" it should "not provide a container if pool is empty" in { - ContainerPool.remove(Map()) shouldBe None + ContainerPool.remove(Map(), MemoryLimit.stdMemory) shouldBe List.empty } it should "not provide a container from busy pool with non-warm containers" in { val pool = Map('none -> noData(), 'pre -> preWarmedData()) - ContainerPool.remove(pool) shouldBe None + ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List.empty } it should "provide a container from pool with one single free container" in { val data = warmedData() val pool = Map('warm -> data) - ContainerPool.remove(pool) shouldBe Some('warm) + ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List('warm) } it should "provide oldest container from busy pool with multiple containers" in { @@ -437,6 +505,18 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { val pool = Map('first -> first, 'second -> second, 'oldest -> oldest) - ContainerPool.remove(pool) shouldBe Some('oldest) + ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List('oldest) + } + + it should "provide a list of the oldest containers from pool, if several containers have to be removed" in { + val namespace = differentNamespace.asString + val first = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(1)) + val second = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(2)) + val third = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(3)) + val oldest = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(0)) + + val pool = Map('first -> first, 'second -> second, 'third -> third, 'oldest -> oldest) + + ContainerPool.remove(pool, MemoryLimit.stdMemory * 2) shouldBe List('oldest, 'first) } } diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index 8bbf80665a7..76594eeddd5 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -167,7 +167,7 @@ class ContainerProxyTests Future.successful(()) } - val poolConfig = ContainerPoolConfig(1, 2) + val poolConfig = ContainerPoolConfig(2.MB) behavior of "ContainerProxy"