-
Notifications
You must be signed in to change notification settings - Fork 268
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2756 from mneedham/kafka-json-ch
kb article on using Kafka/CH JSON data type
- Loading branch information
Showing
1 changed file
with
135 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
--- | ||
date: 2024-11-06 | ||
title: Kafka and the JSON data type | ||
--- | ||
|
||
# Kafka and the JSON data type | ||
|
||
With the introduction of the new [`JSON`](/docs/en/sql-reference/data-types/newjson) data type, ClickHouse is now a good choice of [database for doing JSON analytics](https://clickhouse.com/engineering-resources/json-database). | ||
In this guide, we're going to learn how to load JSON messages from Apache Kafka directly into a single `JSON` column in ClickHouse. | ||
|
||
## Setup Kafka | ||
|
||
Let's start by running a Kafka broker on our machine. We're also going to map port 9092 to port 9092 on our host operating system so that it's easier to interact with Kafka: | ||
|
||
```bash | ||
docker run --name broker -p 9092:9092 apache/kafka:3.8.1 | ||
``` | ||
|
||
## Ingest data into Kafka | ||
|
||
Once that's running, we need to ingest some data. | ||
The Wikimedia recent changes feed is a good source of streaming data, so let's ingest that into the `wiki_events` topic: | ||
|
||
```bash | ||
curl -N https://stream.wikimedia.org/v2/stream/recentchange 2>/dev/null | | ||
awk '/^data: /{gsub(/^data: /, ""); print}' | | ||
jq -cr --arg sep ø '[.meta.id, tostring] | join($sep)' | | ||
kcat -P -b localhost:9092 -t wiki_events -Kø | ||
``` | ||
|
||
We can check tha the data's being ingested by running the following command: | ||
|
||
```bash | ||
kcat -C -b localhost:9092 -t wiki_events | ||
``` | ||
|
||
```text | ||
{"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://www.wikidata.org/wiki/Q130972321","request_id":"5c687ded-4721-4bfc-ae6c-58ca25f4a6ce","id":"0fbb0982-c43b-4e8b-989b-db7e78dbdc76","dt":"2024-11-06T11:59:57Z","domain":"www.wikidata.org","stream":"mediawiki.recentchange","topic":"codfw.mediawiki.recentchange","partition":0,"offset":1228777205},"id":2338656448,"type":"edit","namespace":0,"title":"Q130972321","title_url":"https://www.wikidata.org/wiki/Q130972321","comment":"/* wbsetclaim-create:2||1 */ [[Property:P18]]: Mahdi Rrezaei Journalist.jpg","timestamp":1730894397,"user":"Wikimellatir","bot":false,"notify_url":"https://www.wikidata.org/w/index.php?diff=2270885254&oldid=2270870214&rcid=2338656448","minor":false,"patrolled":false,"length":{"old":4269,"new":4636},"revision":{"old":2270870214,"new":2270885254},"server_url":"https://www.wikidata.org","server_name":"www.wikidata.org","server_script_path":"/w","wiki":"wikidatawiki","parsedcomment":"<span dir=\"auto\"><span class=\"autocomment\">Created claim: </span></span> <a href=\"/wiki/Property:P18\" title=\"image | image of relevant illustration of the subject; if available, also use more specific properties (sample: coat of arms image, locator map, flag image, signature image, logo image, collage image)\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"en\" dir=\"ltr\">image</span> <span class=\"wb-itemlink-id\">(P18)</span></span></a>: Mahdi Rrezaei Journalist.jpg"} | ||
{"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://www.wikidata.org/wiki/Q75756596","request_id":"eb116219-7372-4725-986f-790211708d36","id":"9e0d5299-5bd1-4c58-b796-9852afd8a84e","dt":"2024-11-06T11:59:54Z","domain":"www.wikidata.org","stream":"mediawiki.recentchange","topic":"codfw.mediawiki.recentchange","partition":0,"offset":1228777206},"id":2338656449,"type":"edit","namespace":0,"title":"Q75756596","title_url":"https://www.wikidata.org/wiki/Q75756596","comment":"/* wbeditentity-update-languages-and-other:0||55 */ mv labels and aliases matching [[Property:P528]] or [[Property:P3083]] to mul","timestamp":1730894394,"user":"Twofivesixbot","bot":true,"notify_url":"https://www.wikidata.org/w/index.php?diff=2270885237&oldid=2147709089&rcid=2338656449","minor":false,"patrolled":true,"length":{"old":30879,"new":27161},"revision":{"old":2147709089,"new":2270885237},"server_url":"https://www.wikidata.org","server_name":"www.wikidata.org","server_script_path":"/w","wiki":"wikidatawiki","parsedcomment":"<span dir=\"auto\"><span class=\"autocomment\">Changed label, description and/or aliases in 55 languages, and other parts: </span></span> mv labels and aliases matching <a href=\"/wiki/Property:P528\" title=\"catalog code | catalog name of an object, use with qualifier P972\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"en\" dir=\"ltr\">catalog code</span> <span class=\"wb-itemlink-id\">(P528)</span></span></a> or <a href=\"/wiki/Property:P3083\" title=\"SIMBAD ID | identifier for an astronomical object, in the University of Strasbourg's SIMBAD database\"><span class=\"wb-itemlink\"><span class=\"wb-itemlink-label\" lang=\"en\" dir=\"ltr\">SIMBAD ID</span> <span class=\"wb-itemlink-id\">(P3083)</span></span></a> to mul"} | ||
``` | ||
|
||
So far, so good. | ||
|
||
## Ingest data into ClickHouse | ||
|
||
Next, we're going to ingest the data into ClickHouse. | ||
First, let's enable the JSON type (which is currently experimental), by setting the following property: | ||
|
||
```sql | ||
SET allow_experimental_json_type = 1; | ||
``` | ||
|
||
Now, we'll create the `wiki_queue` table, which uses the [`Kafka` table engine](/docs/en/integrations/kafka/kafka-table-engine). | ||
|
||
```sql | ||
CREATE TABLE wiki_queue | ||
( | ||
json JSON | ||
) | ||
ENGINE = Kafka( | ||
'localhost:9092', | ||
'wiki_events', | ||
'clickhouse-consumer-group', | ||
'JSONAsObject' | ||
); | ||
``` | ||
|
||
Note that we're using the [`JSONAsObject`](https://clickhouse.com/docs/en/interfaces/formats#jsonasobject) format, which will ensure that incoming messages are made available as a JSON object. | ||
This format can only be parsed into a table that has a single column with the `JSON` type. | ||
|
||
Next, we'll create the underlying table to store the Wiki data: | ||
|
||
```sql | ||
CREATE TABLE wiki | ||
( | ||
json JSON, | ||
id String MATERIALIZED getSubcolumn(json, 'meta.id') | ||
) | ||
ENGINE = MergeTree | ||
ORDER BY id; | ||
``` | ||
|
||
Finally, let's create a materialized view to populate the `wiki` table: | ||
|
||
```sql | ||
CREATE MATERIALIZED VIEW wiki_mv TO wiki AS | ||
SELECT json | ||
FROM wiki_queue; | ||
``` | ||
|
||
## Querying JSON data in ClickHouse | ||
|
||
We can then write queries against the `wiki` table. | ||
For example, we could count the number of bots that have committed changes: | ||
|
||
```sql | ||
SELECT json.bot, count() | ||
FROM wiki | ||
GROUP BY ALL | ||
``` | ||
|
||
```text | ||
┌─json.bot─┬─count()─┐ | ||
1. │ true │ 2526 │ | ||
2. │ false │ 4691 │ | ||
└──────────┴─────────┘ | ||
``` | ||
|
||
Or we could find out the users that make the most changes on `en.wikipedia.org`: | ||
|
||
```sql | ||
SELECT | ||
json.user, | ||
count() | ||
FROM wiki | ||
WHERE json.server_name = 'en.wikipedia.org' | ||
GROUP BY ALL | ||
ORDER BY count() DESC | ||
LIMIT 10 | ||
``` | ||
|
||
```text | ||
┌─json.user──────────────────────────────┬─count()─┐ | ||
1. │ Monkbot │ 267 │ | ||
2. │ Onel5969 │ 107 │ | ||
3. │ Bangwiki │ 37 │ | ||
4. │ HHH Pedrigree │ 28 │ | ||
5. │ REDACTED403 │ 23 │ | ||
6. │ KylieTastic │ 22 │ | ||
7. │ Tinniesbison │ 21 │ | ||
8. │ XTheBedrockX │ 20 │ | ||
9. │ 2001:4455:1DB:4000:51F3:6A16:408E:69FC │ 19 │ | ||
10. │ Wcquidditch │ 15 │ | ||
└────────────────────────────────────────┴─────────┘ | ||
``` |