Skip to content

Commit

Permalink
KAFKA-6074; Use ZookeeperClient in ReplicaManager and Partition
Browse files Browse the repository at this point in the history
Replaced ZkUtils with KafkaZkClient in ReplicaManager and Partition.

Relying on existing tests.

Author: tedyu <[email protected]>

Reviewers: Jun Rao <[email protected]>, Manikumar Reddy <[email protected]>, Ismael Juma <[email protected]>

Closes #4254 from tedyu/trunk
  • Loading branch information
tedyu authored and ijuma committed Nov 24, 2017
1 parent ac17ab4 commit 0bc2d0e
Show file tree
Hide file tree
Showing 15 changed files with 79 additions and 108 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ trait AdminUtilities {
}
}

def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: String): Properties
def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties
}

object AdminUtils extends Logging with AdminUtilities {
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package kafka.cluster
import java.util.concurrent.locks.ReentrantReadWriteLock

import com.yammer.metrics.core.Gauge
import kafka.admin.AdminUtils
import kafka.api.LeaderAndIsr
import kafka.api.Request
import kafka.controller.KafkaController
Expand All @@ -29,6 +28,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server._
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zk.AdminZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException}
import org.apache.kafka.common.protocol.Errors
Expand All @@ -55,7 +55,7 @@ class Partition(val topic: String,
// Do not use replicaManager if this partition is ReplicaManager.OfflinePartition
private val localBrokerId = if (!isOffline) replicaManager.config.brokerId else -1
private val logManager = if (!isOffline) replicaManager.logManager else null
private val zkUtils = if (!isOffline) replicaManager.zkUtils else null
private val zkClient = if (!isOffline) replicaManager.zkClient else null
// allReplicasMap includes both assigned replicas and the future replica if there is ongoing replica movement
private val allReplicasMap = new Pool[Int, Replica]
// The read lock is only required when multiple reads are executed and needs to be in a consistent manner
Expand Down Expand Up @@ -171,8 +171,10 @@ class Partition(val topic: String,
def getOrCreateReplica(replicaId: Int = localBrokerId, isNew: Boolean = false): Replica = {
allReplicasMap.getAndMaybePut(replicaId, {
if (isReplicaLocal(replicaId)) {
val adminZkClient = new AdminZkClient(zkClient)
val prop = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
val config = LogConfig.fromProps(logManager.defaultConfig.originals,
AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
prop)
val log = logManager.getOrCreateLog(topicPartition, config, isNew, replicaId == Request.FutureLocalReplicaId)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
val offsetMap = checkpoint.read()
Expand Down Expand Up @@ -661,7 +663,7 @@ class Partition(val topic: String,

private def updateIsr(newIsr: Set[Replica]) {
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partitionId,
newLeaderAndIsr, controllerEpoch, zkVersion)

if(updateSucceeded) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}

protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel)

private def initZk(): ZkUtils = {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package kafka.server
import java.util

import AbstractFetcherThread.ResultWithPartitions
import kafka.admin.AdminUtils
import kafka.api.{FetchRequest => _, _}
import kafka.cluster.{BrokerEndPoint, Replica}
import kafka.log.LogConfig
import kafka.server.ReplicaFetcherThread._
import kafka.server.epoch.LeaderEpochCache
import kafka.zk.AdminZkClient
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
Expand Down Expand Up @@ -155,7 +155,8 @@ class ReplicaFetcherThread(name: String,
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
if (!LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig(
ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " +
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
Expand Down Expand Up @@ -132,7 +133,7 @@ object ReplicaManager {
class ReplicaManager(val config: KafkaConfig,
metrics: Metrics,
time: Time,
val zkUtils: ZkUtils,
val zkClient: KafkaZkClient,
scheduler: Scheduler,
val logManager: LogManager,
val isShuttingDown: AtomicBoolean,
Expand All @@ -148,7 +149,7 @@ class ReplicaManager(val config: KafkaConfig,
def this(config: KafkaConfig,
metrics: Metrics,
time: Time,
zkUtils: ZkUtils,
zkClient: KafkaZkClient,
scheduler: Scheduler,
logManager: LogManager,
isShuttingDown: AtomicBoolean,
Expand All @@ -157,7 +158,7 @@ class ReplicaManager(val config: KafkaConfig,
metadataCache: MetadataCache,
logDirFailureChannel: LogDirFailureChannel,
threadNamePrefix: Option[String] = None) {
this(config, metrics, time, zkUtils, scheduler, logManager, isShuttingDown,
this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown,
quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", brokerId = config.brokerId,
Expand Down Expand Up @@ -265,7 +266,7 @@ class ReplicaManager(val config: KafkaConfig,
if (isrChangeSet.nonEmpty &&
(lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
ReplicationUtils.propagateIsrChanges(zkClient, isrChangeSet)
isrChangeSet.clear()
lastIsrPropagationMs.set(now)
}
Expand Down Expand Up @@ -1433,7 +1434,7 @@ class ReplicaManager(val config: KafkaConfig,
s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.")
}
logManager.handleLogDirFailure(dir)
LogDirUtils.propagateLogDirEvent(zkUtils, localBrokerId)
zkClient.propagateLogDirEvent(localBrokerId)
info(s"Stopped serving replicas in dir $dir")
}

Expand Down
60 changes: 0 additions & 60 deletions core/src/main/scala/kafka/utils/LogDirUtils.scala

This file was deleted.

17 changes: 8 additions & 9 deletions core/src/main/scala/kafka/utils/ReplicationUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.utils
import kafka.api.LeaderAndIsr
import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
import kafka.utils.ZkUtils._
import kafka.zk._
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat

Expand All @@ -29,28 +30,26 @@ object ReplicationUtils extends Logging {

private val IsrChangeNotificationPrefix = "isr_change_"

def updateLeaderAndIsr(zkUtils: ZkUtils, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
def updateLeaderAndIsr(zkClient: KafkaZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
zkVersion: Int): (Boolean,Int) = {
debug(s"Updated ISR for $topic-$partitionId to ${newLeaderAndIsr.isr.mkString(",")}")
val path = getTopicPartitionLeaderAndIsrPath(topic, partitionId)
val newLeaderData = zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
val newLeaderData = LeaderAndIsrZNode.encode(newLeaderAndIsr, controllerEpoch)
// use the epoch of the controller that made the leadership decision, instead of the current controller epoch
val updatePersistentPath: (Boolean, Int) = zkUtils.conditionalUpdatePersistentPath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
val updatePersistentPath: (Boolean, Int) = zkClient.conditionalUpdatePath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
updatePersistentPath
}

def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: Set[TopicPartition]): Unit = {
val isrChangeNotificationPath: String = zkUtils.createSequentialPersistentPath(
def propagateIsrChanges(zkClient: KafkaZkClient, isrChangeSet: Set[TopicPartition]): Unit = {
val isrChangeNotificationPath: String = zkClient.createSequentialPersistentPath(
ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
generateIsrChangeJson(isrChangeSet))
debug(s"Added $isrChangeNotificationPath for $isrChangeSet")
}

private def checkLeaderAndIsrZkData(zkUtils: ZkUtils, path: String, expectedLeaderAndIsrInfo: String): (Boolean, Int) = {
private def checkLeaderAndIsrZkData(zkClient: KafkaZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean, Int) = {
try {
val writtenLeaderAndIsrInfo = zkUtils.readDataMaybeNull(path)
val writtenLeaderOpt = writtenLeaderAndIsrInfo._1
val writtenStat = writtenLeaderAndIsrInfo._2
val (writtenLeaderOpt, writtenStat) = zkClient.getDataAndStat(path)
val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat)
writtenLeaderOpt.foreach { writtenData =>
val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat)
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContex
import kafka.log.LogConfig
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
import kafka.security.auth.{Acl, Resource, ResourceType}

import kafka.server.ConfigType
import kafka.utils._
import kafka.zookeeper._
Expand All @@ -49,6 +50,13 @@ import scala.collection.{Seq, mutable}
class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends Logging {
import KafkaZkClient._

def createSequentialPersistentPath(path: String, data: String = ""): String = {
val createRequest = CreateRequest(path, data.getBytes("UTF-8"), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
val createResponse = retryRequestUntilConnected(createRequest)
createResponse.resultException.foreach(e => throw e)
createResponse.path
}

/**
* Gets topic partition states for the given partitions.
* @param partitions the partitions for which we want ot get states.
Expand Down Expand Up @@ -933,6 +941,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
createResponse.resultException.foreach(e => throw e)
}

def propagateLogDirEvent(brokerId: Int) {
val logDirEventNotificationPath: String = createSequentialPersistentPath(
LogDirEventNotificationZNode.path + "/" + LogDirEventNotificationSequenceZNode.SequenceNumberPrefix,
new String(LogDirEventNotificationSequenceZNode.encode(brokerId), UTF_8))
debug(s"Added $logDirEventNotificationPath for broker $brokerId")
}

/**
* Deletes all Acl change notifications.
* @throws KeeperException if there is an error while deleting Acl change notifications
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/kafka/zk/ZkData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ import org.apache.zookeeper.data.Stat

// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).

object LeaderAndIsrZNode {
def encode(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
"controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
}
}

object ControllerZNode {
def path = "/controller"
def encode(brokerId: Int, timestamp: Long): Array[Byte] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
val server = new KafkaServer(config, time) {

override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown,
quotaManagers, new BrokerTopicStats, metadataCache, logDirFailureChannel) {

override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.easymock.EasyMock
import org.junit._
import org.junit.Assert._
import kafka.cluster.Replica
import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.kafka.common.TopicPartition
Expand All @@ -34,7 +35,7 @@ class HighwatermarkPersistenceTest {

val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
val topic = "foo"
val zkUtils = EasyMock.createMock(classOf[ZkUtils])
val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
val logManagers = configs map { config =>
TestUtils.createLogManager(
logDirs = config.logDirs.map(new File(_)),
Expand All @@ -54,15 +55,15 @@ class HighwatermarkPersistenceTest {
@Test
def testHighWatermarkPersistenceSinglePartition() {
// mock zkclient
EasyMock.replay(zkUtils)
EasyMock.replay(zkClient)

// create kafka scheduler
val scheduler = new KafkaScheduler(2)
scheduler.startup
val metrics = new Metrics
val time = new MockTime
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler,
logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
replicaManager.startup()
Expand All @@ -86,7 +87,7 @@ class HighwatermarkPersistenceTest {
replicaManager.checkpointHighWatermarks()
fooPartition0Hw = hwmFor(replicaManager, topic, 0)
assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
EasyMock.verify(zkUtils)
EasyMock.verify(zkClient)
} finally {
// shutdown the replica manager upon test completion
replicaManager.shutdown(false)
Expand All @@ -100,14 +101,14 @@ class HighwatermarkPersistenceTest {
val topic1 = "foo1"
val topic2 = "foo2"
// mock zkclient
EasyMock.replay(zkUtils)
EasyMock.replay(zkClient)
// create kafka scheduler
val scheduler = new KafkaScheduler(2)
scheduler.startup
val metrics = new Metrics
val time = new MockTime
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient,
scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
replicaManager.startup()
Expand Down Expand Up @@ -155,7 +156,7 @@ class HighwatermarkPersistenceTest {
// verify checkpointed hw for topic 1
topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
assertEquals(10L, topic1Partition0Hw)
EasyMock.verify(zkUtils)
EasyMock.verify(zkClient)
} finally {
// shutdown the replica manager upon test completion
replicaManager.shutdown(false)
Expand Down
Loading

0 comments on commit 0bc2d0e

Please sign in to comment.