Skip to content

Commit

Permalink
feat(sink): sink one-dimensional array data type to postgres and mysql (
Browse files Browse the repository at this point in the history
#10032)

Co-authored-by: weili <[email protected]>
  • Loading branch information
StrikeW and WillyKidd authored Jun 7, 2023
1 parent cf0940c commit 3d89b39
Show file tree
Hide file tree
Showing 36 changed files with 592 additions and 87 deletions.
13 changes: 13 additions & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ download_java_binding "$profile"
# TODO: Switch to stream_chunk encoding once it's completed, and then remove json encoding as well as this env var.
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk

# Change process number limit
echo "--- os limits"
ulimit -a

echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
mkdir ./connector-node
Expand Down Expand Up @@ -110,6 +114,15 @@ else
exit 1
fi

diff -u ./e2e_test/sink/remote/mysql_expected_result_2.tsv \
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_types ORDER BY id")
if [ $? -eq 0 ]; then
echo "mysql sink check 0 passed"
else
echo "The output is not as expected."
exit 1
fi

echo "--- testing kafka sink"
./ci/scripts/e2e-kafka-sink-test.sh
if [ $? -eq 0 ]; then
Expand Down
10 changes: 9 additions & 1 deletion e2e_test/sink/remote/jdbc.check.pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,16 @@ select * from t_remote_0 order by id;
query II
select * from t_remote_1 order by id;
----
1 Alex Text value 1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 2023-05-22 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value"} \xdeadbeef
1 Alex Text value 1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 2023-05-22 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value"} \xdeadbeef
3 Varchar value 3 Text value 3 345 678 901 34.56 78.9 12.34 t 2023-05-24 12:34:56 2023-05-24 12:34:56 2023-05-24 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value3"} \xcafebabe
4 Varchar value 4 Text value 4 456 789 12 45.67 89.01 23.45 f 2023-05-25 23:45:01 2023-05-25 23:45:01 2023-05-25 23:45:01+00 2 years 3 mons 4 days 05:06:07 {"key": "value4"} \xbabec0de
5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 t 2023-05-26 12:34:56 2023-05-26 12:34:56 2023-05-26 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value5"} \xdeadbabe
6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 f 2023-05-27 23:45:01 2023-05-27 23:45:01 2023-05-27 23:45:01+00 2 years 3 mons 4 days 05:06:07 {"key": "value6"} \xdeadbabe


query III
select * from t_types order by id;
----
1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"Value 1","Value 2"} {12.345,56.789}
2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} {"Value 3","Value 4"} {43.21,65.432}
3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"How're you?","\"hello\\ \\world\""} {12.345,56.789}
52 changes: 52 additions & 0 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,27 @@ CREATE TABLE t_remote_1 (
v_bytea BYTEA
);

statement ok
CREATE TABLE rw_types (
id BIGINT PRIMARY KEY,
varchar_column VARCHAR,
text_column TEXT,
integer_column INTEGER,
smallint_column SMALLINT,
bigint_column BIGINT,
decimal_column DECIMAL,
real_column REAL,
double_column DOUBLE PRECISION,
boolean_column BOOLEAN,
date_column DATE,
time_column TIME,
timestamp_column TIMESTAMP,
interval_column INTERVAL,
jsonb_column JSONB,
array_column VARCHAR[],
array_column2 FLOAT[]
);

statement ok
create materialized view mv_remote_0 as select * from t_remote_0;

Expand Down Expand Up @@ -70,6 +91,22 @@ CREATE SINK s_mysql_1 FROM mv_remote_1 WITH (
type='upsert'
);

statement ok
CREATE SINK s2_postgres FROM rw_types WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='t_types',
type='upsert'
);

statement ok
CREATE SINK s2_mysql FROM rw_types WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw',
table.name='t_types',
type='upsert'
);

statement ok
INSERT INTO t_remote_0 VALUES
(1, 'Alice', 28208, 281620391, 4986480304337356659, 28162.0391, 2.03, 28162.0391, '2023-03-20 10:18:30'),
Expand All @@ -94,6 +131,12 @@ INSERT INTO t_remote_1 VALUES
(5, 'Varchar value 5', 'Text value 5', 567, 890, 123, 56.78, 90.12, 34.56, TRUE, '2023-05-26', '12:34:56', '2023-05-26 12:34:56', '2023-05-26 12:34:56', '2 years 3 months 4 days 5 hours 6 minutes 7 seconds', '{"key": "value5"}', E'\\xDEADBABE'),
(6, 'Varchar value 6', 'Text value 6', 789, 123, 456, 67.89, 34.56, 78.91, FALSE, '2023-05-27', '23:45:01', '2023-05-27 23:45:01', '2023-05-27 23:45:01', '2 years 3 months 4 days 5 hours 6 minutes 7 seconds', '{"key": "value6"}', E'\\xDEADBABE');

statement ok
INSERT INTO rw_types (id, varchar_column, text_column, integer_column, smallint_column, bigint_column, decimal_column, real_column, double_column, boolean_column, date_column, time_column, timestamp_column, interval_column, jsonb_column, array_column, array_column2) VALUES
(1, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['Value 1', 'Value 2'], '{12.345,56.789}'),
(2, 'Varcharvalue2', 'Textvalue2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '2 days', '{"key": "value2"}', ARRAY['Value 3', 'Value 4'], '{43.21,65.432}'),
(3, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['How''re you?', '"hello\ \world"'], ARRAY[12.345,56.789]);

statement ok
FLUSH;

Expand All @@ -118,6 +161,12 @@ DROP SINK s_postgres_0;
statement ok
DROP SINK s_postgres_1;

statement ok
DROP SINK s2_postgres;

statement ok
DROP SINK s2_mysql;

statement ok
DROP SINK s_mysql_0;

Expand All @@ -136,5 +185,8 @@ DROP TABLE t_remote_0;
statement ok
DROP TABLE t_remote_1;

statement ok
DROP TABLE rw_types;

statement ok
FLUSH;
20 changes: 20 additions & 0 deletions e2e_test/sink/remote/mysql_create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,23 @@ CREATE TABLE t_remote_1 (
v_jsonb JSON,
v_bytea BLOB
);

CREATE TABLE t_types (
id BIGINT PRIMARY KEY,
varchar_column VARCHAR(100),
text_column TEXT,
integer_column INTEGER,
smallint_column SMALLINT,
bigint_column BIGINT,
decimal_column DECIMAL(10,2),
real_column float,
double_column DOUBLE,
boolean_column TINYINT,
date_column DATE,
time_column TIME,
timestamp_column TIMESTAMP,
interval_column VARCHAR(100),
jsonb_column JSON,
array_column LONGTEXT,
array_column2 LONGTEXT
);
3 changes: 3 additions & 0 deletions e2e_test/sink/remote/mysql_expected_result_2.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} Value 1,Value 2 12.345,56.789
2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 0 2023-05-23 23:45:01 2023-05-23 23:45:01 P0Y0M2DT0H0M0S {"key": "value2"} Value 3,Value 4 43.21,65.432
3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} How're you?,"hello\ \world" 12.345,56.789
21 changes: 21 additions & 0 deletions e2e_test/sink/remote/pg_create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,24 @@ CREATE TABLE t_remote_1 (
v_jsonb JSONB,
v_bytea BYTEA
);


CREATE TABLE t_types (
id BIGINT PRIMARY KEY,
varchar_column VARCHAR(100),
text_column TEXT,
integer_column INTEGER,
smallint_column SMALLINT,
bigint_column BIGINT,
decimal_column DECIMAL,
real_column REAL,
double_column DOUBLE PRECISION,
boolean_column BOOLEAN,
date_column DATE,
time_column TIME,
timestamp_column TIMESTAMP,
interval_column INTERVAL,
jsonb_column JSONB,
array_column VARCHAR[],
array_column2 DECIMAL[]
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector.api;

import com.risingwave.proto.Data;

public class ColumnDesc {
String name;
Data.DataType dataType;

public ColumnDesc(String name, Data.DataType dataType) {
this.name = name;
this.dataType = dataType;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Data.DataType getDataType() {
return dataType;
}

public void setDataType(Data.DataType dataType) {
this.dataType = dataType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

package com.risingwave.connector.api;

import com.google.common.collect.Lists;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.Data;
import com.risingwave.proto.Data.DataType.TypeName;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -27,18 +28,21 @@ public class TableSchema {
private final List<String> columnNames;
private final Map<String, TypeName> columns;
private final Map<String, Integer> columnIndices;
private List<ColumnDesc> columnDescs;

private final List<String> primaryKeys;

public TableSchema(
List<String> columnNames, List<TypeName> typeNames, List<String> primaryKeys) {
List<String> columnNames, List<Data.DataType> dataTypes, List<String> primaryKeys) {
this.columnNames = columnNames;
this.primaryKeys = primaryKeys;
this.columns = new HashMap<>();
this.columnIndices = new HashMap<>();
this.columnDescs = new ArrayList<>();
for (int i = 0; i < columnNames.size(); i++) {
columns.put(columnNames.get(i), typeNames.get(i));
columns.put(columnNames.get(i), dataTypes.get(i).getTypeName());
columnIndices.put(columnNames.get(i), i);
columnDescs.add(new ColumnDesc(columnNames.get(i), dataTypes.get(i)));
}
}

Expand All @@ -54,6 +58,10 @@ public TypeName getColumnType(String columnName) {
return columns.get(columnName);
}

public ColumnDesc getColumnDesc(int index) {
return columnDescs.get(index);
}

public Map<String, TypeName> getColumnTypes() {
return new HashMap<>(columns);
}
Expand All @@ -62,27 +70,8 @@ public String[] getColumnNames() {
return columnNames.toArray(new String[0]);
}

public static TableSchema getMockTableSchema() {
return new TableSchema(
Lists.newArrayList("id", "name"),
Lists.newArrayList(TypeName.INT32, TypeName.VARCHAR),
Lists.newArrayList("id"));
}

public static ConnectorServiceProto.TableSchema getMockTableProto() {
return ConnectorServiceProto.TableSchema.newBuilder()
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("id")
.setDataType(TypeName.INT32)
.build())
.addColumns(
ConnectorServiceProto.TableSchema.Column.newBuilder()
.setName("name")
.setDataType(TypeName.VARCHAR)
.build())
.addAllPkIndices(List.of(1))
.build();
public List<ColumnDesc> getColumnDescs() {
return columnDescs;
}

public Object getFromRow(String columnName, SinkRow row) {
Expand Down
5 changes: 3 additions & 2 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
import grpc
import connector_service_pb2_grpc
import connector_service_pb2
import data_pb2
import psycopg2


def make_mock_schema():
# todo
schema = connector_service_pb2.TableSchema(
columns=[
connector_service_pb2.TableSchema.Column(name="id", data_type=2),
connector_service_pb2.TableSchema.Column(name="name", data_type=7)
connector_service_pb2.TableSchema.Column(name="id", data_type=data_pb2.DataType(type_name=2)),
connector_service_pb2.TableSchema.Column(name="name", data_type=data_pb2.DataType(type_name=7))
],
pk_indices=[0]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,13 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj
}
byte[] bytes = Base64.getDecoder().decode((String) value);
return new ByteArrayInputStream(bytes);
case LIST:
if (!(value instanceof java.util.ArrayList<?>)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected list, got " + value.getClass())
.asRuntimeException();
}
return value;
default:
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unsupported type " + typeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public void onNext(ConnectorServiceProto.SinkStreamRequest sinkTask) {
}
sinkId = sinkTask.getStart().getSinkId();
bindSink(sinkTask.getStart().getSinkConfig(), sinkTask.getStart().getFormat());
LOG.debug("Sink initialized");
responseObserver.onNext(
ConnectorServiceProto.SinkResponse.newBuilder()
.setStart(StartResponse.newBuilder().build())
Expand Down
Loading

0 comments on commit 3d89b39

Please sign in to comment.