Skip to content

Commit

Permalink
KAFKA-17615 Remove KafkaServer from tests (#18271)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mimaison authored Dec 30, 2024
1 parent 6737178 commit 4080f19
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
for (serverIdx <- brokerServers.indices) {
killBroker(serverIdx)
val config = newConfigs(serverIdx)
servers(serverIdx) = TestUtils.createServer(config, time = brokerTime(config.brokerId))
servers(serverIdx) = createBroker(config, time = brokerTime(config.brokerId))
restartDeadBrokers()
}

Expand Down
56 changes: 0 additions & 56 deletions core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.{lang, util}
import java.util.{Optional, Properties, Map => JMap}
import java.util.concurrent.{CompletionStage, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import kafka.controller.KafkaController
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
Expand Down Expand Up @@ -98,37 +97,6 @@ class DynamicBrokerConfigTest {
}
}

@Test
def testEnableDefaultUncleanLeaderElection(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
origProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")

val config = KafkaConfig(origProps)
val serverMock = Mockito.mock(classOf[KafkaServer])
val controllerMock = Mockito.mock(classOf[KafkaController])
val logManagerMock = Mockito.mock(classOf[LogManager])

Mockito.when(serverMock.config).thenReturn(config)
Mockito.when(serverMock.kafkaController).thenReturn(controllerMock)
Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty)

val currentDefaultLogConfig = new AtomicReference(new LogConfig(new Properties))
Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ => currentDefaultLogConfig.get())
Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig])))
.thenAnswer(invocation => currentDefaultLogConfig.set(invocation.getArgument(0)))

config.dynamicConfig.initialize(None, None)
config.dynamicConfig.addBrokerReconfigurable(new DynamicLogConfig(logManagerMock, serverMock))

val props = new Properties()

props.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
config.dynamicConfig.updateDefaultConfig(props)
assertTrue(config.uncleanLeaderElectionEnable)
Mockito.verify(controllerMock).enableDefaultUncleanLeaderElection()
}

@Test
def testUpdateDynamicThreadPool(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
Expand Down Expand Up @@ -434,7 +402,7 @@ class DynamicBrokerConfigTest {
validProps.foreach { case (k, v) => props.put(k, v) }
invalidProps.foreach { case (k, v) => props.put(k, v) }

// DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in
// DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided
// in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid.
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = true))

Expand Down Expand Up @@ -513,7 +481,7 @@ class DynamicBrokerConfigTest {
props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://hostname:9092")
new DynamicListenerConfig(kafkaServer).validateReconfiguration(KafkaConfig(props))

// it is illegal to update non-reconfiguable configs of existent listeners
// it is illegal to update non-reconfigurable configs of existent listeners
props.put("listener.name.plaintext.you.should.not.pass", "failure")
val dynamicListenerConfig = new DynamicListenerConfig(kafkaServer)
assertThrows(classOf[ConfigException], () => dynamicListenerConfig.validateReconfiguration(KafkaConfig(props)))
Expand Down Expand Up @@ -1114,7 +1082,7 @@ class DynamicBrokerConfigTest {
}
}

class TestDynamicThreadPool() extends BrokerReconfigurable {
class TestDynamicThreadPool extends BrokerReconfigurable {

override def reconfigurableConfigs: Set[String] = {
DynamicThreadPool.ReconfigurableConfigs
Expand Down
17 changes: 0 additions & 17 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Utils.formatAddress
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.LeaderAndIsr
Expand Down Expand Up @@ -151,22 +150,6 @@ object TestUtils extends Logging {
JTestUtils.tempFile(content)
}

/**
* Create a kafka server instance with appropriate test settings
* USING THIS IS A SIGN YOU ARE NOT WRITING A REAL UNIT TEST
*
* @param config The configuration of the server
*/
def createServer(config: KafkaConfig, time: Time = Time.SYSTEM): KafkaServer = {
createServer(config, time, None, startup = true)
}

def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], startup: Boolean): KafkaServer = {
val server = new KafkaServer(config, time, threadNamePrefix)
if (startup) server.startup()
server
}

/**
* Create a test config for the provided parameters.
*
Expand Down

0 comments on commit 4080f19

Please sign in to comment.