From 51f7a35c929d9aa04d821098a2266902f9178d7c Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 8 May 2016 10:45:47 -0700 Subject: [PATCH] KAFKA-3670; ControlledShutdownLeaderSelector should pick the preferred replica as the new leader, if possible Author: Ismael Juma Reviewers: Jun Rao Closes #1338 from ijuma/kafka-3670-controlled-shutdown-leader-selector-preferred-replica --- .../controller/PartitionLeaderSelector.scala | 9 +-- ...ControlledShutdownLeaderSelectorTest.scala | 73 +++++++++++++++++++ 2 files changed, 76 insertions(+), 6 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 5eed3829ff3c..9d8b0b6f71e2 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -185,13 +185,10 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) - val newLeaderOpt = newIsr.headOption - newLeaderOpt match { + liveAssignedReplicas.filter(newIsr.contains).headOption match { case Some(newLeader) => - debug("Partition %s : current leader = %d, new leader = %d" - .format(topicAndPartition, currentLeader, newLeader)) - (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), - liveAssignedReplicas) + debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader)) + (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas) case None => throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" + " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(","))) diff --git a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala new file mode 100644 index 000000000000..f032eb68524b --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.api.LeaderAndIsr +import kafka.common.TopicAndPartition +import kafka.controller.{ControlledShutdownLeaderSelector, ControllerContext} +import org.easymock.EasyMock +import org.junit.{Assert, Test} +import Assert._ +import kafka.cluster.Broker +import kafka.utils.ZkUtils + +import scala.collection.mutable + +class ControlledShutdownLeaderSelectorTest { + + @Test + def testSelectLeader() { + val topicPartition = TopicAndPartition("topic", 1) + val assignment = Seq(6, 5, 4, 3, 2, 1) + val preferredReplicaId = assignment.head + + val firstIsr = List(1, 3, 6) + val firstLeader = 1 + + val zkUtils = EasyMock.mock(classOf[ZkUtils]) + val controllerContext = new ControllerContext(zkUtils, zkSessionTimeout = 1000) + controllerContext.liveBrokers = assignment.map(Broker(_, Map.empty, None)).toSet + controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3) + controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment) + + val leaderSelector = new ControlledShutdownLeaderSelector(controllerContext) + val firstLeaderAndIsr = new LeaderAndIsr(firstLeader, firstIsr) + val (secondLeaderAndIsr, secondReplicas) = leaderSelector.selectLeader(topicPartition, firstLeaderAndIsr) + + assertEquals(preferredReplicaId, secondLeaderAndIsr.leader) + assertEquals(Seq(1, 6), secondLeaderAndIsr.isr) + assertEquals(1, secondLeaderAndIsr.zkVersion) + assertEquals(1, secondLeaderAndIsr.leaderEpoch) + assertEquals(assignment, secondReplicas) + + controllerContext.shuttingDownBrokerIds += preferredReplicaId + + val deadBrokerId = 2 + controllerContext.liveBrokers = controllerContext.liveOrShuttingDownBrokers.filter(_.id != deadBrokerId) + controllerContext.shuttingDownBrokerIds -= deadBrokerId + + val (thirdLeaderAndIsr, thirdReplicas) = leaderSelector.selectLeader(topicPartition, secondLeaderAndIsr) + + assertEquals(1, thirdLeaderAndIsr.leader) + assertEquals(Seq(1), thirdLeaderAndIsr.isr) + assertEquals(2, thirdLeaderAndIsr.zkVersion) + assertEquals(2, thirdLeaderAndIsr.leaderEpoch) + assertEquals(Seq(6, 5, 4, 3, 1), thirdReplicas) + + } + +}