Skip to content

Commit

Permalink
refactor(risingwave): port to sqlglot (#8171)
Browse files Browse the repository at this point in the history
Co-authored-by: Kexiang Wang <[email protected]>
Co-authored-by: Phillip Cloud <[email protected]>
Co-authored-by: Jim Crist-Harif <[email protected]>
  • Loading branch information
4 people authored Feb 1, 2024
1 parent 432d151 commit 18c8cb7
Show file tree
Hide file tree
Showing 64 changed files with 3,063 additions and 1,108 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ jobs:
- postgres
sys-deps:
- libgeos-dev
- name: risingwave
title: Risingwave
services:
- risingwave
extras:
- risingwave
- name: impala
title: Impala
serial: true
Expand Down Expand Up @@ -211,6 +217,14 @@ jobs:
- postgres
sys-deps:
- libgeos-dev
- os: windows-latest
backend:
name: risingwave
title: Risingwave
services:
- risingwave
extras:
- risingwave
- os: windows-latest
backend:
name: postgres
Expand Down
177 changes: 177 additions & 0 deletions ci/schema/risingwave.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
SET RW_IMPLICIT_FLUSH=true;

DROP TABLE IF EXISTS "diamonds" CASCADE;

CREATE TABLE "diamonds" (
"carat" FLOAT,
"cut" TEXT,
"color" TEXT,
"clarity" TEXT,
"depth" FLOAT,
"table" FLOAT,
"price" BIGINT,
"x" FLOAT,
"y" FLOAT,
"z" FLOAT
) WITH (
connector = 'posix_fs',
match_pattern = 'diamonds.csv',
posix_fs.root = '/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' );

DROP TABLE IF EXISTS "astronauts" CASCADE;

CREATE TABLE "astronauts" (
"id" BIGINT,
"number" BIGINT,
"nationwide_number" BIGINT,
"name" VARCHAR,
"original_name" VARCHAR,
"sex" VARCHAR,
"year_of_birth" BIGINT,
"nationality" VARCHAR,
"military_civilian" VARCHAR,
"selection" VARCHAR,
"year_of_selection" BIGINT,
"mission_number" BIGINT,
"total_number_of_missions" BIGINT,
"occupation" VARCHAR,
"year_of_mission" BIGINT,
"mission_title" VARCHAR,
"ascend_shuttle" VARCHAR,
"in_orbit" VARCHAR,
"descend_shuttle" VARCHAR,
"hours_mission" DOUBLE PRECISION,
"total_hrs_sum" DOUBLE PRECISION,
"field21" BIGINT,
"eva_hrs_mission" DOUBLE PRECISION,
"total_eva_hrs" DOUBLE PRECISION
) WITH (
connector = 'posix_fs',
match_pattern = 'astronauts.csv',
posix_fs.root = '/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' );

DROP TABLE IF EXISTS "batting" CASCADE;

CREATE TABLE "batting" (
"playerID" TEXT,
"yearID" BIGINT,
"stint" BIGINT,
"teamID" TEXT,
"lgID" TEXT,
"G" BIGINT,
"AB" BIGINT,
"R" BIGINT,
"H" BIGINT,
"X2B" BIGINT,
"X3B" BIGINT,
"HR" BIGINT,
"RBI" BIGINT,
"SB" BIGINT,
"CS" BIGINT,
"BB" BIGINT,
"SO" BIGINT,
"IBB" BIGINT,
"HBP" BIGINT,
"SH" BIGINT,
"SF" BIGINT,
"GIDP" BIGINT
) WITH (
connector = 'posix_fs',
match_pattern = 'batting.csv',
posix_fs.root = '/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' );

DROP TABLE IF EXISTS "awards_players" CASCADE;

CREATE TABLE "awards_players" (
"playerID" TEXT,
"awardID" TEXT,
"yearID" BIGINT,
"lgID" TEXT,
"tie" TEXT,
"notes" TEXT
) WITH (
connector = 'posix_fs',
match_pattern = 'awards_players.csv',
posix_fs.root = '/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' );

DROP TABLE IF EXISTS "functional_alltypes" CASCADE;

CREATE TABLE "functional_alltypes" (
"id" INTEGER,
"bool_col" BOOLEAN,
"tinyint_col" SMALLINT,
"smallint_col" SMALLINT,
"int_col" INTEGER,
"bigint_col" BIGINT,
"float_col" REAL,
"double_col" DOUBLE PRECISION,
"date_string_col" TEXT,
"string_col" TEXT,
"timestamp_col" TIMESTAMP WITHOUT TIME ZONE,
"year" INTEGER,
"month" INTEGER
) WITH (
connector = 'posix_fs',
match_pattern = 'functional_alltypes.csv',
posix_fs.root = '/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' );

DROP TABLE IF EXISTS "tzone" CASCADE;

CREATE TABLE "tzone" (
"ts" TIMESTAMP WITH TIME ZONE,
"key" TEXT,
"value" DOUBLE PRECISION
);

INSERT INTO "tzone"
SELECT
CAST('2017-05-28 11:01:31.000400' AS TIMESTAMP WITH TIME ZONE) +
t * INTERVAL '1 day 1 second' AS "ts",
CHR(97 + t) AS "key",
t + t / 10.0 AS "value"
FROM generate_series(0, 9) AS "t";

DROP TABLE IF EXISTS "array_types" CASCADE;

CREATE TABLE IF NOT EXISTS "array_types" (
"x" BIGINT[],
"y" TEXT[],
"z" DOUBLE PRECISION[],
"grouper" TEXT,
"scalar_column" DOUBLE PRECISION,
"multi_dim" BIGINT[][]
);

INSERT INTO "array_types" VALUES
(ARRAY[1, 2, 3], ARRAY['a', 'b', 'c'], ARRAY[1.0, 2.0, 3.0], 'a', 1.0, ARRAY[ARRAY[NULL::BIGINT, NULL, NULL], ARRAY[1, 2, 3]]),
(ARRAY[4, 5], ARRAY['d', 'e'], ARRAY[4.0, 5.0], 'a', 2.0, ARRAY[]::BIGINT[][]),
(ARRAY[6, NULL], ARRAY['f', NULL], ARRAY[6.0, NULL], 'a', 3.0, ARRAY[NULL, ARRAY[]::BIGINT[], NULL]),
(ARRAY[NULL, 1, NULL], ARRAY[NULL, 'a', NULL], ARRAY[]::DOUBLE PRECISION[], 'b', 4.0, ARRAY[ARRAY[1], ARRAY[2], ARRAY[NULL::BIGINT], ARRAY[3]]),
(ARRAY[2, NULL, 3], ARRAY['b', NULL, 'c'], NULL, 'b', 5.0, NULL),
(ARRAY[4, NULL, NULL, 5], ARRAY['d', NULL, NULL, 'e'], ARRAY[4.0, NULL, NULL, 5.0], 'c', 6.0, ARRAY[ARRAY[1, 2, 3]]);

DROP TABLE IF EXISTS "json_t" CASCADE;

CREATE TABLE IF NOT EXISTS "json_t" ("js" JSONB);

INSERT INTO "json_t" VALUES
('{"a": [1,2,3,4], "b": 1}'),
('{"a":null,"b":2}'),
('{"a":"foo", "c":null}'),
('null'),
('[42,47,55]'),
('[]');

DROP TABLE IF EXISTS "win" CASCADE;
CREATE TABLE "win" ("g" TEXT, "x" BIGINT, "y" BIGINT);
INSERT INTO "win" VALUES
('a', 0, 3),
('a', 1, 2),
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);
10 changes: 5 additions & 5 deletions ci/schema/trino.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ CREATE TABLE hive.default.diamonds (
"y" DOUBLE,
"z" DOUBLE
) WITH (
external_location = 's3a://warehouse/diamonds',
external_location = 's3a://trino/diamonds',
format = 'PARQUET'
);

Expand Down Expand Up @@ -45,7 +45,7 @@ CREATE TABLE hive.default.astronauts (
"eva_hrs_mission" REAL,
"total_eva_hrs" REAL
) WITH (
external_location = 's3a://warehouse/astronauts',
external_location = 's3a://trino/astronauts',
format = 'PARQUET'
);

Expand Down Expand Up @@ -77,7 +77,7 @@ CREATE TABLE hive.default.batting (
"SF" BIGINT,
"GIDP" BIGINT
) WITH (
external_location = 's3a://warehouse/batting',
external_location = 's3a://trino/batting',
format = 'PARQUET'
);

Expand All @@ -93,7 +93,7 @@ CREATE TABLE hive.default.awards_players (
"tie" VARCHAR,
"notes" VARCHAR
) WITH (
external_location = 's3a://warehouse/awards-players',
external_location = 's3a://trino/awards-players',
format = 'PARQUET'
);

Expand All @@ -116,7 +116,7 @@ CREATE TABLE hive.default.functional_alltypes (
"year" INTEGER,
"month" INTEGER
) WITH (
external_location = 's3a://warehouse/functional-alltypes',
external_location = 's3a://trino/functional-alltypes',
format = 'PARQUET'
);
CREATE OR REPLACE VIEW memory.default.functional_alltypes AS
Expand Down
46 changes: 40 additions & 6 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,21 @@ services:
- trino

minio:
image: bitnami/minio:2024.1.16
image: bitnami/minio:2024.1.18
environment:
MINIO_ROOT_USER: accesskey
MINIO_ROOT_PASSWORD: secretkey
MINIO_SKIP_CLIENT: yes
MINIO_SKIP_CLIENT: "yes"
healthcheck:
interval: 1s
retries: 20
test:
- CMD-SHELL
- mc ping --count 1 trino
- mc ready data && mc mb --ignore-existing data/trino data/risingwave
networks:
- trino
- risingwave
volumes:
- minio:/data
- $PWD/docker/minio/config.json:/.mc/config.json:ro

hive-metastore:
Expand All @@ -119,7 +119,7 @@ services:
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://hive-metastore-db:23456/metastore
HIVE_METASTORE_USER: admin
HIVE_METASTORE_PASSWORD: admin
HIVE_METASTORE_WAREHOUSE_DIR: s3://warehouse/
HIVE_METASTORE_WAREHOUSE_DIR: s3://trino/
HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: accesskey
Expand Down Expand Up @@ -538,6 +538,39 @@ services:
networks:
- impala

risingwave:
image: ghcr.io/risingwavelabs/risingwave:nightly-20240122
command: "standalone --meta-opts=\" \
--advertise-addr 0.0.0.0:5690 \
--backend mem \
--state-store hummock+minio://accesskey:secretkey@minio:9000/risingwave \
--data-directory hummock_001\" \
--compute-opts=\"--advertise-addr 0.0.0.0:5688 --role both\" \
--frontend-opts=\"--listen-addr 0.0.0.0:4566 --advertise-addr 0.0.0.0:4566\" \
--compactor-opts=\"--advertise-addr 0.0.0.0:6660\""
ports:
- 4566:4566
depends_on:
minio:
condition: service_healthy
volumes:
- risingwave:/data
environment:
RUST_BACKTRACE: "1"
ENABLE_TELEMETRY: "false"
healthcheck:
test:
- CMD-SHELL
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/6660; exit $$?;'
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/5688; exit $$?;'
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;'
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/5690; exit $$?;'
interval: 1s
retries: 20
restart: on-failure
networks:
- risingwave

networks:
impala:
# docker defaults to naming networks "$PROJECT_$NETWORK" but the Java Hive
Expand All @@ -554,6 +587,7 @@ networks:
oracle:
exasol:
flink:
risingwave:

volumes:
broker_var:
Expand All @@ -569,6 +603,6 @@ volumes:
mysql:
oracle:
postgres:
minio:
exasol:
impala:
risingwave:
2 changes: 1 addition & 1 deletion docker/minio/config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"version": "10",
"aliases": {
"trino": {
"data": {
"url": "http://minio:9000",
"accessKey": "accesskey",
"secretKey": "secretkey",
Expand Down
1 change: 1 addition & 0 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"datafusion": "postgres",
# closest match see https://github.com/ibis-project/ibis/pull/7303#discussion_r1350223901
"exasol": "oracle",
"risingwave": "postgres",
}

_SQLALCHEMY_TO_SQLGLOT_DIALECT = {
Expand Down
18 changes: 18 additions & 0 deletions ibis/backends/base/sqlglot/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,24 @@ def _from_ibis_Map(cls, dtype: dt.Map) -> sge.DataType:
return sge.DataType(this=typecode.HSTORE)


class RisingWaveType(PostgresType):
dialect = "risingwave"

@classmethod
def _from_ibis_Timestamp(cls, dtype: dt.Timestamp) -> sge.DataType:
if dtype.timezone is not None:
return sge.DataType(this=typecode.TIMESTAMPTZ)
return sge.DataType(this=typecode.TIMESTAMP)

@classmethod
def _from_ibis_Decimal(cls, dtype: dt.Decimal) -> sge.DataType:
return sge.DataType(this=typecode.DECIMAL)

@classmethod
def _from_ibis_UUID(cls, dtype: dt.UUID) -> sge.DataType:
return sge.DataType(this=typecode.VARCHAR)


class DataFusionType(PostgresType):
unknown_type_strings = {
"utf8": dt.string,
Expand Down
Loading

0 comments on commit 18c8cb7

Please sign in to comment.