-
Notifications
You must be signed in to change notification settings - Fork 598
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
- Loading branch information
Showing
36 changed files
with
6,495 additions
and
216 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
control substitution on | ||
|
||
statement ok | ||
CREATE SOURCE person ( | ||
"id" BIGINT, | ||
"name" VARCHAR, | ||
"email_address" VARCHAR, | ||
"credit_card" VARCHAR, | ||
"city" VARCHAR, | ||
"state" VARCHAR, | ||
"date_time" TIMESTAMP, | ||
"extra" VARCHAR, | ||
) WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'nexmark-person' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
|
||
statement ok | ||
CREATE SOURCE auction ( | ||
"id" BIGINT, | ||
"item_name" VARCHAR, | ||
"description" VARCHAR, | ||
"initial_bid" BIGINT, | ||
"reserve" BIGINT, | ||
"date_time" TIMESTAMP, | ||
"expires" TIMESTAMP, | ||
"seller" BIGINT, | ||
"category" BIGINT, | ||
"extra" VARCHAR, | ||
) WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'nexmark-auction' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
statement ok | ||
CREATE SOURCE bid ( | ||
"auction" BIGINT, | ||
"bidder" BIGINT, | ||
"price" BIGINT, | ||
"channel" VARCHAR, | ||
"url" VARCHAR, | ||
"date_time" TIMESTAMP, | ||
"extra" VARCHAR | ||
) WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'nexmark-bid' | ||
) FORMAT PLAIN ENCODE JSON; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
statement ok | ||
DROP SOURCE person CASCADE; | ||
|
||
statement ok | ||
DROP SOURCE auction CASCADE; | ||
|
||
statement ok | ||
DROP SOURCE bid CASCADE; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
control substitution on | ||
|
||
system ok | ||
rpk topic delete -r nexmark-* || true | ||
|
||
system ok | ||
rpk topic create nexmark-auction -p 4 && | ||
rpk topic create nexmark-bid -p 4 && | ||
rpk topic create nexmark-person -p 4 | ||
|
||
include ./create_tables.slt.part | ||
|
||
include ./insert_auction.slt.part | ||
include ./insert_bid.slt.part | ||
include ./insert_person.slt.part | ||
|
||
statement ok | ||
flush; | ||
|
||
statement ok | ||
create sink nexmark_auction FROM auction | ||
WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'nexmark-auction' | ||
) FORMAT PLAIN ENCODE JSON ( | ||
force_append_only='true' | ||
); | ||
|
||
statement ok | ||
create sink nexmark_bid FROM bid | ||
WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'nexmark-bid' | ||
) FORMAT PLAIN ENCODE JSON ( | ||
force_append_only='true' | ||
); | ||
|
||
statement ok | ||
create sink nexmark_person FROM person | ||
WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'nexmark-person' | ||
) FORMAT PLAIN ENCODE JSON ( | ||
force_append_only='true' | ||
); | ||
|
||
sleep 5s | ||
|
||
statement ok | ||
DROP SINK nexmark_auction; | ||
|
||
statement ok | ||
DROP SINK nexmark_bid; | ||
|
||
statement ok | ||
DROP SINK nexmark_person; | ||
|
||
include ./drop_tables.slt.part |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
control substitution on | ||
|
||
# Note: rw_fragments is not isolated by schema so we make the test serial. | ||
|
||
system ok | ||
rpk topic create test-topic-19563 -p 6 | ||
|
||
statement ok | ||
CREATE SOURCE kafkasource ( | ||
v1 timestamp with time zone | ||
) | ||
WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'test-topic-19563', | ||
scan.startup.mode = 'earliest' | ||
) FORMAT PLAIN ENCODE JSON ( | ||
timestamptz.handling.mode = 'utc_without_suffix' | ||
); | ||
|
||
# Note that StreamSourceScan is in the StreamDynamicFilter fragment, which has 3 upstream fragments. | ||
query T | ||
explain create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000; | ||
---- | ||
StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } | ||
└─StreamDynamicFilter { predicate: (v1 <= $expr1), output: [v1, _row_id], cleaned_by_watermark: true } | ||
├─StreamProject { exprs: [v1, _row_id], output_watermarks: [v1] } | ||
│ └─StreamDynamicFilter { predicate: (v1 >= now), output_watermarks: [v1], output: [v1, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], cleaned_by_watermark: true } | ||
│ ├─StreamRowIdGen { row_id_index: 4 } | ||
│ │ └─StreamSourceScan { columns: [v1, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } | ||
│ └─StreamExchange { dist: Broadcast } | ||
│ └─StreamNow | ||
└─StreamExchange { dist: Broadcast } | ||
└─StreamProject { exprs: [AddWithTimeZone(now, '730000 days':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } | ||
└─StreamNow | ||
|
||
|
||
# The following test is adapted from `temporal_filter.slt`. | ||
|
||
# This statement should be correct for the next ~1000 years | ||
# We cannot have a variable interval for now, so we use 2000 year's worth of days as the upper bound. | ||
statement ok | ||
create materialized view mv1 as select v1 from kafkasource where v1 between now() and now() + interval '1 day' * 365 * 2000; | ||
|
||
query I | ||
select array_length(upstream_fragment_ids) from rw_fragments where array_contains(flags, Array['SOURCE_SCAN']); | ||
---- | ||
3 | ||
|
||
system ok | ||
cat <<EOF | rpk topic produce test-topic-19563 | ||
{"v1": "3031-01-01 19:00:00"} | ||
{"v1": "3031-01-01 20:00:00"} | ||
{"v1": "3031-01-01 21:00:00"} | ||
{"v1": "5031-01-01 21:00:00"} | ||
{"v1": "0001-01-01 21:00:00"} | ||
EOF | ||
|
||
sleep 3s | ||
|
||
# Below lower bound and above upper bound are not shown | ||
query I | ||
select * from mv1 order by v1; | ||
---- | ||
3031-01-01 19:00:00+00:00 | ||
3031-01-01 20:00:00+00:00 | ||
3031-01-01 21:00:00+00:00 | ||
|
||
|
||
statement ok | ||
DROP SOURCE kafkasource CASCADE; | ||
|
||
system ok | ||
rpk topic delete test-topic-18308 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
include ../../nexmark/produce_kafka.slt.part | ||
include ../../nexmark/create_sources_kafka.slt.part | ||
|
||
control substitution off | ||
|
||
include ../../streaming/nexmark/create_views.slt.part | ||
include ../../streaming/nexmark/test_mv_result.slt.part | ||
|
||
include ../../nexmark/drop_sources_kafka.slt.part |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
include ../../tpch/produce_kafka.slt.part | ||
include ../../tpch/create_sources_kafka.slt.part | ||
|
||
control substitution off | ||
|
||
include ../../streaming/tpch/create_views.slt.part | ||
include ../../streaming/tpch/q1.slt.part | ||
include ../../streaming/tpch/q2.slt.part | ||
include ../../streaming/tpch/q3.slt.part | ||
include ../../streaming/tpch/q4.slt.part | ||
include ../../streaming/tpch/q5.slt.part | ||
include ../../streaming/tpch/q6.slt.part | ||
include ../../streaming/tpch/q7.slt.part | ||
include ../../streaming/tpch/q8.slt.part | ||
include ../../streaming/tpch/q9.slt.part | ||
include ../../streaming/tpch/q10.slt.part | ||
include ../../streaming/tpch/q11.slt.part | ||
include ../../streaming/tpch/q12.slt.part | ||
include ../../streaming/tpch/q13.slt.part | ||
include ../../streaming/tpch/q14.slt.part | ||
include ../../streaming/tpch/q15.slt.part | ||
include ../../streaming/tpch/q16.slt.part | ||
include ../../streaming/tpch/q17.slt.part | ||
include ../../streaming/tpch/q18.slt.part | ||
include ../../streaming/tpch/q19.slt.part | ||
include ../../streaming/tpch/q20.slt.part | ||
include ../../streaming/tpch/q21.slt.part | ||
include ../../streaming/tpch/q22.slt.part | ||
|
||
include ../../tpch/drop_sources_kafka.slt.part |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
control substitution on | ||
|
||
statement ok | ||
CREATE SOURCE supplier ( | ||
s_suppkey INTEGER, | ||
s_name VARCHAR, | ||
s_address VARCHAR, | ||
s_nationkey INTEGER, | ||
s_phone VARCHAR, | ||
s_acctbal NUMERIC, | ||
s_comment VARCHAR | ||
) WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'tpch-supplier' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
statement ok | ||
CREATE SOURCE part ( | ||
p_partkey INTEGER, | ||
p_name VARCHAR, | ||
p_mfgr VARCHAR, | ||
p_brand VARCHAR, | ||
p_type VARCHAR, | ||
p_size INTEGER, | ||
p_container VARCHAR, | ||
p_retailprice NUMERIC, | ||
p_comment VARCHAR | ||
) WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'tpch-part' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
statement ok | ||
CREATE SOURCE partsupp ( | ||
ps_partkey INTEGER, | ||
ps_suppkey INTEGER, | ||
ps_availqty INTEGER, | ||
ps_supplycost NUMERIC, | ||
ps_comment VARCHAR | ||
) WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'tpch-partsupp' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
statement ok | ||
CREATE SOURCE customer ( | ||
c_custkey INTEGER, | ||
c_name VARCHAR, | ||
c_address VARCHAR, | ||
c_nationkey INTEGER, | ||
c_phone VARCHAR, | ||
c_acctbal NUMERIC, | ||
c_mktsegment VARCHAR, | ||
c_comment VARCHAR | ||
) WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'tpch-customer' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
statement ok | ||
CREATE SOURCE orders ( | ||
o_orderkey BIGINT, | ||
o_custkey INTEGER, | ||
o_orderstatus VARCHAR, | ||
o_totalprice NUMERIC, | ||
o_orderdate DATE, | ||
o_orderpriority VARCHAR, | ||
o_clerk VARCHAR, | ||
o_shippriority INTEGER, | ||
o_comment VARCHAR | ||
) WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'tpch-orders' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
statement ok | ||
CREATE SOURCE lineitem ( | ||
l_orderkey BIGINT, | ||
l_partkey INTEGER, | ||
l_suppkey INTEGER, | ||
l_linenumber INTEGER, | ||
l_quantity NUMERIC, | ||
l_extendedprice NUMERIC, | ||
l_discount NUMERIC, | ||
l_tax NUMERIC, | ||
l_returnflag VARCHAR, | ||
l_linestatus VARCHAR, | ||
l_shipdate DATE, | ||
l_commitdate DATE, | ||
l_receiptdate DATE, | ||
l_shipinstruct VARCHAR, | ||
l_shipmode VARCHAR, | ||
l_comment VARCHAR | ||
) WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'tpch-lineitem' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
statement ok | ||
CREATE SOURCE nation ( | ||
n_nationkey INTEGER, | ||
n_name VARCHAR, | ||
n_regionkey INTEGER, | ||
n_comment VARCHAR | ||
) WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'tpch-nation' | ||
) FORMAT PLAIN ENCODE JSON; | ||
|
||
statement ok | ||
CREATE SOURCE region ( | ||
r_regionkey INTEGER, | ||
r_name VARCHAR, | ||
r_comment VARCHAR | ||
) WITH ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'tpch-region' | ||
) FORMAT PLAIN ENCODE JSON; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
statement ok | ||
DROP SOURCE supplier CASCADE; | ||
|
||
statement ok | ||
DROP SOURCE region CASCADE; | ||
|
||
statement ok | ||
DROP SOURCE nation CASCADE; | ||
|
||
statement ok | ||
DROP SOURCE lineitem CASCADE; | ||
|
||
statement ok | ||
DROP SOURCE orders CASCADE; | ||
|
||
statement ok | ||
DROP SOURCE customer CASCADE; | ||
|
||
statement ok | ||
DROP SOURCE partsupp CASCADE; | ||
|
||
statement ok | ||
DROP SOURCE part CASCADE; |
Oops, something went wrong.