From a3fae281308e2e3b1cd5296c1d42f74c87e0a768 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Fri, 14 Feb 2020 15:00:31 +0000 Subject: [PATCH] feat: enhance `PRINT TOPIC`'s format detection (#4551) * feat: enhance `PRINT TOPIC`'s format detection fixes: https://github.com/confluentinc/ksql/issues/4258 With this change `PRINT TOPIC` has enhanced detection for the key and value formats of a topic. The command starts with a list of known formats for both the key and the value and refines this list as it sees more data. As the list of possible formats is refined over time the command will output the reduced list. For example, you may see output such as: ``` ksql> PRINT some_topic FROM BEGINNING; Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 12/21/18 23:58:42 PM PSD, key: stream/CLICKSTREAM/create, value: {statement":"CREATE STREAM clickstream (_time bigint,time varchar, ip varchar, request varchar, status int, userid int, bytes bigint, agent varchar) with (kafka_topic = 'clickstream', value_format = 'json');","streamsProperties":{}} rowtime: 12/21/18 23:58:42 PM PSD, key: table/EVENTS_PER_MIN/create, value: {"statement":"create table events_per_min as select userid, count(*) as events from clickstream window TUMBLING (size 10 second) group by userid EMIT CHANGES;","streamsProperties":{}} Key format: KAFKA_STRING ... ``` In the last line of the above output the command has narrowed the key format down as it has proceeded more data. The command has also been updated to only detect valid UTF8 encoded text as type `JSON` or `KAFKA_STRING`. This is inline with how KSQL would later deserialize the data. If no known format can successfully deserialize the data it is printed as a combination of ASCII characters and hex encoded bytes. --- config/ksql-server.properties | 2 +- docs-md/concepts/schemas.md | 182 ++- docs-md/developer-guide/create-a-stream.md | 11 +- docs-md/developer-guide/create-a-table.md | 2 +- .../developer-guide/joins/partition-data.md | 8 +- .../developer-guide/ksqldb-reference/print.md | 18 +- .../ksqldb-reference/select-push-query.md | 4 +- .../query-with-structured-data.md | 4 +- docs-md/tutorials/basics-docker.md | 13 +- docs-md/tutorials/basics-local.md | 13 +- docs/includes/ksql-includes.rst | 6 +- .../java/io/confluent/ksql/cli/CliTest.java | 10 +- .../resources/streaming/PrintPublisher.java | 1 - .../resources/streaming/RecordFormatter.java | 352 ++++++ .../resources/streaming/TopicStream.java | 385 ------- .../streaming/TopicStreamWriter.java | 85 +- .../streaming/PrintSubscriptionTest.java | 1 - .../streaming/RecordFormatterTest.java | 1001 +++++++++++++++++ .../resources/streaming/TopicStreamTest.java | 587 ---------- .../streaming/TopicStreamWriterTest.java | 28 +- .../ksql/serde/kafka/KafkaSerdeFactory.java | 9 +- 21 files changed, 1673 insertions(+), 1049 deletions(-) create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java delete mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java delete mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamTest.java diff --git a/config/ksql-server.properties b/config/ksql-server.properties index ab323599654f..30b72de5dafb 100644 --- a/config/ksql-server.properties +++ b/config/ksql-server.properties @@ -62,4 +62,4 @@ bootstrap.servers=localhost:9092 # ksql.connect.worker.config=config/connect.properties # Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry: -# ksql.schema.registry.url=? +# ksql.schema.registry.url=http://localhost:8081 diff --git a/docs-md/concepts/schemas.md b/docs-md/concepts/schemas.md index 9e2d181d2ac2..2c95810c7820 100644 --- a/docs-md/concepts/schemas.md +++ b/docs-md/concepts/schemas.md @@ -1,17 +1,189 @@ --- layout: page title: Schemas in ksqlDB -tagline: Use schemas in your queries +tagline: Defining the structure of your data description: Learn how schemas work with ksqlDB keywords: ksqldb, schema, evolution, avro --- +Data sources like streams and tables have an associated schema. This schema defines the columns +available in the data, just like a the columns in a traditional SQL database table. + +## Key vs Value columns + +KsqlDB supports both key and value columns. These map to the data held in the keys and values of the +underlying {{ site.ak }} topic. + +A column is defined by a combination of its [name](#valid-identifiers), its [SQL data type](#sql-data-type), +and possibly a namespace. + +Key columns have a `KEY` namespace suffix. Key columns have the following restrictions: + * The can only be a single key column, currently. + * The key column must be named `ROWKEY` in the KSQL schema. + +Value columns have no namespace suffix. There can be one or more value columns amd the value columns +can have any name. + +For example, the following declares a schema with a single `INT` key column and several value +columns: + +```sql +ROWKEY INT KEY, ID BIGINT, STRING NAME, ADDRESS ADDRESS_TYPE +``` + +## Valid Identifiers + +Column and field names must be valid identifiers. + +Unquoted identifiers will be treated as upper-case, for example `col0` is equivalent to `COL0`, and +must contain only alpha-numeric and underscore characters. + +Identifiers containing invalid character, or where case needs to be preserved, can be quoted using +back-tick quotes, for example ``col0``. + +## SQL data types + +The following SQL types are supported by ksqlDB: + + * [Primitive types](#primitive-types) + * [Decimal type](#decimal-type) + * [Array type](#array-type) + * [Map type](#map-type) + * [Struct type](#struct-type) + * [Custom types](#custom-types) + +### Primitive types + +Supported primitive types are: + + * `BOOLEAN`: a binary value + * `INT`: 32-bit signed integer + * `BIGINT`: 64-bit signed integer + * `DOUBLE`: double precision (64-bit) IEEE 754 floating-point number + * `STRING`: a unicode character sequence (UTF8) + +### Decimal type + +The `DECIMAL` type can store numbers with a very large number of digits and perform calculations exactly. +It is recommended for storing monetary amounts and other quantities where exactness is required. +However, arithmetic on decimals is slow compared to integer and floating point types. + +`DECIMAL` types have a _precision_ and _scale_. +The scale is the number of digits in the fractional part, to the right of the decimal point. +The precision is the total number of significant digits in the whole number, that is, +the number of digits on both sides of the decimal point. +For example, the number `765.937500` has a precision of 9 and a scale of 6. + +To declare a column of type `DECIMAL` use the syntax: + +```sql +DECIMAL(precision, scale) +``` + +The precision must be positive, the scale zero or positive. + +### Array type + +The `ARRAY` type defines a variable-length array of elements. All elements in the array must be of +the same type. + +To declare an `ARRAY` use the syntax: + +``` +ARRAY +``` + +The _element-type_ of an another [SQL data type](#sql-data-types). + +For example, the following creates an array of `STRING`s: + +```sql +ARRAY +``` + +Instances of an array can be created using the syntax: + +``` +ARRAY[value [, value]*] +``` + +For example, the following creates an array with three `INT` elements: + +```sql +ARRAY[2, 4, 6] +``` + +### Map type + +The `MAP` type defines a variable-length collection of key-value pairs. All keys in the map must be +of the same type. All values in the map must be of the same type. + +To declare a `MAP` use the syntax: + +``` +MAP +``` + +The _key-type_ must currently be `STRING` while the _value-type_ can an any other [SQL data type](#sql-data-types). + +For example, the following creates a map with `STRING` keys and values: + +```sql +MAP +``` + +Instances of a map can be created using the syntax: + +``` +MAP(key := value [, key := value]*) +``` + +For example, the following creates a map with three key-value pairs: + +```sql +MAP('a' := 1, 'b' := 2, 'c' := 3) +``` + +### Struct type + +The `STRUCT` type defines a list of named fields, where each field can have any [SQL data type](#sql-data-types). + +To declare a `STRUCT` use the syntax: + +``` +STRUCT +``` + +The _field-name_ can be any [valid identifier](#valid-identifiers). The _field-type_ can be any +valid [SQL data type](#sql-data-types). + +For example, the following creates a struct with an `INT` field called `FOO` and a `BOOLEAN` field +call `BAR`: + +```sql +STRUCT +``` + +Instances of a struct can be created using the syntax: + +``` +STRUCT(field-name := field-value [, field-name := field-value]*) +``` + +For example, the following creates a struct with fields called `FOO` and `BAR` and sets their values +to `10` and `true`, respectively: + +```sql +STRUCT('FOO' := 10, 'BAR' := true) +``` + +### Custom types + +KsqlDB supports custom types using the `CREATE TYPE` statements. +See the [`CREATE TYPE` docs](../developer-guide/ksqldb-reference/create-type) for more information. + TODO: -- overview of how schemas work in ksqlDB -- overview of data types -- overview of serialization - schema evolution with ksqlDB and Avro - Page last revised on: {{ git_revision_date }} diff --git a/docs-md/developer-guide/create-a-stream.md b/docs-md/developer-guide/create-a-stream.md index ae3b7ad733e6..b37ac7061473 100644 --- a/docs-md/developer-guide/create-a-stream.md +++ b/docs-md/developer-guide/create-a-stream.md @@ -282,8 +282,8 @@ PRINT pageviews_intro; Your output should resemble: ``` -Key format: KAFKA (BIGINT or DOUBLE) -Value format: KAFKA (STRING) +Key format: KAFKA_BIGINT or KAFKA_DOUBLE +Value format: KAFKA_STRING rowtime: 10/30/18 10:15:51 PM GMT, key: 294851, value: 1540937751186,User_8,Page_12 rowtime: 10/30/18 10:15:55 PM GMT, key: 295051, value: 1540937755255,User_1,Page_15 rowtime: 10/30/18 10:15:57 PM GMT, key: 295111, value: 1540937757265,User_8,Page_10 @@ -298,6 +298,13 @@ Press Ctrl+C to stop printing the stream. !!! note The query continues to run after you stop printing the stream. +!!! note + KsqlDB has determined that the key format is either `KAFKA_BIGINT` or `KAFKA_DOUBLE`. + KsqlDB has not narrowed it further because it is not possible to rule out + either format just by inspecting the key's serialized bytes. In this case we know the key is + a `BIGINT`. For other cases you may know the key type or you may need to speak to the author + of the data. + Use the SHOW QUERIES statement to view the query that ksqlDB created for the `pageviews_intro` stream: diff --git a/docs-md/developer-guide/create-a-table.md b/docs-md/developer-guide/create-a-table.md index fbdf9e5123a3..faf113f939db 100644 --- a/docs-md/developer-guide/create-a-table.md +++ b/docs-md/developer-guide/create-a-table.md @@ -218,7 +218,7 @@ PRINT users_female; Your output should resemble: ``` -Key format: KAFKA (STRING) +Key format: KAFKA_STRING Value format: JSON rowTime: 12/21/18 23:58:42 PM PSD, key: User_5, value: {"USERID":"User_5","GENDER":"FEMALE","REGIONID":"Region_4"} rowTime: 12/21/18 23:58:42 PM PSD, key: User_2, value: {"USERID":"User_2","GENDER":"FEMALE","REGIONID":"Region_7"} diff --git a/docs-md/developer-guide/joins/partition-data.md b/docs-md/developer-guide/joins/partition-data.md index 4ec8238df45f..a0b842f538ea 100644 --- a/docs-md/developer-guide/joins/partition-data.md +++ b/docs-md/developer-guide/joins/partition-data.md @@ -48,7 +48,7 @@ the resulting table is determined as follows: - When grouping by a single column or expression, the type of `ROWKEY` in the resulting stream matches the type of the column or expression. - When grouping by multiple columns or expressions, the type of `ROWKEY` in the - resulting stream is a `STRING`. + resulting stream is an [SQL `STRING`](../../concepts/schemas). - If the FROM clause contains only tables and no GROUP BY clause, the key is copied over from the key of the table(s) in the FROM clause. - If the FROM clause contains only tables and has a GROUP BY clause, the @@ -56,7 +56,7 @@ the resulting table is determined as follows: - When grouping by a single column or expression, the type of `ROWKEY` in the resulting stream matches the type of the column or expression. - When grouping by multiple columns or expressions, the type of `ROWKEY` in the - resulting stream is a `STRING`. + resulting stream is an [SQL `STRING`](../../concepts/schemas). The following example shows a `users` table joined with a `clicks` stream on the `userId` column. The `users` table has the correct primary key @@ -105,8 +105,8 @@ processing. For a join to work, the keys from both sides must have the same SQL type. -For example, you can join a stream of user clicks that's keyed on a `VARCHAR` -user id with a table of user profiles that's also keyed on a `VARCHAR` user id. +For example, you can join a stream of user clicks that's keyed on a `STRING` +user id with a table of user profiles that's also keyed on a `STRING` user id. Records with the exact same user id on both sides will be joined. If the schema of the columns you wish to join on don't match, it may be possible diff --git a/docs-md/developer-guide/ksqldb-reference/print.md b/docs-md/developer-guide/ksqldb-reference/print.md index feda31a59a52..2664903d0cb5 100644 --- a/docs-md/developer-guide/ksqldb-reference/print.md +++ b/docs-md/developer-guide/ksqldb-reference/print.md @@ -21,6 +21,9 @@ Description Print the contents of Kafka topics to the ksqlDB CLI. +The _topicName_ is case sensitive. Quote the name if it contains invalid characters. +See [Valid Identifiers](../../concepts/schemas#valid-identifiers) for more information. + The PRINT statement supports the following properties: | Property | Description | @@ -33,10 +36,11 @@ Example ------- The following statement shows how to print all of the records in a topic named -`ksql__commands`. +`_confluent-ksql-default__command_topic`, (the default name for the topic ksqlDB stores your submitted command in). +Note, the topic name has been quoted as it contains the invalid dash character. ```sql -PRINT ksql__commands FROM BEGINNING; +PRINT '_confluent-ksql-default__command_topic' FROM BEGINNING; ``` ksqlDB attempts to determine the format of the data in the topic and outputs what it thinks are @@ -51,13 +55,19 @@ the key and value formats at the top of the output. Your output should resemble: ``` - Key format: KAFKA (INTEGER) - Value format: JSON + Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING + Value format: JSON or KAFKA_STRING rowtime: 12/21/18 23:58:42 PM PSD, key: stream/CLICKSTREAM/create, value: {statement":"CREATE STREAM clickstream (_time bigint,time varchar, ip varchar, request varchar, status int, userid int, bytes bigint, agent varchar) with (kafka_topic = 'clickstream', value_format = 'json');","streamsProperties":{}} rowtime: 12/21/18 23:58:42 PM PSD, key: table/EVENTS_PER_MIN/create, value: {"statement":"create table events_per_min as select userid, count(*) as events from clickstream window TUMBLING (size 10 second) group by userid EMIT CHANGES;","streamsProperties":{}} ^CTopic printing ceased ``` +The key format for this topic is `KAFKA_STRING`. However, the `PRINT` command does not know this and +has attempted to determine the format of the key by inspecting the data. It has determined that the +format may be `KAFKA_STRING`, but it could also be `JSON` or a windowed `KAFKA_STRING`. +The value format for this topic is `JSON`. However, the `PRINT` command has also determined it could +be `KAFKA_STRING`. This is because `JSON` is serialized as text. Hence you could choose to deserialize +this value data as a `KAFKA_STRING` if you wanted to. However, `JSON` is likely the better option. Page last revised on: {{ git_revision_date }} diff --git a/docs-md/developer-guide/ksqldb-reference/select-push-query.md b/docs-md/developer-guide/ksqldb-reference/select-push-query.md index 388eca5ef6ab..ab5f4d2d533a 100644 --- a/docs-md/developer-guide/ksqldb-reference/select-push-query.md +++ b/docs-md/developer-guide/ksqldb-reference/select-push-query.md @@ -48,7 +48,7 @@ In the previous statements, `from_item` is one of the following: - `from_item LEFT JOIN from_item ON join_condition` The WHERE clause can refer to any column defined for a stream or table, -including the system columns `ROWTIME` and `ROWKEY`. +including the `ROWTIME` and `ROWKEY` system columns. Example ------- @@ -114,7 +114,7 @@ or joins. Windows are tracked per record key. Windowing adds two additional system columns to the data, which provide the window bounds: `WINDOWSTART` and `WINDOWEND`. -KsqlDB supports the following WINDOW types. +KsqlDB supports the following WINDOW types: **TUMBLING**: Tumbling windows group input records into fixed-sized, non-overlapping windows based on the records' timestamps. You must diff --git a/docs-md/developer-guide/query-with-structured-data.md b/docs-md/developer-guide/query-with-structured-data.md index 45eb0c888c56..b1e61475f557 100644 --- a/docs-md/developer-guide/query-with-structured-data.md +++ b/docs-md/developer-guide/query-with-structured-data.md @@ -144,8 +144,8 @@ PRINT 'raw-topic' FROM BEGINNING; Your output should resemble: ```json - Key format: KAFKA (STRING) - Value format: JSON + Key format: KAFKA_STRING + Value format: JSON OR KAFKA_STRING rowtime: 12/21/18 23:58:42 PM PSD, key: 1, value: {"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.1","field-a":1,"field-b":"first-value-for-key1"}} rowtime: 12/21/18 23:58:42 PM PSD, key: 2, value: {"type":"key2","data":{"timestamp":"2018-12-21 23:58:42.2","field-a":1,"field-c":11,"field-d":"first-value-for-key2"}} rowtime: 12/21/18 23:58:42 PM PSD, key: 3, value: {"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.3","field-a":2,"field-b":"updated-value-for-key1"}} diff --git a/docs-md/tutorials/basics-docker.md b/docs-md/tutorials/basics-docker.md index 1145c46736d4..ea4dd00cbfa9 100644 --- a/docs-md/tutorials/basics-docker.md +++ b/docs-md/tutorials/basics-docker.md @@ -184,7 +184,7 @@ PRINT users; Your output should resemble: ```json - Key format: KAFKA (STRING) + Key format: KAFKA_STRING Value format: AVRO rowtime: 10/30/18 10:15:51 PM GMT, key: User_1, value: {"registertime":1516754966866,"userid":"User_1","regionid":"Region_9","gender":"MALE"} rowtime: 10/30/18 10:15:51 PM GMT, key: User_3, value: {"registertime":1491558386780,"userid":"User_3","regionid":"Region_2","gender":"MALE"} @@ -204,8 +204,8 @@ PRINT pageviews; Your output should resemble: ``` - Key format: KAFKA (INTEGER) - Format: KAFKA (STRING) + Key format: KAFKA_BIGINT or KAFKA_DOUBLE + Format: KAFKA_STRING rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243183, value: 1540254243183,User_9,Page_20 rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243617, value: 1540254243617,User_7,Page_47 rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243888, value: 1540254243888,User_4,Page_27 @@ -215,6 +215,13 @@ Your output should resemble: Press Ctrl+C to stop printing messages. +!!! note + KsqlDB has determined that the key format is either `KAFKA_BIGINT` or `KAFKA_DOUBLE`. + KsqlDB has not narrowed it further because it is not possible to rule out + either format just by inspecting the key's serialized bytes. In this case we know the key is + a `BIGINT`. For other cases you may know the key type or you may need to speak to the author + of the data. + For more information, see [ksqlDB Syntax Reference](../developer-guide/syntax-reference.md). Create a Stream and Table diff --git a/docs-md/tutorials/basics-local.md b/docs-md/tutorials/basics-local.md index 18fe92df354a..5b2b52a93f54 100644 --- a/docs-md/tutorials/basics-local.md +++ b/docs-md/tutorials/basics-local.md @@ -162,7 +162,7 @@ PRINT users; Your output should resemble: ```json -Key format: KAFKA (STRING) +Key format: KAFKA_STRING Value format: AVRO rowtime: 10/30/18 10:15:51 PM GMT, key: User_1, value: {"registertime":1516754966866,"userid":"User_1","regionid":"Region_9","gender":"MALE"} rowtime: 10/30/18 10:15:51 PM GMT, key: User_3, value: {"registertime":1491558386780,"userid":"User_3","regionid":"Region_2","gender":"MALE"} @@ -182,8 +182,8 @@ PRINT pageviews; Your output should resemble: ``` -Key format: KAFKA (INTEGER) -Format: KAFKA (STRING) +Key format: KAFKA_BIGINT or KAFKA_DOUBLE +Format: KAFKA_STRING rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243183, value: 1540254243183,User_9,Page_20 rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243617, value: 1540254243617,User_7,Page_47 rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243888, value: 1540254243888,User_4,Page_27 @@ -193,6 +193,13 @@ Topic printing ceased Press Ctrl+C to stop printing messages. +!!! note + KsqlDB has determined that the key format is either `KAFKA_BIGINT` or `KAFKA_DOUBLE`. + KsqlDB has not narrowed it further because it is not possible to rule out + either format just by inspecting the key's serialized bytes. In this case we know the key is + a `BIGINT`. For other cases you may know the key type or you may need to speak to the author + of the data. + For more information, see [KSQL Syntax Reference](../developer-guide/syntax-reference.md). Create a Stream and Table diff --git a/docs/includes/ksql-includes.rst b/docs/includes/ksql-includes.rst index ca27a3abf927..d5b3f715ba21 100644 --- a/docs/includes/ksql-includes.rst +++ b/docs/includes/ksql-includes.rst @@ -144,7 +144,7 @@ Your output should resemble: :: - Key format: KAFKA (STRING) + Key format: KAFKA_STRING Value format: AVRO rowtime: 10/30/18 10:15:51 PM GMT, key: User_1, value: {"registertime":1516754966866,"userid":"User_1","regionid":"Region_9","gender":"MALE"} rowtime: 10/30/18 10:15:51 PM GMT, key: User_3, value: {"registertime":1491558386780,"userid":"User_3","regionid":"Region_2","gender":"MALE"} @@ -164,8 +164,8 @@ Your output should resemble: :: - Key format: KAFKA (INTEGER) - Format: KAFKA (STRING) + Key format: KAFKA_INTEGER + Format: KAFKA_STRING rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243183, value: 1540254243183,User_9,Page_20 rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243617, value: 1540254243617,User_7,Page_47 rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243888, value: 1540254243888,User_4,Page_27 diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index 5dc2aa18d8d1..da1690a1f566 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -403,13 +403,13 @@ public void shouldPrintTopicWithJsonValue() { // Then: assertThatEventually(() -> terminal.getOutputString(), containsString("Value format: JSON")); - assertThat(terminal.getOutputString(), containsString("Key format: KAFKA (BIGINT or DOUBLE)")); + assertThat(terminal.getOutputString(), containsString("Key format: KAFKA_BIGINT or KAFKA_DOUBLE")); assertThat(terminal.getOutputString(), containsString("," + " key: 1, " + "value: {" + "\"ORDERTIME\":1," - + "\"ORDERID\":" + "\"ORDER_1\"," - + "\"ITEMID\":" + "\"ITEM_1\"," + + "\"ORDERID\":\"ORDER_1\"," + + "\"ITEMID\":\"ITEM_1\"," + "\"ORDERUNITS\":10.0," + "\"TIMESTAMP\":\"2018-01-01\"," + "\"PRICEARRAY\":[100.0,110.99,90.0]," @@ -423,8 +423,8 @@ public void shouldPrintTopicWithDelimitedValue() { run("print " + DELIMITED_TOPIC + " FROM BEGINNING INTERVAL 1 LIMIT 2;", localCli); // Then: - assertThatEventually(() -> terminal.getOutputString(), containsString("Value format: KAFKA (STRING)")); - assertThat(terminal.getOutputString(), containsString("Key format: KAFKA (STRING)")); + assertThatEventually(() -> terminal.getOutputString(), containsString("Value format: KAFKA_STRING")); + assertThat(terminal.getOutputString(), containsString("Key format: KAFKA_STRING")); assertThat(terminal.getOutputString(), containsString(", key: , value: ")); assertThat(terminal.getOutputString(), containsString(", key: ITEM_1, value: ITEM_1,home cinema")); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PrintPublisher.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PrintPublisher.java index bfb07c1666a0..68118256ca47 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PrintPublisher.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PrintPublisher.java @@ -25,7 +25,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.rest.server.resources.streaming.Flow.Subscriber; -import io.confluent.ksql.rest.server.resources.streaming.TopicStream.RecordFormatter; import io.confluent.ksql.services.ServiceContext; import java.math.RoundingMode; import java.time.Duration; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java new file mode 100644 index 000000000000..8ee4fc923443 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatter.java @@ -0,0 +1,352 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.resources.streaming; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Streams; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.ksql.json.JsonMapper; +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.Windowed; + +/** + * Formats records as strings. + * + *

An instance will attempt to determine the key and value formats of the topic records it is + * asked to process. It does this by maintaining lists of possible formats for the keys and values + * and removes any formats that fail to deserialize any of the data seen. In this way, the list of + * possibilities will reduce over time. + * + *

The list of starting formats is defined in the {@link Format} enum. The list of key formats + * also includes windowed variants, as defined in the {@link WindowSchema} enum. + * + *

Where multiple formats are still possible, the current record is formatted using the first. + * Hence the order of formats in the list affects which format is used to format the output. + * + *

If all known formats fail the output is formatted using {@link Bytes#toString()}, which can + * handle arbitrary bytes. + */ +public final class RecordFormatter { + + private final DateFormat dateFormat; + private final Deserializers keyDeserializers; + private final Deserializers valueDeserializers; + + public RecordFormatter( + final SchemaRegistryClient schemaRegistryClient, + final String topicName + ) { + this( + SimpleDateFormat.getDateTimeInstance(3, 1, Locale.getDefault()), + new Deserializers(topicName, schemaRegistryClient, true), + new Deserializers(topicName, schemaRegistryClient, false) + ); + } + + @VisibleForTesting + RecordFormatter( + final DateFormat dateFormat, + final Deserializers keyDeserializers, + final Deserializers valueDeserializers + ) { + this.dateFormat = requireNonNull(dateFormat, "dateFormat"); + this.keyDeserializers = requireNonNull(keyDeserializers, "keyDeserializers"); + this.valueDeserializers = requireNonNull(valueDeserializers, "valueDeserializers"); + } + + public List> format(final Iterable> records) { + return StreamSupport.stream(records.spliterator(), false) + .map(this::delayedFormat) + .collect(Collectors.toList()); + } + + /** + * Returns the list of key formats that are capable of deserializing all the keys seen without + * error. + * + *

As more records are passed to {@link #format} the list is refined. + * + * @return the list of compatible key formats + */ + public List getPossibleKeyFormats() { + return keyDeserializers.getPossibleFormats(); + } + + /** + * Returns the list of value formats that are capable of deserializing all the values seen without + * error. + * + *

As more records are passed to {@link #format} the list is refined. + * + * @return the list of compatible value formats + */ + public List getPossibleValueFormats() { + return valueDeserializers.getPossibleFormats(); + } + + private Supplier delayedFormat(final ConsumerRecord record) { + return () -> "rowtime: " + formatRowTime(record.timestamp()) + + ", " + "key: " + keyDeserializers.format(record.key()) + + ", value: " + valueDeserializers.format(record.value()); + } + + private String formatRowTime(final long timestamp) { + return timestamp == ConsumerRecord.NO_TIMESTAMP + ? "N/A" + : dateFormat.format(new Date(timestamp)); + } + + private static Deserializer newJsonDeserializer() { + final String replacement = UTF_8.newDecoder().replacement(); + + return (Deserializer) (topic, data) -> { + if (data.length == 0) { + throw new DeserializationException("Empty data"); + } + + final String text = new String(data, UTF_8); + if (text.contains(replacement)) { + throw new DeserializationException("String contains replacement char"); + } + + try { + // test it parses: + JsonMapper.INSTANCE.mapper.readTree(text); + + // but return actual text: + return text; + } catch (final IOException e) { + throw new DeserializationException("Failed to deserialize as JSON", e); + } + }; + } + + private static Deserializer newStringDeserializer() { + final StringDeserializer deserializer = new StringDeserializer(); + final String replacement = UTF_8.newDecoder().replacement(); + + return (Deserializer) (topic, data) -> { + if (data.length == 0) { + throw new DeserializationException("Empty data"); + } + + final String text = deserializer.deserialize("", data); + if (text.contains(replacement)) { + throw new DeserializationException("String contains replacement char"); + } + + return text; + }; + } + + @VisibleForTesting + static final class Deserializers { + + private final String topicName; + private final List deserializers; + + @SuppressWarnings("UnstableApiUsage") + Deserializers( + final String topicName, + final SchemaRegistryClient schemaRegistryClient, + final boolean incWindowed + ) { + this.topicName = requireNonNull(topicName, "topicName"); + + final List deserializers = Arrays.stream(Format.values()) + .map(format -> format.getDeserializer(schemaRegistryClient)) + .collect(Collectors.toList()); + + if (!incWindowed) { + this.deserializers = deserializers; + } else { + this.deserializers = deserializers.stream() + .flatMap(deserializer -> + deserializer.doNotWrap + ? Stream.of(deserializer) + : Streams.concat( + Arrays.stream(WindowSchema.values()).map(ws -> ws.wrap(deserializer)), + Stream.of(deserializer) + )) + .collect(Collectors.toList()); + } + } + + List getPossibleFormats() { + return deserializers.stream() + .map(NamedDeserializer::toString) + .filter(name -> !name.equals(Format.UNRECOGNISED_BYTES.toString())) + .collect(Collectors.toList()); + } + + String format(final Bytes bytes) { + if (bytes == null || bytes.get() == null) { + return ""; + } + + String firstResult = null; + final Iterator it = deserializers.iterator(); + while (it.hasNext()) { + final Optional possibleResult = tryDeserializer(bytes, it.next()); + if (possibleResult.isPresent() && firstResult == null) { + firstResult = possibleResult.get(); + } + + if (!possibleResult.isPresent()) { + it.remove(); + } + } + + return firstResult == null + ? "" + : firstResult; + } + + private Optional tryDeserializer( + final Bytes bytes, + final NamedDeserializer deserializer + ) { + try { + final Object result = deserializer.deserializer.deserialize(topicName, bytes.get()); + return Optional.of(result == null ? "" : result.toString()); + } catch (final Exception e) { + return Optional.empty(); + } + } + } + + enum WindowSchema { + + SESSION(SessionWindowedDeserializer::new), + HOPPING(WindowSchema::newTimeWindowedDeserializer), + TUMBLING(WindowSchema::newTimeWindowedDeserializer); + + private final Function, Deserializer> mapper; + + WindowSchema(final Function, Deserializer> mapper) { + this.mapper = requireNonNull(mapper, "mapper"); + } + + public NamedDeserializer wrap(final NamedDeserializer inner) { + + final String name = name() + "(" + inner.name + ")"; + + final Deserializer deserializer = mapper.apply(inner.deserializer); + + return new NamedDeserializer(name, inner.doNotWrap, deserializer); + } + + private static Deserializer newTimeWindowedDeserializer(final Deserializer inner) { + final TimeWindowedDeserializer windowedDeser = new TimeWindowedDeserializer<>(inner); + + return (topic, data) -> { + final Windowed windowed = windowedDeser.deserialize(topic, data); + + // Exclude window end time for time-windowed as the end time is not in the serialized data: + return "[" + windowed.key() + "@" + windowed.window().start() + "/-]"; + }; + } + } + + enum Format { + AVRO(0, KafkaAvroDeserializer::new), + JSON(RecordFormatter::newJsonDeserializer), + KAFKA_INT(IntegerDeserializer::new), + KAFKA_BIGINT(LongDeserializer::new), + KAFKA_DOUBLE(DoubleDeserializer::new), + KAFKA_STRING(RecordFormatter::newStringDeserializer), + UNRECOGNISED_BYTES(BytesDeserializer::new); + + private final Function> deserializerFactory; + + Format(final Supplier> deserializerFactory) { + this(1, srClient -> deserializerFactory.get()); + } + + @SuppressWarnings("unused") + Format( + final int usedOnlyToDifferentiateWhichConstructorIsCalled, + final Function> deserializerFactory + ) { + this.deserializerFactory = requireNonNull(deserializerFactory, "deserializerFactory"); + } + + NamedDeserializer getDeserializer(final SchemaRegistryClient srClient) { + final Deserializer deserializer = deserializerFactory.apply(srClient); + return new NamedDeserializer(name(), this == UNRECOGNISED_BYTES, deserializer); + } + } + + private static final class NamedDeserializer { + + final String name; + final boolean doNotWrap; + final Deserializer deserializer; + + private NamedDeserializer( + final String name, + final boolean doNotWrap, + final Deserializer deserializer + ) { + this.name = requireNonNull(name, "name"); + this.doNotWrap = doNotWrap; + this.deserializer = requireNonNull(deserializer, "deserializer"); + } + + @Override + public String toString() { + return name; + } + } + + private static final class DeserializationException extends RuntimeException { + + DeserializationException(final String msg) { + super(msg); + } + + DeserializationException(final String msg, final Throwable cause) { + super(msg, cause); + } + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java deleted file mode 100644 index 15804186cfe6..000000000000 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStream.java +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.rest.server.resources.streaming; - -import static java.util.Objects.requireNonNull; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.ksql.json.JsonMapper; -import io.confluent.ksql.schema.ksql.SqlBaseType; -import io.confluent.ksql.serde.kafka.KafkaSerdeFactory; -import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.Locale; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.Bytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class TopicStream { - - private TopicStream() { - } - - public static class RecordFormatter { - - private static final Logger log = LoggerFactory.getLogger(RecordFormatter.class); - - private final KafkaAvroDeserializer avroDeserializer; - private final String topicName; - private final DateFormat dateFormat; - - private Optional keyFormatter = Optional.empty(); - private Optional valueFormatter = Optional.empty(); - - public RecordFormatter( - final SchemaRegistryClient schemaRegistryClient, - final String topicName - ) { - this( - schemaRegistryClient, - topicName, - SimpleDateFormat.getDateTimeInstance(3, 1, Locale.getDefault()) - ); - } - - @VisibleForTesting - RecordFormatter( - final SchemaRegistryClient schemaRegistryClient, - final String topicName, - final DateFormat dateFormat - ) { - this.topicName = requireNonNull(topicName, "topicName"); - this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient); - this.dateFormat = requireNonNull(dateFormat, "dateFormat"); - } - - public List> format(final Iterable> records) { - if (!keyFormatter.isPresent()) { - keyFormatter = getKeyFormatter(records); - } - - if (!valueFormatter.isPresent()) { - valueFormatter = getValueFormatter(records); - } - - return StreamSupport.stream(records.spliterator(), false) - .map(this::delayedFormat) - .collect(Collectors.toList()); - } - - @SuppressWarnings("OptionalGetWithoutIsPresent") // will not be empty if needed - private Supplier delayedFormat(final ConsumerRecord record) { - return () -> { - try { - final String rowTime = record.timestamp() == ConsumerRecord.NO_TIMESTAMP - ? "N/A" - : dateFormat.format(new Date(record.timestamp())); - - final String rowKey = record.key() == null || record.key().get() == null - ? "" - : keyFormatter.get().print(record.key()); - - final String value = record.value() == null || record.value().get() == null - ? "" - : valueFormatter.get().print(record.value()); - - return "rowtime: " + rowTime - + ", " + "key: " + rowKey - + ", value: " + value; - } catch (IOException e) { - log.warn("Exception formatting record", e); - return "Failed to parse row"; - } - }; - } - - public String getKeyFormat() { - return keyFormatter - .map(Formatter::getFormat) - .orElse(Format.UNDEFINED.toString()); - } - - public String getValueFormat() { - return valueFormatter - .map(Formatter::getFormat) - .orElse(Format.UNDEFINED.toString()); - } - - private Optional getKeyFormatter( - final Iterable> records - ) { - if (Iterables.isEmpty(records)) { - return Optional.empty(); - } - - final Stream valueStream = StreamSupport - .stream(records.spliterator(), false) - .map(ConsumerRecord::key); - - return findFormatter(valueStream); - } - - private Optional getValueFormatter( - final Iterable> records - ) { - if (Iterables.isEmpty(records)) { - return Optional.empty(); - } - - final Stream valueStream = StreamSupport - .stream(records.spliterator(), false) - .map(ConsumerRecord::value); - - return findFormatter(valueStream); - } - - private Optional findFormatter(final Stream dataStream) { - final List formatters = dataStream - .filter(Objects::nonNull) - .filter(d -> d.get() != null) - .map(this::findFormatter) - .collect(Collectors.toList()); - - final Set formats = formatters.stream() - .map(Formatter::getFormat) - .collect(Collectors.toSet()); - - switch (formats.size()) { - case 0: - // No viable records (will try again with next batch): - return Optional.empty(); - - case 1: - // Single format: - return Optional.of(formatters.get(0)); - - default: - // Mixed format topic: - return Format.MIXED.maybeGetFormatter(topicName, null, avroDeserializer); - } - } - - private Formatter findFormatter(final Bytes data) { - return Arrays.stream(Format.values()) - .map(f -> f.maybeGetFormatter(topicName, data, avroDeserializer)) - .filter(Optional::isPresent) - .map(Optional::get) - .findFirst() - .orElseThrow(() -> new IllegalStateException("Unexpected")); - } - } - - interface Formatter { - - String print(Bytes data) throws IOException; - - String getFormat(); - } - - enum Format { - UNDEFINED { - @Override - public Optional maybeGetFormatter( - final String topicName, - final Bytes data, - final KafkaAvroDeserializer avroDeserializer - ) { - return Optional.empty(); - } - }, - AVRO { - @Override - public Optional maybeGetFormatter( - final String topicName, - final Bytes data, - final KafkaAvroDeserializer avroDeserializer - ) { - try { - avroDeserializer.deserialize(topicName, data.get()); - return Optional.of(createFormatter(topicName, avroDeserializer)); - } catch (final Exception t) { - return Optional.empty(); - } - } - - private Formatter createFormatter( - final String topicName, - final KafkaAvroDeserializer avroDeserializer - ) { - return new Formatter() { - @Override - public String print(final Bytes data) { - return avroDeserializer.deserialize(topicName, data.get()) - .toString(); - } - - @Override - public String getFormat() { - return AVRO.toString(); - } - }; - } - }, - JSON { - @Override - public Optional maybeGetFormatter( - final String topicName, - final Bytes data, - final KafkaAvroDeserializer avroDeserializer - ) { - try { - final JsonNode jsonNode = JsonMapper.INSTANCE.mapper.readTree(data.toString()); - - if (!(jsonNode instanceof ObjectNode) && !(jsonNode instanceof ArrayNode)) { - // Other valid JSON types, e.g. NumericNode, BooleanNode, etc - // are indistinguishable from single column delimited format: - return Optional.empty(); - } - - return Optional.of(createFormatter()); - } catch (final Exception t) { - return Optional.empty(); - } - } - - private Formatter createFormatter() { - return new Formatter() { - @Override - public String print(final Bytes data) throws IOException { - // Ensure deserializes to validate JSON: - JsonMapper.INSTANCE.mapper.readTree(data.get()); - - // Return data as string: - return data.toString(); - } - - @Override - public String getFormat() { - return JSON.toString(); - } - }; - } - }, - KAFKA { - @Override - public Optional maybeGetFormatter( - final String topicName, - final Bytes data, - final KafkaAvroDeserializer avroDeserializer - ) { - return KafkaSerdeFactory.SQL_SERDE.entrySet().stream() - .map(e -> trySerde(e.getKey(), e.getValue(), topicName, data)) - .filter(Optional::isPresent) - .map(Optional::get) - .findFirst(); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - private Optional trySerde( - final SqlBaseType type, - final Serde serde, - final String topicName, - final Bytes data - ) { - try { - final Deserializer deserializer = (Deserializer) serde.deserializer(); - deserializer.deserialize(topicName, data.get()); - - return Optional.of(createFormatter(deserializer, topicName, type)); - - } catch (final Exception e) { - return Optional.empty(); - } - } - - private Formatter createFormatter( - final Deserializer deserializer, - final String topicName, - final SqlBaseType type - ) { - final String subType = type == SqlBaseType.BIGINT || type == SqlBaseType.DOUBLE - ? "BIGINT or DOUBLE" // Not possible to tell between them from bytes only. - : type.toString(); - - return new Formatter() { - @Override - public String print(final Bytes data) { - return deserializer.deserialize(topicName, data.get()).toString(); - } - - @Override - public String getFormat() { - return KAFKA + " (" + subType + ")"; - } - }; - } - }, - MIXED { - @Override - public Optional maybeGetFormatter( - final String topicName, - final Bytes data, - final KafkaAvroDeserializer avroDeserializer - ) { - // Mixed mode defaults to string values: - return Optional.of(createStringFormatter(MIXED.toString())); - } - }; - - abstract Optional maybeGetFormatter( - String topicName, - Bytes data, - KafkaAvroDeserializer avroDeserializer - ); - - private static Formatter createStringFormatter(final String format) { - final StringDeserializer deserializer = new StringDeserializer(); - return new Formatter() { - - @Override - public String print(final Bytes data) { - return deserializer.deserialize("", data.get()); - } - - @Override - public String getFormat() { - return format; - } - }; - } - } -} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamWriter.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamWriter.java index 21f541fcc732..a99811bf71f5 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamWriter.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamWriter.java @@ -20,15 +20,16 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.parser.tree.PrintTopic; -import io.confluent.ksql.rest.server.resources.streaming.TopicStream.RecordFormatter; import io.confluent.ksql.services.ServiceContext; -import java.io.EOFException; import java.io.IOException; import java.io.OutputStream; +import java.io.PrintStream; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.OptionalInt; +import java.util.function.Predicate; import java.util.function.Supplier; import javax.ws.rs.core.StreamingOutput; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -45,7 +46,7 @@ public class TopicStreamWriter implements StreamingOutput { private final KafkaConsumer topicConsumer; private final SchemaRegistryClient schemaRegistryClient; private final String topicName; - private final OptionalInt limit; + private final Predicate limitReached; private long messagesWritten; private long messagesPolled; @@ -80,7 +81,9 @@ public static TopicStreamWriter create( this.schemaRegistryClient = requireNonNull(schemaRegistryClient, "schemaRegistryClient"); this.topicName = requireNonNull(topicName, "topicName"); this.interval = interval; - this.limit = requireNonNull(limit, "limit"); + this.limitReached = requireNonNull(limit, "limit").isPresent() + ? written -> written >= limit.getAsInt() + : written -> false; this.disconnectCheckInterval = requireNonNull(disconnectCheckInterval, "disconnectCheckInterval"); this.messagesWritten = 0; @@ -94,39 +97,38 @@ public static TopicStreamWriter create( @Override public void write(final OutputStream out) { try { + final PrintStream print = new PrintStream(out, true, "UTF8"); final RecordFormatter formatter = new RecordFormatter(schemaRegistryClient, topicName); - boolean printFormat = true; - while (true) { + final FormatsTracker formatsTracker = new FormatsTracker(print); + while (!print.checkError() && !limitReached.test(messagesWritten)) { final ConsumerRecords records = topicConsumer.poll(disconnectCheckInterval); if (records.isEmpty()) { - out.write("\n".getBytes(UTF_8)); - out.flush(); + print.println(); continue; } final List> values = formatter.format(records.records(topicName)); - for (final Supplier value : values) { - if (printFormat) { - printFormat = false; - out.write(("Key format: " + formatter.getKeyFormat() + "\n").getBytes(UTF_8)); - out.write(("Value format: " + formatter.getValueFormat() + "\n").getBytes(UTF_8)); - } + if (values.isEmpty()) { + continue; + } + final List toOutput = new ArrayList<>(); + for (final Supplier value : values) { if (messagesPolled++ % interval == 0) { messagesWritten++; - out.write(value.get().getBytes(UTF_8)); - out.write(System.lineSeparator().getBytes(UTF_8)); - out.flush(); + toOutput.add(value.get()); } - if (limit.isPresent() && messagesWritten >= limit.getAsInt()) { - return; + if (limitReached.test(messagesWritten)) { + break; } } + + formatsTracker.update(formatter); + + toOutput.forEach(print::println); } - } catch (final EOFException exception) { - // Connection terminated, we can stop writing } catch (final Exception exception) { log.error("Exception encountered while writing to output stream", exception); outputException(out, exception); @@ -144,4 +146,45 @@ private static void outputException(final OutputStream out, final Exception exce log.debug("Client disconnected while attempting to write an error message"); } } + + private static final class FormatsTracker { + + private final PrintStream out; + private final List keyFormats = new ArrayList<>(); + private final List valueFormats = new ArrayList<>(); + + FormatsTracker(final PrintStream out) { + this.out = requireNonNull(out, "out"); + this.keyFormats.add("add an entry to force output for formats on the first loop"); + this.valueFormats.add("add an entry to force output for formats on the first loop"); + } + + public void update(final RecordFormatter formatter) { + update(out, keyFormats, formatter.getPossibleKeyFormats(), "Key format: "); + update(out, valueFormats, formatter.getPossibleValueFormats(), "Value format: "); + } + + private static void update( + final PrintStream out, + final List previous, + final List current, + final String prefix + ) { + if (previous.equals(current)) { + return; + } + + previous.clear(); + previous.addAll(current); + + out.print(prefix); + + if (current.isEmpty()) { + out.println(" does not match any supported format. " + + "It may be a STRING with encoding other than UTF8, or some other format."); + } else { + out.println(String.join(" or ", current)); + } + } + } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PrintSubscriptionTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PrintSubscriptionTest.java index 9480439f06d1..4c4f3f057320 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PrintSubscriptionTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/PrintSubscriptionTest.java @@ -13,7 +13,6 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.rest.server.resources.streaming.PrintPublisher.PrintSubscription; import io.confluent.ksql.rest.server.resources.streaming.StreamingTestUtils.TestSubscriber; -import io.confluent.ksql.rest.server.resources.streaming.TopicStream.RecordFormatter; import java.time.Duration; import java.util.Collection; import java.util.Iterator; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java new file mode 100644 index 000000000000..2cbf66f8dc19 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/RecordFormatterTest.java @@ -0,0 +1,1001 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.resources.streaming; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.ksql.rest.server.resources.streaming.RecordFormatter.Deserializers; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.DoubleSerializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.SessionWindowedSerializer; +import org.apache.kafka.streams.kstream.TimeWindowedSerializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.internals.SessionKeySchema; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(Enclosed.class) +public class RecordFormatterTest { + + private static final String TOPIC_NAME = "some-topic"; + + @RunWith(MockitoJUnitRunner.class) + public static class MainClassTest { + + private static final Bytes KEY_BYTES = Bytes.wrap("the key".getBytes(UTF_8)); + private static final Bytes VALUE_BYTES = Bytes.wrap("the value".getBytes(UTF_8)); + + @Mock + private SchemaRegistryClient schemaRegistryClient; + @Mock + private Deserializers keyDeserializers; + @Mock + private Deserializers valueDeserializers; + + private RecordFormatter formatter; + private long timestamp = 1581366404000L; + + @Before + public void setUp() { + formatter = new RecordFormatter( + new SimpleDateFormat("MM/dd/yyyy HH:mm:ss +0000"), + keyDeserializers, + valueDeserializers + ); + } + + @Test + public void shouldStartWithAllKeyFormats() { + // Given: + formatter = new RecordFormatter(schemaRegistryClient, TOPIC_NAME); + + // Then: + assertThat(formatter.getPossibleKeyFormats(), containsInAnyOrder( + "AVRO", "SESSION(AVRO)", "TUMBLING(AVRO)", "HOPPING(AVRO)", + "JSON", "SESSION(JSON)", "TUMBLING(JSON)", "HOPPING(JSON)", + "KAFKA_INT", "SESSION(KAFKA_INT)", "TUMBLING(KAFKA_INT)", "HOPPING(KAFKA_INT)", + "KAFKA_BIGINT", "SESSION(KAFKA_BIGINT)", "TUMBLING(KAFKA_BIGINT)", + "HOPPING(KAFKA_BIGINT)", + "KAFKA_DOUBLE", "SESSION(KAFKA_DOUBLE)", "TUMBLING(KAFKA_DOUBLE)", + "HOPPING(KAFKA_DOUBLE)", + "KAFKA_STRING", "SESSION(KAFKA_STRING)", "TUMBLING(KAFKA_STRING)", "HOPPING(KAFKA_STRING)" + )); + } + + @Test + public void shouldStartWithAllValueFormats() { + // Given: + formatter = new RecordFormatter(schemaRegistryClient, TOPIC_NAME); + + // Then: + assertThat(formatter.getPossibleValueFormats(), containsInAnyOrder( + "AVRO", + "JSON", + "KAFKA_INT", + "KAFKA_BIGINT", + "KAFKA_DOUBLE", + "KAFKA_STRING" + )); + } + + @Test + public void shouldDelayFormatting() { + // When: + formatter.format(consumerRecords(KEY_BYTES, VALUE_BYTES)); + + // Then: + verify(keyDeserializers, never()).format(any()); + verify(valueDeserializers, never()).format(any()); + } + + @Test + public void shouldFormatOnDemand() { + // Given: + final List> result = formatter + .format(consumerRecords(KEY_BYTES, VALUE_BYTES)); + + // When: + result.get(0).get(); + + // Then: + verify(keyDeserializers).format(KEY_BYTES); + verify(valueDeserializers).format(VALUE_BYTES); + } + + @Test + public void shouldFormatNulls() { + // When: + formatSingle(consumerRecord(null, null)); + + // Then: + verify(keyDeserializers).format(null); + verify(valueDeserializers).format(null); + } + + @Test + public void shouldFormatNullsBytes() { + // Given: + final Bytes nullBytes = new Bytes(null); + + // When: + formatSingle(consumerRecord(nullBytes, nullBytes)); + + // Then: + verify(keyDeserializers).format(nullBytes); + verify(valueDeserializers).format(nullBytes); + } + + @Test + public void shouldFormatRowTime() { + // When: + final String formatted = formatSingle(consumerRecord(null, null)); + + // Then: + assertThat(formatted, containsString("rowtime: 02/10/2020 20:26:44 +0000, ")); + } + + @Test + public void shouldFormatNoRowTime() { + // Given: + timestamp = ConsumerRecord.NO_TIMESTAMP; + + // When: + final String formatted = formatSingle(consumerRecord(null, null)); + + // Then: + assertThat(formatted, containsString("rowtime: N/A, ")); + } + + @Test + public void shouldReturnPossibleKeyFormats() { + // Given: + final ImmutableList expected = ImmutableList.of("f0", "f1"); + when(keyDeserializers.getPossibleFormats()).thenReturn(expected); + + // Then: + assertThat(formatter.getPossibleKeyFormats(), is(expected)); + } + + @Test + public void shouldReturnPossibleValueFormats() { + // Given: + final ImmutableList expected = ImmutableList.of("f0", "f1"); + when(valueDeserializers.getPossibleFormats()).thenReturn(expected); + + // Then: + assertThat(formatter.getPossibleValueFormats(), is(expected)); + } + + private String formatSingle(final ConsumerRecord consumerRecord) { + final List> result = formatter.format(ImmutableList.of(consumerRecord)); + assertThat(result, hasSize(1)); + return result.get(0).get(); + } + + @SuppressWarnings("SameParameterValue") + private Iterable> consumerRecords( + final Bytes keyBytes, + final Bytes valueBytes + ) { + return ImmutableList.of(consumerRecord(keyBytes, valueBytes)); + } + + private ConsumerRecord consumerRecord( + final Bytes keyBytes, + final Bytes valueBytes + ) { + return new ConsumerRecord<>( + TOPIC_NAME, + 1, + 1, + timestamp, + TimestampType.CREATE_TIME, + 123, + 1, + 1, + keyBytes, + valueBytes + ); + } + } + + @RunWith(MockitoJUnitRunner.class) + public static class DeserializersTest { + + private static final Random RNG = new Random(); + + private static final Schema AVRO_SCHEMA = parseAvroSchema("{" + + " \"type\": \"record\"," + + " \"name\": \"myrecord\"," + + " \"fields\": [" + + " { \"name\": \"str1\", \"type\": \"string\" }" + + " ]" + + "}"); + + private static final String JSON_OBJECT = "{\"a\":1}"; + private static final String JSON_ARRAY = "[10,22,44]"; + private static final String JSON_TEXT = "\"Yes this is valid json\""; + private static final String JSON_NUMBER = "24.987"; + private static final String JSON_BOOLEAN = "true"; + private static final String JSON_NULL = "null"; + private static final List VALID_JSON = ImmutableList.of( + JSON_OBJECT, + JSON_ARRAY, + JSON_TEXT, + JSON_NUMBER, + JSON_BOOLEAN, + JSON_NULL + ); + + private static final List NULL_VARIANTS; + + static { + final List nullVariants = new ArrayList<>(); + nullVariants.add(new Bytes(null)); + nullVariants.add(null); + NULL_VARIANTS = Collections.unmodifiableList(nullVariants); + } + + private static final int SERIALIZED_INT_SIZE = 4; + private static final int SERIALIZED_BIGINT_SIZE = 8; + private static final int SERIALIZED_DOUBLE_SIZE = 8; + + private static final Window TIME_WINDOW = new TimeWindow(1234567890123L, 1234567899999L); + private static final Window SESSION_WINDOW = new SessionWindow(1534567890123L, 1534567899999L); + + private static final GenericRecord AVRO_RECORD = avroRecord(); + private static final Bytes SERIALIZED_AVRO_RECORD = serialize(AVRO_RECORD, avroSerializer()); + private static final Bytes SERIALIZED_TIME_WINDOWED_AVRO_RECORD = serialize( + new Windowed<>(AVRO_RECORD, TIME_WINDOW), + new TimeWindowedSerializer<>(avroSerializer()) + ); + private static final Bytes SERIALIZED_SESSION_WINDOWED_AVRO_RECORD = serialize( + new Windowed<>(AVRO_RECORD, SESSION_WINDOW), + new SessionWindowedSerializer<>(avroSerializer()) + ); + + private static final int KAFKA_INT = 24; + private static final Bytes SERIALIZED_KAFKA_INT = serialize(KAFKA_INT, new IntegerSerializer()); + private static final Bytes SERIALIZED_TIME_WINDOWED_KAFKA_INT = serialize( + new Windowed<>(KAFKA_INT, TIME_WINDOW), + WindowedSerdes.timeWindowedSerdeFrom(Integer.class).serializer() + ); + private static final Bytes SERIALIZED_SESSION_WINDOWED_KAFKA_INT = serialize( + new Windowed<>(KAFKA_INT, SESSION_WINDOW), + WindowedSerdes.sessionWindowedSerdeFrom(Integer.class).serializer() + ); + + private static final long KAFKA_BIGINT = 199L; + private static final Bytes SERIALIZED_KAFKA_BIGINT = serialize(KAFKA_BIGINT, + new LongSerializer() + ); + private static final Bytes SERIALIZED_TIME_WINDOWED_KAFKA_BIGINT = serialize( + new Windowed<>(KAFKA_BIGINT, TIME_WINDOW), + WindowedSerdes.timeWindowedSerdeFrom(Long.class).serializer() + ); + private static final Bytes SERIALIZED_SESSION_WINDOWED_KAFKA_BIGINT = serialize( + new Windowed<>(KAFKA_BIGINT, SESSION_WINDOW), + WindowedSerdes.sessionWindowedSerdeFrom(Long.class).serializer() + ); + + private static final double KAFKA_DOUBLE = 24.199d; + private static final Bytes SERIALIZED_KAFKA_DOUBLE = serialize( + KAFKA_DOUBLE, + new DoubleSerializer() + ); + private static final Bytes SERIALIZED_TIME_WINDOWED_KAFKA_DOUBLE = serialize( + new Windowed<>(KAFKA_DOUBLE, TIME_WINDOW), + WindowedSerdes.timeWindowedSerdeFrom(Double.class).serializer() + ); + private static final Bytes SERIALIZED_SESSION_WINDOWED_KAFKA_DOUBLE = serialize( + new Windowed<>(KAFKA_DOUBLE, SESSION_WINDOW), + WindowedSerdes.sessionWindowedSerdeFrom(Double.class).serializer() + ); + + private static final String KAFKA_STRING = "κόσμε"; + private static final Bytes SERIALIZED_INVALID_UTF8 = Bytes.wrap( + new byte[]{-1, -1, 'i', 's', ' ', 'i', 'n', 'v', 'a', 'l', 'i', 'd', -1, -1} + ); + private static final Bytes SERIALIZED_KAFKA_STRING = serialize( + KAFKA_STRING, + new StringSerializer() + ); + private static final Bytes SERIALIZED_TIME_WINDOWED_KAFKA_STRING = serialize( + new Windowed<>(KAFKA_STRING, TIME_WINDOW), + WindowedSerdes.timeWindowedSerdeFrom(String.class).serializer() + ); + private static final Bytes SERIALIZED_SESSION_WINDOWED_KAFKA_STRING = serialize( + new Windowed<>(KAFKA_STRING, SESSION_WINDOW), + WindowedSerdes.sessionWindowedSerdeFrom(String.class).serializer() + ); + + @Mock + private SchemaRegistryClient schemaRegistryClient; + + private Deserializers deserializers; + + + @Before + public void setUp() { + deserializers = new Deserializers(TOPIC_NAME, schemaRegistryClient, true); + } + + @Test + public void shouldNotExcludeAnyThingOnNulls() { + // Given: + final List expected = ImmutableList.copyOf(deserializers.getPossibleFormats()); + + NULL_VARIANTS.forEach(nullVariant -> { + // When: + deserializers.format(nullVariant); + + // Then: + assertThat(deserializers.getPossibleFormats(), is(expected)); + }); + } + + @Test + public void shouldExcludeFixedSizeFormatsWhereSizeDoesNotMatch() { + // When: + deserializers.format(getBytes(2)); + + // Then: + assertThat(deserializers.getPossibleFormats(), notHasItems( + "KAFKA_INT", "SESSION(KAFKA_INT)", "TUMBLING(KAFKA_INT)", "HOPPING(KAFKA_INT)", + "KAFKA_BIGINT", "SESSION(KAFKA_BIGINT)", "TUMBLING(KAFKA_BIGINT)", + "HOPPING(KAFKA_BIGINT)", + "KAFKA_DOUBLE", "SESSION(KAFKA_DOUBLE)", "TUMBLING(KAFKA_DOUBLE)", "HOPPING(KAFKA_DOUBLE)" + )); + } + + @Test + public void shouldExcludeSessionWindowedOnInvalidWindows() { + // Given: + final Bytes invalidAsEndTimeBeforeStart = SessionKeySchema + .toBinary(getBytes(4), 100, 99); + + // When: + deserializers.format(invalidAsEndTimeBeforeStart); + + // Then: + assertThat(deserializers.getPossibleFormats(), notHasItems( + "SESSION(AVRO)", + "SESSION(JSON)", + "SESSION(KAFKA_INT)", + "SESSION(KAFKA_BIGINT)", + "SESSION(KAFKA_DOUBLE)" + )); + } + + @Test + public void shouldExcludeAvroNoSchema() { + // Given: + // not: givenAvroSchemaRegistered(); + + // When: + deserializers.format(SERIALIZED_AVRO_RECORD); + + // Then: + assertThat(deserializers.getPossibleFormats(), not(hasItem("AVRO"))); + } + + @Test + public void shouldNotExcludeAvroOnValidAvro() { + // Given: + givenAvroSchemaRegistered(); + + // When: + deserializers.format(SERIALIZED_AVRO_RECORD); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItems( + "AVRO" + )); + } + + @Test + public void shouldFormatValidAvro() { + // Given: + givenAvroSchemaRegistered(); + + // When: + final String formatted = deserializers.format(SERIALIZED_AVRO_RECORD); + + // Then: + assertThat(formatted, is("{\"str1\": \"My first string\"}")); + } + + @Test + public void shouldNotExcludeTimeWindowedAvroOnValidTimeWindowedAvro() { + // Given: + givenAvroSchemaRegistered(); + + // When: + deserializers.format(SERIALIZED_TIME_WINDOWED_AVRO_RECORD); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItems( + "TUMBLING(AVRO)", "HOPPING(AVRO)" + )); + } + + @Test + public void shouldFormatValidTimeWindowedAvro() { + // Given: + givenAvroSchemaRegistered(); + + // When: + final String formatted = deserializers.format(SERIALIZED_TIME_WINDOWED_AVRO_RECORD); + + assertThat(formatted, is("[{\"str1\": \"My first string\"}@1234567890123/-]")); + } + + @Test + public void shouldNotExcludeSessionWindowedAvroOnValidTimeWindowedAvro() { + // Given: + givenAvroSchemaRegistered(); + + // When: + deserializers.format(SERIALIZED_SESSION_WINDOWED_AVRO_RECORD); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItems( + "SESSION(AVRO)" + )); + } + + @Test + public void shouldFormatValidSessionWindowedAvro() { + // Given: + givenAvroSchemaRegistered(); + + // When: + final String formatted = deserializers.format(SERIALIZED_SESSION_WINDOWED_AVRO_RECORD); + + // Then: + assertThat(formatted, is("[{\"str1\": \"My first string\"}@1534567890123/1534567899999]")); + } + + @Test + public void shouldExcludeJsonOnBadJson() { + // Given: + final String notJson = "{" + + "BAD DATA" + + "\"name\": \"myrecord\"," + + " \"type\": \"record\"" + + "}"; + + // When: + deserializers.format(Bytes.wrap(notJson.getBytes(UTF_8))); + + // Then: + assertThat(deserializers.getPossibleFormats(), not(hasItems("JSON"))); + } + + @Test + public void shouldNotExcludeJsonOnValidJson() { + VALID_JSON.forEach(json -> { + // Given: + final Bytes serialized = Bytes.wrap(json.getBytes(UTF_8)); + + // When: + deserializers.format(serialized); + + // Then: + assertThat(json, deserializers.getPossibleFormats(), hasItems("JSON")); + }); + } + + @Test + public void shouldFormatValidJson() { + VALID_JSON.forEach(json -> { + // Given: + final Bytes serialized = Bytes.wrap(json.getBytes(UTF_8)); + + // When: + final String formatted = deserializers.format(serialized); + + // Then: + assertThat(formatted, is(json)); + }); + } + + @Test + public void shouldNotExcludeTimeWindowedJsonOnTimeWindowedJson() { + VALID_JSON.forEach(json -> { + // Given: + final Bytes serialized = serialize( + new Windowed<>(json, TIME_WINDOW), + WindowedSerdes.timeWindowedSerdeFrom(String.class).serializer() + ); + + // When: + deserializers.format(serialized); + + // Then: + assertThat(json, deserializers.getPossibleFormats(), hasItems( + "TUMBLING(JSON)", "HOPPING(JSON)" + )); + }); + } + + @Test + public void shouldFormatValidTimeWindowedJson() { + VALID_JSON.forEach(json -> { + // Given: + final Bytes serialized = serialize( + new Windowed<>(json, TIME_WINDOW), + WindowedSerdes.timeWindowedSerdeFrom(String.class).serializer() + ); + + // When: + final String formatted = deserializers.format(serialized); + + // Then: + assertThat(formatted, is("[" + json + "@1234567890123/-]")); + }); + } + + @Test + public void shouldNotExcludeSessionWindowedJsonOnTimeWindowedJson() { + VALID_JSON.forEach(json -> { + // Given: + final Bytes serialized = serialize( + new Windowed<>(json, SESSION_WINDOW), + WindowedSerdes.sessionWindowedSerdeFrom(String.class).serializer() + ); + + // When: + deserializers.format(serialized); + + // Then: + assertThat(json, deserializers.getPossibleFormats(), hasItems( + "SESSION(JSON)" + )); + }); + } + + @Test + public void shouldFormatValidSessionWindowedJson() { + VALID_JSON.forEach(json -> { + // Given: + final Bytes serialized = serialize( + new Windowed<>(json, SESSION_WINDOW), + WindowedSerdes.sessionWindowedSerdeFrom(String.class).serializer() + ); + + // When: + final String formatted = deserializers.format(serialized); + + // Then: + assertThat(formatted, is("[" + json + "@1534567890123/1534567899999]")); + }); + } + + @Test + public void shouldExcludeKafkaIntIfWrongSize() { + // When: + deserializers.format(getBytes(SERIALIZED_INT_SIZE + 1)); + + // Then: + assertThat(deserializers.getPossibleFormats(), not(hasItem("KAFKA_INT"))); + } + + @Test + public void shouldNotExcludeKafkaIntIfWriteSize() { + // When: + deserializers.format(SERIALIZED_KAFKA_INT); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItem("KAFKA_INT")); + } + + @Test + public void shouldFormatValidKafkaInt() { + // When: + final String formatted = deserializers.format(SERIALIZED_KAFKA_INT); + + // Then: + assertThat(formatted, is(KAFKA_INT + "")); + } + + @Test + public void shouldNotExcludeTimeWindowedKafkaIntIfWriteSize() { + // When: + deserializers.format(SERIALIZED_TIME_WINDOWED_KAFKA_INT); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItems( + "HOPPING(KAFKA_INT)", "TUMBLING(KAFKA_INT)" + )); + } + + @Test + public void shouldFormatValidTimeWindowedKafkaInt() { + // When: + final String formatted = deserializers.format(SERIALIZED_TIME_WINDOWED_KAFKA_INT); + + // Then: + assertThat(formatted, is("[" + KAFKA_INT + "@1234567890123/-]")); + } + + @Test + public void shouldNotExcludeSessionWindowedKafkaIntIfWriteSize() { + // When: + deserializers.format(SERIALIZED_SESSION_WINDOWED_KAFKA_INT); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItems( + "SESSION(KAFKA_INT)" + )); + } + + @Test + public void shouldFormatValidSessionWindowedKafkaInt() { + // When: + final String formatted = deserializers.format(SERIALIZED_SESSION_WINDOWED_KAFKA_INT); + + // Then: + assertThat(formatted, is("[" + KAFKA_INT + "@1534567890123/1534567899999]")); + } + + @Test + public void shouldExcludeKafkaBigIntIfNotWriteSize() { + // When: + deserializers.format(getBytes(SERIALIZED_BIGINT_SIZE - 1)); + + // Then: + assertThat(deserializers.getPossibleFormats(), not(hasItem("KAFKA_BIGINT"))); + } + + @Test + public void shouldNotExcludeKafkaBigIntIfWriteSize() { + // When: + deserializers.format(SERIALIZED_KAFKA_BIGINT); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItem("KAFKA_BIGINT")); + } + + @Test + public void shouldFormatValidKafkaBigInt() { + // When: + final String formatted = deserializers.format(SERIALIZED_KAFKA_BIGINT); + + // Then: + assertThat(formatted, is(KAFKA_BIGINT + "")); + } + + @Test + public void shouldNotExcludeTimeWindowedKafkaBigIntIfWriteSize() { + // When: + deserializers.format(SERIALIZED_TIME_WINDOWED_KAFKA_BIGINT); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItems( + "HOPPING(KAFKA_BIGINT)", "TUMBLING(KAFKA_BIGINT)" + )); + } + + @Test + public void shouldFormatValidTimeWindowedKafkaBigInt() { + // When: + final String formatted = deserializers.format(SERIALIZED_TIME_WINDOWED_KAFKA_BIGINT); + + // Then: + assertThat(formatted, is("[" + KAFKA_BIGINT + "@1234567890123/-]")); + } + + @Test + public void shouldNotExcludeSessionWindowedKafkaBigIntIfWriteSize() { + // When: + deserializers.format(SERIALIZED_SESSION_WINDOWED_KAFKA_BIGINT); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItems( + "SESSION(KAFKA_BIGINT)" + )); + } + + @Test + public void shouldFormatValidSessionWindowedKafkaBigInt() { + // When: + final String formatted = deserializers.format(SERIALIZED_SESSION_WINDOWED_KAFKA_BIGINT); + + // Then: + assertThat(formatted, is("[" + KAFKA_BIGINT + "@1534567890123/1534567899999]")); + } + + @Test + public void shouldExcludeKafkaDoubleIfNotWriteSize() { + // When: + deserializers.format(getBytes(SERIALIZED_DOUBLE_SIZE - 1)); + + // Then: + assertThat(deserializers.getPossibleFormats(), not(hasItem("KAFKA_DOUBLE"))); + } + + @Test + public void shouldNotExcludeKafkaDoubleIfWriteSize() { + // When: + deserializers.format(SERIALIZED_KAFKA_DOUBLE); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItem("KAFKA_DOUBLE")); + } + + @Test + public void shouldFormatValidKafkaDoubleAsBigInt() { + // Note: Long and double are both 8 bytes and always valid. + // Long is more populate, so ksql leans towards long + + // When: + final String formatted = deserializers.format(SERIALIZED_KAFKA_DOUBLE); + + // Then: + assertThat(formatted, is(longEquiv(KAFKA_DOUBLE) + "")); + } + + @Test + public void shouldNotExcludeTimeWindowedKafkaDoubleIfWriteSize() { + // When: + deserializers.format(SERIALIZED_TIME_WINDOWED_KAFKA_DOUBLE); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItems( + "HOPPING(KAFKA_DOUBLE)", "TUMBLING(KAFKA_DOUBLE)" + )); + } + + @Test + public void shouldFormatValidTimeWindowedKafkaDouble() { + // Note: Long and double are both 8 bytes and always valid. + // Long is more populate, so ksql leans towards long + + // When: + final String formatted = deserializers.format(SERIALIZED_TIME_WINDOWED_KAFKA_DOUBLE); + + // Then: + assertThat(formatted, + is("[" + longEquiv(KAFKA_DOUBLE) + "@1234567890123/-]")); + } + + @Test + public void shouldNotExcludeSessionWindowedKafkaDoubleIfWriteSize() { + // When: + deserializers.format(SERIALIZED_SESSION_WINDOWED_KAFKA_DOUBLE); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItems( + "SESSION(KAFKA_DOUBLE)" + )); + } + + @Test + public void shouldFormatValidSessionWindowedKafkaDouble() { + // Note: Long and double are both 8 bytes and always valid. + // Long is more populate, so ksql leans towards long + + // When: + final String formatted = deserializers.format(SERIALIZED_SESSION_WINDOWED_KAFKA_DOUBLE); + + // Then: + assertThat(formatted, is("[" + longEquiv(KAFKA_DOUBLE) + "@1534567890123/1534567899999]")); + } + + @Test + public void shouldExcludeKafkaStringIfNotUtf8Text() { + // When: + deserializers.format(SERIALIZED_INVALID_UTF8); + + // Then: + assertThat(deserializers.getPossibleFormats(), not(hasItem("KAFKA_STRING"))); + } + + @Test + public void shouldNotExcludeKafkaStringIfValidUtf8() { + // When: + deserializers.format(SERIALIZED_KAFKA_STRING); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItem("KAFKA_STRING")); + } + + @Test + public void shouldFormatValidKafkaString() { + // When: + final String formatted = deserializers.format(SERIALIZED_KAFKA_STRING); + + // Then: + assertThat(formatted, is(KAFKA_STRING)); + } + + @Test + public void shouldNotExcludeTimeWindowedKafkaStringIfValidUtf8() { + // When: + deserializers.format(SERIALIZED_TIME_WINDOWED_KAFKA_STRING); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItems( + "HOPPING(KAFKA_STRING)", "TUMBLING(KAFKA_STRING)" + )); + } + + @Test + public void shouldFormatValidTimeWindowedKafkaString() { + // When: + final String formatted = deserializers.format(SERIALIZED_TIME_WINDOWED_KAFKA_STRING); + + // Then: + assertThat(formatted, is("[" + KAFKA_STRING + "@1234567890123/-]")); + } + + @Test + public void shouldNotExcludeSessionWindowedKafkaStringIfValidUtf8() { + // When: + deserializers.format(SERIALIZED_SESSION_WINDOWED_KAFKA_STRING); + + // Then: + assertThat(deserializers.getPossibleFormats(), hasItems( + "SESSION(KAFKA_STRING)" + )); + } + + @Test + public void shouldFormatValidSessionWindowedKafkaString() { + // When: + final String formatted = deserializers.format(SERIALIZED_SESSION_WINDOWED_KAFKA_STRING); + + // Then: + assertThat(formatted, is("[" + KAFKA_STRING + "@1534567890123/1534567899999]")); + } + + @Test + public void shouldFallBackToUnrecognisedBytesIfEverythingElseFails() { + // When: + deserializers.format(SERIALIZED_INVALID_UTF8); + + // Then: + assertThat(deserializers.getPossibleFormats(), is(empty())); + } + + @Test + public void shouldFormatUnrecognisedBytes() { + // When: + final String formatted = deserializers.format(SERIALIZED_INVALID_UTF8); + + // Then: + assertThat(formatted, is(SERIALIZED_INVALID_UTF8.toString())); + } + + @Test + public void shouldFormatNull() { + NULL_VARIANTS.forEach(nullVariant -> + assertThat(deserializers.format(null), is("")) + ); + } + + private void givenAvroSchemaRegistered() { + try { + final AvroSchema avroSchema = new AvroSchema(AVRO_SCHEMA); + when(schemaRegistryClient.getSchemaById(anyInt())).thenReturn(avroSchema); + } catch (final Exception e) { + fail("invalid test:" + e.getMessage()); + } + } + + private static Bytes getBytes(final int size) { + final byte[] bytes = new byte[size]; + RNG.nextBytes(bytes); + return Bytes.wrap(bytes); + } + + /** + * No way to tell between a double and a long once serialized. KSQL defaults to long. So doubles + * are output as longs: + */ + @SuppressWarnings("SameParameterValue") + private static long longEquiv(final double kafkaDouble) { + final byte[] bytes = new DoubleSerializer().serialize("foo", kafkaDouble); + return new LongDeserializer().deserialize("foo", bytes); + } + + private static Bytes serialize(final T value, final Serializer serializer) { + return Bytes.wrap(serializer.serialize(TOPIC_NAME, value)); + } + + @SuppressWarnings("SameParameterValue") + private static Schema parseAvroSchema(final String avroSchema) { + final Schema.Parser parser = new Schema.Parser(); + return parser.parse(avroSchema); + } + + private static Record avroRecord() { + final Record avroRecord = new Record(AVRO_SCHEMA); + avroRecord.put("str1", "My first string"); + return avroRecord; + } + + private static Serializer avroSerializer() { + final Map props = new HashMap<>(); + props.put("schema.registry.url", "localhost:9092"); + + final SchemaRegistryClient schemaRegistryClient = mock(SchemaRegistryClient.class); + return new KafkaAvroSerializer(schemaRegistryClient, props); + } + } + + @SafeVarargs + @SuppressWarnings("varargs") + private static Matcher> notHasItems(final T... items) { + return allOf(Arrays.stream(items) + .map(Matchers::hasItem) + .map(Matchers::not) + .collect(Collectors.toList())); + } +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamTest.java deleted file mode 100644 index cc278d92df60..000000000000 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamTest.java +++ /dev/null @@ -1,587 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.rest.server.resources.streaming; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.endsWith; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Streams; -import io.confluent.kafka.schemaregistry.avro.AvroSchema; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import io.confluent.ksql.rest.server.resources.streaming.TopicStream.Format; -import io.confluent.ksql.rest.server.resources.streaming.TopicStream.RecordFormatter; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.serialization.DoubleSerializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.Bytes; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@SuppressWarnings({"UnstableApiUsage", "unchecked"}) -@RunWith(MockitoJUnitRunner.class) -public class TopicStreamTest { - - private static final byte[] NULL = "null-marker".getBytes(UTF_8); - - private static final String TOPIC_NAME = "some-topic"; - - private static final String JSON_OBJECT = "{\"a\":1}"; - private static final String JSON_ARRAY = "[10,22,44]"; - private static final Schema AVRO_SCHEMA = parseAvroSchema("{" + - " \"type\": \"record\"," + - " \"name\": \"myrecord\"," + - " \"fields\": [" + - " { \"name\": \"str1\", \"type\": \"string\" }" + - " ]" + - "}"); - private static final byte[] SERIALIZED_AVRO_RECORD = serializedAvroRecord(); - private static final int KAFKA_INT = 24; - private static final byte[] SERIZALIZED_KAFKA_INT = serialize(KAFKA_INT, new IntegerSerializer()); - private static final long KAFKA_BIGINT = 199L; - private static final byte[] SERIZALIZED_KAFKA_BIGINT = serialize(KAFKA_BIGINT, new LongSerializer()); - private static final double KAFKA_DOUBLE = 24.199d; - private static final byte[] SERIZALIZED_KAFKA_DOUBLE = serialize(KAFKA_DOUBLE, new DoubleSerializer()); - - private static final String DELIMITED_VALUE = "De,lim,it,ed"; - private static final byte[] RANDOM_BYTES = new byte[]{23, 45, 63, 23, 1, 0, 1, 99, 101}; - private static final List NULL_VARIANTS; - - private static final DateFormat UTC_FORMAT = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss +0000"); - - static { - final List nullVariants = new ArrayList<>(); - nullVariants.add(new Bytes(null)); - nullVariants.add(null); - NULL_VARIANTS = Collections.unmodifiableList(nullVariants); - } - - @Mock - private SchemaRegistryClient schemaRegistryClient; - - private RecordFormatter formatter; - private long timestamp = 1581366404000L; - - @Before - public void setUp() { - formatter = new RecordFormatter(schemaRegistryClient, TOPIC_NAME, UTC_FORMAT); - } - - @Test - public void shouldDetectAvroKey() { - // Given: - givenAvroSchemaRegistered(); - - // When: - formatSingle(SERIALIZED_AVRO_RECORD, NULL); - - // Then: - assertThat(formatter.getKeyFormat(), is(Format.AVRO.toString())); - } - - @Test - public void shouldDetectJsonObjectKey() { - // When: - formatSingle(JSON_OBJECT.getBytes(UTF_8), SERIALIZED_AVRO_RECORD); - - // Then: - assertThat(formatter.getKeyFormat(), is(Format.JSON.toString())); - } - - @Test - public void shouldDetectJsonObjectValue() { - // When: - formatSingle(SERIALIZED_AVRO_RECORD, JSON_OBJECT.getBytes(UTF_8)); - - // Then: - assertThat(formatter.getValueFormat(), is(Format.JSON.toString())); - } - - @Test - public void shouldDetectJsonArrayKey() { - // When: - formatSingle(JSON_ARRAY.getBytes(UTF_8), NULL); - - // Then: - assertThat(formatter.getKeyFormat(), is(Format.JSON.toString())); - } - - @Test - public void shouldDetectJsonArrayValue() { - // When: - formatSingle(NULL, JSON_ARRAY.getBytes(UTF_8)); - - // Then: - assertThat(formatter.getValueFormat(), is(Format.JSON.toString())); - } - - @Test - public void shouldDetectBadJsonAsString() { - // Given: - final String notJson = "{" - + "BAD DATA" - + "\"name\": \"myrecord\"," + - " \"type\": \"record\"" + - "}"; - - // When: - formatSingle(SERIALIZED_AVRO_RECORD, notJson.getBytes(UTF_8)); - - // Then: - assertThat(formatter.getValueFormat(), is("KAFKA (STRING)")); - } - - @Test - public void shouldDetectDelimitedAsStringKey() { - // When: - formatSingle(DELIMITED_VALUE.getBytes(UTF_8), NULL); - - // Then: - assertThat(formatter.getKeyFormat(), is("KAFKA (STRING)")); - } - - @Test - public void shouldDetectDelimitedAsStringValue() { - // When: - formatSingle(NULL, DELIMITED_VALUE.getBytes(UTF_8)); - - // Then: - assertThat(formatter.getValueFormat(), is("KAFKA (STRING)")); - } - - @Test - public void shouldDetectRandomBytesAsStringKey() { - // When: - formatSingle(RANDOM_BYTES, NULL); - - // Then: - assertThat(formatter.getKeyFormat(), is("KAFKA (STRING)")); - } - - @Test - public void shouldDetectRandomBytesAsStringValue() { - // When: - formatSingle(NULL, RANDOM_BYTES); - - // Then: - assertThat(formatter.getValueFormat(), is("KAFKA (STRING)")); - } - - @Test - public void shouldDetectKafkaIntKey() { - // When: - formatSingle(SERIZALIZED_KAFKA_INT, NULL); - - // Then: - assertThat(formatter.getKeyFormat(), is("KAFKA (INTEGER)")); - } - - @Test - public void shouldDetectKafkaBigIntKey() { - // When: - formatSingle(SERIZALIZED_KAFKA_BIGINT, NULL); - - // Then: - assertThat(formatter.getKeyFormat(), is("KAFKA (BIGINT or DOUBLE)")); - } - - @Test - public void shouldDetectKafkaDoubleKey() { - // When: - formatSingle(SERIZALIZED_KAFKA_DOUBLE, NULL); - - // Then: - assertThat(formatter.getKeyFormat(), is("KAFKA (BIGINT or DOUBLE)")); - } - - @Test - public void shouldDetectMixedModeKey() { - // When: - formatKeys( - JSON_OBJECT.getBytes(UTF_8), - DELIMITED_VALUE.getBytes(UTF_8), - SERIALIZED_AVRO_RECORD - ); - - // Then: - assertThat(formatter.getKeyFormat(), is(Format.MIXED.toString())); - } - - @Test - public void shouldDetectMixedModeValue() { - // When: - formatValues( - JSON_OBJECT.getBytes(UTF_8), - DELIMITED_VALUE.getBytes(UTF_8), - SERIALIZED_AVRO_RECORD - ); - - // Then: - assertThat(formatter.getValueFormat(), is(Format.MIXED.toString())); - } - - @Test - public void shouldDeferFormatDetectionOnNulls() { - // When: - format(NULL_VARIANTS, NULL_VARIANTS); - - // Then: - assertThat(formatter.getKeyFormat(), is(Format.UNDEFINED.toString())); - assertThat(formatter.getValueFormat(), is(Format.UNDEFINED.toString())); - } - - @Test - public void shouldDetermineKeyFormatsOnSecondCallIfNoViableRecordsInFirst() { - // Given: - formatSingle(NULL, NULL); - - // When: - formatKeys(NULL, JSON_OBJECT.getBytes(UTF_8), NULL); - - // Then: - assertThat(formatter.getKeyFormat(), is(Format.JSON.toString())); - } - - @Test - public void shouldDetermineValueFormatsOnSecondCallIfNoViableRecordsInFirst() { - // Given: - formatSingle(NULL, NULL); - - // When: - formatValues(NULL, JSON_OBJECT.getBytes(UTF_8), NULL); - - // Then: - assertThat(formatter.getValueFormat(), is(Format.JSON.toString())); - } - - @Test - public void shouldOutputRowTime() { - // When: - final String formatted = formatSingle(NULL, NULL); - - // Then: - assertThat(formatted, containsString("rowtime: 02/10/2020 20:26:44 +0000, ")); - } - - @Test - public void shouldOutputRowTimeAsNaIfNa() { - // Given: - timestamp = ConsumerRecord.NO_TIMESTAMP; - - // When: - final String formatted = formatSingle(NULL, NULL); - - // Then: - assertThat(formatted, containsString("rowtime: N/A, ")); - } - - @Test - public void shouldFormatAvroKey() { - // Given: - givenAvroSchemaRegistered(); - - // When: - final String formatted = formatSingle(SERIALIZED_AVRO_RECORD, NULL); - - // Then: - assertThat(formatted, containsString(", key: {\"str1\": \"My first string\"}, ")); - } - - @Test - public void shouldFormatAvroValue() { - // Given: - givenAvroSchemaRegistered(); - - // When: - final String formatted = formatSingle(NULL, SERIALIZED_AVRO_RECORD); - - // Then: - assertThat(formatted, endsWith(", value: {\"str1\": \"My first string\"}")); - } - - @Test - public void shouldFormatJsonObjectKey() { - // When: - final String formatted = formatSingle(JSON_OBJECT.getBytes(UTF_8), NULL); - - // Then: - assertThat(formatted, containsString(", key: " + JSON_OBJECT + ", ")); - } - - @Test - public void shouldFormatJsonObjectValue() { - // When: - final String formatted = formatSingle(NULL, JSON_OBJECT.getBytes(UTF_8)); - - // Then: - assertThat(formatted, containsString(", value: " + JSON_OBJECT)); - } - - @Test - public void shouldFormatJsonArrayKey() { - // When: - final String formatted = formatSingle(JSON_ARRAY.getBytes(UTF_8), NULL); - - // Then: - assertThat(formatted, containsString(", key: " + JSON_ARRAY + ", ")); - } - - @Test - public void shouldFormatJsonArrayValue() { - // When: - final String formatted = formatSingle(NULL, JSON_ARRAY.getBytes(UTF_8)); - - // Then: - assertThat(formatted, containsString(", value: " + JSON_ARRAY)); - } - - @Test - public void shouldFormatDelimitedAsStringKey() { - // When: - final String formatted = formatSingle(DELIMITED_VALUE.getBytes(UTF_8), NULL); - - // Then: - assertThat(formatted, containsString(", key: " + DELIMITED_VALUE + ", ")); - } - - @Test - public void shouldFormatDelimitedAsStringValue() { - // When: - final String formatted = formatSingle(NULL, DELIMITED_VALUE.getBytes(UTF_8)); - - // Then: - assertThat(formatted, containsString(", value: " + DELIMITED_VALUE)); - } - - @Test - public void shouldFormatRandomBytesAsStringKey() { - // When: - final String formatted = formatSingle(RANDOM_BYTES, NULL); - - // Then: - assertThat(formatted, containsString(", key: " + new String(RANDOM_BYTES, UTF_8) + ", ")); - } - - @Test - public void shouldFormatRandomBytesAsStringValue() { - // When: - final String formatted = formatSingle(NULL, RANDOM_BYTES); - - // Then: - assertThat(formatted, containsString(", value: " + new String(RANDOM_BYTES, UTF_8))); - } - - @Test - public void shouldFormatKafkaIntKey() { - // When: - final String formatted = formatSingle(SERIZALIZED_KAFKA_INT, NULL); - - // Then: - assertThat(formatted, containsString(", key: " + KAFKA_INT + ", ")); - } - - @Test - public void shouldFormatKafkaBigIntKey() { - // When: - final String formatted = formatSingle(SERIZALIZED_KAFKA_BIGINT, NULL); - - // Then: - assertThat(formatted, containsString(", key: " + KAFKA_BIGINT + ", ")); - } - - @Test - public void shouldFormatKafkaDoubleKey() { - // When: - final String formatted = formatSingle(SERIZALIZED_KAFKA_DOUBLE, NULL); - - // Then: - assertThat(formatted, containsString(", key: " + longEquiv(KAFKA_DOUBLE) + ", ")); - } - - @Test - public void shouldDefaultToStringFormattingInMixedMode() { - // When: - final List results = formatValues( - JSON_OBJECT.getBytes(UTF_8), - DELIMITED_VALUE.getBytes(UTF_8), - SERIALIZED_AVRO_RECORD - ); - - // Then: - assertThat(results, contains( - containsString(", value: " + JSON_OBJECT), - containsString(", value: " + DELIMITED_VALUE), - containsString(", value: " + new String(SERIALIZED_AVRO_RECORD, UTF_8)) - )); - } - - @Test - public void shouldFormatNulls() { - // When: - final List formatted = format(NULL_VARIANTS, NULL_VARIANTS); - - // Then: - assertThat(formatted, contains( - containsString(", value: "), - containsString(", value: ") - )); - } - - @Test - public void shouldFormatNullJsonRecord() { - // Given: - formatSingle(NULL, JSON_OBJECT.getBytes(UTF_8)); - - // When: - final String formatted = formatSingle(NULL, "null".getBytes(UTF_8)); - - // Then: - assertThat(formatted, containsString(", value: null")); - } - - private String formatSingle(final byte[] key, final byte[] value) { - final List formatted = format( - Collections.singletonList(toBytes(key)), - Collections.singletonList(toBytes(value)) - ); - assertThat("Only expect one line", formatted, hasSize(1)); - - return formatted.get(0); - } - - private List formatKeys(final byte[] first, final byte[]... others) { - - final List keys = Streams - .concat(Stream.of(first), Arrays.stream(others)) - .map(TopicStreamTest::toBytes) - .collect(Collectors.toList()); - - final List values = IntStream.range(0, keys.size()) - .mapToObj(idx -> (Bytes)null) - .collect(Collectors.toList()); - - return format(keys, values); - } - - private List formatValues(final byte[] first, final byte[]... others) { - - final List values = Streams - .concat(Stream.of(first), Arrays.stream(others)) - .map(TopicStreamTest::toBytes) - .collect(Collectors.toList()); - - final List keys = IntStream.range(0, values.size()) - .mapToObj(idx -> (Bytes)null) - .collect(Collectors.toList()); - - return format(keys, values); - } - - private List format(final List keys, final List values) { - assertThat("invalid test", keys, hasSize(values.size())); - - final List> recs = IntStream.range(0, keys.size()) - .mapToObj(idx -> new ConsumerRecord<>(TOPIC_NAME, 1, 1, timestamp, - TimestampType.CREATE_TIME, 123, 1, 1, - keys.get(idx), values.get(idx))) - .collect(Collectors.toList()); - - final ConsumerRecords records = - new ConsumerRecords<>(ImmutableMap.of(new TopicPartition(TOPIC_NAME, 1), recs)); - - return formatter.format(records).stream() - .map(Supplier::get) - .collect(Collectors.toList()); - } - - private void givenAvroSchemaRegistered() { - try { - when(schemaRegistryClient.getSchemaById(anyInt())) - .thenReturn(new AvroSchema(AVRO_SCHEMA)); - } catch (final Exception e) { - fail("invalid test"); - } - } - - /* - No way to tell between a double and a long once serialized. - KSQL defaults to long. So doubles are output as longs: - */ - private static long longEquiv(final double kafkaDouble) { - final byte[] bytes = new DoubleSerializer().serialize("foo", kafkaDouble); - return new LongDeserializer().deserialize("foo", bytes); - } - - private static Bytes toBytes(final byte[] key) { - return key == NULL ? null : Bytes.wrap(key); - } - - private static byte[] serialize(final T value, final Serializer serializer) { - return serializer.serialize("topic", value); - } - - @SuppressWarnings("SameParameterValue") - private static Schema parseAvroSchema(final String avroSchema) { - final Schema.Parser parser = new Schema.Parser(); - return parser.parse(avroSchema); - } - - private static byte[] serializedAvroRecord() { - final GenericData.Record avroRecord = new GenericData.Record(AVRO_SCHEMA); - avroRecord.put("str1", "My first string"); - - final Map props = new HashMap<>(); - props.put("schema.registry.url", "localhost:9092"); - - final SchemaRegistryClient schemaRegistryClient = mock(SchemaRegistryClient.class); - final KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(schemaRegistryClient, props); - - return avroSerializer.serialize("topic", avroRecord); - } -} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamWriterTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamWriterTest.java index 52fd4f349fd7..79b98b6feb44 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamWriterTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/TopicStreamWriterTest.java @@ -78,8 +78,12 @@ public void shouldIntervalOneAndLimitTwo() { // Then: final List expected = ImmutableList.of( - "Key format: KAFKA (STRING)", - "Value format: KAFKA (STRING)", + "Key format: ", + "KAFKA_STRING", + System.lineSeparator(), + "Value format: ", + "KAFKA_STRING", + System.lineSeparator(), "rowtime: N/A, key: key-0, value: value-0", System.lineSeparator(), "rowtime: N/A, key: key-1, value: value-1", @@ -106,8 +110,12 @@ public void shouldIntervalTwoAndLimitTwo() { // Then: final List expected = ImmutableList.of( - "Key format: KAFKA (STRING)", - "Value format: KAFKA (STRING)", + "Key format: ", + "KAFKA_STRING", + System.lineSeparator(), + "Value format: ", + "KAFKA_STRING", + System.lineSeparator(), "rowtime: N/A, key: key-0, value: value-0", System.lineSeparator(), "rowtime: N/A, key: key-2, value: value-2", @@ -118,7 +126,7 @@ public void shouldIntervalTwoAndLimitTwo() { private static class ValidatingOutputStream extends OutputStream { - private final List recordedWrites; + private final List recordedWrites; ValidatingOutputStream() { this.recordedWrites = new ArrayList<>(); @@ -127,21 +135,19 @@ private static class ValidatingOutputStream extends OutputStream { @Override public void write(final int b) { /* not called*/ } @Override - public void write(final byte[] bytes) { - recordedWrites.add(bytes); + public void write(final byte[] bytes, int off, int len) { + recordedWrites.add(new String(bytes, off, len, Charsets.UTF_8)); } void assertWrites(final List expected) { for (int i = 0; i < recordedWrites.size(); i++) { - final byte[] bytes = recordedWrites.get(i); + final String actual = recordedWrites.get(i); if (expected.size() <= i) { break; } - assertThat( - new String(bytes, Charsets.UTF_8), - containsString(expected.get(i))); + assertThat(actual, containsString(expected.get(i))); } assertThat(recordedWrites, hasSize(expected.size())); diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java index 83e462014332..4ca72c1854a9 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/kafka/KafkaSerdeFactory.java @@ -22,7 +22,6 @@ import io.confluent.ksql.schema.connect.SqlSchemaFormatter; import io.confluent.ksql.schema.connect.SqlSchemaFormatter.Option; import io.confluent.ksql.schema.ksql.PersistenceSchema; -import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.serde.KsqlSerdeFactory; import io.confluent.ksql.util.DecimalUtil; @@ -44,6 +43,7 @@ @Immutable public class KafkaSerdeFactory implements KsqlSerdeFactory { + // Note: If supporting new types here, add new type of PRINT TOPIC support too private static final ImmutableMap> SERDE = ImmutableMap.of( Type.INT32, Serdes.Integer(), Type.INT64, Serdes.Long(), @@ -51,13 +51,6 @@ public class KafkaSerdeFactory implements KsqlSerdeFactory { Type.STRING, Serdes.String() ); - public static final ImmutableMap> SQL_SERDE = ImmutableMap.of( - SqlBaseType.INTEGER, Serdes.Integer(), - SqlBaseType.BIGINT, Serdes.Long(), - SqlBaseType.DOUBLE, Serdes.Double(), - SqlBaseType.STRING, Serdes.String() - ); - @Override public void validate(final PersistenceSchema schema) { getPrimitiveSerde(schema.serializedSchema());