From 506b89f59c2376d49d48845aaef28df1905fb6db Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 12 Jun 2023 02:34:07 -0700 Subject: [PATCH 1/2] feat(pinot-demo): add demo for sink to pinot via kafka --- integration_tests/pinot-sink/README.md | 157 ++++++++++++++++++ .../pinot-sink/config/orders_schema.json | 42 +++++ .../pinot-sink/config/orders_table.json | 44 +++++ integration_tests/pinot-sink/create-sink.sql | 7 + integration_tests/pinot-sink/create-table.sql | 11 ++ .../pinot-sink/docker-compose.yml | 107 ++++++++++++ integration_tests/pinot-sink/insert.sql | 3 + integration_tests/pinot-sink/update.sql | 1 + 8 files changed, 372 insertions(+) create mode 100644 integration_tests/pinot-sink/README.md create mode 100644 integration_tests/pinot-sink/config/orders_schema.json create mode 100644 integration_tests/pinot-sink/config/orders_table.json create mode 100644 integration_tests/pinot-sink/create-sink.sql create mode 100644 integration_tests/pinot-sink/create-table.sql create mode 100644 integration_tests/pinot-sink/docker-compose.yml create mode 100644 integration_tests/pinot-sink/insert.sql create mode 100644 integration_tests/pinot-sink/update.sql diff --git a/integration_tests/pinot-sink/README.md b/integration_tests/pinot-sink/README.md new file mode 100644 index 0000000000000..36bf010a5f4f8 --- /dev/null +++ b/integration_tests/pinot-sink/README.md @@ -0,0 +1,157 @@ +# Sink Changes from RisingWave Tables to Apache Pinot + +The demo was modified from the `pinot-upsert` project of https://github.com/dunithd/edu-samples + +## Run the demo + +1. Start the cluster with `docker compose up -d` command. + The command will start a RisingWave cluster together with a pinot cluster with 1 controller, 1 broker and 1 server. +2. Create a kafka topic named `orders.upsert.log` for data to sink to. +```shell +docker compose exec kafka \ +kafka-topics --create --topic orders.upsert.log --bootstrap-server localhost:9092 +``` +3. Connect the RisingWave frontend via the psql client. Create RisingWave table and sink. +```shell +psql -h localhost -p 4566 -d dev -U root + +# within the psql client +dev=> CREATE TABLE IF NOT EXISTS orders +( + id INT PRIMARY KEY, + user_id BIGINT, + product_id BIGINT, + status VARCHAR, + quantity INT, + total FLOAT, + created_at BIGINT, + updated_at BIGINT +); +CREATE_TABLE +dev=> CREATE SINK orders_sink FROM orders WITH ( + connector = 'kafka', + properties.bootstrap.server = 'kafka:9092', + topic = 'orders.upsert.log', + type = 'upsert', + primary_key = 'id' +); +CREATE_SINK +``` +4. Create a pinot table named `orders` that ingests data from the kafka topic +```shell +docker exec -it pinot-controller /opt/pinot/bin/pinot-admin.sh AddTable \ +-tableConfigFile /config/orders_table.json \ +-schemaFile /config/orders_schema.json -exec +``` +5. Connector to RisingWave frontend and insert test data +```shell +psql -h localhost -p 4566 -d dev -U root + +# Within the psql client +insert into orders values (1, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000); +insert into orders values (2, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000); +insert into orders values (3, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000); +flush; +``` +After inserting the data, query the pinot table with pinot cli +```shell +docker compose exec pinot-controller \ +/opt/pinot/bin/pinot-admin.sh PostQuery -brokerHost \ +pinot-broker -brokerPort 8099 -query "SELECT * FROM orders" + +# Result like +{ + "rows": [ + [ + 1, + 100, + 1, + "INIT", + 1.0, + 1685421033000, + 10 + ], + [ + 2, + 100, + 1, + "INIT", + 1.0, + 1685421033000, + 10 + ], + [ + 3, + 100, + 1, + "INIT", + 1.0, + 1685421033000, + 10 + ] + ] +} +``` +6. Update the `status` of order with `id` as 1 to `PROCESSING` +```shell +psql -h localhost -p 4566 -d dev -U root + +# Within the psql client +update orders set status = 'PROCESSING' where id = 1; +flush; +``` +After updating the data, query the pinot table with pinot cli +```shell +docker compose exec pinot-controller \ +/opt/pinot/bin/pinot-admin.sh PostQuery -brokerHost \ +pinot-broker -brokerPort 8099 -query "SELECT * FROM orders" + +# Result like +{ + "rows": [ + [ + 2, + 100, + 1, + "INIT", + 1.0, + 1685421033000, + 10 + ], + [ + 3, + 100, + 1, + "INIT", + 1.0, + 1685421033000, + 10 + ], + [ + 1, + 100, + 1, + "PROCESSING", + 1.0, + 1685421033000, + 10 + ] + ] +} +``` +From the query result, we can see that the update on RisingWave table +has been reflected on the pinot table. + + +By now, the demo has finished. + +## Kafka Payload Format + +In the demo, there will be 4 upsert events in the kafka topic. +The payload is like the following: +```json +{"created_at":1685421033000,"id":1,"product_id":100,"quantity":1,"status":"INIT","total":1.0,"updated_at":1685421033000,"user_id":10} +{"created_at":1685421033000,"id":2,"product_id":100,"quantity":1,"status":"INIT","total":1.0,"updated_at":1685421033000,"user_id":10} +{"created_at":1685421033000,"id":3,"product_id":100,"quantity":1,"status":"INIT","total":1.0,"updated_at":1685421033000,"user_id":10} +{"created_at":1685421033000,"id":1,"product_id":100,"quantity":1,"status":"PROCESSING","total":1.0,"updated_at":1685421033000,"user_id":10} +``` diff --git a/integration_tests/pinot-sink/config/orders_schema.json b/integration_tests/pinot-sink/config/orders_schema.json new file mode 100644 index 0000000000000..7059b4a51b241 --- /dev/null +++ b/integration_tests/pinot-sink/config/orders_schema.json @@ -0,0 +1,42 @@ +{ + "schemaName": "orders", + "primaryKeyColumns": [ + "id" + ], + "dimensionFieldSpecs": [ + { + "name": "id", + "dataType": "INT" + }, + { + "name": "user_id", + "dataType": "INT" + }, + { + "name": "product_id", + "dataType": "INT" + }, + { + "name": "status", + "dataType": "STRING" + } + ], + "metricFieldSpecs": [ + { + "name": "quantity", + "dataType": "INT" + }, + { + "name": "total", + "dataType": "FLOAT" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "updated_at", + "dataType": "LONG", + "format": "1:MILLISECONDS:EPOCH", + "granularity": "1:MILLISECONDS" + } + ] +} \ No newline at end of file diff --git a/integration_tests/pinot-sink/config/orders_table.json b/integration_tests/pinot-sink/config/orders_table.json new file mode 100644 index 0000000000000..7d4ce24dc8dca --- /dev/null +++ b/integration_tests/pinot-sink/config/orders_table.json @@ -0,0 +1,44 @@ +{ + "tableName": "orders", + "tableType": "REALTIME", + "segmentsConfig": { + "timeColumnName": "updated_at", + "timeType": "MILLISECONDS", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "1", + "segmentPushType": "APPEND", + "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", + "schemaName": "orders", + "replicasPerPartition": "1" + }, + "tenants": {}, + "tableIndexConfig": { + "loadMode": "MMAP", + "streamConfigs": { + "streamType": "kafka", + "stream.kafka.consumer.type": "lowLevel", + "stream.kafka.topic.name": "orders.upsert.log", + "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", + "stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka", + "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", + "stream.kafka.zk.broker.url": "zookeeper:2181/kafka", + "stream.kafka.broker.list": "kafka:9092", + "realtime.segment.flush.threshold.size": 30, + "realtime.segment.flush.threshold.rows": 30 + } + }, + "ingestionConfig" : { + "complexTypeConfig": { + "delimeter": "." + } + }, + "metadata": { + "customConfigs": {} + }, + "upsertConfig": { + "mode": "FULL" + }, + "routing": { + "instanceSelectorType": "strictReplicaGroup" + } +} \ No newline at end of file diff --git a/integration_tests/pinot-sink/create-sink.sql b/integration_tests/pinot-sink/create-sink.sql new file mode 100644 index 0000000000000..966a1114724a9 --- /dev/null +++ b/integration_tests/pinot-sink/create-sink.sql @@ -0,0 +1,7 @@ +create SINK orders_sink FROM orders WITH ( + connector = 'kafka', + properties.bootstrap.server = 'kafka:9092', + topic = 'orders.upsert.log', + type = 'upsert', + primary_key = 'id' +); \ No newline at end of file diff --git a/integration_tests/pinot-sink/create-table.sql b/integration_tests/pinot-sink/create-table.sql new file mode 100644 index 0000000000000..2d3bcaa000b6b --- /dev/null +++ b/integration_tests/pinot-sink/create-table.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS orders +( + id INT PRIMARY KEY, + user_id BIGINT, + product_id BIGINT, + status VARCHAR, + quantity INT, + total FLOAT, + created_at BIGINT, + updated_at BIGINT +); \ No newline at end of file diff --git a/integration_tests/pinot-sink/docker-compose.yml b/integration_tests/pinot-sink/docker-compose.yml new file mode 100644 index 0000000000000..57cafc1ef0ed7 --- /dev/null +++ b/integration_tests/pinot-sink/docker-compose.yml @@ -0,0 +1,107 @@ +--- +version: "3" +services: + compactor-0: + extends: + file: ../../docker/docker-compose.yml + service: compactor-0 + compute-node-0: + extends: + file: ../../docker/docker-compose.yml + service: compute-node-0 + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + frontend-node-0: + extends: + file: ../../docker/docker-compose.yml + service: frontend-node-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + meta-node-0: + extends: + file: ../../docker/docker-compose.yml + service: meta-node-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 + kafka: + image: confluentinc/cp-kafka:7.1.0 + platform: linux/amd64 + hostname: kafka + container_name: kafka + ports: + - "29092:29092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR + depends_on: + [ zookeeper ] + healthcheck: { test: nc -z localhost 9092, interval: 1s, start_period: 120s } + pinot-controller: + image: apachepinot/pinot:0.12.0 + platform: linux/amd64 + command: "StartController -zkAddress zookeeper:2181" + container_name: "pinot-controller" + volumes: + - ./config:/config + restart: unless-stopped + ports: + - "9000:9000" + depends_on: + - zookeeper + pinot-broker: + image: apachepinot/pinot:0.12.0 + platform: linux/amd64 + command: "StartBroker -zkAddress zookeeper:2181" + restart: unless-stopped + container_name: "pinot-broker" + ports: + - "8099:8099" + depends_on: + - pinot-controller + pinot-server: + image: apachepinot/pinot:0.12.0 + platform: linux/amd64 + container_name: "pinot-server" + command: "StartServer -zkAddress zookeeper:2181" + restart: unless-stopped + depends_on: + - pinot-broker + zookeeper: + image: confluentinc/cp-zookeeper:7.1.0 + platform: linux/amd64 + hostname: zookeeper + container_name: zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false + message_queue: + external: false +name: risingwave-compose diff --git a/integration_tests/pinot-sink/insert.sql b/integration_tests/pinot-sink/insert.sql new file mode 100644 index 0000000000000..3627cef256d48 --- /dev/null +++ b/integration_tests/pinot-sink/insert.sql @@ -0,0 +1,3 @@ +insert into orders values (1, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000); +insert into orders values (2, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000); +insert into orders values (3, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000); \ No newline at end of file diff --git a/integration_tests/pinot-sink/update.sql b/integration_tests/pinot-sink/update.sql new file mode 100644 index 0000000000000..e01810edf4f94 --- /dev/null +++ b/integration_tests/pinot-sink/update.sql @@ -0,0 +1 @@ +update orders set status = 'PROCESSING' where id = 1; \ No newline at end of file From e43c9fe7cde2afdc4c870a68e105329f44b35cf5 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 12 Jun 2023 16:28:39 -0700 Subject: [PATCH 2/2] fix typo --- integration_tests/pinot-sink/README.md | 2 +- integration_tests/pinot-sink/config/orders_table.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration_tests/pinot-sink/README.md b/integration_tests/pinot-sink/README.md index 36bf010a5f4f8..dff67418fbf88 100644 --- a/integration_tests/pinot-sink/README.md +++ b/integration_tests/pinot-sink/README.md @@ -97,7 +97,7 @@ pinot-broker -brokerPort 8099 -query "SELECT * FROM orders" psql -h localhost -p 4566 -d dev -U root # Within the psql client -update orders set status = 'PROCESSING' where id = 1; +UPDATE orders SET status = 'PROCESSING' WHERE id = 1; flush; ``` After updating the data, query the pinot table with pinot cli diff --git a/integration_tests/pinot-sink/config/orders_table.json b/integration_tests/pinot-sink/config/orders_table.json index 7d4ce24dc8dca..e789be9a24f63 100644 --- a/integration_tests/pinot-sink/config/orders_table.json +++ b/integration_tests/pinot-sink/config/orders_table.json @@ -29,7 +29,7 @@ }, "ingestionConfig" : { "complexTypeConfig": { - "delimeter": "." + "delimiter": "." } }, "metadata": {