diff --git a/knowledgebase/kafka-clickhouse-json.md b/knowledgebase/kafka-clickhouse-json.md new file mode 100644 index 00000000000..e7abb5deab3 --- /dev/null +++ b/knowledgebase/kafka-clickhouse-json.md @@ -0,0 +1,135 @@ +--- +date: 2024-11-06 +title: Kafka and the JSON data type +--- + +# Kafka and the JSON data type + +With the introduction of the new [`JSON`](/docs/en/sql-reference/data-types/newjson) data type, ClickHouse is now a good choice of [database for doing JSON analytics](https://clickhouse.com/engineering-resources/json-database). +In this guide, we're going to learn how to load JSON messages from Apache Kafka directly into a single `JSON` column in ClickHouse. + +## Setup Kafka + +Let's start by running a Kafka broker on our machine. We're also going to map port 9092 to port 9092 on our host operating system so that it's easier to interact with Kafka: + +```bash +docker run --name broker -p 9092:9092 apache/kafka:3.8.1 +``` + +## Ingest data into Kafka + +Once that's running, we need to ingest some data. +The Wikimedia recent changes feed is a good source of streaming data, so let's ingest that into the `wiki_events` topic: + +```bash +curl -N https://stream.wikimedia.org/v2/stream/recentchange 2>/dev/null | +awk '/^data: /{gsub(/^data: /, ""); print}' | +jq -cr --arg sep ø '[.meta.id, tostring] | join($sep)' | +kcat -P -b localhost:9092 -t wiki_events -Kø +``` + +We can check tha the data's being ingested by running the following command: + +```bash +kcat -C -b localhost:9092 -t wiki_events +``` + +```text +{"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://www.wikidata.org/wiki/Q130972321","request_id":"5c687ded-4721-4bfc-ae6c-58ca25f4a6ce","id":"0fbb0982-c43b-4e8b-989b-db7e78dbdc76","dt":"2024-11-06T11:59:57Z","domain":"www.wikidata.org","stream":"mediawiki.recentchange","topic":"codfw.mediawiki.recentchange","partition":0,"offset":1228777205},"id":2338656448,"type":"edit","namespace":0,"title":"Q130972321","title_url":"https://www.wikidata.org/wiki/Q130972321","comment":"/* wbsetclaim-create:2||1 */ [[Property:P18]]: Mahdi Rrezaei Journalist.jpg","timestamp":1730894397,"user":"Wikimellatir","bot":false,"notify_url":"https://www.wikidata.org/w/index.php?diff=2270885254&oldid=2270870214&rcid=2338656448","minor":false,"patrolled":false,"length":{"old":4269,"new":4636},"revision":{"old":2270870214,"new":2270885254},"server_url":"https://www.wikidata.org","server_name":"www.wikidata.org","server_script_path":"/w","wiki":"wikidatawiki","parsedcomment":"Created claim: image (P18): Mahdi Rrezaei Journalist.jpg"} +{"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://www.wikidata.org/wiki/Q75756596","request_id":"eb116219-7372-4725-986f-790211708d36","id":"9e0d5299-5bd1-4c58-b796-9852afd8a84e","dt":"2024-11-06T11:59:54Z","domain":"www.wikidata.org","stream":"mediawiki.recentchange","topic":"codfw.mediawiki.recentchange","partition":0,"offset":1228777206},"id":2338656449,"type":"edit","namespace":0,"title":"Q75756596","title_url":"https://www.wikidata.org/wiki/Q75756596","comment":"/* wbeditentity-update-languages-and-other:0||55 */ mv labels and aliases matching [[Property:P528]] or [[Property:P3083]] to mul","timestamp":1730894394,"user":"Twofivesixbot","bot":true,"notify_url":"https://www.wikidata.org/w/index.php?diff=2270885237&oldid=2147709089&rcid=2338656449","minor":false,"patrolled":true,"length":{"old":30879,"new":27161},"revision":{"old":2147709089,"new":2270885237},"server_url":"https://www.wikidata.org","server_name":"www.wikidata.org","server_script_path":"/w","wiki":"wikidatawiki","parsedcomment":"Changed label, description and/or aliases in 55 languages, and other parts: mv labels and aliases matching catalog code (P528) or SIMBAD ID (P3083) to mul"} +``` + +So far, so good. + +## Ingest data into ClickHouse + +Next, we're going to ingest the data into ClickHouse. +First, let's enable the JSON type (which is currently experimental), by setting the following property: + +```sql +SET allow_experimental_json_type = 1; +``` + +Now, we'll create the `wiki_queue` table, which uses the [`Kafka` table engine](/docs/en/integrations/kafka/kafka-table-engine). + +```sql +CREATE TABLE wiki_queue +( + json JSON +) +ENGINE = Kafka( + 'localhost:9092', + 'wiki_events', + 'clickhouse-consumer-group', + 'JSONAsObject' +); +``` + +Note that we're using the [`JSONAsObject`](https://clickhouse.com/docs/en/interfaces/formats#jsonasobject) format, which will ensure that incoming messages are made available as a JSON object. +This format can only be parsed into a table that has a single column with the `JSON` type. + +Next, we'll create the underlying table to store the Wiki data: + +```sql +CREATE TABLE wiki +( + json JSON, + id String MATERIALIZED getSubcolumn(json, 'meta.id') +) +ENGINE = MergeTree +ORDER BY id; +``` + +Finally, let's create a materialized view to populate the `wiki` table: + +```sql +CREATE MATERIALIZED VIEW wiki_mv TO wiki AS +SELECT json +FROM wiki_queue; +``` + +## Querying JSON data in ClickHouse + +We can then write queries against the `wiki` table. +For example, we could count the number of bots that have committed changes: + +```sql +SELECT json.bot, count() +FROM wiki +GROUP BY ALL +``` + +```text + ┌─json.bot─┬─count()─┐ +1. │ true │ 2526 │ +2. │ false │ 4691 │ + └──────────┴─────────┘ +``` + +Or we could find out the users that make the most changes on `en.wikipedia.org`: + +```sql +SELECT + json.user, + count() +FROM wiki +WHERE json.server_name = 'en.wikipedia.org' +GROUP BY ALL +ORDER BY count() DESC +LIMIT 10 +``` + +```text + ┌─json.user──────────────────────────────┬─count()─┐ + 1. │ Monkbot │ 267 │ + 2. │ Onel5969 │ 107 │ + 3. │ Bangwiki │ 37 │ + 4. │ HHH Pedrigree │ 28 │ + 5. │ REDACTED403 │ 23 │ + 6. │ KylieTastic │ 22 │ + 7. │ Tinniesbison │ 21 │ + 8. │ XTheBedrockX │ 20 │ + 9. │ 2001:4455:1DB:4000:51F3:6A16:408E:69FC │ 19 │ +10. │ Wcquidditch │ 15 │ + └────────────────────────────────────────┴─────────┘ +``` \ No newline at end of file