-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory based loadbalancing #3747
Conversation
Would love to see this get pushed through. There are some compensating changes we will need in kube-deploy; I will be happy to take care of them when it is time. |
0f134ae
to
d3000b0
Compare
Codecov Report
@@ Coverage Diff @@
## master #3747 +/- ##
=========================================
- Coverage 85.41% 80.92% -4.5%
=========================================
Files 147 147
Lines 7070 7093 +23
Branches 423 408 -15
=========================================
- Hits 6039 5740 -299
- Misses 1031 1353 +322
Continue to review full report at Codecov.
|
d01dc16
to
09e569b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will need a pass over the tests, great stuff 🎉
@@ -339,6 +345,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { | |||
@tailrec | |||
def schedule(invokers: IndexedSeq[InvokerHealth], | |||
dispatched: IndexedSeq[ForcableSemaphore], | |||
memory: ByteSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we make this slotsNeeded
and keep that as an integer?
*/ | ||
case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int) | ||
case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invokerBusyThreshold: ByteSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename invokerBusyThreshold
to something more meaningful?
.getOrElse { | ||
(createContainer(), "recreated") | ||
// Only process request, if there are no other requests waiting for free slots, or if the current request is the next request to process | ||
if (runBuffer.size == 0 || runBuffer.headOption.map(_.msg == r.msg).getOrElse(false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runBuffer.headOption(_.msg == r.msg).getOrElse(true)
} else { | ||
r.retryLogDeadline | ||
} | ||
if (!runBuffer.map(_.msg).contains(r.msg)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if(!runBuffer.exists(_.msg == r.msg))
.schedule(r.action, r.msg.user.namespace.name, freePool) | ||
.map(container => { | ||
(container, "warm") | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map(container => (container, "warm))
Some(ref) | ||
} else None | ||
if (freeContainers.nonEmpty && freeContainers.map(_._2.memoryLimit.toMB).sum >= memory.toMB) { | ||
if (memory > 0.B) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can collapse these ifs to get rid of one level of nesting.
@@ -91,62 +95,89 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, | |||
// their requests and send them back to the pool for rescheduling (this may happen if "docker" operations | |||
// fail for example, or a container has aged and was destroying itself when a new request was assigned) | |||
case r: Run => | |||
val createdContainer = if (busyPool.size < poolConfig.maxActiveContainers) { | |||
// Only process request, if there are no other requests waiting for free slots, or if the current request is the next request to process | |||
if (runBuffer.isEmpty || runBuffer.dequeueOption.exists(_._1.msg == r.msg)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be collapsed to only one condition
runBuffer.dequeueOption.map(_._1.msg == r.msg).getOrElse(true)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about it, it makes sense to keep it seperate for readability!
On another note: We could extract this to a value isResentFromBuffer
to branch the execution later. There are checks there which implicitly check for this condition, that'd make it clearer.
.orElse { | ||
if (busyPool | ||
.map(_._2.memoryLimit.toMB) | ||
.sum + freePool.map(_._2.memoryLimit.toMB).sum < poolConfig.userMemory.toMB) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to add the action's memory limit to this condition as well.
On a broader note: Should we make this a method?
def hasCapacityFor(pool: Map[ActorRef, ContainerData], memory: ByteSize): Boolean =
pool.map(_._2.memoryLimit.toMB).sum + memory.toMB <= poolConfig.userMemory.toMB)
Would then be usable like hasCapacityFor(busyPool ++ freePool, r.action.limits.memory.megabytes)
}) | ||
.getOrElse { | ||
(createContainer(r.action.limits.memory.megabytes.MB), "recreated") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we collapse the map
, orElse
etc statements a bit by not using {}
but ()
?
busyPool = busyPool + (actor -> data) | ||
freePool = freePool - actor | ||
// Remove the action that get's executed now from the buffer and execute the next one afterwards. | ||
runBuffer = runBuffer.dequeueOption.map(_._2).getOrElse(runBuffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more information in the comment would be nice, like:
// It is guaranteed that the currently executed messages is == the head of the queue, if the queue has any entries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love the tests ❤️ ! One should be added to cover an edge case of ContainerPool.remove. Great job!
} | ||
|
||
it should "not provide a container from busy pool with non-warm containers" in { | ||
val pool = Map('none -> noData(), 'pre -> preWarmedData()) | ||
ContainerPool.remove(pool) shouldBe None | ||
ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add a test which tests that if you cannot make space for enough capacity, the List is empty?
@@ -124,7 +129,7 @@ class ContainerPoolTests | |||
it should "reuse a warm container" in within(timeout) { | |||
val (containers, factory) = testContainers(2) | |||
val feed = TestProbe() | |||
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) | |||
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 4), feed.ref)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment somewhere, that an action is created with stdMemory by default, so this is preserving behavior (4 actions can be scheduled)
val feed = TestProbe() | ||
|
||
// a pool with slots for 512MB | ||
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(512.MB), feed.ref)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be 2 * stdMemory
pool ! runMessage | ||
containers(0).expectMsg(runMessage) | ||
pool ! runMessageDifferentAction | ||
containers(1).expectMsg(runMessageDifferentAction) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments in here would be nice, like
containers(0).expectMsg(runMessage) // 1 * stdMemory taken
pool ! runMessageDifferentAction
containers(1).expectMsg(runMessageDifferentAction) // 2 * stdMemory taken -> full
...
pool ! runMessageLarge
// need to remove both action to make space for the large action (needs 2 * stdMemory)
containers(0).expectMsg(Remove)
containers(1).expectMsg(Remove)
containers(2).expectMsg(runMessageLarge)
} | ||
// Action 2 should start immediately as well (without any retries, as there is already enough space in the pool) | ||
containers(1).expectMsg(runMessageDifferentAction) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice test! 🎉
57261e8
to
2e334a2
Compare
f053a42
to
be8ef95
Compare
The gatling-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've checked the code part only, looks good given my experience in this corner.
3c94f0a
to
ec9fc2c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description in ShardingContainerPoolBalancer.scala should be updated to describe the algorithm.
} | ||
|
||
def /(other: ByteSize): Double = { | ||
// Without throwing the exception the result would be `Infinity` here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could consider making this return a Try instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the reasons for using a Try here?
On dividing Int
s, you also get the response directly, instead of a Try
, don't you?
@@ -456,13 +461,15 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { | |||
* | |||
* @param invokers a list of available invokers to search in, including their state | |||
* @param dispatched semaphores for each invoker to give the slots away from | |||
* @param slots Number of slots, that need to be aquired (e.g. memory in MB) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
acquired (typo)
0530ea7
to
f5f6217
Compare
Adapt invoker-tests.
…tions will keep the same.
PG2#3520 🔵 |
@@ -6,7 +6,7 @@ whisk { | |||
use-cluster-bootstrap: false | |||
} | |||
loadbalancer { | |||
invoker-busy-threshold: 4 | |||
user-memory: 1024 m |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cbickel Should this be named invoker-user-memory
. I see following exception on startup as in my case CONFIG_whisk_loadbalancer_invokerUserMemory
was not defined
Exception in thread "main" pureconfig.error.ConfigReaderException: Cannot convert configuration to a whisk.core.loadBalancer.ShardingContainerPoolBalancerConfig. Failures are:
at 'whisk.loadbalancer':
- Key not found: 'invoker-user-memory'.
at pureconfig.package$.getResultOrThrow(package.scala:138)
at pureconfig.package$.loadConfigOrThrow(package.scala:160)
at whisk.core.loadBalancer.ShardingContainerPoolBalancer.<init>(ShardingContainerPoolBalancer.scala:159)
at whisk.core.loadBalancer.ShardingContainerPoolBalancer$.instance(ShardingContainerPoolBalancer.scala:437)
at whisk.core.controller.Controller.<init>(Controller.scala:117)
at whisk.core.controller.Controller$.main(Controller.scala:258)
at whisk.core.controller.Controller.main(Controller.scala)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right.
I'll open a PR to correct this.
Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#3993 has the fix.
This PR implements the idea, that has been discussed in the following mail thread:
https://lists.apache.org/thread.html/dfccf972bc1419fe48dbc23119441108c45f85d53625fd6f8fc04fcb@%3Cdev.openwhisk.apache.org%3E
It changes the amount of containers an invoker can spawn based on available memory and not on the CPU.
In addition it makes the loadbalancer aware about the amount of available memory on each invoker and it limits the invoker to create only user containers, if there is enough free memory.
Related issue and scope
My changes affect the following components
Types of changes
Checklist: