Skip to content

Commit

Permalink
Remove containers gradually when disable invoker (#5253)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng authored Jun 2, 2022
1 parent 21b03a5 commit 80de54e
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 25 deletions.
1 change: 1 addition & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@
"CONFIG_whisk_containerPool_prewarmExpirationCheckIntervalVariance": "{{ container_pool_prewarm_expirationCheckIntervalVariance | default('10 seconds') }}"
"CONFIG_whisk_containerPool_prewarmPromotion": "{{ container_pool_strict | default('false') | lower }}"
"CONFIG_whisk_containerPool_prewarmMaxRetryLimit": "{{ container_pool_prewarm_max_retry_limit | default(5) }}"
"CONFIG_whisk_containerPool_batchDeletionSize": "{{ container_pool_batchDeletionSize | default(10) }}"
"CONFIG_whisk_invoker_username": "{{ invoker.username }}"
"CONFIG_whisk_invoker_password": "{{ invoker.password }}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ case class ContainerPoolConfig(userMemory: ByteSize,
prewarmMaxRetryLimit: Int,
prewarmPromotion: Boolean,
memorySyncInterval: FiniteDuration,
batchDeletionSize: Int,
prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) {
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")

require(prewarmExpirationCheckInterval.toSeconds > 0, "prewarmExpirationCheckInterval must be > 0")
require(batchDeletionSize > 0, "batch deletion size must be > 0")

/**
* The shareFactor indicates the number of containers that would share a single core, on average.
Expand Down
1 change: 1 addition & 0 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ whisk {
prewarm-max-retry-limit: 5 # max subsequent retry limit to create prewarm containers
prewarm-promotion: false # if true, action can take prewarm container which has bigger memory
memory-sync-interval: 1 second # period to sync memory info to etcd
batch-deletion-size: 10 # batch size for removing containers when disable invoker, too big value may cause docker/k8s overload
}

kubernetes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ class FunctionPullingContainerPool(
private var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
private var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]

// for shutting down
private var disablingPool = immutable.Set.empty[ActorRef]

private var shuttingDown = false

private val creationMessages = TrieMap[ActorRef, ContainerCreationMessage]()
Expand Down Expand Up @@ -353,18 +356,12 @@ class FunctionPullingContainerPool(

// Container got removed
case ContainerRemoved(replacePrewarm) =>
inProgressPool.get(sender()).foreach { _ =>
inProgressPool = inProgressPool - sender()
}

warmedPool.get(sender()).foreach { _ =>
warmedPool = warmedPool - sender()
}
inProgressPool = inProgressPool - sender()
warmedPool = warmedPool - sender()
disablingPool -= sender()

// container was busy (busy indicates at full capacity), so there is capacity to accept another job request
busyPool.get(sender()).foreach { _ =>
busyPool = busyPool - sender()
}
busyPool = busyPool - sender()

//in case this was a prewarm
prewarmedPool.get(sender()).foreach { data =>
Expand Down Expand Up @@ -601,11 +598,26 @@ class FunctionPullingContainerPool(
* Make all busyPool's memoryQueue actor shutdown gracefully
*/
private def waitForPoolToClear(): Unit = {
busyPool.keys.foreach(_ ! GracefulShutdown)
warmedPool.keys.foreach(_ ! GracefulShutdown)
if (inProgressPool.nonEmpty) {
val pool = self
// how many busy containers will be removed in this term
val slotsForBusyPool = math.max(poolConfig.batchDeletionSize - disablingPool.size, 0)
(busyPool.keySet &~ disablingPool)
.take(slotsForBusyPool)
.foreach(container => {
disablingPool += container
container ! GracefulShutdown
})
// how many warm containers will be removed in this term
val slotsForWarmPool = math.max(poolConfig.batchDeletionSize - disablingPool.size, 0)
(warmedPool.keySet &~ disablingPool)
.take(slotsForWarmPool)
.foreach(container => {
disablingPool += container
container ! GracefulShutdown
})
if (inProgressPool.nonEmpty || busyPool.size + warmedPool.size > slotsForBusyPool + slotsForWarmPool) {
context.system.scheduler.scheduleOnce(5.seconds) {
waitForPoolToClear()
pool ! GracefulShutdown
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class ContainerPoolTests
}

def poolConfig(userMemory: ByteSize) =
ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second)
ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second, 10)

behavior of "ContainerPool"

Expand Down Expand Up @@ -818,7 +818,8 @@ class ContainerPoolTests
100,
3,
false,
1.second)
1.second,
10)
val initialCount = 2
val pool =
system.actorOf(
Expand Down Expand Up @@ -864,7 +865,8 @@ class ContainerPoolTests
100,
3,
false,
1.second)
1.second,
10)
val minCount = 0
val initialCount = 2
val maxCount = 4
Expand Down Expand Up @@ -1237,7 +1239,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
}

it should "remove expired in order of expiration" in {
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 1, 3, false, 1.second)
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 1, 3, false, 1.second, 10)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
//use a second kind so that we know sorting is not isolated to the expired of each kind
val exec2 = CodeExecAsString(RuntimeManifest("actionKind2", ImageName("testImage")), "testCode", None)
Expand All @@ -1261,7 +1263,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {

it should "remove only the prewarmExpirationLimit of expired prewarms" in {
//limit prewarm removal to 2
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 2, 3, false, 1.second)
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 2, 3, false, 1.second, 10)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
val prewarmConfig =
Expand All @@ -1287,7 +1289,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {

it should "remove only the expired prewarms regardless of minCount" in {
//limit prewarm removal to 100
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 100, 3, false, 1.second)
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 100, 3, false, 1.second, 10)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
//minCount is 2 - should leave at least 2 prewarms when removing expired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class ContainerProxyTests
(transid: TransactionId, activation: WhiskActivation, isBlockingActivation: Boolean, context: UserContext) =>
Future.successful(())
}
val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second)
val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second, 10)
def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class FunctionPullingContainerPoolTests
memorySyncInterval: FiniteDuration = FiniteDuration(1, TimeUnit.SECONDS),
prewarmMaxRetryLimit: Int = 3,
prewarmPromotion: Boolean = false,
batchDeletionSize: Int = 10,
prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) =
ContainerPoolConfig(
userMemory,
Expand All @@ -192,6 +193,7 @@ class FunctionPullingContainerPoolTests
prewarmMaxRetryLimit,
prewarmPromotion,
memorySyncInterval,
batchDeletionSize,
prewarmContainerCreationConfig)

def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId,
Expand Down Expand Up @@ -309,6 +311,118 @@ class FunctionPullingContainerPoolTests
}
}

it should "stop containers gradually when shut down" in within(timeout * 20) {
val (containers, factory) = testContainers(10)
val doc = put(entityStore, bigWhiskAction)
val topic = s"creationAck${schedulerInstanceId.asString}"
val consumer = new TestConnector(topic, 4, true)
val pool = system.actorOf(
Props(new FunctionPullingContainerPool(
factory,
invokerHealthService.ref,
poolConfig(MemoryLimit.STD_MEMORY * 20, batchDeletionSize = 3),
invokerInstance,
List.empty,
sendAckToScheduler(consumer.getProducer()))))

(0 to 10).foreach(_ => pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)) // 11 * stdMemory taken)
(0 to 10).foreach(i => {
containers(i).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
// create 5 container in busy pool, and 6 in warmed pool
if (i < 5)
containers(i).send(pool, Initialized(initializedData)) // container is initialized
else
containers(i).send(
pool,
ContainerIsPaused(
WarmData(
stub[DockerContainer],
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
doc.rev,
Instant.now,
TestProbe().ref)))
})

// disable
pool ! GracefulShutdown
// at first, 3 containers will be removed from busy pool, and left containers will not
var disablingContainers = Set.empty[Int]
(0 to 10).foreach(i => {
try {
containers(i).expectMsg(1.second, GracefulShutdown)
disablingContainers += i
} catch {
case _: Throwable =>
}
})
assert(disablingContainers.size == 3, "more than 3 containers is shutting down")
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))

Thread.sleep(3000)
var completedContainer = -1
(0 to 10)
.filter(!disablingContainers.contains(_))
.foreach(i => {
try {
containers(i).expectMsg(1.second, GracefulShutdown)
disablingContainers += i
// only make one container complete shutting down
if (completedContainer == -1)
completedContainer = i
} catch {
case _: Throwable =>
}
})
assert(disablingContainers.size == 6, "more than 3 containers is shutting down")
containers(completedContainer).send(pool, ContainerRemoved(false))

Thread.sleep(3000)
(0 to 10)
.filter(!disablingContainers.contains(_))
.foreach(i => {
try {
containers(i).expectMsg(1.second, GracefulShutdown)
disablingContainers += i
} catch {
case _: Throwable =>
}
})
// there should be only one more container going to shut down
assert(disablingContainers.size == 7, "more than 3 containers is shutting down")
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))

Thread.sleep(3000)
(0 to 10)
.filter(!disablingContainers.contains(_))
.foreach(i => {
try {
containers(i).expectMsg(1.second, GracefulShutdown)
disablingContainers += i
} catch {
case _: Throwable =>
}
})
assert(disablingContainers.size == 10, "more than 3 containers is shutting down")
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))

Thread.sleep(3000)
(0 to 10)
.filter(!disablingContainers.contains(_))
.foreach(i => {
try {
containers(i).expectMsg(1.second, GracefulShutdown)
disablingContainers += i
} catch {
case _: Throwable =>
}
})
assert(disablingContainers.size == 11, "unexpected containers is shutting down")
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))
}

it should "create prewarmed containers on startup" in within(timeout) {
stream.reset()
val (containers, factory) = testContainers(1)
Expand Down Expand Up @@ -343,6 +457,7 @@ class FunctionPullingContainerPoolTests
3,
false,
FiniteDuration(10, TimeUnit.SECONDS),
10,
prewarmContainerCreationConfig)

val pool = system.actorOf(
Expand Down Expand Up @@ -906,7 +1021,8 @@ class FunctionPullingContainerPoolTests
100,
3,
false,
1.second)
1.second,
10)
val initialCount = 2
val pool = system.actorOf(
Props(
Expand Down Expand Up @@ -958,7 +1074,8 @@ class FunctionPullingContainerPoolTests
100,
3,
false,
1.second)
1.second,
10)
val minCount = 0
val initialCount = 2
val maxCount = 4
Expand Down Expand Up @@ -1105,7 +1222,8 @@ class FunctionPullingContainerPoolTests
100,
maxRetryLimit,
false,
1.second)
1.second,
10)
val initialCount = 1
val pool = system.actorOf(
Props(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class FunctionPullingContainerProxyTests
100,
3,
false,
1.second)
1.second,
10)

val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds, 5.seconds)

Expand Down

0 comments on commit 80de54e

Please sign in to comment.