From e1c79cee2ab243d95647935d2b3e7abe371bf6ea Mon Sep 17 00:00:00 2001 From: Kowshik Prakasam Date: Mon, 5 Oct 2020 13:31:01 -0700 Subject: [PATCH] Address comments from Jun --- .../scala/kafka/controller/KafkaController.scala | 4 ++-- core/src/main/scala/kafka/zk/ZkData.scala | 3 ++- .../unit/kafka/cluster/BrokerEndPointTest.scala | 2 +- .../controller/ControllerIntegrationTest.scala | 15 ++++++++++++++- .../unit/kafka/server/UpdateFeaturesTest.scala | 9 +++++---- 5 files changed, 24 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b13971ff73c31..37b5c4d77399c 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -383,7 +383,7 @@ class KafkaController(val config: KafkaConfig, case FeatureZNodeStatus.Disabled => if (!existingFeatureZNode.features.empty()) { warn(s"FeatureZNode at path: ${FeatureZNode.path} with disabled status" + - " contains non-empty features.") + s" contains non-empty features: ${existingFeatureZNode.features}") } Features.emptyFinalizedFeatures } @@ -420,7 +420,7 @@ class KafkaController(val config: KafkaConfig, if (existingFeatureZNode.status == FeatureZNodeStatus.Disabled && !existingFeatureZNode.features.empty()) { warn(s"FeatureZNode at path: ${FeatureZNode.path} with disabled status" + - " contains non-empty features.") + s" contains non-empty features: ${existingFeatureZNode.features}") } if (!newNode.equals(existingFeatureZNode)) { updateFeatureZNode(newNode) diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 18f364a8bcaac..67fbef39321f6 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -799,6 +799,7 @@ object DelegationTokenInfoZNode { * is less than KAFKA_2_7_IV0. */ object FeatureZNodeStatus extends Enumeration { + type FeatureZNodeStatus = Value val Disabled, Enabled = Value def withNameOpt(value: Int): Option[Value] = { @@ -812,7 +813,7 @@ object FeatureZNodeStatus extends Enumeration { * @param status the status of the ZK node * @param features the cluster-wide finalized features */ -case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) { +case class FeatureZNode(status: FeatureZNodeStatus.FeatureZNodeStatus, features: Features[FinalizedVersionRange]) { } object FeatureZNode { diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala index d3b52ae5cf5f9..3708f73aedb64 100644 --- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala @@ -185,7 +185,7 @@ class BrokerEndPointTest { "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"], "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}, "rack":"dc1", - "features": {"feature1": {"min_version": 1, "first_active_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, "max_version": 4}} + "features": {"feature1": {"min_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "max_version": 4}} }""" val broker = parseBrokerJson(1, json) assertEquals(1, broker.id) diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 9ceedf80cb0ed..47c271fca0b29 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -596,7 +596,6 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { testControllerMove(() => zkClient.createPartitionReassignment(reassignment)) } - @Test def testControllerFeatureZNodeSetupWhenFeatureVersioningIsEnabledWithNonExistingFeatureZNode(): Unit = { testControllerFeatureZNodeSetup(Option.empty, KAFKA_2_7_IV0) @@ -761,6 +760,20 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { } servers = makeServers(1, interBrokerProtocolVersion = Some(interBrokerProtocolVersion)) TestUtils.waitUntilControllerElected(zkClient) + // Below we wait on a dummy event to finish processing in the controller event thread. + // We schedule this dummy event only after the controller is elected, which is a sign that the + // controller has already started processing the Startup event. Waiting on the dummy event is + // used to make sure that the event thread has completed processing Startup event, that triggers + // the setup of FeatureZNode. + val controller = getController().kafkaController + val latch = new CountDownLatch(1) + controller.eventManager.put(new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = { + latch.countDown() + } + override def preempt(): Unit = {} + }) + latch.await() val (mayBeFeatureZNodeBytes, versionAfter) = zkClient.getDataAndVersion(FeatureZNode.path) val newZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) diff --git a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala index 3822c4238ff7a..a16ff30d543e5 100644 --- a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala +++ b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala @@ -148,6 +148,7 @@ class UpdateFeaturesTest extends BaseRequestTest { * Tests whether an invalid feature update does not get processed on the server as expected, * and raises the ExceptionType on the client side as expected. * + * @param feature the feature to be updated * @param invalidUpdate the invalid feature update to be sent in the * updateFeatures request to the server * @param exceptionMsgPattern a pattern for the expected exception message @@ -222,7 +223,7 @@ class UpdateFeaturesTest extends BaseRequestTest { /** * Tests that an UpdateFeatures request fails in the Controller, when, for a feature the downgrade - * is attempted to a max version level thats higher than the existing max version level. + * is attempted to a max version level higher than the existing max version level. */ @Test def testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted(): Unit = { @@ -285,13 +286,13 @@ class UpdateFeaturesTest extends BaseRequestTest { def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = { testWithInvalidFeatureUpdate[InvalidRequestException]( "feature_non_existing", - new FeatureUpdate(0, true), - ".*Can not delete non-existing finalized feature.*".r) + new FeatureUpdate(3, true), + ".*Could not apply finalized feature update because the provided feature is not supported.*".r) } /** * Tests that an UpdateFeatures request fails in the Controller, when, a feature version level - * upgrade is attempted to a version level thats the same as the existing max version level. + * upgrade is attempted to a version level same as the existing max version level. */ @Test def testShouldFailRequestWhenUpgradingToSameVersionLevel(): Unit = {