Skip to content

Commit

Permalink
feat(serde): kafka format (#3065)
Browse files Browse the repository at this point in the history
* feat(serde): kafka format

A new ``KAFKA`` format that supports ``INT``, ``BIGINT``, ``DOUBLE`` and ``STRING`` fields that have been serialized using the standard Kafka serializers,
  e.g. ``org.apache.kafka.common.serialization.LongSerializer``, or equivalent.

 The format only supports single values, i.e. only single field, being primarily intended for use as a key format.
  • Loading branch information
big-andy-coates authored Jul 16, 2019
1 parent 9f812df commit 2b5c3d1
Show file tree
Hide file tree
Showing 37 changed files with 1,339 additions and 121 deletions.
7 changes: 7 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ KSQL 5.4.0 includes new features, including:

* New ``UNIX_TIMESTAMP()`` and ``UNIX_DATE()`` functions.

* A new ``KAFKA`` format that supports ``INT``, ``BIGINT``, ``DOUBLE`` and ``STRING`` fields that
have been serialized using the standard Kafka serializers,
e.g. ``org.apache.kafka.common.serialization.LongSerializer``.

The format only supports single values, i.e. only single field, being primarily intended for use
as a key format.

KSQL 5.4.0 includes the following misc. changes:

* Require either the value for a ``@UdfParameter`` or for the UDF JAR to be compiled with
Expand Down
3 changes: 2 additions & 1 deletion docs/developer-guide/create-a-stream.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ The following example creates a stream that has three columns from the

KSQL can't infer the topic's data format, so you must provide the format of the
values that are stored in the topic. In this example, the data format is
``DELIMITED``. Other options are ``Avro`` and ``JSON``.
``DELIMITED``. Other options are ``Avro``, ``JSON`` or ``KAFKA``.
See :ref:`ksql_formats` for more details.

In the KSQL CLI, paste the following CREATE STREAM statement:

Expand Down
3 changes: 2 additions & 1 deletion docs/developer-guide/create-a-table.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ Also, the ``userid`` field is assigned as the table's KEY property.

KSQL can't infer the topic's data format, so you must provide the format of the
values that are stored in the topic. In this example, the data format is
``JSON``. Other options are ``Avro`` and ``DELIMITED``.
``JSON``. Other options are ``Avro``, ``DELIMITED`` or ``KAFKA``.
See :ref:`ksql_formats` for more details.

In the KSQL CLI, paste the following CREATE TABLE statement:

Expand Down
58 changes: 53 additions & 5 deletions docs/developer-guide/serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ KSQL provides some additional configuration that allows serialization to be cont
Single field (un)wrapping
-------------------------

.. note:: The ``DELIMITED`` format is not effected by single field unwrapping.
.. note:: The ``DELIMITED`` and ``KAFKA`` formats do not support single field unwrapping.

Controlling deserializing of single fields
==========================================
Expand Down Expand Up @@ -201,6 +201,7 @@ KSQL currently supports three serialization formats:
*. ``DELIMITED`` supports comma separated values. See :ref:`delimited_format` below.
*. ``JSON`` supports JSON values. See :ref:`json_format` below.
*. ``AVRO`` supports AVRO serialized values. See :ref:`avro_format` below.
*. ``KAFKA`` supports primitives serialized using the standard Kafka serializers. See :ref:`kafka_format` below.
.. _delimited_format
Expand Down Expand Up @@ -262,7 +263,6 @@ And a JSON value of:
KSQL deserializes the JSON object's fields into the corresponding fields of the stream.

-------------------------------------
Top-level primitives, arrays and maps
-------------------------------------

Expand Down Expand Up @@ -292,6 +292,12 @@ the ``WRAP_SINGLE_VALUE`` is set to ``false``, for example:
For more information, see :ref:`ksql_single_field_wrapping`.

Field Name Case Sensitivity
---------------------------

The format is case-insensitive when matching a KSQL field name with a JSON document's property name.
The first case-insensitive match is used.

.. _avro_format
----
Expand All @@ -304,7 +310,6 @@ including records and top-level primitives, arrays, and maps.
The format requires KSQL to be configured to store and retrieve the Avro schemas from the |sr-long|.
For more information, see :ref:`install_ksql-avro-schema`.

------------
Avro Records
------------

Expand Down Expand Up @@ -333,7 +338,6 @@ And an Avro record serialized with the schema:
KSQL deserializes the Avro record's fields into the corresponding fields of the stream.

-------------------------------------
Top-level primitives, arrays and maps
-------------------------------------

Expand Down Expand Up @@ -365,5 +369,49 @@ the ``WRAP_SINGLE_VALUE`` is set to ``false``, for example:
For more information, see :ref:`ksql_single_field_wrapping`.

===========================
Field Name Case Sensitivity
---------------------------

The format is case-insensitive when matching a KSQL field name with an Avro record's field name.
The first case-insensitive match is used.

.. _kafka_format
-----
KAFKA
-----

The ``KAFKA`` format supports``INT``, ``BIGINT``, ``DOUBLE`` and ``STRING`` primitives that have
been serialized using Kafka's standard set of serializers.

The format is designed primarily to support primitive message keys. It can be used as a value format,
though certain operations aren't supported when this is the case.

Unlike some other formats, the ``KAFKA`` format does not perform any type coercion, so it's important
to correctly match the field type to the underlying serialized form to avoid deserialization errors.

The table below details the SQL types the format supports, including details of the associated Kafka
Java Serializer, Deserializer and Connect Converter classes you would need to use to write the key
to Kafka, read the key from Kafka, or use to configure Apache Connect to work with the ``KAFKA`` format,
respectively.

+------------------+--------------------------------+-------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------+
| KSQL Field Type | Kafka Type | Kafka Serializer | Kafka Deserializer | Connect Converter |
+==================+================================+=============================================================+===============================================================+=======================================================+
| INT / INTEGER | A 32-bit signed integer | ``org.apache.kafka.common.serialization.IntegerSerializer`` | ``org.apache.kafka.common.serialization.IntegerDeserializer`` | ``org.apache.kafka.connect.storage.IntegerConverter`` |
+------------------+--------------------------------+-------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------+
| BIGINT | A 64-bit signed integer | ``org.apache.kafka.common.serialization.LongSerializer`` | ``org.apache.kafka.common.serialization.LongDeserializer`` | ``org.apache.kafka.connect.storage.LongConverter`` |
+------------------+--------------------------------+-------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------+
| DOUBLE | A 64-bit floating point number | ``org.apache.kafka.common.serialization.DoubleSerializer`` |``org.apache.kafka.common.serialization.DoubleDeserializer`` | ``org.apache.kafka.connect.storage.DoubleConverter`` |
+------------------+--------------------------------+-------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------+
| STRING / VARCHAR | A UTF-8 encoded text string | ``org.apache.kafka.common.serialization.StringSerializer`` |``org.apache.kafka.common.serialization.StringDeserializer`` | ``org.apache.kafka.connect.storage.StringConverter`` |
+------------------+--------------------------------+-------------------------------------------------------------+---------------------------------------------------------------+-------------------------------------------------------+

Because the format supports only primitive types, you can only use it when the schema contains a single field.

For example, if your Kafka messages have a ``long`` key, you can make them available to KSQL a statement
similar to:

.. code:: sql
CREATE STREAM USERS (ROWKEY BIGINT KEY, NAME STRING) WITH (KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON', ...);
22 changes: 12 additions & 10 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ The WITH clause supports the following properties:
| | exists with different partition/replica counts. |
+-------------------------+--------------------------------------------------------------------------------------------+
| VALUE_FORMAT (required) | Specifies the serialization format of the message value in the topic. Supported formats: |
| | ``JSON``, ``DELIMITED`` (comma-separated value), and ``AVRO``. |
| | For more information, see :ref:`ksql_serialization`. |
| | ``JSON``, ``DELIMITED`` (comma-separated value), ``AVRO`` and ``KAFKA``. |
| | For more information, see :ref:`ksql_formats`. |
+-------------------------+--------------------------------------------------------------------------------------------+
| PARTITIONS | The number of partitions in the backing topic. This property must be set if creating a |
| | STREAM without an existing topic (the command will fail if the topic does not exist). |
Expand All @@ -364,8 +364,8 @@ The WITH clause supports the following properties:
| | the implicit ``ROWKEY`` column (message key). |
| | If set, KSQL uses it as an optimization hint to determine if repartitioning can be avoided |
| | when performing aggregations and joins. |
| | You can only use this if the key format in kafka is ``VARCHAR`` or ``STRING``. Do not use |
| | this hint if the message key format in kafka is AVRO or JSON. |
| | You can only use this if the key format in Kafka is ``VARCHAR`` or ``STRING``. Do not use |
| | this hint if the message key format in Kafka is ``AVRO`` or ``JSON``. |
| | See :ref:`ksql_key_requirements` for more information. |
+-------------------------+--------------------------------------------------------------------------------------------+
| TIMESTAMP | By default, the implicit ``ROWTIME`` column is the timestamp of the message in the Kafka |
Expand Down Expand Up @@ -464,8 +464,8 @@ The WITH clause supports the following properties:
| | exists with different partition/replica counts. |
+-------------------------+--------------------------------------------------------------------------------------------+
| VALUE_FORMAT (required) | Specifies the serialization format of message values in the topic. Supported formats: |
| | ``JSON``, ``DELIMITED`` (comma-separated value), and ``AVRO``. |
| | For more information, see :ref:`ksql_serialization`. |
| | ``JSON``, ``DELIMITED`` (comma-separated value), ``AVRO`` and ``KAFKA``. |
| | For more information, see :ref:`ksql_formats`. |
+-------------------------+--------------------------------------------------------------------------------------------+
| PARTITIONS | The number of partitions in the backing topic. This property must be set if creating a |
| | TABLE without an existing topic (the command will fail if the topic does not exist). |
Expand Down Expand Up @@ -592,8 +592,9 @@ The WITH clause for the result supports the following properties:
| | name of the stream in upper case will be used as default. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| VALUE_FORMAT | Specifies the serialization format of the message value in the topic. Supported formats: |
| | ``JSON``, ``DELIMITED`` (comma-separated value), and ``AVRO``. If this property is not |
| | set, then the format of the input stream/table is used. |
| | ``JSON``, ``DELIMITED`` (comma-separated value), ``AVRO`` and ``KAFKA``. |
| | If this property is not set, then the format of the input stream/table is used. |
| | For more information, see :ref:`ksql_formats`. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| PARTITIONS | The number of partitions in the backing topic. If this property is not set, then the number |
| | of partitions of the input stream/table will be used. In join queries, the property values are taken |
Expand Down Expand Up @@ -698,8 +699,9 @@ The WITH clause supports the following properties:
| | name of the table will be used as default. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| VALUE_FORMAT | Specifies the serialization format of the message value in the topic. Supported formats: |
| | ``JSON``, ``DELIMITED`` (comma-separated value), and ``AVRO``. If this property is not |
| | set, then the format of the input stream or table is used. |
| | ``JSON``, ``DELIMITED`` (comma-separated value), ``AVRO`` and ``KAFKA``. |
| | If this property is not set, then the format of the input stream/table is used. |
| | For more information, see :ref:`ksql_formats`. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| PARTITIONS | The number of partitions in the backing topic. If this property is not set, then the number |
| | of partitions of the input stream/table will be used. In join queries, the property values are taken |
Expand Down
3 changes: 2 additions & 1 deletion docs/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ KSQL currently supports formats:
- JSON
- Avro message values are supported. Avro keys are not yet supported. Requires |sr| and ``ksql.schema.registry.url`` in the
KSQL server configuration file. For more information, see :ref:`install_ksql-avro-schema`.
- KAFKA (for example, a ``BIGINT`` that's serialized using Kafka's standard ``LongSerializer``).

See :ref:`data-types` for more details.
See :ref:`ksql_formats` for more details.

====================================
Is KSQL fully compliant to ANSI SQL?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.schema.ksql;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.parser.tree.Type;
import io.confluent.ksql.schema.ksql.types.SqlArray;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlMap;
Expand All @@ -31,14 +30,13 @@
import org.apache.kafka.connect.data.SchemaBuilder;

/**
* Util class for converting between KSQL's {@link LogicalSchema} and it's SQL types, i.e. those
* derived from {@link Type}.
* Util class for converting between KSQL's {@link LogicalSchema} and its SQL types.
*
* <p>KSQL the following main type systems / schema types:
*
* <ul>
* <li>
* <b>SQL {@link Type}:</b>
* <b>{@link SqlType}:</b>
* - the SQL type system, e.g. INTEGER, BIGINT, ARRAY&lt;something&gt;, etc
* </li>
* <li>
Expand All @@ -47,7 +45,7 @@
* or table.
* </li>
* <li>
* <b>{@link PhysicalSchema}:</b>
* <b>PhysicalSchema</b>
* - the schema of how all the row's parts, e.g. key, value, are serialized to/from Kafka.
* </li>
* <li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public enum Format {

JSON(true),
AVRO(true),
DELIMITED(false);
DELIMITED(false),
KAFKA(false);

private final boolean supportsUnwrapping;

Expand Down
49 changes: 49 additions & 0 deletions ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,30 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.connect.data.Field;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
class Analyzer {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private static final String KAFKA_VALUE_FORMAT_LIMITATION_DETAILS = ""
+ "The KAFKA format is primarily intended for use as a key format. "
+ "It can be used as a value format, but can not be used in any operation that "
+ "requires a repartition or changelog topic." + System.lineSeparator()
+ "Removing this limitation requires enhancements to the core of KSQL. "
+ "This will come in a future release. Until then, avoid using the KAFKA format for values."
+ System.lineSeparator() + "If you have an existing topic with "
+ "KAFKA formatted values you can duplicate the data and serialize using Avro or JSON with a "
+ "statement such as: "
+ System.lineSeparator()
+ System.lineSeparator()
+ "'CREATE STREAM <new-stream-name> WITH(VALUE_FORMAT='Avro') AS "
+ "SELECT * FROM <existing-kafka-formated-stream-name>;'"
+ System.lineSeparator()
+ "For more info see https://github.com/confluentinc/ksql/issues/3060";

private final MetaStore metaStore;
private final String topicPrefix;
private final SerdeFactories serdeFactories;
Expand Down Expand Up @@ -127,6 +144,8 @@ Analysis analyze(

visitor.analyzeSink(sink, sqlExpression);

visitor.validate();

return visitor.analysis;
}

Expand All @@ -135,6 +154,8 @@ private final class Visitor extends DefaultTraversalVisitor<Node, Void> {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private final Analysis analysis = new Analysis();
private boolean isJoin = false;
private boolean isGroupBy = false;

private void analyzeSink(
final Optional<Sink> sink,
Expand Down Expand Up @@ -306,6 +327,8 @@ private void throwOnUnknownColumnReference() {

@Override
protected Node visitJoin(final Join node, final Void context) {
isJoin = true;

process(node.getLeft(), context);
process(node.getRight(), context);

Expand Down Expand Up @@ -478,6 +501,8 @@ private void analyzeWhere(final Node node) {
}

private void analyzeGroupBy(final GroupBy groupBy) {
isGroupBy = true;

for (final GroupingElement groupingElement : groupBy.getGroupingElements()) {
final Set<Expression> groupingSet = groupingElement.enumerateGroupingSets().get(0);
analysis.addGroupByExpressions(groupingSet);
Expand Down Expand Up @@ -525,6 +550,30 @@ private void visitSelectStar(final AllColumns allColumns) {
}
}
}

public void validate() {
final String kafkaSources = analysis.getFromDataSources().stream()
.filter(s -> s.getDataSource().getKsqlTopic().getValueSerdeFactory().getFormat()
== Format.KAFKA)
.map(AliasedDataSource::getAlias)
.collect(Collectors.joining(", "));

if (kafkaSources.isEmpty()) {
return;
}

if (isJoin) {
throw new KsqlException("Source(s) " + kafkaSources + " are using the 'KAFKA' value format."
+ " This format does not yet support JOIN."
+ System.lineSeparator() + KAFKA_VALUE_FORMAT_LIMITATION_DETAILS);
}

if (isGroupBy) {
throw new KsqlException("Source(s) " + kafkaSources + " are using the 'KAFKA' value format."
+ " This format does not yet support GROUP BY."
+ System.lineSeparator() + KAFKA_VALUE_FORMAT_LIMITATION_DETAILS);
}
}
}

@FunctionalInterface
Expand Down
Loading

0 comments on commit 2b5c3d1

Please sign in to comment.