Skip to content

Commit

Permalink
KAFKA-15853 Move KafkaConfig log properties and docs out of core (apa…
Browse files Browse the repository at this point in the history
…che#15569)

Reviewers: Mickael Maison <[email protected]>, Nikolay <[email protected]>, Federico Valeri <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
OmniaGM authored Apr 19, 2024
1 parent 76e0891 commit ecb2dd4
Show file tree
Hide file tree
Showing 67 changed files with 602 additions and 518 deletions.
4 changes: 4 additions & 0 deletions checkstyle/import-control-server-common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,13 @@
<allow class="org.apache.kafka.server.util.ShutdownableThread" />
</subpackage>
</subpackage>
<subpackage name="config">
<allow pkg="org.apache.kafka.server"/>
</subpackage>
</subpackage>

<subpackage name="admin">
<allow pkg="org.apache.kafka.server.common" />
</subpackage>

</import-control>
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;

/**
* Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for
Expand Down Expand Up @@ -163,7 +165,7 @@ private void doStart() {
putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0);
putIfAbsent(brokerConfig, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) brokers.length);
putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false);
putIfAbsent(brokerConfig, AUTO_CREATE_TOPICS_ENABLE_CONFIG, false);
// reduce the size of the log cleaner map to reduce test memory usage
putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);

Expand All @@ -177,7 +179,7 @@ private void doStart() {
for (int i = 0; i < brokers.length; i++) {
brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
currentBrokerLogDirs[i] = currentBrokerLogDirs[i] == null ? createLogDir() : currentBrokerLogDirs[i];
brokerConfig.put(KafkaConfig.LogDirProp(), currentBrokerLogDirs[i]);
brokerConfig.put(LOG_DIR_CONFIG, currentBrokerLogDirs[i]);
if (!hasListenerConfig)
brokerConfig.put(KafkaConfig.ListenersProp(), listenerName.value() + "://localhost:" + currentBrokerPorts[i]);
brokers[i] = TestUtils.createServer(new KafkaConfig(brokerConfig, true), time);
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@

import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;

/**
* This class is responsible for
Expand Down Expand Up @@ -282,7 +283,7 @@ private void configureRLMM() {
rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());

rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
rlmmProps.put(LOG_DIR_CONFIG, logDir);
rlmmProps.put("cluster.id", clusterId);

remoteLogMetadataManager.configure(rlmmProps);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/MetadataLogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package kafka.raft

import kafka.server.KafkaConfig
import org.apache.kafka.common.config.AbstractConfig
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.server.config.ServerLogConfigs

final case class MetadataLogConfig(
logSegmentBytes: Int,
Expand All @@ -42,7 +42,7 @@ object MetadataLogConfig {
config.getLong(KafkaConfig.MetadataMaxRetentionMillisProp),
maxBatchSizeInBytes,
maxFetchSizeInBytes,
LogConfig.DEFAULT_FILE_DELETE_DELAY_MS,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
config.getInt(KafkaConfig.NodeIdProp)
)
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
import org.apache.kafka.storage.internals
Expand Down Expand Up @@ -553,7 +554,7 @@ object KafkaMetadataLog extends Logging {
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.maxBatchSizeInBytes.toString)
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString)
props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, LogConfig.DEFAULT_FILE_DELETE_DELAY_MS.toString)
props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT.toString)

// Disable time and byte retention when deleting segments
props.setProperty(TopicConfig.RETENTION_MS_CONFIG, "-1")
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import kafka.metrics.LinuxIoMetricsCollector
import kafka.migration.MigrationPropagator
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
Expand All @@ -46,6 +45,7 @@ import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.security.{CredentialProvider, PasswordEncoder}
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
Expand Down Expand Up @@ -207,9 +207,9 @@ class ControllerServer(
sharedServer.startForController()

createTopicPolicy = Option(config.
getConfiguredInstance(CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy]))
getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, classOf[CreateTopicPolicy]))
alterConfigPolicy = Option(config.
getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigPolicy]))

val voterConnections = FutureUtils.waitWithLogging(logger.underlying, logIdent,
"controller quorum voters future",
Expand Down
32 changes: 16 additions & 16 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ConfigType, KafkaSecurityConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin
import org.apache.kafka.server.telemetry.ClientTelemetry
import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
Expand Down Expand Up @@ -120,14 +120,14 @@ object DynamicBrokerConfig {

def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
name match {
case KafkaConfig.LogRollTimeMillisProp | KafkaConfig.LogRollTimeHoursProp =>
List(KafkaConfig.LogRollTimeMillisProp, KafkaConfig.LogRollTimeHoursProp)
case KafkaConfig.LogRollTimeJitterMillisProp | KafkaConfig.LogRollTimeJitterHoursProp =>
List(KafkaConfig.LogRollTimeJitterMillisProp, KafkaConfig.LogRollTimeJitterHoursProp)
case KafkaConfig.LogFlushIntervalMsProp => // LogFlushSchedulerIntervalMsProp is used as default
List(KafkaConfig.LogFlushIntervalMsProp, KafkaConfig.LogFlushSchedulerIntervalMsProp)
case KafkaConfig.LogRetentionTimeMillisProp | KafkaConfig.LogRetentionTimeMinutesProp | KafkaConfig.LogRetentionTimeHoursProp =>
List(KafkaConfig.LogRetentionTimeMillisProp, KafkaConfig.LogRetentionTimeMinutesProp, KafkaConfig.LogRetentionTimeHoursProp)
case ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG | ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG =>
List(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG)
case ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG | ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG =>
List(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG)
case ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG => // KafkaLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG is used as default
List(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG)
case ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG | ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG | ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG =>
List(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG)
case ListenerConfigRegex(baseName) if matchListenerOverride =>
// `ListenerMechanismConfigs` are specified as listenerPrefix.mechanism.<configName>
// and other listener configs are specified as listenerPrefix.<configName>
Expand Down Expand Up @@ -674,7 +674,7 @@ object DynamicLogConfig {
// Exclude message.format.version for now since we need to check that the version
// is supported on all brokers in the cluster.
@nowarn("cat=deprecation")
val ExcludedConfigs = Set(KafkaConfig.LogMessageFormatVersionProp)
val ExcludedConfigs = Set(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG)

val ReconfigurableConfigs = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet -- ExcludedConfigs
val KafkaConfigToLogConfigName = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
Expand All @@ -697,11 +697,11 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
if (logRetentionMs != -1L && logLocalRetentionMs != -2L) {
if (logLocalRetentionMs == -1L) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs,
s"Value must not be -1 as ${KafkaConfig.LogRetentionTimeMillisProp} value is set as $logRetentionMs.")
s"Value must not be -1 as ${ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG} value is set as $logRetentionMs.")
}
if (logLocalRetentionMs > logRetentionMs) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs,
s"Value must not be more than ${KafkaConfig.LogRetentionTimeMillisProp} property value: $logRetentionMs")
s"Value must not be more than ${ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG} property value: $logRetentionMs")
}
}
}
Expand All @@ -712,11 +712,11 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
if (logRetentionBytes > -1 && logLocalRetentionBytes != -2) {
if (logLocalRetentionBytes == -1) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes,
s"Value must not be -1 as ${KafkaConfig.LogRetentionBytesProp} value is set as $logRetentionBytes.")
s"Value must not be -1 as ${ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG} value is set as $logRetentionBytes.")
}
if (logLocalRetentionBytes > logRetentionBytes) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes,
s"Value must not be more than ${KafkaConfig.LogRetentionBytesProp} property value: $logRetentionBytes")
s"Value must not be more than ${ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG} property value: $logRetentionBytes")
}
}
}
Expand Down Expand Up @@ -771,7 +771,7 @@ object DynamicThreadPool {
val ReconfigurableConfigs = Set(
KafkaConfig.NumIoThreadsProp,
ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG,
KafkaConfig.NumRecoveryThreadsPerDataDirProp,
ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG,
KafkaConfig.BackgroundThreadsProp)

def validateReconfiguration(currentConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
Expand All @@ -796,7 +796,7 @@ object DynamicThreadPool {
name match {
case KafkaConfig.NumIoThreadsProp => config.numIoThreads
case ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG => config.numReplicaFetchers
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => config.numRecoveryThreadsPerDataDir
case ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG => config.numRecoveryThreadsPerDataDir
case KafkaConfig.BackgroundThreadsProp => config.backgroundThreads
case n => throw new IllegalStateException(s"Unexpected config $n")
}
Expand Down
Loading

0 comments on commit ecb2dd4

Please sign in to comment.