forked from ibis-project/ibis
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(risingwave): init impl for Risingwave (ibis-project#7954)
This is the initial PR for ibis to support risingwave. [RisingWave](https://github.com/risingwavelabs/risingwave) is a distributed SQL streaming database engineered to provide the simplest and most cost-efficient approach for processing and managing streaming data with utmost reliability. After a few weeks of investigation, here's my phasic results. 1. As Risingwave is largely compatible with Postgres, Ibis can be easily extended to support Risingwave. The main work is to add a new dialect for Risingwave, which is similar to the `postgres` dialect. I've almost finished this part in this PR. With this PR, Ibis can be used to connect to Risingwave and run some basic queries. I've manually tested some queries which works well and add some tests imitating Postgres Backend. I would appreciate if you can help test more queries. 2. As ibis relies on SQLalchemy to support Postgres, we follow its implementation to support Risingwave. However, there are also some differences in semantics between Risingwave and Postgres, which require some modification in either ibis or SQLalchemy. `sqlalchemy-risingwave` is designed to reduce this mismatch. So in this PR, I introduce the new dependencies `sqlalchemy-risingwave` to ibis. 3. Ibis has no support for Materialized View natively. However Materialized View is a core concept in RW, people use RW because of its convenient auto-updated Materialized View. Now, if a user wants to create a new MV, he needs to use a raw SQL. Adding DDLs like `CreateMaterializedView`, `CreateSource` and `CreateSink` in the [base ddl file](https://github.com/ibis-project/ibis/blob/main/ibis/backends/base/sql/ddl.py) may help. We would appreciate it if you can help offer some suggestions. Besides this, I also met some obstacle that may need your help. 1. Risingwave hasn't supported `TEMPORARY VIEW` yet, so I changed some implementations relying on `TEMPORARY VIEW` to a normal view. For example, for the `_metadata()` functions, RW backend's implementation is `con.exec_driver_sql(f"CREATE VIEW IF NOT EXISTS {name} AS {query}")`. While in PG it's `con.exec_driver_sql(f"CREATE TEMPORARY VIEW {name} AS {query}")`. Do you have any suggestions for other ways to work around this? Besides, I didn't quite get what `_metadata()` is doing here. I would appreciate it if you could explain it a bit. 2. There's some mismatch between the `postgres` dialect and `risingwave` dialect, which are still not fully tested in this PR. We'll continue to work on it. 3. ~~This PR requires some new features of Risingwave v1.6.0 and sqlalchemy-risingwave v1.0.0 which are not released yet. They'll be released soon.~~ Done. BTW, how should I indicate this backend is only for risingwave > 1.6? 4. I don't quite understand the test pipeline of Ibis. I copied the test cases from the `postgres` dialect and modified them to fit the `risingwave` dialect, and some of them are commented temporarily due to the lack of support. I also added an SQL script to help set up the test environment, which creates tables and loads data. But I don't know how to run it in the test pipeline. Any suggestions or guidance are welcomed. I suppose the test pipeline would require a docker image of Risingwave. We can provide one if needed. 5. I'm a newbie in the ibis community, this PR may not be perfect considering others. Any comments are welcomed and I sincerely appreciate your time and patience. closes ibis-project#8038
- Loading branch information
1 parent
42050d1
commit 55c545d
Showing
46 changed files
with
4,183 additions
and
1,127 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
SET RW_IMPLICIT_FLUSH=true; | ||
|
||
DROP TABLE IF EXISTS diamonds CASCADE; | ||
|
||
CREATE TABLE diamonds ( | ||
carat FLOAT, | ||
cut TEXT, | ||
color TEXT, | ||
clarity TEXT, | ||
depth FLOAT, | ||
"table" FLOAT, | ||
price BIGINT, | ||
x FLOAT, | ||
y FLOAT, | ||
z FLOAT | ||
) WITH ( | ||
connector = 'posix_fs', | ||
match_pattern = 'diamonds.csv', | ||
posix_fs.root = '/data', | ||
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' ); | ||
|
||
DROP TABLE IF EXISTS astronauts CASCADE; | ||
|
||
CREATE TABLE astronauts ( | ||
"id" BIGINT, | ||
"number" BIGINT, | ||
"nationwide_number" BIGINT, | ||
"name" VARCHAR, | ||
"original_name" VARCHAR, | ||
"sex" VARCHAR, | ||
"year_of_birth" BIGINT, | ||
"nationality" VARCHAR, | ||
"military_civilian" VARCHAR, | ||
"selection" VARCHAR, | ||
"year_of_selection" BIGINT, | ||
"mission_number" BIGINT, | ||
"total_number_of_missions" BIGINT, | ||
"occupation" VARCHAR, | ||
"year_of_mission" BIGINT, | ||
"mission_title" VARCHAR, | ||
"ascend_shuttle" VARCHAR, | ||
"in_orbit" VARCHAR, | ||
"descend_shuttle" VARCHAR, | ||
"hours_mission" DOUBLE PRECISION, | ||
"total_hrs_sum" DOUBLE PRECISION, | ||
"field21" BIGINT, | ||
"eva_hrs_mission" DOUBLE PRECISION, | ||
"total_eva_hrs" DOUBLE PRECISION | ||
) WITH ( | ||
connector = 'posix_fs', | ||
match_pattern = 'astronauts.csv', | ||
posix_fs.root = '/data', | ||
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' ); | ||
|
||
DROP TABLE IF EXISTS batting CASCADE; | ||
|
||
CREATE TABLE batting ( | ||
"playerID" TEXT, | ||
"yearID" BIGINT, | ||
stint BIGINT, | ||
"teamID" TEXT, | ||
"lgID" TEXT, | ||
"G" BIGINT, | ||
"AB" BIGINT, | ||
"R" BIGINT, | ||
"H" BIGINT, | ||
"X2B" BIGINT, | ||
"X3B" BIGINT, | ||
"HR" BIGINT, | ||
"RBI" BIGINT, | ||
"SB" BIGINT, | ||
"CS" BIGINT, | ||
"BB" BIGINT, | ||
"SO" BIGINT, | ||
"IBB" BIGINT, | ||
"HBP" BIGINT, | ||
"SH" BIGINT, | ||
"SF" BIGINT, | ||
"GIDP" BIGINT | ||
) WITH ( | ||
connector = 'posix_fs', | ||
match_pattern = 'batting.csv', | ||
posix_fs.root = '/data', | ||
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' ); | ||
|
||
DROP TABLE IF EXISTS awards_players CASCADE; | ||
|
||
CREATE TABLE awards_players ( | ||
"playerID" TEXT, | ||
"awardID" TEXT, | ||
"yearID" BIGINT, | ||
"lgID" TEXT, | ||
tie TEXT, | ||
notes TEXT | ||
) WITH ( | ||
connector = 'posix_fs', | ||
match_pattern = 'awards_players.csv', | ||
posix_fs.root = '/data', | ||
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' ); | ||
|
||
DROP TABLE IF EXISTS functional_alltypes CASCADE; | ||
|
||
CREATE TABLE functional_alltypes ( | ||
id INTEGER, | ||
bool_col BOOLEAN, | ||
tinyint_col SMALLINT, | ||
smallint_col SMALLINT, | ||
int_col INTEGER, | ||
bigint_col BIGINT, | ||
float_col REAL, | ||
double_col DOUBLE PRECISION, | ||
date_string_col TEXT, | ||
string_col TEXT, | ||
timestamp_col TIMESTAMP WITHOUT TIME ZONE, | ||
year INTEGER, | ||
month INTEGER | ||
) WITH ( | ||
connector = 'posix_fs', | ||
match_pattern = 'functional_alltypes.csv', | ||
posix_fs.root = '/data', | ||
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' ); | ||
|
||
DROP TABLE IF EXISTS tzone CASCADE; | ||
|
||
CREATE TABLE tzone ( | ||
ts TIMESTAMP WITH TIME ZONE, | ||
key TEXT, | ||
value DOUBLE PRECISION | ||
); | ||
|
||
INSERT INTO tzone | ||
SELECT | ||
CAST('2017-05-28 11:01:31.000400' AS TIMESTAMP WITH TIME ZONE) + | ||
t * INTERVAL '1 day 1 second' AS ts, | ||
CHR(97 + t) AS key, | ||
t + t / 10.0 AS value | ||
FROM generate_series(0, 9) AS t; | ||
|
||
DROP TABLE IF EXISTS array_types CASCADE; | ||
|
||
CREATE TABLE IF NOT EXISTS array_types ( | ||
x BIGINT[], | ||
y TEXT[], | ||
z DOUBLE PRECISION[], | ||
grouper TEXT, | ||
scalar_column DOUBLE PRECISION, | ||
multi_dim BIGINT[][] | ||
); | ||
|
||
INSERT INTO array_types VALUES | ||
(ARRAY[1, 2, 3], ARRAY['a', 'b', 'c'], ARRAY[1.0, 2.0, 3.0], 'a', 1.0, ARRAY[ARRAY[NULL::BIGINT, NULL, NULL], ARRAY[1, 2, 3]]), | ||
(ARRAY[4, 5], ARRAY['d', 'e'], ARRAY[4.0, 5.0], 'a', 2.0, ARRAY[]::BIGINT[][]), | ||
(ARRAY[6, NULL], ARRAY['f', NULL], ARRAY[6.0, NULL], 'a', 3.0, ARRAY[NULL, ARRAY[]::BIGINT[], NULL]), | ||
(ARRAY[NULL, 1, NULL], ARRAY[NULL, 'a', NULL], ARRAY[]::DOUBLE PRECISION[], 'b', 4.0, ARRAY[ARRAY[1], ARRAY[2], ARRAY[NULL::BIGINT], ARRAY[3]]), | ||
(ARRAY[2, NULL, 3], ARRAY['b', NULL, 'c'], NULL, 'b', 5.0, NULL), | ||
(ARRAY[4, NULL, NULL, 5], ARRAY['d', NULL, NULL, 'e'], ARRAY[4.0, NULL, NULL, 5.0], 'c', 6.0, ARRAY[ARRAY[1, 2, 3]]); | ||
|
||
DROP TABLE IF EXISTS json_t CASCADE; | ||
|
||
CREATE TABLE IF NOT EXISTS json_t (js JSONB); | ||
|
||
INSERT INTO json_t VALUES | ||
('{"a": [1,2,3,4], "b": 1}'), | ||
('{"a":null,"b":2}'), | ||
('{"a":"foo", "c":null}'), | ||
('null'), | ||
('[42,47,55]'), | ||
('[]'); | ||
|
||
DROP TABLE IF EXISTS win CASCADE; | ||
CREATE TABLE win (g TEXT, x BIGINT, y BIGINT); | ||
INSERT INTO win VALUES | ||
('a', 0, 3), | ||
('a', 1, 2), | ||
('a', 2, 0), | ||
('a', 3, 1), | ||
('a', 4, 1); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# RisingWave config file to be mounted into the Docker containers. | ||
# See https://github.com/risingwavelabs/risingwave/blob/main/src/config/example.toml for example |
Oops, something went wrong.