Skip to content

Commit

Permalink
KAFKA-3579; Reference both old and new consumer properties in `TopicC…
Browse files Browse the repository at this point in the history
…ommand`

Add references to the new consumer property 'max.partition.fetch.bytes' along with the old consumer property 'fetch.message.max.bytes' in the corresponding warning messages of TopicCommand.
Also, create and leverage a static variable for the default value of the new consumer property.
Also, use 'DEFAULT_...' for default propoerty constant names in the code instead of '..._DEFAULT'.

Author: Vahid Hashemian <[email protected]>

Reviewers: Manikumar reddy O <[email protected]>, Ashish Singh <[email protected]>, Grant Henke <[email protected]>, Ismael Juma <[email protected]>

Closes apache#1239 from vahidhashemian/KAFKA-3579
  • Loading branch information
vahidhashemian authored and ijuma committed May 8, 2016
1 parent 51f7a35 commit 62b9fa2
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class ConsumerConfig extends AbstractConfig {
*/
public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.";
public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024;

/** <code>send.buffer.bytes</code> */
public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
Expand Down Expand Up @@ -184,7 +185,7 @@ public class ConsumerConfig extends AbstractConfig {
public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";
private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal topics (such as offsets) should be exposed to the consumer. "
+ "If set to <code>true</code> the only way to receive records from an internal topic is subscribing to it.";
public static final boolean EXCLUDE_INTERNAL_TOPICS_DEFAULT = true;
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;

static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Expand Down Expand Up @@ -231,7 +232,7 @@ public class ConsumerConfig extends AbstractConfig {
CommonClientConfigs.CLIENT_ID_DOC)
.define(MAX_PARTITION_FETCH_BYTES_CONFIG,
Type.INT,
1 * 1024 * 1024,
DEFAULT_MAX_PARTITION_FETCH_BYTES,
atLeast(0),
Importance.HIGH,
MAX_PARTITION_FETCH_BYTES_DOC)
Expand Down Expand Up @@ -332,7 +333,7 @@ public class ConsumerConfig extends AbstractConfig {
MAX_POLL_RECORDS_DOC)
.define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
Type.BOOLEAN,
EXCLUDE_INTERNAL_TOPICS_DEFAULT,
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
Importance.MEDIUM,
EXCLUDE_INTERNAL_TOPICS_DOC)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void setup() {
this.partitionAssignor.clear();

client.setNode(node);
this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, autoCommitEnabled);
this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled);
}

@After
Expand Down Expand Up @@ -735,7 +735,7 @@ public void testAutoCommitDynamicAssignment() {
final String consumerId = "consumer";

ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);

subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
Expand All @@ -761,7 +761,7 @@ public void testAutoCommitDynamicAssignmentRebalance() {
final String consumerId = "consumer";

ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);

subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
subscriptions.needReassignment();
Expand Down Expand Up @@ -789,7 +789,7 @@ public void testAutoCommitDynamicAssignmentRebalance() {
@Test
public void testAutoCommitManualAssignment() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);

subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 100);
Expand All @@ -807,7 +807,7 @@ public void testAutoCommitManualAssignment() {
@Test
public void testAutoCommitManualAssignmentCoordinatorUnknown() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);

subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 100);
Expand Down Expand Up @@ -1096,7 +1096,7 @@ public void testProtocolMetadataOrder() {

try (Metrics metrics = new Metrics(time)) {
ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range),
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false);
List<ProtocolMetadata> metadata = coordinator.metadata();
assertEquals(2, metadata.size());
assertEquals(roundRobin.name(), metadata.get(0).name());
Expand All @@ -1105,7 +1105,7 @@ public void testProtocolMetadataOrder() {

try (Metrics metrics = new Metrics(time)) {
ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin),
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false);
List<ProtocolMetadata> metadata = coordinator.metadata();
assertEquals(2, metadata.size());
assertEquals(range.name(), metadata.get(0).name());
Expand Down
35 changes: 20 additions & 15 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.admin
import java.util.Properties
import joptsimple._
import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException}
import kafka.consumer.{ConsumerConfig, Whitelist}
import kafka.consumer.{ConsumerConfig => OldConsumerConfig, Whitelist}
import kafka.coordinator.GroupCoordinator
import kafka.log.{Defaults, LogConfig}
import kafka.server.ConfigType
Expand All @@ -31,6 +31,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConversions._
import scala.collection._
import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
import org.apache.kafka.common.internals.TopicConstants


Expand Down Expand Up @@ -383,30 +384,34 @@ object TopicCommand extends Logging {
def shortMessageSizeWarning(maxMessageBytes: Int): String = {
"\n\n" +
"*****************************************************************************************************\n" +
"*** WARNING: you are creating a topic where the max.message.bytes is greater than the consumer ***\n" +
"*** default. This operation is potentially dangerous. Consumers will get failures if their ***\n" +
"*** fetch.message.max.bytes < the value you are using. ***\n" +
"*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker's ***\n" +
"*** default max.message.bytes. This operation is potentially dangerous. Consumers will get ***\n" +
s"*** failures if their fetch.message.max.bytes (old consumer) or ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG} ***\n"+
"*** (new consumer) < the value you are using. ***\n" +
"*****************************************************************************************************\n" +
s"- value set here: $maxMessageBytes\n" +
s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n" +
s"- Default Old Consumer fetch.message.max.bytes: ${OldConsumerConfig.FetchSize}\n" +
s"- Default New Consumer ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: ${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n" +
s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n\n"
}

def longMessageSizeWarning(maxMessageBytes: Int): String = {
"\n\n" +
"****************************************************************************************************\n" +
"*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker ***\n" +
"*** default. This operation is dangerous. There are two potential side effects: ***\n" +
"*** - Consumers will get failures if their fetch.message.max.bytes < the value you are using ***\n" +
"*** - Producer requests larger than replica.fetch.max.bytes will not replicate and hence have ***\n" +
"*** a higher risk of data loss ***\n" +
"*** You should ensure both of these settings are greater than the value set here before using ***\n" +
"*** this topic. ***\n" +
"****************************************************************************************************\n" +
"*****************************************************************************************************\n" +
"*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker's ***\n" +
"*** default max.message.bytes. This operation is dangerous. There are two potential side effects: ***\n" +
"*** - Consumers will get failures if their fetch.message.max.bytes (old consumer) or ***\n" +
s"*** ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG} (new consumer) < the value you are using ***\n" +
"*** - Producer requests larger than replica.fetch.max.bytes will not replicate and hence have ***\n" +
"*** a higher risk of data loss ***\n" +
"*** You should ensure both of these settings are greater than the value set here before using ***\n" +
"*** this topic. ***\n" +
"*****************************************************************************************************\n" +
s"- value set here: $maxMessageBytes\n" +
s"- Default Broker replica.fetch.max.bytes: ${kafka.server.Defaults.ReplicaFetchMaxBytes}\n" +
s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n" +
s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n\n"
s"- Default Old Consumer fetch.message.max.bytes: ${OldConsumerConfig.FetchSize}\n" +
s"- Default New Consumer ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: ${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n\n"
}
}

0 comments on commit 62b9fa2

Please sign in to comment.