Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18026: KIP-1112, document new config and update the Streams upgrade guide #17906

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions docs/streams/developer-guide/config-streams.html
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
<li><a class="reference internal" href="#probing-rebalance-interval-ms" id="id30">probing.rebalance.interval.ms</a></li>
<li><a class="reference internal" href="#processing-exception-handler" id="id41">processing.exception.handler</a></li>
<li><a class="reference internal" href="#processing-guarantee" id="id25">processing.guarantee</a></li>
<li><a class="reference internal" href="#processor-wrapper-class" id="id42">processor.wrapper.class</a></li>
<li><a class="reference internal" href="#production-exception-handler" id="id24">production.exception.handler</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-non-overlap-cost" id="id37">rack.aware.assignment.non_overlap_cost</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-strategy" id="id35">rack.aware.assignment.strategy</a></li>
Expand Down Expand Up @@ -408,76 +409,83 @@ <h4><a class="toc-backref" href="#id23">num.standby.replicas</a><a class="header
<code class="docutils literal"><span class="pre">"exactly_once"</span></code> (for EOS version 1) and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> (for EOS version 2, requires broker version 2.5+)</td>.
<td>See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a></td>
</tr>
<tr class="row-odd"><td>production.exception.handler</td>
<tr class="row-odd"><td>processor.wrapper.class</td>
<td>Medium</td>
<td colspan="2">A class or class name implementing the <code class="docutils literal"><span class="pre">ProcessorWrapper</span></code> interface.
Must be passed in when creating the topology, and will not be applied unless passed in to the appropriate constructor as a TopologyConfig. You should
use the <code class="docutils literal"><span class="pre">StreamsBuilder#new(TopologyConfig)</span></code> constructor for DSL applications, and the
<code class="docutils literal"><span class="pre">Topology#new(TopologyConfig)</span></code> constructor for PAPI applications.</td>
</tr>
<tr class="row-even"><td>production.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
</tr>
<tr class="row-even"><td>poll.ms</td>
<tr class="row-odd"><td>poll.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to block waiting for input.</td>
<td>100 milliseconds</td>
</tr>
<tr class="row-odd"><td>rack.aware.assignment.tags</td>
<tr class="row-even"><td>rack.aware.assignment.tags</td>
<td>Medium</td>
<td colspan="2">List of tag keys used to distribute standby replicas across Kafka Streams
clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over
clients with different tag values.</td>
<td>the empty list</td>
</tr>
<tr class="row-even"><td>replication.factor</td>
<tr class="row-odd"><td>replication.factor</td>
<td>Medium</td>
<td colspan="2">The replication factor for changelog topics and repartition topics created by the application.
The default of <code>-1</code> (meaning: use broker default replication factor) requires broker version 2.4 or newer.</td>
<td><code class="docutils literal"><span class="pre">-1</span></code></td>
</tr>
<tr class="row-odd"><td>retry.backoff.ms</td>
<tr class="row-even"><td>retry.backoff.ms</td>
<td>Medium</td>
<td colspan="2">The amount of time in milliseconds, before a request is retried.</td>
<td><code class="docutils literal"><span class="pre">100</span></code></td>
</tr>
<tr class="row-even"><td>rocksdb.config.setter</td>
<tr class="row-odd"><td>rocksdb.config.setter</td>
<td>Medium</td>
<td colspan="2">The RocksDB configuration.</td>
<td></td>
</tr>
<tr class="row-odd"><td>state.cleanup.delay.ms</td>
<tr class="row-even"><td>state.cleanup.delay.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
<td>600000 milliseconds (10 minutes)</td>
</tr>
<tr class="row-even"><td>state.dir</td>
<tr class="row-odd"><td>state.dir</td>
<td>High</td>
<td colspan="2">Directory location for state stores.</td>
<td><code class="docutils literal"><span class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td>
</tr>
<tr class="row-odd"><td>task.assignor.class</td>
<tr class="row-even"><td>task.assignor.class</td>
<td>Medium</td>
<td colspan="2">A task assignor class or class name implementing the <code>TaskAssignor</code> interface.</td>
<td>The high-availability task assignor.</td>
</tr>
<tr class="row-even"><td>task.timeout.ms</td>
<tr class="row-odd"><td>task.timeout.ms</td>
<td>Medium</td>
<td colspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td>
<td>300000 milliseconds (5 minutes)</td>
</tr>
<tr class="row-odd"><td>topology.optimization</td>
<tr class="row-even"><td>topology.optimization</td>
<td>Medium</td>
<td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: <code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>),
<code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> (<code>single.store.self.join</code>). </td>
<td><code>NO_OPTIMIZATION</code></td>
</tr>
<tr class="row-even"><td>upgrade.from</td>
<tr class="row-odd"><td>upgrade.from</td>
<td>Medium</td>
<td colspan="2">The version you are upgrading from during a rolling upgrade.</td>
<td>See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td>
</tr>
<tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
<tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td>86400000 milliseconds (1 day)</td>
</tr>
<tr class="row-even"><td>window.size.ms</td>
<tr class="row-odd"><td>window.size.ms</td>
<td>Low</td>
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
Expand Down Expand Up @@ -998,6 +1006,23 @@ <h4><a class="toc-backref" href="#id30">probing.rebalance.interval.ms</a><a clas
</dl>
</div></blockquote>
</div>
<div class="section" id="processor-wrapper-class">
<span id="streams-developer-guide-processor-wrapper-class"></span><h4><a class="toc-backref" href="#id42">processor.wrapper.class</a><a class="headerlink" href="#processor-wrapper-class" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
A class or class name implementing the <code class="docutils literal"><span class="pre">ProcessorWrapper</span></code> interface. This feature allows you to wrap any of the
processors in the compiled topology, including both custom processor implementations and those created by Streams for DSL operators. This can be useful for logging or tracing
implementations since it allows access to the otherwise-hidden processor context for DSL operators, and also allows for injecting additional debugging information to an entire
application topology with just a single config.
</p>
<p>
IMPORTANT: This MUST be passed in when creating the topology, and will not be applied unless passed in to the appropriate topology-building constructor. You should
use the <code class="docutils literal"><span class="pre">StreamsBuilder#new(TopologyConfig)</span></code> constructor for DSL applications, and the
<code class="docutils literal"><span class="pre">Topology#new(TopologyConfig)</span></code> constructor for PAPI applications.
</p>
</div></blockquote>
</div>
<div class="section" id="replication-factor">
<span id="replication-factor-parm"></span><h4><a class="toc-backref" href="#id13">replication.factor</a><a class="headerlink" href="#replication-factor" title="Permalink to this headline"></a></h4>
<blockquote>
Expand Down
36 changes: 33 additions & 3 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ <h1>Upgrade Guide and API Changes</h1>
<p>For a table that shows Streams API compatibility with Kafka broker versions, see <a href="#streams_api_broker_compat">Broker Compatibility</a>.</p>

<h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"></a><a href="#streams_notable_changes">Notable compatibility changes in past releases</a></h3>

<p>
Starting in version 4.0.0, Kafka Streams will only be compatible when running against brokers on version 2.1
or higher. Additionally, exactly-once semantics (EOS) will require brokers to be at least version 2.5.
</p>

<p>
Downgrading from 3.5.x or newer version to 3.4.x or older version needs special attention:
Since 3.5.0 release, Kafka Streams uses a new serialization format for repartition topics.
Expand Down Expand Up @@ -155,6 +161,18 @@ <h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API
<code>TransformerSupplier</code>, <code>ValueTransformer</code>, and <code>ValueTransformerSupplier</code>.
</p>

<p>
You can now configure your topology with a <code>ProcessorWrapper</code>, which allows you to access and optionally wrap/replace
any processor in the topology by injecting an alternative <code>ProcessorSupplier</code> in its place. This can be used to peek
records and access the processor context even for DSL operators, for example to implement a logging or tracing framework, or to
aid in testing or debugging scenarios. You must implement the <code>ProcessorWrapper</code> interface and then pass the class
or class name into the configs via the new <code>StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG</code> config. NOTE: this config is
applied during the topology building phase, and therefore will not take effect unless the config is passed in when creating
the StreamsBuilder (DSL) or Topology(PAPI) objects. You MUST use the StreamsBuilder/Topology constructor overload that
accepts a TopologyConfig parameter for the <code>StreamsConfig#PROCESSOR_WRAPPER_CLASS_CONFIG</code> to be picked up.
See <a href="https://cwiki.apache.org/confluence/x/TZCMEw">KIP-1112</a> for more details.
</p>

<h3><a id="streams_api_changes_390" href="#streams_api_changes_390">Streams API changes in 3.9.0</a></h3>

<p>
Expand Down Expand Up @@ -1634,27 +1652,30 @@ <h3 class="anchor-heading"><a id="streams_api_broker_compat" class="anchor-link"
<thead>
<tr>
<th></th>
<th colspan="3">Kafka Broker (columns)</th>
<th colspan="4">Kafka Broker (columns)</th>
</tr>
</thead>
<tbody>
<tr>
<td>Kafka Streams API (rows)</td>
<td>0.10.0.x</td>
<td>0.10.1.x and 0.10.2.x</td>
<td>0.11.0.x and<br>1.0.x and<br>1.1.x and<br>2.0.x and<br>2.1.x and<br>2.2.x and<br>2.3.x and<br>2.4.x and<br>2.5.x and<br>2.6.x and<br>2.7.x and<br>2.8.x and<br>3.0.x and<br>3.1.x and<br>3.2.x and<br>3.3.x and<br>3.4.x and<br>3.5.x and<br>3.6.x and<br>3.7.x</td>
<td>0.11.0.x and<br>1.0.x and<br>1.1.x and<br>2.0.x</td>
<td>2.1.x and<br>2.2.x and<br>2.3.x and<br>2.4.x and<br>2.5.x and<br>2.6.x and<br>2.7.x and<br>2.8.x and<br>3.0.x and<br>3.1.x and<br>3.2.x and<br>3.3.x and<br>3.4.x and<br>3.5.x and<br>3.6.x and<br>3.7.x and<br>3.8.x and<br>3.9.x and<br>4.0.x</td>
</tr>
<tr>
<td>0.10.0.x</td>
<td>compatible</td>
<td>compatible</td>
<td>compatible</td>
<td>compatible</td>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma just to confirm my reading of KIP-896, the change only affects older clients compatibility with newer brokers, not the other way around, right? In other words a 0.11 client could still run against a 4.0 broker?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words a 0.11 client could still run against a 4.0 broker?

I don't think so. According to KIP-896:

Users relying on clients not included in this document will have to ensure they are using a version that does not rely on protocol API versions that are being removed as part of this proposal.

This means that broker version 4.x can only communicate with clients version 2.1 and above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct @chia7712. This raises the minimum version for everything to 2.1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for confirming -- glad I checked 🙂

</tr>
<tr>
<td>0.10.1.x and 0.10.2.x</td>
<td></td>
<td>compatible</td>
<td>compatible</td>
<td>compatible</td>
</tr>
<tr>
<td>0.11.0.x</td>
Expand All @@ -1667,12 +1688,21 @@ <h3 class="anchor-heading"><a id="streams_api_broker_compat" class="anchor-link"
<td></td>
<td>compatible with exactly-once turned off<br>(requires broker version 0.11.0.x or higher);<br>requires message format 0.10 or higher;<br>message headers are not supported<br>(requires broker version 0.11.0.x or higher<br>with message format 0.11 or higher)</td>
<td>compatible; requires message format 0.10 or higher;<br>if message headers are used, message format 0.11<br>or higher required</td>
<td>compatible</td>
</tr>
<tr>
<td>2.2.1 and<br>2.3.x and<br>2.4.x and<br>2.5.x and<br>2.6.x and<br>2.7.x and<br>2.8.x and<br>3.0.x and<br>3.1.x and<br>3.2.x and<br>3.3.x and<br>3.4.x and<br>3.5.x and<br>3.6.x and<br>3.7.x</td>
<td>2.2.1 and<br>2.3.x and<br>2.4.x and<br>2.5.x and<br>2.6.x and<br>2.7.x and<br>2.8.x and<br>3.0.x and<br>3.1.x and<br>3.2.x and<br>3.3.x and<br>3.4.x and<br>3.5.x and<br>3.6.x and<br>3.7.x and<br>3.8.x and<br>3.9.x</td>
<td></td>
<td></td>
<td>compatible; requires message format 0.11 or higher;<br>enabling exactly-once v2 requires 2.4.x or higher</td>
Copy link
Contributor Author

@ableegoldman ableegoldman Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax is this right? I always thought eosv2 needed brokers to be 2.5 or higher, not 2.4. Am I misremembering this or is the matrix wrong?

This error message could also be wrong I suppose but this log in StreamThread at least seems to agree it's 2.5:

log.error("Shutting down because the Kafka cluster seems to be on a too old version. " +
                              "Setting {}=\"{}\" requires broker version 2.5 or higher.",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok according to the KafkaProducer javadocs for #sendOffsetsToTransaction it's 2.5 so I think that's right. I'll fix it here but it might be good to go back and fix this for the previous javadocs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, EOSv2 does require 2.5 brokers.

<td>compatible</td>
</tr>
<tr>
<td>4.0.x</td>
<td></td>
<td></td>
<td></td>
<td>compatible with exactly-once turned off<br>(requires broker version 2.4.x or higher)</td>
</tr>
</tbody>
</table>
Expand Down