diff --git a/src/current/v24.3/changefeed-sinks.md b/src/current/v24.3/changefeed-sinks.md index 82d0dd06308..dfced793de0 100644 --- a/src/current/v24.3/changefeed-sinks.md +++ b/src/current/v24.3/changefeed-sinks.md @@ -40,17 +40,6 @@ To set a different sink URI to an existing changefeed, use the [`sink` option]({ ## Kafka -{{site.data.alerts.callout_info}} -CockroachDB uses a different version of the Kafka sink that is implemented with the [franz-go](https://github.com/twmb/franz-go) Kafka client library. If you are using a [testing release]({% link releases/index.md %}#patch-releases) of v24.2 or v24.2.0, we recommend that you enable this updated version of the Kafka sink to avoid a potential bug in the previous version of the CockroachDB Kafka sink; for more details, refer to the [technical advisory 122372]({% link advisories/a122372.md %}). You can enable this Kafka sink with the cluster setting [`changefeed.new_kafka_sink.enabled`]({% link v24.2/show-cluster-setting.md %}). - -{% include_cached copy-clipboard.html %} -~~~ sql -SET CLUSTER SETTING changefeed.new_kafka_sink.enabled = true; -~~~ - -If you are running v24.2.1 and later, the `changefeed.new_kafka_sink.enabled` cluster setting is enabled by default. -{{site.data.alerts.end}} - ### Kafka sink connection Example of a Kafka sink URI using `SCRAM-SHA-256` authentication: @@ -117,14 +106,17 @@ Kafka has the following topic limitations: ### Kafka sink configuration - The `kafka_sink_config` option allows configuration of a changefeed's message delivery, Kafka server version, and batching parameters. +You can configure flushing, acknowledgments, compression, and concurrency behavior of changefeeds running to a Kafka sink with the following: + +- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Kafka sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [Google Cloud Pub/Sub](#google-cloud-pub-sub) sinks and [webhook sinks](#webhook-sink). +- The `kafka_sink_config` option allows configuration of a changefeed's message delivery, Kafka server version, and batching parameters. {{site.data.alerts.callout_danger}} Each of the following settings have significant impact on a changefeed's behavior, such as latency. For example, it is possible to configure batching parameters to be very high, which would negatively impact changefeed latency. As a result it would take a long time to see messages coming through to the sink. Also, large batches may be rejected by the Kafka server unless it's separately configured to accept a high [`max.message.bytes`](https://kafka.apache.org/documentation/#brokerconfigs_message.max.bytes). {{site.data.alerts.end}} ~~~ -kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "ClientID": "kafka_client_ID", "Version": "0.8.2.0", "RequiredAcks": "ONE", "Compression": "GZIP" }' +kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "ClientID": "kafka_client_ID", "Version": "0.8.2.0", "RequiredAcks": "ONE", "Compression": "GZIP", "CompressionLevel": 3}' ~~~ `"Flush"."MaxMessages"` and `"Flush"."Frequency"` are configurable batching parameters depending on latency and throughput needs. For example, if `"MaxMessages"` is set to 1000 and `"Frequency"` to 1 second, it will flush to Kafka either after 1 second or after 1000 messages are batched, whichever comes first. It's important to consider that if there are not many messages, then a `"1s"` frequency will add 1 second latency. However, if there is a larger influx of messages these will be flushed quicker. @@ -154,6 +146,7 @@ Field | Type | Description | Default `"Version"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets the appropriate Kafka cluster version, which can be used to connect to [Kafka versions < v1.0](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) (`kafka_sink_config='{"Version": "0.8.2.0"}'`). | `"1.0.0.0"` `"RequiredAcks"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specifies what a successful write to Kafka is. CockroachDB [guarantees at least once delivery of messages]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) — this value defines the **delivery**. The possible values are:

`"ONE"`: a write to Kafka is successful once the leader node has committed and acknowledged the write. Note that this has the potential risk of dropped messages; if the leader node acknowledges before replicating to a quorum of other Kafka nodes, but then fails.

`"NONE"`: no Kafka brokers are required to acknowledge that they have committed the message. This will decrease latency and increase throughput, but comes at the cost of lower consistency.

`"ALL"`: a quorum must be reached (that is, most Kafka brokers have committed the message) before the leader can acknowledge. This is the highest consistency level. {% include {{ page.version.version }}/cdc/kafka-acks.md %} | `"ONE"` `"Compression"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets a compression protocol that the changefeed should use when emitting events. The possible values are: `"NONE"`, `"GZIP"`, `"SNAPPY"`, `"LZ4"`, `"ZSTD"`. | `"NONE"` +New in v24.3:`"CompressionLevel"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the level of compression. This determines the level of compression ratio versus compression speed, i.e., how much the data size is reduced (better compression) and how quickly the compression process is completed. The compression protocols have the following ranges:
`GZIP`:`ZSTD`:`LZ4`**Note:** If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. `SNAPPY` does not support `CompressionLevel`. | `GZIP`: `-1`

`ZSTD`: `2`

`LZ4`: `0` ### Kafka sink messages @@ -359,7 +352,7 @@ For a list of compatible parameters and options, refer to [Parameters]({% link { You can configure flushing, retry, and concurrency behavior of changefeeds running to a Pub/Sub sink with the following: -- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Pub/Sub sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [webhook sinks](#webhook-sink). +- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Pub/Sub sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [webhook sinks](#webhook-sink) and [Kafka](#kafka). - Set the `pubsub_sink_config` option to configure the changefeed flushing and retry behavior to your webhook sink. For details on the `pubsub_sink_config` option's configurable fields, refer to the following table and examples. Field | Type | Description | Default @@ -562,7 +555,7 @@ The following are considerations when using the webhook sink: You can configure flushing, retry, and concurrency behavior of changefeeds running to a webhook sink with the following: -- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a webhook sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [Google Cloud Pub/Sub sinks](#google-cloud-pub-sub). +- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a webhook sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [Google Cloud Pub/Sub sinks](#google-cloud-pub-sub) and [Kafka](#kafka). - Set the `webhook_sink_config` option to configure the changefeed flushing and retry behavior to your webhook sink. For details on the `webhook_sink_config` option's configurable fields, refer to the following table and examples. Field | Type | Description | Default