Skip to content

Commit

Permalink
Address comments from Jun
Browse files Browse the repository at this point in the history
  • Loading branch information
kowshik committed Oct 6, 2020
1 parent b69f7fe commit e1c79ce
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 9 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ class KafkaController(val config: KafkaConfig,
case FeatureZNodeStatus.Disabled =>
if (!existingFeatureZNode.features.empty()) {
warn(s"FeatureZNode at path: ${FeatureZNode.path} with disabled status" +
" contains non-empty features.")
s" contains non-empty features: ${existingFeatureZNode.features}")
}
Features.emptyFinalizedFeatures
}
Expand Down Expand Up @@ -420,7 +420,7 @@ class KafkaController(val config: KafkaConfig,
if (existingFeatureZNode.status == FeatureZNodeStatus.Disabled &&
!existingFeatureZNode.features.empty()) {
warn(s"FeatureZNode at path: ${FeatureZNode.path} with disabled status" +
" contains non-empty features.")
s" contains non-empty features: ${existingFeatureZNode.features}")
}
if (!newNode.equals(existingFeatureZNode)) {
updateFeatureZNode(newNode)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/zk/ZkData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ object DelegationTokenInfoZNode {
* is less than KAFKA_2_7_IV0.
*/
object FeatureZNodeStatus extends Enumeration {
type FeatureZNodeStatus = Value
val Disabled, Enabled = Value

def withNameOpt(value: Int): Option[Value] = {
Expand All @@ -812,7 +813,7 @@ object FeatureZNodeStatus extends Enumeration {
* @param status the status of the ZK node
* @param features the cluster-wide finalized features
*/
case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) {
case class FeatureZNode(status: FeatureZNodeStatus.FeatureZNodeStatus, features: Features[FinalizedVersionRange]) {
}

object FeatureZNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class BrokerEndPointTest {
"endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
"listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
"rack":"dc1",
"features": {"feature1": {"min_version": 1, "first_active_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, "max_version": 4}}
"features": {"feature1": {"min_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "max_version": 4}}
}"""
val broker = parseBrokerJson(1, json)
assertEquals(1, broker.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,6 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
testControllerMove(() => zkClient.createPartitionReassignment(reassignment))
}


@Test
def testControllerFeatureZNodeSetupWhenFeatureVersioningIsEnabledWithNonExistingFeatureZNode(): Unit = {
testControllerFeatureZNodeSetup(Option.empty, KAFKA_2_7_IV0)
Expand Down Expand Up @@ -761,6 +760,20 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
}
servers = makeServers(1, interBrokerProtocolVersion = Some(interBrokerProtocolVersion))
TestUtils.waitUntilControllerElected(zkClient)
// Below we wait on a dummy event to finish processing in the controller event thread.
// We schedule this dummy event only after the controller is elected, which is a sign that the
// controller has already started processing the Startup event. Waiting on the dummy event is
// used to make sure that the event thread has completed processing Startup event, that triggers
// the setup of FeatureZNode.
val controller = getController().kafkaController
val latch = new CountDownLatch(1)
controller.eventManager.put(new MockEvent(ControllerState.TopicChange) {
override def process(): Unit = {
latch.countDown()
}
override def preempt(): Unit = {}
})
latch.await()

val (mayBeFeatureZNodeBytes, versionAfter) = zkClient.getDataAndVersion(FeatureZNode.path)
val newZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
* Tests whether an invalid feature update does not get processed on the server as expected,
* and raises the ExceptionType on the client side as expected.
*
* @param feature the feature to be updated
* @param invalidUpdate the invalid feature update to be sent in the
* updateFeatures request to the server
* @param exceptionMsgPattern a pattern for the expected exception message
Expand Down Expand Up @@ -222,7 +223,7 @@ class UpdateFeaturesTest extends BaseRequestTest {

/**
* Tests that an UpdateFeatures request fails in the Controller, when, for a feature the downgrade
* is attempted to a max version level thats higher than the existing max version level.
* is attempted to a max version level higher than the existing max version level.
*/
@Test
def testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted(): Unit = {
Expand Down Expand Up @@ -285,13 +286,13 @@ class UpdateFeaturesTest extends BaseRequestTest {
def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = {
testWithInvalidFeatureUpdate[InvalidRequestException](
"feature_non_existing",
new FeatureUpdate(0, true),
".*Can not delete non-existing finalized feature.*".r)
new FeatureUpdate(3, true),
".*Could not apply finalized feature update because the provided feature is not supported.*".r)
}

/**
* Tests that an UpdateFeatures request fails in the Controller, when, a feature version level
* upgrade is attempted to a version level thats the same as the existing max version level.
* upgrade is attempted to a version level same as the existing max version level.
*/
@Test
def testShouldFailRequestWhenUpgradingToSameVersionLevel(): Unit = {
Expand Down

0 comments on commit e1c79ce

Please sign in to comment.