Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(serde): kafka format #3065

Merged
merged 19 commits into from
Jul 16, 2019

Conversation

big-andy-coates
Copy link
Contributor

@big-andy-coates big-andy-coates commented Jul 11, 2019

Description

Note: contains PR #3066

Part of the work to introduce primitive, then structured, keys.

Keys are currently assumed to be Kafka serialized Strings. When we introduce a way to specify a KEY_FORMAT there must be a way to declare the key is a Kafka serialized string if we're to maintain backwards compatibility. Also, many companies using Kafka serialized ints or longs as keys.

This PR brings a new KAFKA format, which will use the appropriate standards Kafka serde classes to deserialize a primitive key or value, e.g.

CREATE STREAM FOO (ROWKEY BIGINT KEY, ....) WITH (KEY_FORMAT='KAFKA', ...);

Will handle the case where the keys of the messages are longs that have been serialized using Kafka'sLongSerializer class. (Or will when the rest of the associated work is also complete).

The new KAFKA format supports INT, BIGINT, DOUBLE and STRING fields only, as that's the set of Kafka serde classes that match up to our KSQL types.

The format only supports single values, i.e. only single field, being primarily intended for use as a key format.

However, users can use it as a value format too. But if they do so then they can't use the source in a statement with a JOIN or GROUP BY clause. This is because such statements generally require repartition and changelog topics and such internal topics currently use the same value format as their source, i.e. they'd use KAFKA value format, and they also currently copy ROWTIME and ROWKEY into the value schema, i.e. they have multiple fields in the value schema, and KAFKA format can not support multiple fields...

So I have explicitly disabled JOIN and GROUP BY where the VALUE_FORMAT is KAFKA so that the user gets a more useful error message.

In the future we can fix this by either/both:

  • Use a standard serialization format for all internal topics, i.e. don't use the source's formats for internal formats.
  • Don't copy the two fields in to the value of the internal topics, which has a space saving as well.

This PR also enhanced the validation done on C* statements that use the DELIMITED value format. Previously a statement such as the one below would succeed:

CREATE STREAM FOO (V0 ARRAY<INT>, V1 MAP<STRING, BIGINT>, V2 STRUCT<F0 INT>) WITH(VALUE_FORMAT='DELIMITED',...);

Even though DELIMITED does not support such complex types. Any C*AS statement built off FOO would then fail with a cryptic error message. I picked this up when testing such statements for the new format, so fixed both.

Testing done

Lots of appropriate tests added.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

A new ``KAFKA`` format that supports ``INT``, ``BIGINT``, ``DOUBLE`` and ``STRING`` fields that have been serialized using the standard Kafka serializers,
  e.g. ``org.apache.kafka.common.serialization.LongSerializer``, or equivalent.

 The format only supports single values, i.e. only single field, being primarily intended for use as a key format.
@big-andy-coates big-andy-coates requested review from JimGalasyn and a team as code owners July 11, 2019 09:03
@rmoff
Copy link
Member

rmoff commented Jul 12, 2019

Will this work with Kafka Connect?

Conflicting files
ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java
ksql-engine/src/main/java/io/confluent/ksql/serde/KsqlSerdeFactories.java
ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerTest.java
ksql-serde/src/main/java/io/confluent/ksql/serde/Format.java
@big-andy-coates
Copy link
Contributor Author

@rmoff

Will this work with Kafka Connect?

I don't think this will help connect, which tends to use Avro/Json keys, right? I'll be looking into Avro/Json keys v. soon.

The new KAFKA format just allows users to import data where the key is, for example, a long that's been serialized using Kafka's LongSerializer. How they then dump the data back out through connect, where there key is not a string, I'm not sure yet. Need to see what Connect can handle and also look into allowing the type of the key to change across statements, i.e. so users can switch to a string key in their output topic if needed

docs/faq.rst Outdated Show resolved Hide resolved
Copy link
Member

@JimGalasyn JimGalasyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, with a few suggestions.

@big-andy-coates
Copy link
Contributor Author

big-andy-coates commented Jul 12, 2019

Update on Connect friendlyness of new KAFKA format from Connect team.

Connect requires a Converter, but AK 2.0 introduced a bunch of numeric converters, including https://github.com/apache/kafka/blob/2.0.0/connect/runtime/src/main/java/org/apache/kafka/connect/converters/LongConverter.java as part of KIP-305 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-305%3A+Add+Connect+primitive+number+converters)

In other words, yes you can configure Connect to serialize something like a numeric User Id as a Long / BIGINT. Saaaaaaaaweeet. Who's doing the blog post????? eh? eh? cc @rmoff

Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @big-andy-coates, this mostly looks good. 1 issue inline.

@big-andy-coates big-andy-coates requested review from a team and rodesai July 14, 2019 12:12
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - I agree with Rohan's comment, so if you feel strongly I think we should justify it. Maybe we can use @vcrfxia's benchmarks?

@agavra agavra requested a review from a team July 15, 2019 23:32
Conflicting files
ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerTest.java
ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerdeFactory.java
Conflicting files
ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerTest.java
Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@big-andy-coates big-andy-coates merged commit 2b5c3d1 into confluentinc:master Jul 16, 2019
@big-andy-coates big-andy-coates deleted the kafka_format branch July 16, 2019 21:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants