Skip to content

Commit

Permalink
feat: enhance PRINT TOPIC's format detection (#4551)
Browse files Browse the repository at this point in the history
* feat: enhance `PRINT TOPIC`'s format detection

fixes: #4258

With this change `PRINT TOPIC` has enhanced detection for the key and value formats of a topic.
The command starts with a list of known formats for both the key and the value and refines this
list as it sees more data.

As the list of possible formats is refined over time the command will output the reduced list.
For example, you may see output such as:

```
ksql> PRINT some_topic FROM BEGINNING;
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 12/21/18 23:58:42 PM PSD, key: stream/CLICKSTREAM/create, value: {statement":"CREATE STREAM clickstream (_time bigint,time varchar, ip varchar, request varchar, status int, userid int, bytes bigint, agent varchar) with (kafka_topic = 'clickstream', value_format = 'json');","streamsProperties":{}}
rowtime: 12/21/18 23:58:42 PM PSD, key: table/EVENTS_PER_MIN/create, value: {"statement":"create table events_per_min as select userid, count(*) as events from clickstream window  TUMBLING (size 10 second) group by userid EMIT CHANGES;","streamsProperties":{}}
Key format: KAFKA_STRING
...
```

In the last line of the above output the command has narrowed the key format down as it has proceeded more data.

The command has also been updated to only detect valid UTF8 encoded text as type `JSON` or `KAFKA_STRING`.
This is inline with how KSQL would later deserialize the data.

If no known format can successfully deserialize the data it is printed as a combination of ASCII characters and hex encoded bytes.
  • Loading branch information
big-andy-coates authored Feb 14, 2020
1 parent 77df772 commit a3fae28
Show file tree
Hide file tree
Showing 21 changed files with 1,673 additions and 1,049 deletions.
2 changes: 1 addition & 1 deletion config/ksql-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ bootstrap.servers=localhost:9092
# ksql.connect.worker.config=config/connect.properties

# Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry:
# ksql.schema.registry.url=?
# ksql.schema.registry.url=http://localhost:8081
182 changes: 177 additions & 5 deletions docs-md/concepts/schemas.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,189 @@
---
layout: page
title: Schemas in ksqlDB
tagline: Use schemas in your queries
tagline: Defining the structure of your data
description: Learn how schemas work with ksqlDB
keywords: ksqldb, schema, evolution, avro
---

Data sources like streams and tables have an associated schema. This schema defines the columns
available in the data, just like a the columns in a traditional SQL database table.

## Key vs Value columns

KsqlDB supports both key and value columns. These map to the data held in the keys and values of the
underlying {{ site.ak }} topic.

A column is defined by a combination of its [name](#valid-identifiers), its [SQL data type](#sql-data-type),
and possibly a namespace.

Key columns have a `KEY` namespace suffix. Key columns have the following restrictions:
* The can only be a single key column, currently.
* The key column must be named `ROWKEY` in the KSQL schema.

Value columns have no namespace suffix. There can be one or more value columns amd the value columns
can have any name.

For example, the following declares a schema with a single `INT` key column and several value
columns:

```sql
ROWKEY INT KEY, ID BIGINT, STRING NAME, ADDRESS ADDRESS_TYPE
```

## Valid Identifiers

Column and field names must be valid identifiers.

Unquoted identifiers will be treated as upper-case, for example `col0` is equivalent to `COL0`, and
must contain only alpha-numeric and underscore characters.

Identifiers containing invalid character, or where case needs to be preserved, can be quoted using
back-tick quotes, for example ``col0``.

## SQL data types

The following SQL types are supported by ksqlDB:

* [Primitive types](#primitive-types)
* [Decimal type](#decimal-type)
* [Array type](#array-type)
* [Map type](#map-type)
* [Struct type](#struct-type)
* [Custom types](#custom-types)

### Primitive types

Supported primitive types are:

* `BOOLEAN`: a binary value
* `INT`: 32-bit signed integer
* `BIGINT`: 64-bit signed integer
* `DOUBLE`: double precision (64-bit) IEEE 754 floating-point number
* `STRING`: a unicode character sequence (UTF8)

### Decimal type

The `DECIMAL` type can store numbers with a very large number of digits and perform calculations exactly.
It is recommended for storing monetary amounts and other quantities where exactness is required.
However, arithmetic on decimals is slow compared to integer and floating point types.

`DECIMAL` types have a _precision_ and _scale_.
The scale is the number of digits in the fractional part, to the right of the decimal point.
The precision is the total number of significant digits in the whole number, that is,
the number of digits on both sides of the decimal point.
For example, the number `765.937500` has a precision of 9 and a scale of 6.

To declare a column of type `DECIMAL` use the syntax:

```sql
DECIMAL(precision, scale)
```

The precision must be positive, the scale zero or positive.

### Array type

The `ARRAY` type defines a variable-length array of elements. All elements in the array must be of
the same type.

To declare an `ARRAY` use the syntax:

```
ARRAY<element-type>
```

The _element-type_ of an another [SQL data type](#sql-data-types).

For example, the following creates an array of `STRING`s:

```sql
ARRAY<STRING>
```

Instances of an array can be created using the syntax:

```
ARRAY[value [, value]*]
```

For example, the following creates an array with three `INT` elements:

```sql
ARRAY[2, 4, 6]
```

### Map type

The `MAP` type defines a variable-length collection of key-value pairs. All keys in the map must be
of the same type. All values in the map must be of the same type.

To declare a `MAP` use the syntax:

```
MAP<key-type, element-type>
```

The _key-type_ must currently be `STRING` while the _value-type_ can an any other [SQL data type](#sql-data-types).

For example, the following creates a map with `STRING` keys and values:

```sql
MAP<STRING, STRING>
```

Instances of a map can be created using the syntax:

```
MAP(key := value [, key := value]*)
```

For example, the following creates a map with three key-value pairs:

```sql
MAP('a' := 1, 'b' := 2, 'c' := 3)
```

### Struct type

The `STRUCT` type defines a list of named fields, where each field can have any [SQL data type](#sql-data-types).

To declare a `STRUCT` use the syntax:

```
STRUCT<field-name field-type [, field-name field-type]*>
```

The _field-name_ can be any [valid identifier](#valid-identifiers). The _field-type_ can be any
valid [SQL data type](#sql-data-types).

For example, the following creates a struct with an `INT` field called `FOO` and a `BOOLEAN` field
call `BAR`:

```sql
STRUCT<FOO INT, BAR BOOLEAN>
```

Instances of a struct can be created using the syntax:

```
STRUCT(field-name := field-value [, field-name := field-value]*)
```

For example, the following creates a struct with fields called `FOO` and `BAR` and sets their values
to `10` and `true`, respectively:

```sql
STRUCT('FOO' := 10, 'BAR' := true)
```

### Custom types

KsqlDB supports custom types using the `CREATE TYPE` statements.
See the [`CREATE TYPE` docs](../developer-guide/ksqldb-reference/create-type) for more information.

TODO:

- overview of how schemas work in ksqlDB
- overview of data types
- overview of serialization
- schema evolution with ksqlDB and Avro


Page last revised on: {{ git_revision_date }}
11 changes: 9 additions & 2 deletions docs-md/developer-guide/create-a-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ PRINT pageviews_intro;
Your output should resemble:

```
Key format: KAFKA (BIGINT or DOUBLE)
Value format: KAFKA (STRING)
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Value format: KAFKA_STRING
rowtime: 10/30/18 10:15:51 PM GMT, key: 294851, value: 1540937751186,User_8,Page_12
rowtime: 10/30/18 10:15:55 PM GMT, key: 295051, value: 1540937755255,User_1,Page_15
rowtime: 10/30/18 10:15:57 PM GMT, key: 295111, value: 1540937757265,User_8,Page_10
Expand All @@ -298,6 +298,13 @@ Press Ctrl+C to stop printing the stream.
!!! note
The query continues to run after you stop printing the stream.

!!! note
KsqlDB has determined that the key format is either `KAFKA_BIGINT` or `KAFKA_DOUBLE`.
KsqlDB has not narrowed it further because it is not possible to rule out
either format just by inspecting the key's serialized bytes. In this case we know the key is
a `BIGINT`. For other cases you may know the key type or you may need to speak to the author
of the data.

Use the SHOW QUERIES statement to view the query that ksqlDB created for
the `pageviews_intro` stream:

Expand Down
2 changes: 1 addition & 1 deletion docs-md/developer-guide/create-a-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ PRINT users_female;
Your output should resemble:

```
Key format: KAFKA (STRING)
Key format: KAFKA_STRING
Value format: JSON
rowTime: 12/21/18 23:58:42 PM PSD, key: User_5, value: {"USERID":"User_5","GENDER":"FEMALE","REGIONID":"Region_4"}
rowTime: 12/21/18 23:58:42 PM PSD, key: User_2, value: {"USERID":"User_2","GENDER":"FEMALE","REGIONID":"Region_7"}
Expand Down
8 changes: 4 additions & 4 deletions docs-md/developer-guide/joins/partition-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ the resulting table is determined as follows:
- When grouping by a single column or expression, the type of `ROWKEY` in the
resulting stream matches the type of the column or expression.
- When grouping by multiple columns or expressions, the type of `ROWKEY` in the
resulting stream is a `STRING`.
resulting stream is an [SQL `STRING`](../../concepts/schemas).
- If the FROM clause contains only tables and no GROUP BY clause, the key is
copied over from the key of the table(s) in the FROM clause.
- If the FROM clause contains only tables and has a GROUP BY clause, the
grouping columns determine the key of the resulting table.
- When grouping by a single column or expression, the type of `ROWKEY` in the
resulting stream matches the type of the column or expression.
- When grouping by multiple columns or expressions, the type of `ROWKEY` in the
resulting stream is a `STRING`.
resulting stream is an [SQL `STRING`](../../concepts/schemas).

The following example shows a `users` table joined with a `clicks` stream
on the `userId` column. The `users` table has the correct primary key
Expand Down Expand Up @@ -105,8 +105,8 @@ processing.

For a join to work, the keys from both sides must have the same SQL type.

For example, you can join a stream of user clicks that's keyed on a `VARCHAR`
user id with a table of user profiles that's also keyed on a `VARCHAR` user id.
For example, you can join a stream of user clicks that's keyed on a `STRING`
user id with a table of user profiles that's also keyed on a `STRING` user id.
Records with the exact same user id on both sides will be joined.

If the schema of the columns you wish to join on don't match, it may be possible
Expand Down
18 changes: 14 additions & 4 deletions docs-md/developer-guide/ksqldb-reference/print.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ Description

Print the contents of Kafka topics to the ksqlDB CLI.

The _topicName_ is case sensitive. Quote the name if it contains invalid characters.
See [Valid Identifiers](../../concepts/schemas#valid-identifiers) for more information.

The PRINT statement supports the following properties:

| Property | Description |
Expand All @@ -33,10 +36,11 @@ Example
-------

The following statement shows how to print all of the records in a topic named
`ksql__commands`.
`_confluent-ksql-default__command_topic`, (the default name for the topic ksqlDB stores your submitted command in).
Note, the topic name has been quoted as it contains the invalid dash character.

```sql
PRINT ksql__commands FROM BEGINNING;
PRINT '_confluent-ksql-default__command_topic' FROM BEGINNING;
```

ksqlDB attempts to determine the format of the data in the topic and outputs what it thinks are
Expand All @@ -51,13 +55,19 @@ the key and value formats at the top of the output.
Your output should resemble:

```
Key format: KAFKA (INTEGER)
Value format: JSON
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 12/21/18 23:58:42 PM PSD, key: stream/CLICKSTREAM/create, value: {statement":"CREATE STREAM clickstream (_time bigint,time varchar, ip varchar, request varchar, status int, userid int, bytes bigint, agent varchar) with (kafka_topic = 'clickstream', value_format = 'json');","streamsProperties":{}}
rowtime: 12/21/18 23:58:42 PM PSD, key: table/EVENTS_PER_MIN/create, value: {"statement":"create table events_per_min as select userid, count(*) as events from clickstream window TUMBLING (size 10 second) group by userid EMIT CHANGES;","streamsProperties":{}}
^CTopic printing ceased
```

The key format for this topic is `KAFKA_STRING`. However, the `PRINT` command does not know this and
has attempted to determine the format of the key by inspecting the data. It has determined that the
format may be `KAFKA_STRING`, but it could also be `JSON` or a windowed `KAFKA_STRING`.

The value format for this topic is `JSON`. However, the `PRINT` command has also determined it could
be `KAFKA_STRING`. This is because `JSON` is serialized as text. Hence you could choose to deserialize
this value data as a `KAFKA_STRING` if you wanted to. However, `JSON` is likely the better option.

Page last revised on: {{ git_revision_date }}
4 changes: 2 additions & 2 deletions docs-md/developer-guide/ksqldb-reference/select-push-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ In the previous statements, `from_item` is one of the following:
- `from_item LEFT JOIN from_item ON join_condition`

The WHERE clause can refer to any column defined for a stream or table,
including the system columns `ROWTIME` and `ROWKEY`.
including the `ROWTIME` and `ROWKEY` system columns.

Example
-------
Expand Down Expand Up @@ -114,7 +114,7 @@ or joins. Windows are tracked per record key.
Windowing adds two additional system columns to the data, which provide
the window bounds: `WINDOWSTART` and `WINDOWEND`.

KsqlDB supports the following WINDOW types.
KsqlDB supports the following WINDOW types:

**TUMBLING**: Tumbling windows group input records into fixed-sized,
non-overlapping windows based on the records' timestamps. You must
Expand Down
4 changes: 2 additions & 2 deletions docs-md/developer-guide/query-with-structured-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ PRINT 'raw-topic' FROM BEGINNING;
Your output should resemble:

```json
Key format: KAFKA (STRING)
Value format: JSON
Key format: KAFKA_STRING
Value format: JSON OR KAFKA_STRING
rowtime: 12/21/18 23:58:42 PM PSD, key: 1, value: {"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.1","field-a":1,"field-b":"first-value-for-key1"}}
rowtime: 12/21/18 23:58:42 PM PSD, key: 2, value: {"type":"key2","data":{"timestamp":"2018-12-21 23:58:42.2","field-a":1,"field-c":11,"field-d":"first-value-for-key2"}}
rowtime: 12/21/18 23:58:42 PM PSD, key: 3, value: {"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.3","field-a":2,"field-b":"updated-value-for-key1"}}
Expand Down
13 changes: 10 additions & 3 deletions docs-md/tutorials/basics-docker.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ PRINT users;
Your output should resemble:

```json
Key format: KAFKA (STRING)
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 10/30/18 10:15:51 PM GMT, key: User_1, value: {"registertime":1516754966866,"userid":"User_1","regionid":"Region_9","gender":"MALE"}
rowtime: 10/30/18 10:15:51 PM GMT, key: User_3, value: {"registertime":1491558386780,"userid":"User_3","regionid":"Region_2","gender":"MALE"}
Expand All @@ -204,8 +204,8 @@ PRINT pageviews;
Your output should resemble:

```
Key format: KAFKA (INTEGER)
Format: KAFKA (STRING)
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Format: KAFKA_STRING
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243183, value: 1540254243183,User_9,Page_20
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243617, value: 1540254243617,User_7,Page_47
rowtime: 10/23/18 12:24:03 AM PSD, key: 1540254243888, value: 1540254243888,User_4,Page_27
Expand All @@ -215,6 +215,13 @@ Your output should resemble:

Press Ctrl+C to stop printing messages.

!!! note
KsqlDB has determined that the key format is either `KAFKA_BIGINT` or `KAFKA_DOUBLE`.
KsqlDB has not narrowed it further because it is not possible to rule out
either format just by inspecting the key's serialized bytes. In this case we know the key is
a `BIGINT`. For other cases you may know the key type or you may need to speak to the author
of the data.

For more information, see [ksqlDB Syntax Reference](../developer-guide/syntax-reference.md).

Create a Stream and Table
Expand Down
Loading

0 comments on commit a3fae28

Please sign in to comment.