Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pinot-demo): add demo for sink to pinot via kafka #10294

Merged
merged 2 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions integration_tests/pinot-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Sink Changes from RisingWave Tables to Apache Pinot

The demo was modified from the `pinot-upsert` project of https://github.com/dunithd/edu-samples

## Run the demo

1. Start the cluster with `docker compose up -d` command.
The command will start a RisingWave cluster together with a pinot cluster with 1 controller, 1 broker and 1 server.
2. Create a kafka topic named `orders.upsert.log` for data to sink to.
```shell
docker compose exec kafka \
kafka-topics --create --topic orders.upsert.log --bootstrap-server localhost:9092
```
3. Connect the RisingWave frontend via the psql client. Create RisingWave table and sink.
```shell
psql -h localhost -p 4566 -d dev -U root

# within the psql client
dev=> CREATE TABLE IF NOT EXISTS orders
(
id INT PRIMARY KEY,
user_id BIGINT,
product_id BIGINT,
status VARCHAR,
quantity INT,
total FLOAT,
created_at BIGINT,
updated_at BIGINT
);
CREATE_TABLE
dev=> CREATE SINK orders_sink FROM orders WITH (
connector = 'kafka',
properties.bootstrap.server = 'kafka:9092',
topic = 'orders.upsert.log',
type = 'upsert',
primary_key = 'id'
);
CREATE_SINK
```
4. Create a pinot table named `orders` that ingests data from the kafka topic
```shell
docker exec -it pinot-controller /opt/pinot/bin/pinot-admin.sh AddTable \
-tableConfigFile /config/orders_table.json \
-schemaFile /config/orders_schema.json -exec
```
5. Connector to RisingWave frontend and insert test data
```shell
psql -h localhost -p 4566 -d dev -U root

# Within the psql client
insert into orders values (1, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
insert into orders values (2, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
insert into orders values (3, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
flush;
```
After inserting the data, query the pinot table with pinot cli
```shell
docker compose exec pinot-controller \
/opt/pinot/bin/pinot-admin.sh PostQuery -brokerHost \
pinot-broker -brokerPort 8099 -query "SELECT * FROM orders"

# Result like
{
"rows": [
[
1,
100,
1,
"INIT",
1.0,
1685421033000,
10
],
[
2,
100,
1,
"INIT",
1.0,
1685421033000,
10
],
[
3,
100,
1,
"INIT",
1.0,
1685421033000,
10
]
]
}
```
6. Update the `status` of order with `id` as 1 to `PROCESSING`
```shell
psql -h localhost -p 4566 -d dev -U root

# Within the psql client
UPDATE orders SET status = 'PROCESSING' WHERE id = 1;
flush;
```
After updating the data, query the pinot table with pinot cli
```shell
docker compose exec pinot-controller \
/opt/pinot/bin/pinot-admin.sh PostQuery -brokerHost \
pinot-broker -brokerPort 8099 -query "SELECT * FROM orders"

# Result like
{
"rows": [
[
2,
100,
1,
"INIT",
1.0,
1685421033000,
10
],
[
3,
100,
1,
"INIT",
1.0,
1685421033000,
10
],
[
1,
100,
1,
"PROCESSING",
1.0,
1685421033000,
10
]
]
}
```
From the query result, we can see that the update on RisingWave table
has been reflected on the pinot table.


By now, the demo has finished.

## Kafka Payload Format

In the demo, there will be 4 upsert events in the kafka topic.
The payload is like the following:
```json
{"created_at":1685421033000,"id":1,"product_id":100,"quantity":1,"status":"INIT","total":1.0,"updated_at":1685421033000,"user_id":10}
{"created_at":1685421033000,"id":2,"product_id":100,"quantity":1,"status":"INIT","total":1.0,"updated_at":1685421033000,"user_id":10}
{"created_at":1685421033000,"id":3,"product_id":100,"quantity":1,"status":"INIT","total":1.0,"updated_at":1685421033000,"user_id":10}
{"created_at":1685421033000,"id":1,"product_id":100,"quantity":1,"status":"PROCESSING","total":1.0,"updated_at":1685421033000,"user_id":10}
```
42 changes: 42 additions & 0 deletions integration_tests/pinot-sink/config/orders_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"schemaName": "orders",
"primaryKeyColumns": [
"id"
],
"dimensionFieldSpecs": [
{
"name": "id",
"dataType": "INT"
},
{
"name": "user_id",
"dataType": "INT"
},
{
"name": "product_id",
"dataType": "INT"
},
{
"name": "status",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "quantity",
"dataType": "INT"
},
{
"name": "total",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [
{
"name": "updated_at",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
44 changes: 44 additions & 0 deletions integration_tests/pinot-sink/config/orders_table.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"tableName": "orders",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "updated_at",
"timeType": "MILLISECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "orders",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "orders.upsert.log",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "zookeeper:2181/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "zookeeper:2181/kafka",
"stream.kafka.broker.list": "kafka:9092",
"realtime.segment.flush.threshold.size": 30,
"realtime.segment.flush.threshold.rows": 30
}
},
"ingestionConfig" : {
"complexTypeConfig": {
"delimiter": "."
}
},
"metadata": {
"customConfigs": {}
},
"upsertConfig": {
"mode": "FULL"
},
"routing": {
"instanceSelectorType": "strictReplicaGroup"
}
}
7 changes: 7 additions & 0 deletions integration_tests/pinot-sink/create-sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
create SINK orders_sink FROM orders WITH (
connector = 'kafka',
properties.bootstrap.server = 'kafka:9092',
topic = 'orders.upsert.log',
type = 'upsert',
primary_key = 'id'
);
11 changes: 11 additions & 0 deletions integration_tests/pinot-sink/create-table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS orders
(
id INT PRIMARY KEY,
user_id BIGINT,
product_id BIGINT,
status VARCHAR,
quantity INT,
total FLOAT,
created_at BIGINT,
updated_at BIGINT
);
107 changes: 107 additions & 0 deletions integration_tests/pinot-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
---
version: "3"
services:
compactor-0:
extends:
file: ../../docker/docker-compose.yml
service: compactor-0
compute-node-0:
extends:
file: ../../docker/docker-compose.yml
service: compute-node-0
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
frontend-node-0:
extends:
file: ../../docker/docker-compose.yml
service: frontend-node-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
meta-node-0:
extends:
file: ../../docker/docker-compose.yml
service: meta-node-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
kafka:
image: confluentinc/cp-kafka:7.1.0
platform: linux/amd64
hostname: kafka
container_name: kafka
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
depends_on:
[ zookeeper ]
healthcheck: { test: nc -z localhost 9092, interval: 1s, start_period: 120s }
pinot-controller:
image: apachepinot/pinot:0.12.0
platform: linux/amd64
command: "StartController -zkAddress zookeeper:2181"
container_name: "pinot-controller"
volumes:
- ./config:/config
restart: unless-stopped
ports:
- "9000:9000"
depends_on:
- zookeeper
pinot-broker:
image: apachepinot/pinot:0.12.0
platform: linux/amd64
command: "StartBroker -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-broker"
ports:
- "8099:8099"
depends_on:
- pinot-controller
pinot-server:
image: apachepinot/pinot:0.12.0
platform: linux/amd64
container_name: "pinot-server"
command: "StartServer -zkAddress zookeeper:2181"
restart: unless-stopped
depends_on:
- pinot-broker
zookeeper:
image: confluentinc/cp-zookeeper:7.1.0
platform: linux/amd64
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
3 changes: 3 additions & 0 deletions integration_tests/pinot-sink/insert.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
insert into orders values (1, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
insert into orders values (2, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
insert into orders values (3, 10, 100, 'INIT', 1, 1.0, 1685421033000, 1685421033000);
1 change: 1 addition & 0 deletions integration_tests/pinot-sink/update.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
update orders set status = 'PROCESSING' where id = 1;