Skip to content

Commit

Permalink
KAFKA-16368: Update default linger.ms to 5 for KIP-1030
Browse files Browse the repository at this point in the history
  • Loading branch information
jayteej committed Dec 9, 2024
1 parent 31d97bc commit 05d0dd1
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ public class ProducerConfig extends AbstractConfig {
+ "<p>"
+ "Note: This setting gives the upper bound of the batch size to be sent. If we have fewer than this many bytes accumulated "
+ "for this partition, we will 'linger' for the <code>linger.ms</code> time waiting for more records to show up. "
+ "This <code>linger.ms</code> setting defaults to 0, which means we'll immediately send out a record even the accumulated "
+ "batch size is under this <code>batch.size</code> setting.";
+ "This <code>linger.ms</code> setting defaults to 5, which means we'll wait 5ms for the accumulated "
+ "<code>batch.size</code> to fill as much as it can in this timeframe. This value previously defaulted"
+ "to 0, but over time we have observed that the IO overhead caused by smaller batches negates any latency gains.";

/** <code>partitioner.adaptive.partitioning.enable</code> */
public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG = "partitioner.adaptive.partitioning.enable";
Expand Down Expand Up @@ -147,8 +148,9 @@ public class ProducerConfig extends AbstractConfig {
+ "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once "
+ "we get <code>" + BATCH_SIZE_CONFIG + "</code> worth of records for a partition it will be sent immediately regardless of this "
+ "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the "
+ "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>" + LINGER_MS_CONFIG + "=5</code>, "
+ "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.";
+ "specified time waiting for more records to show up. This setting defaults to 5 (i.e. 5ms delay). Increasing <code>" + LINGER_MS_CONFIG + "=50</code>, "
+ "for example, would have the effect of reducing the number of requests sent but would add up to 50ms of latency to records sent in the absence of load."
+ "This value previously defaulted to 0, but over time we have observed that the IO overhead caused by smaller batches negates any latency gains.";

/** <code>request.timeout.ms</code> */
public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
Expand Down Expand Up @@ -383,7 +385,7 @@ public class ProducerConfig extends AbstractConfig {
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
.define(PARTITIONER_IGNORE_KEYS_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, PARTITIONER_IGNORE_KEYS_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 5, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ import scala.jdk.CollectionConverters._
class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
val producerCount: Int = 1
val brokerCount: Int = 2
val defaultLingerMs: Int = 5;

serverConfig.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 2.toString)
serverConfig.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, 2.toString)
serverConfig.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, false.toString)

producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000L.toString)
producerConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000.toString)
producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000.toString)
producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, (1000 + defaultLingerMs).toString)

/**
* Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,8 @@ val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new Con
private var _retries = Int.MaxValue
private var _acks = -1
private var _requestTimeoutMs = 30000
private var _deliveryTimeoutMs = 30000
private val defaultLingerMs = 5;
private var _deliveryTimeoutMs = 30000 + defaultLingerMs

def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this }
def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
Expand Down

0 comments on commit 05d0dd1

Please sign in to comment.