Skip to content

Commit

Permalink
add tpch test
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Nov 26, 2024
1 parent 950d065 commit a6e58df
Show file tree
Hide file tree
Showing 11 changed files with 2,732 additions and 5 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ repos:
hooks:
- id: end-of-file-fixer
- id: trailing-whitespace
exclude: 'src/frontend/planner_test/tests/testdata/.*'
- repo: https://github.com/crate-ci/typos
rev: v1.23.1
hooks:
Expand Down
File renamed without changes.
8 changes: 8 additions & 0 deletions e2e_test/nexmark/drop_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
statement ok
DROP SOURCE person CASCADE;

statement ok
DROP SOURCE auction CASCADE;

statement ok
DROP SOURCE bid CASCADE;
6 changes: 4 additions & 2 deletions e2e_test/nexmark/produce_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
control substitution on

system ok
rpk topic delete nexmark-events || true
rpk topic delete -r nexmark-* || true

system ok
rpk topic create nexmark-events -p 4
rpk topic create nexmark-auction -p 4 &&
rpk topic create nexmark-bid -p 4 &&
rpk topic create nexmark-person -p 4

include ./create_tables.slt.part

Expand Down
5 changes: 2 additions & 3 deletions e2e_test/source_inline/kafka/nexmark.slt
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
include ../../nexmark/produce_kafka.slt.part
include ../../nexmark/create_sources_kafka.part
include ../../nexmark/create_sources_kafka.slt.part

control substitution off

include ../../streaming/nexmark/create_views.slt.part
include ../../streaming/nexmark/test_mv_result.slt.part

statement ok
drop source nexmark cascade;
include ../../nexmark/drop_sources_kafka.slt.part
30 changes: 30 additions & 0 deletions e2e_test/source_inline/kafka/tpch.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
include ../../tpch/produce_kafka.slt.part
include ../../tpch/create_sources_kafka.slt.part

control substitution off

include ../../streaming/tpch/create_views.slt.part
include ../../streaming/tpch/q1.slt.part
include ../../streaming/tpch/q2.slt.part
include ../../streaming/tpch/q3.slt.part
include ../../streaming/tpch/q4.slt.part
include ../../streaming/tpch/q5.slt.part
include ../../streaming/tpch/q6.slt.part
include ../../streaming/tpch/q7.slt.part
include ../../streaming/tpch/q8.slt.part
include ../../streaming/tpch/q9.slt.part
include ../../streaming/tpch/q10.slt.part
include ../../streaming/tpch/q11.slt.part
include ../../streaming/tpch/q12.slt.part
include ../../streaming/tpch/q13.slt.part
include ../../streaming/tpch/q14.slt.part
include ../../streaming/tpch/q15.slt.part
include ../../streaming/tpch/q16.slt.part
include ../../streaming/tpch/q17.slt.part
include ../../streaming/tpch/q18.slt.part
include ../../streaming/tpch/q19.slt.part
include ../../streaming/tpch/q20.slt.part
include ../../streaming/tpch/q21.slt.part
include ../../streaming/tpch/q22.slt.part

include ../../tpch/drop_sources_kafka.slt.part
118 changes: 118 additions & 0 deletions e2e_test/tpch/create_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
control substitution on

statement ok
CREATE SOURCE supplier (
s_suppkey INTEGER,
s_name VARCHAR,
s_address VARCHAR,
s_nationkey INTEGER,
s_phone VARCHAR,
s_acctbal NUMERIC,
s_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-supplier'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE part (
p_partkey INTEGER,
p_name VARCHAR,
p_mfgr VARCHAR,
p_brand VARCHAR,
p_type VARCHAR,
p_size INTEGER,
p_container VARCHAR,
p_retailprice NUMERIC,
p_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-part'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE partsupp (
ps_partkey INTEGER,
ps_suppkey INTEGER,
ps_availqty INTEGER,
ps_supplycost NUMERIC,
ps_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-partsupp'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE customer (
c_custkey INTEGER,
c_name VARCHAR,
c_address VARCHAR,
c_nationkey INTEGER,
c_phone VARCHAR,
c_acctbal NUMERIC,
c_mktsegment VARCHAR,
c_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-customer'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE orders (
o_orderkey BIGINT,
o_custkey INTEGER,
o_orderstatus VARCHAR,
o_totalprice NUMERIC,
o_orderdate DATE,
o_orderpriority VARCHAR,
o_clerk VARCHAR,
o_shippriority INTEGER,
o_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-orders'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE lineitem (
l_orderkey BIGINT,
l_partkey INTEGER,
l_suppkey INTEGER,
l_linenumber INTEGER,
l_quantity NUMERIC,
l_extendedprice NUMERIC,
l_discount NUMERIC,
l_tax NUMERIC,
l_returnflag VARCHAR,
l_linestatus VARCHAR,
l_shipdate DATE,
l_commitdate DATE,
l_receiptdate DATE,
l_shipinstruct VARCHAR,
l_shipmode VARCHAR,
l_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-lineitem'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE nation (
n_nationkey INTEGER,
n_name VARCHAR,
n_regionkey INTEGER,
n_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-nation'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE SOURCE region (
r_regionkey INTEGER,
r_name VARCHAR,
r_comment VARCHAR
) WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-region'
) FORMAT PLAIN ENCODE JSON;
23 changes: 23 additions & 0 deletions e2e_test/tpch/drop_sources_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
statement ok
DROP SOURCE supplier CASCADE;

statement ok
DROP SOURCE region CASCADE;

statement ok
DROP SOURCE nation CASCADE;

statement ok
DROP SOURCE lineitem CASCADE;

statement ok
DROP SOURCE orders CASCADE;

statement ok
DROP SOURCE customer CASCADE;

statement ok
DROP SOURCE partsupp CASCADE;

statement ok
DROP SOURCE part CASCADE;
131 changes: 131 additions & 0 deletions e2e_test/tpch/produce_kafka.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
control substitution on

system ok
rpk topic delete -r tpch-* || true

system ok
rpk topic create tpch-supplier -p 4 &&
rpk topic create tpch-part -p 4 &&
rpk topic create tpch-partsupp -p 4 &&
rpk topic create tpch-customer -p 4 &&
rpk topic create tpch-orders -p 4 &&
rpk topic create tpch-lineitem -p 4 &&
rpk topic create tpch-nation -p 4 &&
rpk topic create tpch-region -p 4

include ./create_tables.slt.part

include ./insert_supplier.slt.part
include ./insert_part.slt.part
include ./insert_partsupp.slt.part
include ./insert_customer.slt.part
include ./insert_orders.slt.part
include ./insert_lineitem.slt.part
include ./insert_nation.slt.part
include ./insert_region.slt.part

statement ok
flush;

statement ok
create sink kafka_supplier FROM supplier
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-supplier'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink kafka_part FROM part
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-part'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink kafka_partsupp FROM partsupp
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-partsupp'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink kafka_customer FROM customer
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-customer'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

# note: In source, Date format is days_since_unix_epoch. In sink, it's num_days_from_ce.
# https://github.com/risingwavelabs/risingwave/issues/16467

statement ok
create sink kafka_orders AS select * except(o_orderdate), o_orderdate::varchar as o_orderdate FROM orders
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-orders'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink kafka_lineitem AS select * except(l_shipdate, l_commitdate, l_receiptdate), l_shipdate::varchar as l_shipdate, l_commitdate::varchar as l_commitdate, l_receiptdate::varchar as l_receiptdate FROM lineitem
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-lineitem'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink kafka_nation FROM nation
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-nation'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

statement ok
create sink kafka_region FROM region
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'tpch-region'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

sleep 5s

statement ok
DROP SINK kafka_supplier;

statement ok
DROP SINK kafka_part;

statement ok
DROP SINK kafka_partsupp;

statement ok
DROP SINK kafka_customer;

statement ok
DROP SINK kafka_orders;

statement ok
DROP SINK kafka_lineitem;

statement ok
DROP SINK kafka_nation;

statement ok
DROP SINK kafka_region;

include ./drop_tables.slt.part
Loading

0 comments on commit a6e58df

Please sign in to comment.