Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): add date, time, interval, bytea and jsonb data types for PG/MySQL sink of stream_chunk/json payload #9957

Merged
merged 31 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0b1ffcc
add date, time data types for jdbc sink(pg)
StrikeW May 22, 2023
aa2977b
clean
StrikeW May 23, 2023
393b9fb
add support for interval and jsonb
StrikeW May 23, 2023
01b28fb
check downstream database type
StrikeW May 24, 2023
4c44ab5
add date, time, jsonb for jdbc sink json format
WillyKidd May 25, 2023
5b7e046
fix jsonb for mysql
WillyKidd May 25, 2023
7ca3efb
fix fallthrough in switch
WillyKidd May 25, 2023
7247041
bytea support for stream_chunk on connector side
WillyKidd May 26, 2023
10d2e4a
bytea support for json format
WillyKidd May 26, 2023
4b7b84f
fix date
WillyKidd May 26, 2023
2725dea
"upgrade" sink e2e test
WillyKidd May 26, 2023
265015f
Revert ""upgrade" sink e2e test"
WillyKidd May 26, 2023
77f1f32
fix unit test
WillyKidd May 26, 2023
b81e060
add sink e2e test for pg and mysql
WillyKidd May 26, 2023
dcbeb08
fix jdbc sink updateinsert
WillyKidd May 29, 2023
a6f3023
fix 😅
WillyKidd May 29, 2023
6551611
fix
WillyKidd May 29, 2023
af21dc1
try fix e2e
WillyKidd May 29, 2023
b6f4352
fix e2e
WillyKidd May 29, 2023
6fc4d8a
fix e2e
WillyKidd May 29, 2023
fb2018f
fix e2e
WillyKidd May 29, 2023
56120de
fix
WillyKidd May 29, 2023
c60353a
fix
WillyKidd May 29, 2023
b452f4e
revert upsert/append_only.result
WillyKidd May 29, 2023
f84116e
fix misc
WillyKidd May 30, 2023
f97c11c
Merge branch 'main' into siyuan/jdbc-sink-data-types
WillyKidd May 30, 2023
dd07a95
use base64 for bytea in json encoding
WillyKidd May 30, 2023
d176fde
"upgrade" jdbc sink test
WillyKidd May 30, 2023
000a208
Merge branch 'main' into siyuan/jdbc-sink-data-types
WillyKidd May 30, 2023
009c1cf
fix format
WillyKidd May 30, 2023
2035ae8
minor
StrikeW May 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions ci/scripts/e2e-sink-test.sh
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node
mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE DATABASE IF NOT EXISTS test;"
# grant access to `test` for ci test user
mysql --host=mysql --port=3306 -u root -p123456 -e "GRANT ALL PRIVILEGES ON test.* TO 'mysqluser'@'%';"
# create a table named t_remote
# creates two table named t_remote_0, t_remote_1
mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/mysql_create_table.sql

echo "--- preparing postgresql"
Expand Down Expand Up @@ -92,10 +92,19 @@ sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
sleep 1

# check sink destination mysql using shell
diff -u ./e2e_test/sink/remote/mysql_expected_result.tsv \
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote ORDER BY id")
diff -u ./e2e_test/sink/remote/mysql_expected_result_0.tsv \
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote_0 ORDER BY id")
if [ $? -eq 0 ]; then
echo "mysql sink check passed"
echo "mysql sink check 0 passed"
else
echo "The output is not as expected."
exit 1
fi

diff -u ./e2e_test/sink/remote/mysql_expected_result_1.tsv \
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT id, v_varchar, v_text, v_integer, v_smallint, v_bigint, v_decimal, v_real, v_double, v_boolean, v_date, v_time, v_timestamp, v_jsonb, TO_BASE64(v_bytea) FROM test.t_remote_1 ORDER BY id")
if [ $? -eq 0 ]; then
echo "mysql sink check 1 passed"
else
echo "The output is not as expected."
exit 1
Expand Down
11 changes: 10 additions & 1 deletion e2e_test/sink/remote/jdbc.check.pg.slt
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
# the check is run on sink destination postgres database

query I
select * from t_remote order by id;
select * from t_remote_0 order by id;
----
1 Alex 28208 281620391 4986480304337356659 28162.0391 2.03 28162.0391 2023-03-20 10:18:30
3 Carl 18300 1702307129 7878292368468104216 17023.07129 23.07 17023.07129 2023-03-20 10:18:32
4 Doris 17250 151951802 3946135584462581863 1519518.02 18.02 1519518.02 2023-03-21 10:18:30
5 Eve 9725 698160808 524334216698825611 69.8160808 69.81 69.8160808 2023-03-21 10:18:31
6 Frank 28131 1233587627 8492820454814063326 123358.7627 58.76 123358.7627 2023-03-21 10:18:32

query II
select * from t_remote_1 order by id;
----
1 Alex Text value 1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 {"key": "value"} \xdeadbeef
3 Varchar value 3 Text value 3 345 678 901 34.56 78.9 12.34 t 2023-05-24 12:34:56 2023-05-24 12:34:56 {"key": "value3"} \xcafebabe
4 Varchar value 4 Text value 4 456 789 12 45.67 89.01 23.45 f 2023-05-25 23:45:01 2023-05-25 23:45:01 {"key": "value4"} \xbabec0de
5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 t 2023-05-26 12:34:56 2023-05-26 12:34:56 {"key": "value5"} \xdeadbabe
6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 f 2023-05-27 23:45:01 2023-05-27 23:45:01 {"key": "value6"} \xdeadbabe
96 changes: 82 additions & 14 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
create table t_remote (
create table t_remote_0 (
id integer primary key,
v_varchar varchar,
v_smallint smallint,
Expand All @@ -12,59 +12,127 @@ create table t_remote (
);

statement ok
create materialized view mv_remote as select * from t_remote;
CREATE TABLE t_remote_1 (
id BIGINT PRIMARY KEY,
v_varchar VARCHAR,
v_text TEXT,
v_integer INTEGER,
v_smallint SMALLINT,
v_bigint BIGINT,
v_decimal DECIMAL,
v_real REAL,
v_double DOUBLE PRECISION,
v_boolean BOOLEAN,
v_date DATE,
v_time TIME,
v_timestamp TIMESTAMP,
v_jsonb JSONB,
v_bytea BYTEA
);

statement ok
create materialized view mv_remote_0 as select * from t_remote_0;

statement ok
create materialized view mv_remote_1 as select * from t_remote_1;

statement ok
CREATE SINK s_postgres FROM mv_remote WITH (
CREATE SINK s_postgres_0 FROM mv_remote_0 WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='t_remote',
table.name='t_remote_0',
type='upsert'
);

statement ok
CREATE SINK s_postgres_1 FROM mv_remote_1 WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='t_remote_1',
type='upsert'
);

statement ok
CREATE SINK s_mysql_0 FROM mv_remote_0 WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw',
table.name='t_remote_0',
type='upsert'
);

statement ok
CREATE SINK s_mysql FROM mv_remote WITH (
CREATE SINK s_mysql_1 FROM mv_remote_1 WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw',
table.name='t_remote',
table.name='t_remote_1',
type='upsert'
);

statement ok
INSERT INTO t_remote VALUES
INSERT INTO t_remote_0 VALUES
(1, 'Alice', 28208, 281620391, 4986480304337356659, 28162.0391, 2.03, 28162.0391, '2023-03-20 10:18:30'),
(2, 'Bob', 10580, 2131030003, 3074255027698877876, 21310.30003, 10.3, 21310.30003, '2023-03-20 10:18:31'),
(3, 'Carl', 18300, 1702307129, 7878292368468104216, 17023.07129, 23.07, 17023.07129, '2023-03-20 10:18:32');

statement ok
INSERT INTO t_remote VALUES
INSERT INTO t_remote_0 VALUES
(4, 'Doris', 17250, 151951802, 3946135584462581863, 1519518.02, 18.02, 1519518.02, '2023-03-21 10:18:30'),
(5, 'Eve', 9725, 698160808, 524334216698825611, 69.8160808, 69.81, 69.8160808, '2023-03-21 10:18:31'),
(6, 'Frank', 28131, 1233587627, 8492820454814063326, 123358.7627, 58.76, 123358.7627, '2023-03-21 10:18:32');

statement ok
INSERT INTO t_remote_1 VALUES
(1, 'Varchar value 1', 'Text value 1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '{"key": "value"}', E'\\xDEADBEEF'),
(2, 'Varchar value 2', 'Text value 2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '{"key": "value2"}', E'\\xFEEDBEEF'),
(3, 'Varchar value 3', 'Text value 3', 345, 678, 901, 34.56, 78.90, 12.34, TRUE, '2023-05-24', '12:34:56', '2023-05-24 12:34:56', '{"key": "value3"}', E'\\xCAFEBABE');

statement ok
INSERT INTO t_remote_1 VALUES
(4, 'Varchar value 4', 'Text value 4', 456, 789, 012, 45.67, 89.01, 23.45, FALSE, '2023-05-25', '23:45:01', '2023-05-25 23:45:01', '{"key": "value4"}', E'\\xBABEC0DE'),
(5, 'Varchar value 5', 'Text value 5', 567, 890, 123, 56.78, 90.12, 34.56, TRUE, '2023-05-26', '12:34:56', '2023-05-26 12:34:56', '{"key": "value5"}', E'\\xDEADBABE'),
(6, 'Varchar value 6', 'Text value 6', 789, 123, 456, 67.89, 34.56, 78.91, FALSE, '2023-05-27', '23:45:01', '2023-05-27 23:45:01', '{"key": "value6"}', E'\\xDEADBABE');

statement ok
FLUSH;

statement ok
UPDATE t_remote SET v_varchar = 'Alex' WHERE id = 1;
UPDATE t_remote_0 SET v_varchar = 'Alex' WHERE id = 1;

statement ok
UPDATE t_remote_1 SET v_varchar = 'Alex' WHERE id = 1;

statement ok
DELETE FROM t_remote_0 WHERE id = 2;

statement ok
DELETE FROM t_remote WHERE id = 2;
DELETE FROM t_remote_1 WHERE id = 2;

statement ok
FLUSH;

statement ok
DROP SINK s_postgres;
DROP SINK s_postgres_0;

statement ok
DROP SINK s_postgres_1;

statement ok
DROP SINK s_mysql_0;

statement ok
DROP SINK s_mysql_1;

statement ok
DROP MATERIALIZED VIEW mv_remote_0;

statement ok
DROP SINK s_mysql
DROP MATERIALIZED VIEW mv_remote_1;

statement ok
DROP MATERIALIZED VIEW mv_remote;
DROP TABLE t_remote_0;

statement ok
DROP TABLE t_remote;
DROP TABLE t_remote_1;

statement ok
FLUSH;
22 changes: 20 additions & 2 deletions e2e_test/sink/remote/mysql_create_table.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE t_remote (
CREATE TABLE t_remote_0 (
id integer PRIMARY KEY,
v_varchar varchar(100),
v_smallint smallint,
Expand All @@ -8,4 +8,22 @@ CREATE TABLE t_remote (
v_float float,
v_double double,
v_timestamp timestamp
);
);

CREATE TABLE t_remote_1 (
id BIGINT PRIMARY KEY,
v_varchar VARCHAR(255),
v_text TEXT,
v_integer INT,
v_smallint SMALLINT,
v_bigint BIGINT,
v_decimal DECIMAL(10,2),
v_real FLOAT,
v_double DOUBLE,
v_boolean BOOLEAN,
v_date DATE,
v_time TIME,
v_timestamp TIMESTAMP,
v_jsonb JSON,
v_bytea BLOB
);
5 changes: 5 additions & 0 deletions e2e_test/sink/remote/mysql_expected_result_1.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1 Alex Text value 1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 {"key": "value"} 3q2+7w==
3 Varchar value 3 Text value 3 345 678 901 34.56 78.9 12.34 1 2023-05-24 12:34:56 2023-05-24 12:34:56 {"key": "value3"} yv66vg==
4 Varchar value 4 Text value 4 456 789 12 45.67 89.01 23.45 0 2023-05-25 23:45:01 2023-05-25 23:45:01 {"key": "value4"} ur7A3g==
5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 1 2023-05-26 12:34:56 2023-05-26 12:34:56 {"key": "value5"} 3q26vg==
6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 0 2023-05-27 23:45:01 2023-05-27 23:45:01 {"key": "value6"} 3q26vg==
22 changes: 20 additions & 2 deletions e2e_test/sink/remote/pg_create_table.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE t_remote (
CREATE TABLE t_remote_0 (
id integer PRIMARY KEY,
v_varchar varchar(100),
v_smallint smallint,
Expand All @@ -8,4 +8,22 @@ CREATE TABLE t_remote (
v_float real,
v_double double precision,
v_timestamp timestamp
);
);

CREATE TABLE t_remote_1 (
id BIGINT PRIMARY KEY,
v_varchar VARCHAR(255),
v_text TEXT,
v_integer INTEGER,
v_smallint SMALLINT,
v_bigint BIGINT,
v_decimal DECIMAL(10,2),
v_real REAL,
v_double DOUBLE PRECISION,
v_boolean BOOLEAN,
v_date DATE,
v_time TIME,
v_timestamp TIMESTAMP,
v_jsonb JSONB,
v_bytea BYTEA
);
4 changes: 2 additions & 2 deletions integration_tests/postgres-cdc/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ services:
# Use this command to connect to the DB from outside the container:
# docker exec postgres psql --username=myuser --dbname=mydb
postgres:
image: postgres
image: postgres:14-alpine
environment:
- POSTGRES_USER=myuser
- POSTGRES_PASSWORD=123456
- POSTGRES_DB=mydb
ports:
- 5432:5432
- 8432:5432
healthcheck:
test: [ "CMD-SHELL", "pg_isready --username=myuser --dbname=mydb" ]
interval: 5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload;
import com.risingwave.proto.Data;
import java.io.ByteArrayInputStream;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.Map;

public class JsonDeserializer implements Deserializer {
Expand Down Expand Up @@ -130,6 +134,28 @@ private static BigDecimal castDecimal(Object value) {
}
}

private static Time castTime(Object value) {
try {
Long milli = castLong(value);
return new Time(milli);
} catch (RuntimeException e) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unable to cast into time from " + value.getClass())
.asRuntimeException();
}
}

private static Date castDate(Object value) {
try {
Long days = castLong(value) - 1;
return Date.valueOf(LocalDate.of(1, 1, 1).plusDays(days));
} catch (RuntimeException e) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unable to cast into date from " + value.getClass())
.asRuntimeException();
}
}

private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) {
switch (typeName) {
case INT16:
Expand Down Expand Up @@ -167,6 +193,40 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj
.asRuntimeException();
}
return Timestamp.valueOf((String) value);
case TIME:
return castTime(value);
case DATE:
return castDate(value);
case INTERVAL:
if (!(value instanceof String)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected interval, got " + value.getClass())
.asRuntimeException();
}
return value;
case JSONB:
if (!(value instanceof String)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected jsonb, got " + value.getClass())
.asRuntimeException();
}
return value;
case BYTEA:
if (!(value instanceof String)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected jsonb, got " + value.getClass())
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
.asRuntimeException();
}
String s = (String) value;
int len = s.length();
byte[] bytes = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
WillyKidd marked this conversation as resolved.
Show resolved Hide resolved
bytes[i / 2] =
(byte)
((Character.digit(s.charAt(i), 16) << 4)
+ Character.digit(s.charAt(i + 1), 16));
}
return new ByteArrayInputStream(bytes);
default:
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unsupported type " + typeName)
Expand Down
Loading