Skip to content

Commit

Permalink
Add a buffer in Container pool to make sure, that the order of activa…
Browse files Browse the repository at this point in the history
…tions will keep the same.
  • Loading branch information
cbickel committed Jun 19, 2018
1 parent bfff7cf commit 09e569b
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
var freePool = immutable.Map.empty[ActorRef, ContainerData]
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be buffered here to keep order of computation.
// Otherwise actions with small memory-limits could block actions with large memory limits.
var runBuffer = immutable.Queue.empty[Run]
val logMessageInterval = 10.seconds

prewarmConfig.foreach { config =>
Expand Down Expand Up @@ -92,72 +95,89 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
// their requests and send them back to the pool for rescheduling (this may happen if "docker" operations
// fail for example, or a container has aged and was destroying itself when a new request was assigned)
case r: Run =>
val createdContainer =
if (busyPool.map(_._2.memoryLimit.toMB).sum + r.action.limits.memory.megabytes <= poolConfig.userMemory.toMB) {
// Schedule a job to a warm container
ContainerPool
.schedule(r.action, r.msg.user.namespace.name, freePool)
.map(container => {
(container, "warm")
})
.orElse {
if (busyPool
.map(_._2.memoryLimit.toMB)
.sum + freePool.map(_._2.memoryLimit.toMB).sum < poolConfig.userMemory.toMB) {
takePrewarmContainer(r.action)
.map(container => {
(container, "prewarmed")
})
.orElse {
Some(createContainer(r.action.limits.memory.megabytes.MB), "cold")
}
} else None
}
.orElse {
// Remove a container and create a new one for the given job
ContainerPool
.remove(freePool, r.action.limits.memory.megabytes.MB)
.map(removeContainer)
// If the list had at least one entry, enough containers were removed to start the new container. After
// removing the containers, we are not interested anymore in the containers that have been removed.
.headOption
.map { _ =>
// Only process request, if there are no other requests waiting for free slots, or if the current request is the next request to process
if (runBuffer.isEmpty || runBuffer.dequeueOption.exists(_._1.msg == r.msg)) {
val createdContainer =
if (busyPool
.map(_._2.memoryLimit.toMB)
.sum + r.action.limits.memory.megabytes <= poolConfig.userMemory.toMB) {
// Schedule a job to a warm container
ContainerPool
.schedule(r.action, r.msg.user.namespace.name, freePool)
.map(container => {
(container, "warm")
})
.orElse {
if (busyPool
.map(_._2.memoryLimit.toMB)
.sum + freePool.map(_._2.memoryLimit.toMB).sum < poolConfig.userMemory.toMB) {
takePrewarmContainer(r.action)
.map(container => {
(container, "recreated")
(container, "prewarmed")
})
.getOrElse {
(createContainer(r.action.limits.memory.megabytes.MB), "recreated")
.orElse {
Some(createContainer(r.action.limits.memory.megabytes.MB), "cold")
}
}
}
} else None
} else None
}
.orElse {
// Remove a container and create a new one for the given job
ContainerPool
.remove(freePool, r.action.limits.memory.megabytes.MB)
.map(removeContainer)
// If the list had at least one entry, enough containers were removed to start the new container. After
// removing the containers, we are not interested anymore in the containers that have been removed.
.headOption
.map { _ =>
takePrewarmContainer(r.action)
.map(container => {
(container, "recreated")
})
.getOrElse {
(createContainer(r.action.limits.memory.megabytes.MB), "recreated")
}
}
}
} else None

createdContainer match {
case Some(((actor, data), containerState)) =>
busyPool = busyPool + (actor -> data)
freePool = freePool - actor
actor ! r // forwards the run request to the container
logContainerStart(r, containerState)
case None =>
// this can also happen if createContainer fails to start a new container, or
// if a job is rescheduled but the container it was allocated to has not yet destroyed itself
// (and a new container would over commit the pool)
val isErrorLogged = r.retryLogDeadline.map(_.isOverdue).getOrElse(true)
val retryLogDeadline = if (isErrorLogged) {
logging.error(
this,
s"Rescheduling Run message, too many message in the pool, " +
s"freePoolSize: ${freePool.size} containers and ${freePool.map(_._2.memoryLimit.toMB).sum} MB, " +
s"busyPoolSize: ${busyPool.size} containers and ${busyPool.map(_._2.memoryLimit.toMB).sum} MB, " +
s"maxContainersMemory ${poolConfig.userMemory}, " +
s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
s"needed memory: ${r.action.limits.memory.megabytes} MB")(r.msg.transid)
Some(logMessageInterval.fromNow)
} else {
r.retryLogDeadline
}
self ! Run(r.action, r.msg, retryLogDeadline)
createdContainer match {
case Some(((actor, data), containerState)) =>
busyPool = busyPool + (actor -> data)
freePool = freePool - actor
// Remove the action that get's executed now from the buffer and execute the next one afterwards.
runBuffer = runBuffer.dequeueOption.map(_._2).getOrElse(runBuffer)
runBuffer.dequeueOption.foreach { case (run, _) => self ! run }
actor ! r // forwards the run request to the container
logContainerStart(r, containerState)
case None =>
// this can also happen if createContainer fails to start a new container, or
// if a job is rescheduled but the container it was allocated to has not yet destroyed itself
// (and a new container would over commit the pool)
val isErrorLogged = r.retryLogDeadline.map(_.isOverdue).getOrElse(true)
val retryLogDeadline = if (isErrorLogged) {
logging.error(
this,
s"Rescheduling Run message, too many message in the pool, " +
s"freePoolSize: ${freePool.size} containers and ${freePool.map(_._2.memoryLimit.toMB).sum} MB, " +
s"busyPoolSize: ${busyPool.size} containers and ${busyPool.map(_._2.memoryLimit.toMB).sum} MB, " +
s"maxContainersMemory ${poolConfig.userMemory.toMB} MB, " +
s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
s"needed memory: ${r.action.limits.memory.megabytes} MB")(r.msg.transid)
Some(logMessageInterval.fromNow)
} else {
r.retryLogDeadline
}
if (!runBuffer.exists(_.msg == r.msg)) {
// Add this request to the buffer, as it is not there yet.
runBuffer = runBuffer.enqueue(r)
}
// As this request is the first one in the buffer, try again to execute it.
self ! Run(r.action, r.msg, retryLogDeadline)
}
} else {
// There are currently actions waiting to be executed before this action gets executed.
// These waiting actions were not able to free up enough memory.
runBuffer = runBuffer.enqueue(r)
}

// Container is free to take more work
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,96 @@ class ContainerPoolTests
pool ! runMessage
containers(1).expectMsg(runMessage)
}

/*
* Run buffer
*/
it should "first put messages into the queue and retrying them and then put messages only into the queue" in within(
timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()

// Pool with 512 MB usermemory
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 2), feed.ref))

// Send action that blocks the pool
pool ! runMessageLarge
containers(0).expectMsg(runMessageLarge)

// Send action that should be written to the queue and retried in invoker
pool ! runMessage
containers(1).expectNoMessage(100.milliseconds)

// Send another message that should not be retried, but put into the queue as well
pool ! runMessageDifferentAction
containers(2).expectNoMessage(100.milliseconds)

// Action with 512 MB is finished
containers(0).send(pool, NeedWork(warmedData()))
feed.expectMsg(MessageFeed.Processed)

// Action 1 should start immediately
containers(0).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessage.action, runMessage.msg, Some(_)) => true
}
// Action 2 should start immediately as well (without any retries, as there is already enough space in the pool)
containers(1).expectMsg(runMessageDifferentAction)
}

it should "process activations in the order they are arriving" in within(timeout) {
val (containers, factory) = testContainers(4)
val feed = TestProbe()

// Pool with 512 MB usermemory
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 2), feed.ref))

// Send 4 actions to the ContainerPool (Action 0, Action 2 and Action 3 with each 265 MB and Action 1 with 512 MB)
pool ! runMessage
containers(0).expectMsg(runMessage)
pool ! runMessageLarge
containers(1).expectNoMessage(100.milliseconds)
pool ! runMessageDifferentNamespace
containers(2).expectNoMessage(100.milliseconds)
pool ! runMessageDifferentAction
containers(3).expectNoMessage(100.milliseconds)

// Action 0 ist finished -> Large action should be executed now
containers(0).send(pool, NeedWork(warmedData()))
feed.expectMsg(MessageFeed.Processed)
containers(1).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true
}

// Send another action to the container pool, that would fit memory-wise
pool ! runMessageDifferentEverything
containers(4).expectNoMessage(100.milliseconds)

// Action 1 is finished -> Action 2 and Action 3 should be executed now
containers(1).send(pool, NeedWork(warmedData()))
feed.expectMsg(MessageFeed.Processed)
containers(2).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessageDifferentNamespace.action, runMessageDifferentNamespace.msg, Some(_)) => true
}
// Assert retryLogline = false to check if this request has been stored in the queue instead of retrying in the system
containers(3).expectMsg(runMessageDifferentAction)

// Action 3 is finished -> Action 4 should start
containers(3).send(pool, NeedWork(warmedData()))
feed.expectMsg(MessageFeed.Processed)
containers(4).expectMsgPF() {
// The `Some` assures, that it has been retried while the first action was still blocking the invoker.
case Run(runMessageDifferentEverything.action, runMessageDifferentEverything.msg, Some(_)) => true
}

// Action 2 and 4 are finished
containers(2).send(pool, NeedWork(warmedData()))
feed.expectMsg(MessageFeed.Processed)
containers(4).send(pool, NeedWork(warmedData()))
feed.expectMsg(MessageFeed.Processed)
}
}

/**
Expand Down

0 comments on commit 09e569b

Please sign in to comment.