Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CompressionLevel and make v2 Kafka sink default #19169

Merged
merged 4 commits into from
Dec 10, 2024

Conversation

kathancox
Copy link
Contributor

@kathancox kathancox commented Nov 21, 2024

Fixes DOC-11339, DOC-10867, DOC-10830, DOC-10700

This PR:

  • Updates the kafka_sink_config option with the CompressionLevel field in v24.3.
  • Removes the note for the cluster setting to enable the v2 Kafka sink, because this is the default in v24.3.
  • Adds the cluster setting changefeed.sink-io-workers under Kafka for the default v2 sink.

Rendered Preview

https://deploy-preview-19169--cockroachdb-docs.netlify.app/docs/v24.3/changefeed-sinks.html#kafka-sink-configuration

Copy link

netlify bot commented Nov 21, 2024

Deploy Preview for cockroachdb-api-docs canceled.

Name Link
🔨 Latest commit 3a37922
🔍 Latest deploy log https://app.netlify.com/sites/cockroachdb-api-docs/deploys/6758748e418d0400075be291

Copy link

netlify bot commented Nov 21, 2024

Deploy Preview for cockroachdb-interactivetutorials-docs canceled.

Name Link
🔨 Latest commit 3a37922
🔍 Latest deploy log https://app.netlify.com/sites/cockroachdb-interactivetutorials-docs/deploys/6758748ebd59410008ecd282

Copy link

netlify bot commented Nov 21, 2024

Netlify Preview

Name Link
🔨 Latest commit 3a37922
🔍 Latest deploy log https://app.netlify.com/sites/cockroachdb-docs/deploys/6758748e3ab5800008c0a51f
😎 Deploy Preview https://deploy-preview-19169--cockroachdb-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@kathancox kathancox force-pushed the kafka-v2-sink-updates branch 2 times, most recently from c9da936 to de9c4ec Compare November 21, 2024 21:23
@kathancox kathancox marked this pull request as ready for review November 21, 2024 21:26
The `kafka_sink_config` option allows configuration of a changefeed's message delivery, Kafka server version, and batching parameters.
You can configure flushing, acknowledgments, compression, and concurrency behavior of changefeeds running to a Kafka sink with the following:

- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Kafka sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [Google Cloud Pub/Sub](#google-cloud-pub-sub) sinks and [webhook sinks](#webhook-sink).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to add this now that the v2 Kafka sink is the default? (This paragraph is included for Pub/Sub + Webhook too.)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add the caveat that this only applies if running with the v2 kafka sink

Copy link
Contributor Author

@kathancox kathancox Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not using the terminology v2 Kafka sink (as requested by Rachael), so I'll reference the cluster setting etc. here.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good thanks. should i update the linked issue to use that terminology too?

@kathancox kathancox requested a review from asg0451 November 22, 2024 17:18
@@ -154,6 +146,7 @@ Field | Type | Description | Default
`"Version"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets the appropriate Kafka cluster version, which can be used to connect to [Kafka versions < v1.0](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) (`kafka_sink_config='{"Version": "0.8.2.0"}'`). | `"1.0.0.0"`
<a name="kafka-required-acks"></a>`"RequiredAcks"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specifies what a successful write to Kafka is. CockroachDB [guarantees at least once delivery of messages]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) — this value defines the **delivery**. The possible values are: <br><br>`"ONE"`: a write to Kafka is successful once the leader node has committed and acknowledged the write. Note that this has the potential risk of dropped messages; if the leader node acknowledges before replicating to a quorum of other Kafka nodes, but then fails.<br><br>`"NONE"`: no Kafka brokers are required to acknowledge that they have committed the message. This will decrease latency and increase throughput, but comes at the cost of lower consistency.<br><br>`"ALL"`: a quorum must be reached (that is, most Kafka brokers have committed the message) before the leader can acknowledge. This is the highest consistency level. {% include {{ page.version.version }}/cdc/kafka-acks.md %} | `"ONE"`
<a name="kafka-compression"></a>`"Compression"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets a compression protocol that the changefeed should use when emitting events. The possible values are: `"NONE"`, `"GZIP"`, `"SNAPPY"`, `"LZ4"`, `"ZSTD"`. | `"NONE"`
<span class="version-tag">New in v24.3:</span>`"CompressionLevel"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the level of compression. This determines the level of compression ratio versus compression speed, i.e., how much the data size is reduced (better compression) and how quickly the compression process is completed. The compression protocols have the following ranges:<br>`GZIP`:<ul><li>`0` no compression</li><li>`1` to `9` best speed to best compression</li><li>`-1` default</li><li>`-2` [Huffman-only compression](https://en.wikipedia.org/wiki/Huffman_coding)</li></ul>`ZSTD`:<ul><li>`1` fastest</li><li>`2` default</li><li>`3` better compression</li><li>`4` best compression</li></ul>`LZ4`<ul><li>0 fast default</li><li>`512 * N` Level N, where N is between `1` and `9`. The higher the number, the better compression</li></ul>**Note:** If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. `SNAPPY` does not support `CompressionLevel`. | `GZIP`: `-1`<br><br>`ZSTD`: `2`<br><br>`LZ4`: `0`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you mind cleaning this up a bit? capitalization, more words, english, etc. eg - 0: No Compression

also there's a gzip compression level -3: stateless compression

also, i just found that in kafkav2, it won't let you set compression level < 0. this should be a known issue i guess. just filed an issue for it: cockroachdb/cockroach#136492

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also the formula for lz4 is actually 2^(8 + N) where n is between 1 and 9 inclusive), and 0 = "fast"

might be easier to just list the values tbh.
Screenshot 2024-12-02 at 12 25 29 PM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asg0451 I changed this up a bit, the table for all the information wasn't working so I added some subsections with the table acting more as a list. You'll have to excuse the diff noise of the other field subsections. The LZ4 values, I haven't added those — I've instead stuck to the same format as GZIP using the LZ4 levels, let me know what you think.

For the default GZIP value, I've left it as -1, but noted you can't manually set this as such — this is what I interpreted from your Slack message, so I may be wrong here.

I added the known limitation and tried to be clear about how this does/doesn't apply.

PTAL! Here's the preview: https://deploy-preview-19169--cockroachdb-docs.netlify.app/docs/v24.3/changefeed-sinks.html#kafka-sink-configuration

Copy link

@asg0451 asg0451 Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the confusion -- in the screenshot above the values you specify are the ones in grey, not the constant names. so for "Level1" you specify 512, "Level2" 1024, etc.

Copy link
Contributor Author

@kathancox kathancox Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhhh, I'm sorry, I didn't grasp that the values were what the user had to specify. I will update.

@kathancox kathancox requested a review from asg0451 December 3, 2024 15:41
@@ -0,0 +1 @@
Changefeeds created in v24.3 of CockroachDB that emit to [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), or changefeeds created in earlier versions with the `changefeed.new_kafka_sink.enabled` cluster setting enabled, do not support negative compression level values in the [`kafka_sink_config = {... "CompressionLevel" = ...}`]({% link {{ page.version.version }}/changefeed-sinks.md %}#compressionlevel) option field. [#136492](https://github.com/cockroachdb/cockroach/issues/136492)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you call out that this is specifically for GZIP pls

The `kafka_sink_config` option allows configuration of a changefeed's message delivery, Kafka server version, and batching parameters.
You can configure flushing, acknowledgments, compression, and concurrency behavior of changefeeds running to a Kafka sink with the following:

- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Kafka sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [Google Cloud Pub/Sub](#google-cloud-pub-sub) sinks and [webhook sinks](#webhook-sink).
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add the caveat that this only applies if running with the v2 kafka sink

-------------------+---------------------+------------------+-------------------
`"ClientID"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Applies a Kafka client ID per changefeed. Configure [quotas](https://kafka.apache.org/documentation/#quotas) within your Kafka configuration that apply to a unique client ID. The `ClientID` field can only contain the characters `A-Za-z0-9._-`. For more details, refer to [`ClientID`](#clientid). | ""
<a name="kafka-compression"></a>`"Compression"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets a compression protocol that the changefeed should use when emitting events. The possible values are: `"NONE"`, `"GZIP"`, `"SNAPPY"`, `"LZ4"`, `"ZSTD"`. | `"NONE"`
<span class="version-tag">New in v24.3:</span>`"CompressionLevel"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the level of compression. This determines the level of compression ratio versus compression speed, i.e., how much the data size is reduced (better compression) and how quickly the compression process is completed. For the compression protocol ranges, refer to [`CompressionLevel`](#compressionlevel).<br><br>**Note:** If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. `SNAPPY` does not support `CompressionLevel`. | `GZIP`: `-1`<br><br>`ZSTD`: `2`<br><br>`LZ4`: `0`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while accurate, i'm not sure how i feel about listing GZIP's default as -1 given that the user can't set that with the v2 sink. maybe there's a better way to explain the defaults than to list the magic values.

Copy link
Contributor Author

@kathancox kathancox Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to be pretty awkward given there are "default" compression levels. Throughout the docs, we provide the default values for options/settings etc.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i know. kinda awkward either way i guess. up to you

@@ -0,0 +1 @@
Changefeeds created in v24.3 of CockroachDB that emit to [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), or changefeeds created in earlier versions with the `changefeed.new_kafka_sink.enabled` cluster setting enabled, do not support negative compression level values for `GZIP` compression in the [`kafka_sink_config = {... "CompressionLevel" = ...}`]({% link {{ page.version.version }}/changefeed-sinks.md %}#compressionlevel) option field. [#136492](https://github.com/cockroachdb/cockroach/issues/136492)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added GZIP in here.


{{site.data.alerts.callout_info}}
`changefeed.sink_io_workers` only applies to Kafka sinks created in v24.2.1+, or if the `changefeed.new_kafka_sink.enabled` cluster setting has been enabled in CockroachDB clusters running v23.2.10+ and v24.1.4+.
{{site.data.alerts.end}}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this note above re how the concurrent worker cluster setting interacts with the newer Kafka sink. I think I have the versioning and such correct here.

-------------------+---------------------+------------------+-------------------
`"ClientID"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Applies a Kafka client ID per changefeed. Configure [quotas](https://kafka.apache.org/documentation/#quotas) within your Kafka configuration that apply to a unique client ID. The `ClientID` field can only contain the characters `A-Za-z0-9._-`. For more details, refer to [`ClientID`](#clientid). | ""
<a name="kafka-compression"></a>`"Compression"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets a compression protocol that the changefeed should use when emitting events. The possible values are: `"NONE"`, `"GZIP"`, `"SNAPPY"`, `"LZ4"`, `"ZSTD"`. | `"NONE"`
<span class="version-tag">New in v24.3:</span>`"CompressionLevel"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the level of compression. This determines the level of compression ratio versus compression speed, i.e., how much the data size is reduced (better compression) and how quickly the compression process is completed. For the compression protocol ranges, refer to [`CompressionLevel`](#compressionlevel).<br><br>**Note:** If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. `SNAPPY` does not support `CompressionLevel`. | Refer to [`CompressionLevel`](#compressionlevel)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Default column here, I took out the values and referred readers to the section with the fuller explanations (including the description re the known limitation for GZIP). I hope that's a good compromise, i.e, removing the default values without context and listing the defaults only in the section with the context.

Comment on lines +163 to +168
{% comment %}
These values are not available yet per KL #136492
- `-1`: Default compression
- `-2`: [Huffman-only compression](https://en.wikipedia.org/wiki/Huffman_coding)
- `-3`: Stateless compression
{% endcomment %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've commented this out, but can fully remove.

Comment on lines 175 to 185
- `LZ4`: The following list represents the supported values from fastest compression to best compression:
- `0`: Fastest compression (Default)
- `512`
- `1024`
- `2048`
- `4096`
- `8192`
- `16384`
- `32768`
- `65536`
- `131072`: Best compression
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hope I have now understood this!

@kathancox kathancox requested a review from asg0451 December 4, 2024 18:36
@kathancox kathancox requested a review from rmloveland December 9, 2024 16:03
Copy link
Contributor

@rmloveland rmloveland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@kathancox kathancox force-pushed the kafka-v2-sink-updates branch from 4fdb65c to 3a37922 Compare December 10, 2024 17:04
@kathancox kathancox merged commit dc20fdc into main Dec 10, 2024
6 checks passed
@kathancox kathancox deleted the kafka-v2-sink-updates branch December 10, 2024 18:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants