From ae23b8097da94c8978b5fd96e08a65956e743e0e Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sun, 6 Oct 2024 16:05:43 +0800 Subject: [PATCH] KAFKA-8779 Fix flaky tests introduced by dynamic log levels --- checkstyle/import-control-core.xml | 1 + .../api/PlaintextAdminIntegrationTest.scala | 183 +++++++----------- .../scala/unit/kafka/utils/TestUtils.scala | 14 ++ 3 files changed, 90 insertions(+), 108 deletions(-) diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index f7caef0cef34f..6355e51f64dcd 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -113,6 +113,7 @@ + diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index d04c9e6ee2224..011d237cebaf3 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -41,7 +41,7 @@ import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfig import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter} -import org.apache.kafka.common.requests.{DeleteRecordsRequest} +import org.apache.kafka.common.requests.DeleteRecordsRequest import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.kafka.common.utils.{Time, Utils} @@ -54,7 +54,7 @@ import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs, ServerLogCon import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo, Timeout} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.slf4j.LoggerFactory @@ -81,7 +81,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val topicPartition = new TopicPartition(topic, partition) private var brokerLoggerConfigResource: ConfigResource = _ - private val changedBrokerLoggers = scala.collection.mutable.Set[String]() @BeforeEach override def setUp(testInfo: TestInfo): Unit = { @@ -92,7 +91,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @AfterEach override def tearDown(): Unit = { - teardownBrokerLoggers() + // Due to the fact that log4j is not re-initialized across tests, changing a logger's log level persists + // across test classes. We need to clean up the changes done after testing. + resetLogging super.tearDown() } @@ -128,7 +129,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { try { val alterLogLevelsEntries = Seq( - new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL) + new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL) ).asJavaCollection val exception = assertThrows(classOf[ExecutionException], () => { @@ -3064,127 +3065,122 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create the logger") val loggerConfig = describeBrokerLoggers() val kafkaLogLevel = loggerConfig.get("kafka").value() - val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Replica") + val clusterReplicaLogLevel = loggerConfig.get("kafka.cluster.Replica") // we expect the log level to be inherited from the first ancestor with a level configured - assertEquals(kafkaLogLevel, logCleanerLogLevelConfig.value()) - assertEquals("kafka.cluster.Replica", logCleanerLogLevelConfig.name()) - assertEquals(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, logCleanerLogLevelConfig.source()) - assertEquals(false, logCleanerLogLevelConfig.isReadOnly) - assertEquals(false, logCleanerLogLevelConfig.isSensitive) - assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty) + assertEquals(kafkaLogLevel, clusterReplicaLogLevel.value()) + assertEquals("kafka.cluster.Replica", clusterReplicaLogLevel.name()) + assertEquals(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, clusterReplicaLogLevel.source()) + assertEquals(false, clusterReplicaLogLevel.isReadOnly) + assertEquals(false, clusterReplicaLogLevel.isSensitive) + assertTrue(clusterReplicaLogLevel.synonyms().isEmpty) } @ParameterizedTest @ValueSource(strings = Array("kraft")) - @Disabled // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = { client = createAdminClient + val ancestorLogger = "kafka"; val initialLoggerConfig = describeBrokerLoggers() - val initialRootLogLevel = initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value() - assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value()) - assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.log.LogCleaner").value()) - assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.server.ReplicaManager").value()) - - val newRootLogLevel = LogLevelConfig.DEBUG_LOG_LEVEL - val alterRootLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET) + val initialAncestorLogLevel = initialLoggerConfig.get("kafka").value() + val initialControllerServerLogLevel = initialLoggerConfig.get("kafka.server.ControllerServer").value() + val initialLogCleanerLogLevel = initialLoggerConfig.get("kafka.log.LogCleaner").value() + val initialReplicaManagerLogLevel = initialLoggerConfig.get("kafka.server.ReplicaManager").value() + + val newAncestorLogLevel = LogLevelConfig.DEBUG_LOG_LEVEL + val alterAncestorLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry(ancestorLogger, newAncestorLogLevel), AlterConfigOp.OpType.SET) ).asJavaCollection // Test validateOnly does not change anything - alterBrokerLoggers(alterRootLoggerEntry, validateOnly = true) + alterBrokerLoggers(alterAncestorLoggerEntry, validateOnly = true) val validatedLoggerConfig = describeBrokerLoggers() - assertEquals(initialRootLogLevel, validatedLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.controller.KafkaController").value()) - assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.log.LogCleaner").value()) - assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.server.ReplicaManager").value()) - assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) + assertEquals(initialAncestorLogLevel, validatedLoggerConfig.get(ancestorLogger).value()) + assertEquals(initialControllerServerLogLevel, validatedLoggerConfig.get("kafka.server.ControllerServer").value()) + assertEquals(initialLogCleanerLogLevel, validatedLoggerConfig.get("kafka.log.LogCleaner").value()) + assertEquals(initialReplicaManagerLogLevel, validatedLoggerConfig.get("kafka.server.ReplicaManager").value()) // test that we can change them and unset loggers still use the root's log level - alterBrokerLoggers(alterRootLoggerEntry) - val changedRootLoggerConfig = describeBrokerLoggers() - assertEquals(newRootLogLevel, changedRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.controller.KafkaController").value()) - assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.log.LogCleaner").value()) - assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.server.ReplicaManager").value()) - assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) - - // alter the ZK client's logger so we can later test resetting it - val alterZKLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) + alterBrokerLoggers(alterAncestorLoggerEntry) + val changedAncestorLoggerConfig = describeBrokerLoggers() + assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get(ancestorLogger).value()) + assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get("kafka.server.ControllerServer").value()) + assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get("kafka.log.LogCleaner").value()) + assertEquals(newAncestorLogLevel, changedAncestorLoggerConfig.get("kafka.server.ReplicaManager").value()) + + // alter the LogCleaner's logger so we can later test resetting it + val alterLogCleanerLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) ).asJavaCollection - alterBrokerLoggers(alterZKLoggerEntry) + alterBrokerLoggers(alterLogCleanerLoggerEntry) val changedZKLoggerConfig = describeBrokerLoggers() - assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, changedZKLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) + assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, changedZKLoggerConfig.get("kafka.log.LogCleaner").value()) // properly test various set operations and one delete val alterLogLevelsEntries = Seq( - new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", LogLevelConfig.TRACE_LOG_LEVEL), AlterConfigOp.OpType.SET), - new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", ""), AlterConfigOp.OpType.DELETE) // should reset to the root logger level ).asJavaCollection alterBrokerLoggers(alterLogLevelsEntries) val alteredLoggerConfig = describeBrokerLoggers() - assertEquals(newRootLogLevel, alteredLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(LogLevelConfig.INFO_LOG_LEVEL, alteredLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(newAncestorLogLevel, alteredLoggerConfig.get(ancestorLogger).value()) + assertEquals(LogLevelConfig.INFO_LOG_LEVEL, alteredLoggerConfig.get("kafka.server.ControllerServer").value()) assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, alteredLoggerConfig.get("kafka.log.LogCleaner").value()) assertEquals(LogLevelConfig.TRACE_LOG_LEVEL, alteredLoggerConfig.get("kafka.server.ReplicaManager").value()) - assertEquals(newRootLogLevel, alteredLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) } /** - * 1. Assume ROOT logger == TRACE - * 2. Change kafka.controller.KafkaController logger to INFO - * 3. Unset kafka.controller.KafkaController via AlterConfigOp.OpType.DELETE (resets it to the root logger - TRACE) - * 4. Change ROOT logger to ERROR - * 5. Ensure the kafka.controller.KafkaController logger's level is ERROR (the current root logger level) + * 1. Assume kafka logger == TRACE + * 2. Change kafka.server.ControllerServer logger to INFO + * 3. Unset kafka.server.ControllerServer via AlterConfigOp.OpType.DELETE (resets it to the kafka logger - TRACE) + * 4. Change kafka logger to ERROR + * 5. Ensure the kafka.server.ControllerServer logger's level is ERROR (the current kafka logger level) */ @ParameterizedTest @ValueSource(strings = Array("kraft")) - @Disabled // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(quorum: String): Unit = { client = createAdminClient - // step 1 - configure root logger - val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL - val alterRootLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, initialRootLogLevel), AlterConfigOp.OpType.SET) + val ancestorLogger = "kafka" + // step 1 - configure kafka logger + val initialAncestorLogLevel = LogLevelConfig.TRACE_LOG_LEVEL + val alterAncestorLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry(ancestorLogger, initialAncestorLogLevel), AlterConfigOp.OpType.SET) ).asJavaCollection - alterBrokerLoggers(alterRootLoggerEntry) + alterBrokerLoggers(alterAncestorLoggerEntry) val initialLoggerConfig = describeBrokerLoggers() - assertEquals(initialRootLogLevel, initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(initialAncestorLogLevel, initialLoggerConfig.get(ancestorLogger).value()) + assertEquals(initialAncestorLogLevel, initialLoggerConfig.get("kafka.server.ControllerServer").value()) - // step 2 - change KafkaController logger to INFO + // step 2 - change ControllerServer logger to INFO val alterControllerLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET) + new AlterConfigOp(new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET) ).asJavaCollection alterBrokerLoggers(alterControllerLoggerEntry) val changedControllerLoggerConfig = describeBrokerLoggers() - assertEquals(initialRootLogLevel, changedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(LogLevelConfig.INFO_LOG_LEVEL, changedControllerLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(initialAncestorLogLevel, changedControllerLoggerConfig.get(ancestorLogger).value()) + assertEquals(LogLevelConfig.INFO_LOG_LEVEL, changedControllerLoggerConfig.get("kafka.server.ControllerServer").value()) - // step 3 - unset KafkaController logger + // step 3 - unset ControllerServer logger val deleteControllerLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", ""), AlterConfigOp.OpType.DELETE) + new AlterConfigOp(new ConfigEntry("kafka.server.ControllerServer", ""), AlterConfigOp.OpType.DELETE) ).asJavaCollection alterBrokerLoggers(deleteControllerLoggerEntry) val deletedControllerLoggerConfig = describeBrokerLoggers() - assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(initialAncestorLogLevel, deletedControllerLoggerConfig.get(ancestorLogger).value()) + assertEquals(initialAncestorLogLevel, deletedControllerLoggerConfig.get("kafka.server.ControllerServer").value()) - val newRootLogLevel = LogLevelConfig.ERROR_LOG_LEVEL - val newAlterRootLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET) + val newAncestorLogLevel = LogLevelConfig.ERROR_LOG_LEVEL + val newAlterAncestorLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry(ancestorLogger, newAncestorLogLevel), AlterConfigOp.OpType.SET) ).asJavaCollection - alterBrokerLoggers(newAlterRootLoggerEntry) - val newRootLoggerConfig = describeBrokerLoggers() - assertEquals(newRootLogLevel, newRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) - assertEquals(newRootLogLevel, newRootLoggerConfig.get("kafka.controller.KafkaController").value()) + alterBrokerLoggers(newAlterAncestorLoggerEntry) + val newAncestorLoggerConfig = describeBrokerLoggers() + assertEquals(newAncestorLogLevel, newAncestorLoggerConfig.get(ancestorLogger).value()) + assertEquals(newAncestorLogLevel, newAncestorLoggerConfig.get("kafka.server.ControllerServer").value()) } @ParameterizedTest @ValueSource(strings = Array("kraft")) - @Disabled // to be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(quorum: String): Unit = { client = createAdminClient val deleteRootLoggerEntry = Seq( @@ -3196,7 +3192,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @ParameterizedTest @ValueSource(strings = Array("kraft")) - @Disabled // To be re-enabled once KAFKA-8779 is resolved def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(quorum: String): Unit = { client = createAdminClient val validLoggerName = "kafka.server.KafkaRequestHandler" @@ -3209,29 +3204,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.APPEND) // append is not supported ).asJavaCollection - assertTrue(assertThrows(classOf[ExecutionException], - () => alterBrokerLoggers(appendLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertInstanceOf(classOf[InvalidRequestException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(appendLogLevelEntries)).getCause) assertLogLevelDidNotChange() val subtractLogLevelEntries = Seq( new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SUBTRACT) // subtract is not supported ).asJavaCollection - assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(subtractLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertInstanceOf(classOf[InvalidRequestException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(subtractLogLevelEntries)).getCause) assertLogLevelDidNotChange() val invalidLogLevelLogLevelEntries = Seq( new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", "OFF"), AlterConfigOp.OpType.SET) // OFF is not a valid log level ).asJavaCollection - assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLogLevelLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertInstanceOf(classOf[InvalidConfigurationException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLogLevelLogLevelEntries)).getCause) assertLogLevelDidNotChange() val invalidLoggerNameLogLevelEntries = Seq( new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid new AlterConfigOp(new ConfigEntry("Some Other LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) // invalid logger name is not supported ).asJavaCollection - assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLoggerNameLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertInstanceOf(classOf[InvalidConfigurationException], assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(invalidLoggerNameLogLevelEntries)).getCause) assertLogLevelDidNotChange() } @@ -3245,18 +3239,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { client = createAdminClient val alterLogLevelsEntries = Seq( - new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL) + new ConfigEntry("kafka.server.ControllerServer", LogLevelConfig.INFO_LOG_LEVEL) ).asJavaCollection val alterResult = client.alterConfigs(Map(brokerLoggerConfigResource -> new Config(alterLogLevelsEntries)).asJava) assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerLoggerConfigResource).get).getCause.isInstanceOf[InvalidRequestException]) } def alterBrokerLoggers(entries: util.Collection[AlterConfigOp], validateOnly: Boolean = false): Unit = { - if (!validateOnly) { - for (entry <- entries.asScala) - changedBrokerLoggers.add(entry.configEntry().name()) - } - client.incrementalAlterConfigs(Map(brokerLoggerConfigResource -> entries).asJava, new AlterConfigsOptions().validateOnly(validateOnly)) .values.get(brokerLoggerConfigResource).get() } @@ -3264,28 +3253,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def describeBrokerLoggers(): Config = client.describeConfigs(Collections.singletonList(brokerLoggerConfigResource)).values.get(brokerLoggerConfigResource).get() - /** - * Due to the fact that log4j is not re-initialized across tests, changing a logger's log level persists across test classes. - * We need to clean up the changes done while testing. - */ - private def teardownBrokerLoggers(): Unit = { - if (changedBrokerLoggers.nonEmpty) { - val validLoggers = describeBrokerLoggers().entries().asScala.filterNot(_.name.equals(Log4jController.ROOT_LOGGER)).map(_.name).toSet - val unsetBrokerLoggersEntries = changedBrokerLoggers - .intersect(validLoggers) - .map { logger => new AlterConfigOp(new ConfigEntry(logger, ""), AlterConfigOp.OpType.DELETE) } - .asJavaCollection - - // ensure that we first reset the root logger to an arbitrary log level. Note that we cannot reset it to its original value - alterBrokerLoggers(List( - new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, LogLevelConfig.FATAL_LOG_LEVEL), AlterConfigOp.OpType.SET) - ).asJavaCollection) - alterBrokerLoggers(unsetBrokerLoggersEntries) - - changedBrokerLoggers.clear() - } - } - @ParameterizedTest @ValueSource(strings = Array("kraft")) def testAppendConfigToEmptyDefaultValue(ignored: String): Unit = { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 326bb1ed9185a..cdf26121c1a99 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -68,6 +68,7 @@ import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.test.{TestUtils => JTestUtils} +import org.apache.log4j.PropertyConfigurator import org.apache.zookeeper.KeeperException.SessionExpiredException import org.apache.zookeeper.ZooDefs._ import org.apache.zookeeper.data.ACL @@ -1865,4 +1866,17 @@ object TestUtils extends Logging { timedOut.set(true) } } + + /** + * Resets the logging configuration after the test. + */ + def resetLogging[T] = { + org.apache.log4j.LogManager.resetConfiguration() + val stream = this.getClass.getResourceAsStream("/log4j.properties") + try { + PropertyConfigurator.configure(stream) + } finally { + stream.close() + } + } }