Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: migrate partitioning topic to markdown (DOCS-2776) #3743

Merged
merged 1 commit into from
Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 164 additions & 68 deletions docs-md/developer-guide/partition-data.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
---
layout: page
title: Partition Data to Enable Joins
tagline: Correct partitioning for joins
description: Learn how correctly partitioned topics enable join queries
keywords: ksqldb, join, partition, key, schema
---

Partition Data to Enable Joins
Expand All @@ -9,50 +14,173 @@ streams and tables are *co-partitioned*, which means that input records
on both sides of the join have the same configuration settings for
partitions.

To join two data sources, streams or tables, KSQL needs to compare their
records based on the joining column. To ensure that records with the same
join column are co-located on the same stream task, the join column must
coincide with the column that the sources are partitioned by.

A *primary key*, when present, defines the partitioning column. Tables are
always partitioned by their primary key, and KSQL doesn't allow repartitioning
of tables, so you can only use a table's primary key as a join column.

Streams, on the other hand, may not have a defined key or may have a key that
differs from the join column. In these cases, KSQL repartitions the stream,
which implicitly defines a primary key for it. The primary keys for streams
and tables are of data type `VARCHAR`.

For primary keys to match, they must have the same serialization format. For
example, you can't join a `VARCHAR` key encoded as JSON with one encoded as AVRO.

!!! note
KSQL requires that keys are encoded as UTF-8 strings.

Because you can only use the primary key of a table as a joining column, it's
important to understand how keys are defined. For both streams and tables, the
column that represents the primary key has the name `ROWKEY`.

When you create a table by using a CREATE TABLE statement, the key of the
table is the same as that of the records in the underlying Kafka topic.

When you create a table by using a CREATE TABLE AS SELECT statement, the key of
the resulting table is determined as follows:

- If the FROM clause is a single source, and the source is a stream, the
statement must have a GROUP BY clause, where the grouping columns determine
the key of the resulting table.
- If the single source is a table, the key is copied over from the key of the
table in the FROM clause. If the FROM clause is a join, the primary key of the
resulting table is the joining column, since joins are allowed only on keys.
- If the statement contains a GROUP BY, the key of the resulting table
comprises the grouping columns.

When the primary key consists of multiple columns, like when it's created as
the result of a GROUP BY clause with multiple grouping columns, you must use
ROWKEY as the joining column. Even when the primary key consists of a single
column, we recommend using ROWKEY as the joining column to avoid confusion.

The following example shows a `users` table joined with a `clicks` stream
on the `userId` column. The `users` table has the correct primary key
`userId` that coincides with the joining column. But the `clicks` stream
doesn't have a defined key, and KSQL must repartition it on the joining column,
(`userId`) and assign the primary key before performing the join.

```sql
-- clicks stream, with an unknown key.
-- the schema of stream clicks is: ROWTIME | ROWKEY | USERID | URL
CREATE STREAM clicks (userId STRING, url STRING) WITH(kafka_topic='clickstream', value_format='json');

-- the primary key of table users becomes userId that is the key of the records topic:
-- the schema of table users is: ROWTIME | ROWKEY | USERID | URL
CREATE TABLE users (userId STRING, fullName STRING) WITH(kafka_topic='users', value_format='json', key='userId');

-- join using primary key of table users with newly assigned key of stream clicks
-- join will automatically repartition clicks stream:
SELECT clicks.url, users.fullName FROM clicks JOIN users ON clicks.ROWKEY = users.ROWKEY;
```

Co-partitioning Requirements
----------------------------

- The input records for the join must have the
[same keying scheme](#records-have-the-same-keying-scheme)
- The input records must have the
[same number of partitions](#records-have-the-same-number-of-partitions)
on both sides.
- Both sides of the join must have the
[same partitioning strategy](#records-have-the-same-partitioning-strategy).
When you use KSQL to join streaming data, you must ensure that your streams
and tables are *co-partitioned*, which means that input records on both sides
of the join have the same configuration settings for partitions.

- The input records for the join must have the
[same keying scheme](#records-have-the-same-keying-scheme).
- The input records must have the
[same number of partitions](#records-have-the-same-number-of-partitions)
on both sides.
- Both sides of the join must have the
[same partitioning strategy](#records-have-the-same-partitioning-strategy).

When your inputs are co-partitioned, records with the same key, from
both sides of the join, are delivered to the same stream task during
processing. If your inputs aren't co-partitioned, you need to
[re-key one of them](#ensure-data-co-partitioning) by using the PARTITION BY
clause.
processing.

### Records Have the Same Keying Scheme

The input records for the join must have the same keying scheme, which
means that the join must use the same key field on both sides.
For a join to work, the keys from both sides must have the same serialized
binary data.

For example, you can join a stream of user clicks that's keyed on a `VARCHAR`
user id with a table of user profiles that's also keyed on a `VARCHAR` user id.
Records with the exact same user id on both sides will be joined.

KSQL requires that keys are UTF-8 encoded strings.

!!! note
A join depends on the key's underlying serialization format. For example,
no join occurs on a VARCHAR column that's encoded as JSON with a VARCHAR
column that's encoded as AVRO.

Tables created on top of existing Kafka topics, for example those created with
a `CREATE TABLE` statement, are keyed on the data held in the key of the records
in the Kafka topic. KSQL presents this data in the `ROWKEY` column and expects
the data to be a `VARCHAR`.

Tables created inside KSQL from other sources, for example those created with
a `CREATE TABLE AS SELECT` statement, will copy the key from their source(s)
unless there is an explicit `GROUP BY` clause, which can change what the table
is keyed on.

For example, you can join a stream of user clicks that\'s keyed by a
`VARCHAR userId` field with a table of user profiles that\'s keyed by a
`VARCHAR userId` field. The join won\'t match if the key fields don\'t
have the same name and type.
!!! note
KSQL automatically repartitions a stream if a join requires it, but KSQL
rejects any join on a table's column that is *not* the key. This is
because KSQL doesn't support joins on foreign keys, and repartitioning a
table's topic has the potential to reorder events and misinterpret
tombstones, which can lead to unintended or undesired side effects.

If you are using the same sources in more than one join that requires the data
to be repartitioned, you may prefer to repartition manually to avoid KSQL
repartitioning multiple times.

To repartition a stream, use the PARTITION BY clause. Be aware that Kafka
guarantees the relative order of any two messages from one source partition
only if they are also both in the same partition after the repartition.
Otherwise, Kafka is likely to interleave messages. The use case will determine
if these ordering guarantees are acceptable.

For example, if you need to re-partition a stream to be keyed by a `product_id`
field, and keys need to be distributed over 6 partitions to make a join work,
use the following KSQL statement:

```sql
CREATE STREAM products_rekeyed WITH (PARTITIONS=6) AS SELECT * FROM products PARTITION BY product_id;
```

For more information, see
[Inspecting and Changing Topic Keys](https://www.confluent.io/stream-processing-cookbook/ksql-recipes/inspecting-changing-topic-keys)
in the [Stream Processing Cookbook](https://www.confluent.io/product/ksql/stream-processing-cookbook).

### Records Have the Same Number of Partitions

The input records for the join must have the same number of partitions
on both sides.
The input records for the join must have the same number of partitions on both
sides.

KSQL checks this part of the co-partitioning requirement and rejects any join
where the partition counts differ.

Use the `DESCRIBE EXTENDED <source name>` command in the CLI to determine the
Kafka topic under a source, and use the SHOW TOPICS command in the CLI to list
topics and their partition counts.

KSQL checks this part of the co-partitioning requirement and throws a
runtime exception if the partition count is different.
If the sides of the join have different partition counts, you may want to change
the partition counts of the source topics, or repartition one side to match the
partition count of the other.

Use the `<path-to-confluent>/bin/kafka-topics` CLI tool with the
`--describe` option to see the number of partitions for the Kafka topics
that correspond with your streams and tables.
The following example creates a repartitioned stream, maintaining the existing
key, with the specified number of partitions.

```sql
CREATE STREAM products_rekeyed WITH (PARTITIONS=6) AS SELECT * FROM products PARTITION BY ROWKEY;
```

### Records Have the Same Partitioning Strategy

Records on both sides of the join must have the same partitioning
strategy. If you use the default partitioner settings across all
applications, you don't need to worry about the partitioning strategy.
applications, and your producers don't specify an explicit partition,
you don't need to worry about the partitioning strategy.

But if the producer applications for your records have custom
partitioners specified in
Expand All @@ -71,53 +199,21 @@ though both sides are keyed by `userId`.
KSQL can't verify whether the partitioning strategies are the same for
both join inputs, so you must ensure this.

The [DefaultPartitioner
class](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
The
[DefaultPartitioner class](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
implements the following partitioning strategy:

- If the producer specifies a partition in the record, use it.
- If the producer specifies a key instead of a partition, choose a
partition based on a hash of the key.
- If the producer doesn\'t specify a partition or a key, choose a
partition in a round-robin fashion.
- If the producer specifies a partition in the record, use it.
- If the producer specifies a key instead of a partition, choose a
partition based on a hash of the key.
- If the producer doesn't specify a partition or a key, choose a
partition in a round-robin fashion.

Custom partitioner classes implement the [Partitioner
interface](https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/Partitioner.html)
Custom partitioner classes implement the
[Partitioner interface](https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/Partitioner.html)
and are assigned in the producer configuration property,
`partitioner.class`.

For example implementations of a custom partitioner, see [Built for
realtime: Big data messaging with Apache Kafka, Part
2](https://www.javaworld.com/article/3066873/big-data/big-data-messaging-with-kafka-part-2.html)
and [Apache Kafka Foundation Course - Custom
Partitioner](https://www.learningjournal.guru/courses/kafka/kafka-foundation-training/custom-partitioner/).

Ensure Data Co-partitioning
---------------------------

If your join inputs aren\'t co-partitioned, you must ensure it manually
by re-keying the data on one side of the join.

For example, in a stream-table join, if a stream of user clicks is keyed
by `pageId`, but a table of user profiles is keyed by `userId`, one of
the two inputs must be re-keyed (re-partitioned). Which of the two
should be re-keyed depends on the situation.

If the stream has very high volume, you may not want to re-key it,
because this would duplicate a large data source. Instead, you may
prefer to re-key the smaller table.

To enforce co-partitioning, use the PARTITION BY clause.

For example, if you need to re-partition a stream to be keyed by a
`product_id` field, and keys need to be distributed over 6 partitions to
make a join work, use the following KSQL statement:

```sql
CREATE STREAM products_rekeyed WITH (PARTITIONS=6) AS SELECT * FROM products PARTITION BY product_id EMIT CHANGES;
```

For more information, see [Inspecting and Changing Topic
Keys](https://www.confluent.io/stream-processing-cookbook/ksql-recipes/inspecting-changing-topic-keys)
in the [Stream Processing
Cookbook](https://www.confluent.io/product/ksql/stream-processing-cookbook).
For example implementations of a custom partitioner, see
[Built for realtime: Big data messaging with Apache Kafka, Part2](https://www.javaworld.com/article/3066873/big-data/big-data-messaging-with-kafka-part-2.html)
and [Apache Kafka Foundation Course - Custom Partitioner](https://www.learningjournal.guru/courses/kafka/kafka-foundation-training/custom-partitioner/).
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ nav:
- Create a ksqlDB Table: developer-guide/create-a-table.md
- Implement a User-defined Function (UDF and UDAF): developer-guide/implement-a-udf.md
- Join Event Streams with ksqlDB: developer-guide/join-streams-and-tables.md
- Partition Data to Enable Joins: developer-guide/partition-data.md
- Query With Arrays and Maps: developer-guide/query-with-arrays-and-maps.md
- Query With Structured Data: developer-guide/query-with-structured-data.md
- Transform a Stream With ksqlDB: developer-guide/transform-a-stream-with-ksql.md
Expand Down