Skip to content

Commit

Permalink
KAFKA-3670; ControlledShutdownLeaderSelector should pick the preferre…
Browse files Browse the repository at this point in the history
…d replica as the new leader, if possible

Author: Ismael Juma <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes apache#1338 from ijuma/kafka-3670-controlled-shutdown-leader-selector-preferred-replica
  • Loading branch information
ijuma authored and junrao committed May 8, 2016
1 parent 8fe2552 commit 51f7a35
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,10 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))

val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
val newLeaderOpt = newIsr.headOption
newLeaderOpt match {
liveAssignedReplicas.filter(newIsr.contains).headOption match {
case Some(newLeader) =>
debug("Partition %s : current leader = %d, new leader = %d"
.format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
liveAssignedReplicas)
debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
case None =>
throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server

import kafka.api.LeaderAndIsr
import kafka.common.TopicAndPartition
import kafka.controller.{ControlledShutdownLeaderSelector, ControllerContext}
import org.easymock.EasyMock
import org.junit.{Assert, Test}
import Assert._
import kafka.cluster.Broker
import kafka.utils.ZkUtils

import scala.collection.mutable

class ControlledShutdownLeaderSelectorTest {

@Test
def testSelectLeader() {
val topicPartition = TopicAndPartition("topic", 1)
val assignment = Seq(6, 5, 4, 3, 2, 1)
val preferredReplicaId = assignment.head

val firstIsr = List(1, 3, 6)
val firstLeader = 1

val zkUtils = EasyMock.mock(classOf[ZkUtils])
val controllerContext = new ControllerContext(zkUtils, zkSessionTimeout = 1000)
controllerContext.liveBrokers = assignment.map(Broker(_, Map.empty, None)).toSet
controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3)
controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment)

val leaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
val firstLeaderAndIsr = new LeaderAndIsr(firstLeader, firstIsr)
val (secondLeaderAndIsr, secondReplicas) = leaderSelector.selectLeader(topicPartition, firstLeaderAndIsr)

assertEquals(preferredReplicaId, secondLeaderAndIsr.leader)
assertEquals(Seq(1, 6), secondLeaderAndIsr.isr)
assertEquals(1, secondLeaderAndIsr.zkVersion)
assertEquals(1, secondLeaderAndIsr.leaderEpoch)
assertEquals(assignment, secondReplicas)

controllerContext.shuttingDownBrokerIds += preferredReplicaId

val deadBrokerId = 2
controllerContext.liveBrokers = controllerContext.liveOrShuttingDownBrokers.filter(_.id != deadBrokerId)
controllerContext.shuttingDownBrokerIds -= deadBrokerId

val (thirdLeaderAndIsr, thirdReplicas) = leaderSelector.selectLeader(topicPartition, secondLeaderAndIsr)

assertEquals(1, thirdLeaderAndIsr.leader)
assertEquals(Seq(1), thirdLeaderAndIsr.isr)
assertEquals(2, thirdLeaderAndIsr.zkVersion)
assertEquals(2, thirdLeaderAndIsr.leaderEpoch)
assertEquals(Seq(6, 5, 4, 3, 1), thirdReplicas)

}

}

0 comments on commit 51f7a35

Please sign in to comment.