Skip to content

Kafka Sink

vnnv01 edited this page Mar 6, 2018 · 1 revision

If you are not familiar with the basic characteristics of a sink, please refer to the sinks documentation.

WARNING: The Kafka sink is not exactly-once. It will behave as configured. For example, if you disable retries, you are explicitly requesting at-most-once behavior. When / if we upgrade to Kafka 0.11.x, we will intrinsically support read_committed for true exactly-once behavior. Until then your mileage may vary and you should typically design for at-least-once semantics.

Usage

The Kafka sink publishes messages to Kafka. When using the Kafka sink, you must ensure your transforms produce a final output table for the sink with a well-known schema. If you are familiar with using Kafka producer(s) directly, this well-known schema should feel very obvious / natural:

  • topic string: the topic this row's (key, value) should be written to.
    • if all rows are destined for the same topic, you can omit providing the topic column in your final output table, and instead provide the topic option
  • key [string | binary]: the serialized key bytes for this row's message (optional)
  • value [string | binary]: the serialized value bytes for this row's message

In the simplest form, you can specify just a value column of data type string or binary. Given this well-known schema, the user is responsible for serializing key(s) and value(s) correctly based on their desired message format.

In the following example we demonstrate using SQL functions to process CSV files from a drop directory and transform those CSV rows to JSON records for Kafka. Assume we have input files in the format:

0,a,false
1,b,true

Let's implement the job to emit JSON records for Kafka:

CREATE STREAM foo(id int, name string, flag boolean)
FROM CSV
OPTIONS(
  'path'='...'
);

CREATE TEMPORARY VIEW bar AS
SELECT
  to_json(named_struct(
    'id', id,
    'name', name,
    'flag', flag
  )) value
FROM foo;

SAVE STREAM bar
TO KAFKA
OPTIONS(
  'kafka.bootstrap.servers'='...',
  'topic'='bar'
);

As seen above, we can easily use built-in SQL functions to convert CSV rows to JSON records and publish them to Kafka.

Let's assume a slightly less trivial implementation. The id field from our CSV rows are actually a unique identifier of which tenant this data belongs to. We want to isolate each tenant's data to a unique topic.

CREATE STREAM foo(id int, name string, flag boolean)
FROM CSV
OPTIONS(
  'path'='...'
);

CREATE TEMPORARY VIEW bar AS
SELECT
  concat('tenant-', id) topic,
  to_json(named_struct(
    'name', name,
    'flag', flag
  )) value
FROM foo;

SAVE STREAM bar
TO KAFKA
OPTIONS(
  'kafka.bootstrap.servers'='...'
);

As seen above, we can route data to a unique topic based on the data itself. Notice when the topic column is provided, we do not need to specify topic as an option.

Let's modify this example further. We soon realize it's not tenable to scale using a topic-per-tenant, as our business is booming. Instead we decide we want each tenant's data to consistently route to a specific partition within a single topic. We can intelligently provide a key column to reap the benefits of consistent hashing.

CREATE STREAM foo(id int, name string, flag boolean)
FROM CSV
OPTIONS(
  'path'='...'
);

CREATE TEMPORARY VIEW bar AS
SELECT
  hash(id) key,
  to_json(named_struct(
    'id', id,
    'name', name,
    'flag', flag
  )) value
FROM foo;

SAVE STREAM bar
TO KAFKA
OPTIONS(
  'kafka.bootstrap.servers'='...',
  'topic'='bar'
);

Features

Output Mode Append

Output Mode Update

Output Mode Complete

Partitioning

Options

kafka.*

The Kafka sink accepts most producer config keys by specifying them in the sink options, prefixed with kafka.. For example, if we wanted to specify the number of retries for the underlying producer(s), we would provide kafka.retries in the sink's options. Feel free to review available producer configs via javadocs or the standard Kafka documentation.

The Kafka sink does not allow setting the following producer configs:

  • key.serializer
  • value.serializer

The underlying implementation expects the user to transform their data to binary. See usage for more details and examples of transforming rows to binary for common use cases.

topic

Specifies the topic this Kafka sink should use when publishing messages. Required if the table provides to this sink does not contain a topic column. If the table provided to this sink contains a topic column for each row, you do not need to specify topic as an option.

Defaults to none.

SAVE STREAM foo
TO KAFKA
OPTIONS(
  'topic'='foo'
);