diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 11ba2820f965f..8ca8e5a1768f4 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -28,6 +28,10 @@ download_java_binding "$profile" # TODO: Switch to stream_chunk encoding once it's completed, and then remove json encoding as well as this env var. export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk +# Change process number limit +echo "--- os limits" +ulimit -a + echo "--- Download connector node package" buildkite-agent artifact download risingwave-connector.tar.gz ./ mkdir ./connector-node @@ -110,6 +114,15 @@ else exit 1 fi +diff -u ./e2e_test/sink/remote/mysql_expected_result_2.tsv \ +<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_types ORDER BY id") +if [ $? -eq 0 ]; then + echo "mysql sink check 0 passed" +else + echo "The output is not as expected." + exit 1 +fi + echo "--- testing kafka sink" ./ci/scripts/e2e-kafka-sink-test.sh if [ $? -eq 0 ]; then diff --git a/e2e_test/sink/remote/jdbc.check.pg.slt b/e2e_test/sink/remote/jdbc.check.pg.slt index f65d965eb2f14..bfee9fa70f930 100644 --- a/e2e_test/sink/remote/jdbc.check.pg.slt +++ b/e2e_test/sink/remote/jdbc.check.pg.slt @@ -12,8 +12,16 @@ select * from t_remote_0 order by id; query II select * from t_remote_1 order by id; ---- -1 Alex Text value 1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 2023-05-22 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value"} \xdeadbeef +1 Alex Text value 1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 2023-05-22 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value"} \xdeadbeef 3 Varchar value 3 Text value 3 345 678 901 34.56 78.9 12.34 t 2023-05-24 12:34:56 2023-05-24 12:34:56 2023-05-24 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value3"} \xcafebabe 4 Varchar value 4 Text value 4 456 789 12 45.67 89.01 23.45 f 2023-05-25 23:45:01 2023-05-25 23:45:01 2023-05-25 23:45:01+00 2 years 3 mons 4 days 05:06:07 {"key": "value4"} \xbabec0de 5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 t 2023-05-26 12:34:56 2023-05-26 12:34:56 2023-05-26 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value5"} \xdeadbabe 6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 f 2023-05-27 23:45:01 2023-05-27 23:45:01 2023-05-27 23:45:01+00 2 years 3 mons 4 days 05:06:07 {"key": "value6"} \xdeadbabe + + +query III +select * from t_types order by id; +---- +1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"Value 1","Value 2"} {12.345,56.789} +2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} {"Value 3","Value 4"} {43.21,65.432} +3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"How're you?","\"hello\\ \\world\""} {12.345,56.789} \ No newline at end of file diff --git a/e2e_test/sink/remote/jdbc.load.slt b/e2e_test/sink/remote/jdbc.load.slt index 5ed1468b1c685..df17e5aec2732 100644 --- a/e2e_test/sink/remote/jdbc.load.slt +++ b/e2e_test/sink/remote/jdbc.load.slt @@ -32,6 +32,27 @@ CREATE TABLE t_remote_1 ( v_bytea BYTEA ); +statement ok +CREATE TABLE rw_types ( + id BIGINT PRIMARY KEY, + varchar_column VARCHAR, + text_column TEXT, + integer_column INTEGER, + smallint_column SMALLINT, + bigint_column BIGINT, + decimal_column DECIMAL, + real_column REAL, + double_column DOUBLE PRECISION, + boolean_column BOOLEAN, + date_column DATE, + time_column TIME, + timestamp_column TIMESTAMP, + interval_column INTERVAL, + jsonb_column JSONB, + array_column VARCHAR[], + array_column2 FLOAT[] +); + statement ok create materialized view mv_remote_0 as select * from t_remote_0; @@ -70,6 +91,22 @@ CREATE SINK s_mysql_1 FROM mv_remote_1 WITH ( type='upsert' ); +statement ok +CREATE SINK s2_postgres FROM rw_types WITH ( + connector='jdbc', + jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', + table.name='t_types', + type='upsert' +); + +statement ok +CREATE SINK s2_mysql FROM rw_types WITH ( + connector='jdbc', + jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw', + table.name='t_types', + type='upsert' +); + statement ok INSERT INTO t_remote_0 VALUES (1, 'Alice', 28208, 281620391, 4986480304337356659, 28162.0391, 2.03, 28162.0391, '2023-03-20 10:18:30'), @@ -94,6 +131,12 @@ INSERT INTO t_remote_1 VALUES (5, 'Varchar value 5', 'Text value 5', 567, 890, 123, 56.78, 90.12, 34.56, TRUE, '2023-05-26', '12:34:56', '2023-05-26 12:34:56', '2023-05-26 12:34:56', '2 years 3 months 4 days 5 hours 6 minutes 7 seconds', '{"key": "value5"}', E'\\xDEADBABE'), (6, 'Varchar value 6', 'Text value 6', 789, 123, 456, 67.89, 34.56, 78.91, FALSE, '2023-05-27', '23:45:01', '2023-05-27 23:45:01', '2023-05-27 23:45:01', '2 years 3 months 4 days 5 hours 6 minutes 7 seconds', '{"key": "value6"}', E'\\xDEADBABE'); +statement ok +INSERT INTO rw_types (id, varchar_column, text_column, integer_column, smallint_column, bigint_column, decimal_column, real_column, double_column, boolean_column, date_column, time_column, timestamp_column, interval_column, jsonb_column, array_column, array_column2) VALUES + (1, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['Value 1', 'Value 2'], '{12.345,56.789}'), + (2, 'Varcharvalue2', 'Textvalue2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '2 days', '{"key": "value2"}', ARRAY['Value 3', 'Value 4'], '{43.21,65.432}'), + (3, 'Varcharvalue1', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}', ARRAY['How''re you?', '"hello\ \world"'], ARRAY[12.345,56.789]); + statement ok FLUSH; @@ -118,6 +161,12 @@ DROP SINK s_postgres_0; statement ok DROP SINK s_postgres_1; +statement ok +DROP SINK s2_postgres; + +statement ok +DROP SINK s2_mysql; + statement ok DROP SINK s_mysql_0; @@ -136,5 +185,8 @@ DROP TABLE t_remote_0; statement ok DROP TABLE t_remote_1; +statement ok +DROP TABLE rw_types; + statement ok FLUSH; diff --git a/e2e_test/sink/remote/mysql_create_table.sql b/e2e_test/sink/remote/mysql_create_table.sql index a2e83197da2dd..0cbe15f7dcb8e 100644 --- a/e2e_test/sink/remote/mysql_create_table.sql +++ b/e2e_test/sink/remote/mysql_create_table.sql @@ -29,3 +29,23 @@ CREATE TABLE t_remote_1 ( v_jsonb JSON, v_bytea BLOB ); + +CREATE TABLE t_types ( + id BIGINT PRIMARY KEY, + varchar_column VARCHAR(100), + text_column TEXT, + integer_column INTEGER, + smallint_column SMALLINT, + bigint_column BIGINT, + decimal_column DECIMAL(10,2), + real_column float, + double_column DOUBLE, + boolean_column TINYINT, + date_column DATE, + time_column TIME, + timestamp_column TIMESTAMP, + interval_column VARCHAR(100), + jsonb_column JSON, + array_column LONGTEXT, + array_column2 LONGTEXT +); diff --git a/e2e_test/sink/remote/mysql_expected_result_2.tsv b/e2e_test/sink/remote/mysql_expected_result_2.tsv new file mode 100644 index 0000000000000..87ac3cb3bd123 --- /dev/null +++ b/e2e_test/sink/remote/mysql_expected_result_2.tsv @@ -0,0 +1,3 @@ +1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} Value 1,Value 2 12.345,56.789 +2 Varcharvalue2 Textvalue2 234 567 890 23.45 67.89 1.23 0 2023-05-23 23:45:01 2023-05-23 23:45:01 P0Y0M2DT0H0M0S {"key": "value2"} Value 3,Value 4 43.21,65.432 +3 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 P0Y0M1DT0H0M0S {"key": "value"} How're you?,"hello\ \world" 12.345,56.789 diff --git a/e2e_test/sink/remote/pg_create_table.sql b/e2e_test/sink/remote/pg_create_table.sql index 13e0d4243ce4c..ef8cae74c207e 100644 --- a/e2e_test/sink/remote/pg_create_table.sql +++ b/e2e_test/sink/remote/pg_create_table.sql @@ -29,3 +29,24 @@ CREATE TABLE t_remote_1 ( v_jsonb JSONB, v_bytea BYTEA ); + + +CREATE TABLE t_types ( + id BIGINT PRIMARY KEY, + varchar_column VARCHAR(100), + text_column TEXT, + integer_column INTEGER, + smallint_column SMALLINT, + bigint_column BIGINT, + decimal_column DECIMAL, + real_column REAL, + double_column DOUBLE PRECISION, + boolean_column BOOLEAN, + date_column DATE, + time_column TIME, + timestamp_column TIMESTAMP, + interval_column INTERVAL, + jsonb_column JSONB, + array_column VARCHAR[], + array_column2 DECIMAL[] +); \ No newline at end of file diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/ColumnDesc.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/ColumnDesc.java new file mode 100644 index 0000000000000..842459cdb55f1 --- /dev/null +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/ColumnDesc.java @@ -0,0 +1,43 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.api; + +import com.risingwave.proto.Data; + +public class ColumnDesc { + String name; + Data.DataType dataType; + + public ColumnDesc(String name, Data.DataType dataType) { + this.name = name; + this.dataType = dataType; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Data.DataType getDataType() { + return dataType; + } + + public void setDataType(Data.DataType dataType) { + this.dataType = dataType; + } +} diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java index 053ba1e329920..0e6a59fd87c35 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java @@ -14,10 +14,11 @@ package com.risingwave.connector.api; -import com.google.common.collect.Lists; import com.risingwave.connector.api.sink.SinkRow; import com.risingwave.proto.ConnectorServiceProto; +import com.risingwave.proto.Data; import com.risingwave.proto.Data.DataType.TypeName; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,18 +28,21 @@ public class TableSchema { private final List columnNames; private final Map columns; private final Map columnIndices; + private List columnDescs; private final List primaryKeys; public TableSchema( - List columnNames, List typeNames, List primaryKeys) { + List columnNames, List dataTypes, List primaryKeys) { this.columnNames = columnNames; this.primaryKeys = primaryKeys; this.columns = new HashMap<>(); this.columnIndices = new HashMap<>(); + this.columnDescs = new ArrayList<>(); for (int i = 0; i < columnNames.size(); i++) { - columns.put(columnNames.get(i), typeNames.get(i)); + columns.put(columnNames.get(i), dataTypes.get(i).getTypeName()); columnIndices.put(columnNames.get(i), i); + columnDescs.add(new ColumnDesc(columnNames.get(i), dataTypes.get(i))); } } @@ -54,6 +58,10 @@ public TypeName getColumnType(String columnName) { return columns.get(columnName); } + public ColumnDesc getColumnDesc(int index) { + return columnDescs.get(index); + } + public Map getColumnTypes() { return new HashMap<>(columns); } @@ -62,27 +70,8 @@ public String[] getColumnNames() { return columnNames.toArray(new String[0]); } - public static TableSchema getMockTableSchema() { - return new TableSchema( - Lists.newArrayList("id", "name"), - Lists.newArrayList(TypeName.INT32, TypeName.VARCHAR), - Lists.newArrayList("id")); - } - - public static ConnectorServiceProto.TableSchema getMockTableProto() { - return ConnectorServiceProto.TableSchema.newBuilder() - .addColumns( - ConnectorServiceProto.TableSchema.Column.newBuilder() - .setName("id") - .setDataType(TypeName.INT32) - .build()) - .addColumns( - ConnectorServiceProto.TableSchema.Column.newBuilder() - .setName("name") - .setDataType(TypeName.VARCHAR) - .build()) - .addAllPkIndices(List.of(1)) - .build(); + public List getColumnDescs() { + return columnDescs; } public Object getFromRow(String columnName, SinkRow row) { diff --git a/java/connector-node/python-client/integration_tests.py b/java/connector-node/python-client/integration_tests.py index 94ceb4e78933e..cc28fef9f036c 100644 --- a/java/connector-node/python-client/integration_tests.py +++ b/java/connector-node/python-client/integration_tests.py @@ -19,6 +19,7 @@ import grpc import connector_service_pb2_grpc import connector_service_pb2 +import data_pb2 import psycopg2 @@ -26,8 +27,8 @@ def make_mock_schema(): # todo schema = connector_service_pb2.TableSchema( columns=[ - connector_service_pb2.TableSchema.Column(name="id", data_type=2), - connector_service_pb2.TableSchema.Column(name="name", data_type=7) + connector_service_pb2.TableSchema.Column(name="id", data_type=data_pb2.DataType(type_name=2)), + connector_service_pb2.TableSchema.Column(name="name", data_type=data_pb2.DataType(type_name=7)) ], pk_indices=[0] ) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java index 641a5b2ea309d..588dee5536dbf 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java @@ -224,6 +224,13 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj } byte[] bytes = Base64.getDecoder().decode((String) value); return new ByteArrayInputStream(bytes); + case LIST: + if (!(value instanceof java.util.ArrayList)) { + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription("Expected list, got " + value.getClass()) + .asRuntimeException(); + } + return value; default: throw io.grpc.Status.INVALID_ARGUMENT .withDescription("unsupported type " + typeName) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java index 3047a48f53e28..9d9ee6a89200e 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java @@ -67,7 +67,6 @@ public void onNext(ConnectorServiceProto.SinkStreamRequest sinkTask) { } sinkId = sinkTask.getStart().getSinkId(); bindSink(sinkTask.getStart().getSinkConfig(), sinkTask.getStart().getFormat()); - LOG.debug("Sink initialized"); responseObserver.onNext( ConnectorServiceProto.SinkResponse.newBuilder() .setStart(StartResponse.newBuilder().build()) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java index 701e8dc0f3160..300a0c38d4bf5 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java @@ -42,8 +42,8 @@ static ValueGetter[] buildValueGetter(TableSchema tableSchema) { ValueGetter[] ret = new ValueGetter[colNames.length]; for (int i = 0; i < colNames.length; i++) { int index = i; - Data.DataType.TypeName typeName = tableSchema.getColumnType(colNames[i]); - switch (typeName) { + var columnDesc = tableSchema.getColumnDesc(index); + switch (columnDesc.getDataType().getTypeName()) { case INT16: ret[i] = row -> { @@ -174,9 +174,60 @@ static ValueGetter[] buildValueGetter(TableSchema tableSchema) { return row.getBytea(index); }; break; + case LIST: + var fieldType = columnDesc.getDataType().getFieldType(0); + switch (fieldType.getTypeName()) { + case INT16: + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + case VARCHAR: + break; + default: + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription( + "stream_chunk: unsupported array with field type " + + fieldType.getTypeName()) + .asRuntimeException(); + } + ret[i] = + row -> { + if (row.isNull(index)) { + return null; + } + Object[] objArray = null; + switch (fieldType.getTypeName()) { + case INT16: + objArray = row.getArray(index, Short.class); + break; + case INT32: + objArray = row.getArray(index, Integer.class); + break; + case INT64: + objArray = row.getArray(index, Long.class); + break; + case FLOAT: + objArray = row.getArray(index, Float.class); + break; + case DOUBLE: + objArray = row.getArray(index, Double.class); + break; + case VARCHAR: + objArray = row.getArray(index, String.class); + break; + default: + break; + } + + return objArray; + }; + break; default: throw io.grpc.Status.INVALID_ARGUMENT - .withDescription("unsupported type " + typeName) + .withDescription( + "stream_chunk: unsupported data type " + + columnDesc.getDataType().getTypeName()) .asRuntimeException(); } } diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/TestUtils.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/TestUtils.java new file mode 100644 index 0000000000000..e3e906d68fd79 --- /dev/null +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/TestUtils.java @@ -0,0 +1,59 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector; + +import com.google.common.collect.Lists; +import com.risingwave.connector.api.TableSchema; +import com.risingwave.proto.ConnectorServiceProto; +import com.risingwave.proto.Data; +import java.util.List; + +public class TestUtils { + + public static TableSchema getMockTableSchema() { + return new TableSchema( + Lists.newArrayList("id", "name"), + Lists.newArrayList( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT32) + .build(), + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.VARCHAR) + .build()), + Lists.newArrayList("id")); + } + + public static ConnectorServiceProto.TableSchema getMockTableProto() { + return ConnectorServiceProto.TableSchema.newBuilder() + .addColumns( + ConnectorServiceProto.TableSchema.Column.newBuilder() + .setName("id") + .setDataType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT32) + .build()) + .build()) + .addColumns( + ConnectorServiceProto.TableSchema.Column.newBuilder() + .setName("name") + .setDataType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.VARCHAR) + .build()) + .build()) + .addAllPkIndices(List.of(1)) + .build(); + } +} diff --git a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/DeserializerTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/DeserializerTest.java similarity index 88% rename from java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/DeserializerTest.java rename to java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/DeserializerTest.java index 2ee86ff95ad76..25c969bc393f6 100644 --- a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/DeserializerTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/DeserializerTest.java @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package com.risingwave.connector; +package com.risingwave.connector.sink; -import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.JsonDeserializer; +import com.risingwave.connector.TestUtils; import com.risingwave.connector.api.sink.SinkRow; import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload; @@ -23,7 +24,7 @@ public class DeserializerTest extends TestCase { public void testJsonDeserializer() { - JsonDeserializer deserializer = new JsonDeserializer(TableSchema.getMockTableSchema()); + JsonDeserializer deserializer = new JsonDeserializer(TestUtils.getMockTableSchema()); JsonPayload jsonPayload = JsonPayload.newBuilder() .addRowOps( diff --git a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/FileSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/FileSinkTest.java similarity index 91% rename from java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/FileSinkTest.java rename to java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/FileSinkTest.java index 0dacfe758a539..e68821f4db145 100644 --- a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/FileSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/FileSinkTest.java @@ -12,13 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -package com.risingwave.connector; +package com.risingwave.connector.sink; import static com.risingwave.proto.Data.*; import static org.junit.Assert.*; import com.google.common.collect.Iterators; -import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.FileSink; +import com.risingwave.connector.FileSinkConfig; +import com.risingwave.connector.TestUtils; import com.risingwave.connector.api.sink.ArraySinkRow; import java.io.IOException; import java.nio.file.Files; @@ -36,7 +38,7 @@ public void testSync() throws IOException { } FileSinkConfig config = new FileSinkConfig(path); - FileSink sink = new FileSink(config, TableSchema.getMockTableSchema()); + FileSink sink = new FileSink(config, TestUtils.getMockTableSchema()); String filePath = sink.getSinkPath(); Path file = Paths.get(filePath); @@ -78,7 +80,7 @@ public void testWrite() throws IOException { Files.createDirectories(Paths.get(path)); } FileSinkConfig config = new FileSinkConfig(path); - FileSink sink = new FileSink(config, TableSchema.getMockTableSchema()); + FileSink sink = new FileSink(config, TestUtils.getMockTableSchema()); String filePath = sink.getSinkPath(); try { @@ -107,7 +109,7 @@ public void testDrop() throws IOException { Files.createDirectories(Paths.get(path)); } FileSinkConfig config = new FileSinkConfig(path); - FileSink sink = new FileSink(config, TableSchema.getMockTableSchema()); + FileSink sink = new FileSink(config, TestUtils.getMockTableSchema()); sink.drop(); diff --git a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/SinkStreamObserverTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/SinkStreamObserverTest.java similarity index 98% rename from java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/SinkStreamObserverTest.java rename to java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/SinkStreamObserverTest.java index 739c906b3f1da..d8e85407c995f 100644 --- a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/SinkStreamObserverTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/SinkStreamObserverTest.java @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package com.risingwave.connector; +package com.risingwave.connector.sink; -import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.SinkStreamObserver; +import com.risingwave.connector.TestUtils; import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.ConnectorServiceProto.SinkConfig; import com.risingwave.proto.Data.Op; @@ -27,7 +28,7 @@ public class SinkStreamObserverTest { public SinkConfig fileSinkConfig = SinkConfig.newBuilder() - .setTableSchema(TableSchema.getMockTableProto()) + .setTableSchema(TestUtils.getMockTableProto()) .setConnectorType("file") .putAllProperties(Map.of("output.path", "/tmp/rw-connector")) .build(); diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/deltalake/DeltaLakeLocalSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/deltalake/DeltaLakeLocalSinkTest.java index 1fe85d1b0ed51..9cedb8a308ac9 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/deltalake/DeltaLakeLocalSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/deltalake/DeltaLakeLocalSinkTest.java @@ -19,7 +19,7 @@ import com.google.common.collect.Iterators; import com.risingwave.connector.DeltaLakeSink; -import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.TestUtils; import com.risingwave.connector.api.sink.ArraySinkRow; import io.delta.standalone.DeltaLog; import java.io.IOException; @@ -43,7 +43,7 @@ private static DeltaLakeSink createMockSink(String location) { DeltaLakeSinkFactoryTest.createMockTable(location); Configuration conf = new Configuration(); DeltaLog log = DeltaLog.forTable(conf, location); - return new DeltaLakeSink(TableSchema.getMockTableSchema(), conf, log); + return new DeltaLakeSink(TestUtils.getMockTableSchema(), conf, log); } private void validateTableWithSpark(String location, List rows, StructType schema) { diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/deltalake/DeltaLakeSinkFactoryTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/deltalake/DeltaLakeSinkFactoryTest.java index f90edeedab96f..b7f6c0616cd0f 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/deltalake/DeltaLakeSinkFactoryTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/deltalake/DeltaLakeSinkFactoryTest.java @@ -15,7 +15,7 @@ package com.risingwave.connector.sink.deltalake; import com.risingwave.connector.DeltaLakeSinkFactory; -import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.TestUtils; import io.delta.standalone.DeltaLog; import io.delta.standalone.Operation; import io.delta.standalone.OptimisticTransaction; @@ -49,7 +49,7 @@ public static void createMockTable(String location) { Configuration conf = new Configuration(); DeltaLog log = DeltaLog.forTable(conf, location); - // should be synchronized with `TableSchema.getMockTableSchema()`; + // should be synchronized with `TestUtils.getMockTableSchema()`; StructType schema = new StructType( new StructField[] { @@ -73,7 +73,7 @@ public void testCreate() throws IOException { createMockTable(location); DeltaLakeSinkFactory sinkFactory = new DeltaLakeSinkFactory(); sinkFactory.create( - TableSchema.getMockTableSchema(), + TestUtils.getMockTableSchema(), new HashMap<>() { { put("location", String.format("file://%s", location)); diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkFactoryTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkFactoryTest.java index c7b36f0607065..9ec677af4a598 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkFactoryTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkFactoryTest.java @@ -18,7 +18,7 @@ import com.risingwave.connector.IcebergSink; import com.risingwave.connector.IcebergSinkFactory; -import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.TestUtils; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -64,7 +64,7 @@ public void testCreate() throws IOException { IcebergSink sink = (IcebergSink) sinkFactory.create( - TableSchema.getMockTableSchema(), + TestUtils.getMockTableSchema(), Map.of( "type", sinkMode, diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkLocalTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkLocalTest.java index 0e34426c539bb..0d84960a529d3 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkLocalTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkLocalTest.java @@ -21,7 +21,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import com.risingwave.connector.IcebergSink; -import com.risingwave.connector.api.TableSchema; +import com.risingwave.connector.TestUtils; import com.risingwave.connector.api.sink.ArraySinkRow; import java.io.IOException; import java.nio.file.Files; @@ -110,7 +110,7 @@ public void testSync() throws IOException { TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); IcebergSink sink = new IcebergSink( - TableSchema.getMockTableSchema(), + TestUtils.getMockTableSchema(), hadoopCatalog, hadoopCatalog.loadTable(tableIdentifier), FileFormat.PARQUET); @@ -153,7 +153,7 @@ public void testWrite() throws IOException { TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); IcebergSink sink = new IcebergSink( - TableSchema.getMockTableSchema(), + TestUtils.getMockTableSchema(), hadoopCatalog, hadoopCatalog.loadTable(tableIdentifier), FileFormat.PARQUET); @@ -189,7 +189,7 @@ public void testDrop() throws IOException { TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); IcebergSink sink = new IcebergSink( - TableSchema.getMockTableSchema(), + TestUtils.getMockTableSchema(), hadoopCatalog, hadoopCatalog.loadTable(tableIdentifier), FileFormat.PARQUET); diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkPartitionTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkPartitionTest.java index ade608779699c..5d954104d2531 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkPartitionTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/IcebergSinkPartitionTest.java @@ -24,7 +24,6 @@ import com.risingwave.connector.IcebergSink; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.ArraySinkRow; -import com.risingwave.proto.Data; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -62,9 +61,9 @@ public class IcebergSinkPartitionTest { new TableSchema( Lists.newArrayList("id", "name", "part"), Lists.newArrayList( - Data.DataType.TypeName.INT32, - Data.DataType.TypeName.VARCHAR, - Data.DataType.TypeName.VARCHAR), + DataType.newBuilder().setTypeName(DataType.TypeName.INT32).build(), + DataType.newBuilder().setTypeName(DataType.TypeName.VARCHAR).build(), + DataType.newBuilder().setTypeName(DataType.TypeName.VARCHAR).build()), Lists.newArrayList("id")); private void createMockTable() throws IOException { diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkLocalTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkLocalTest.java index 3ee18f546c4fa..cbbbcc35a2b64 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkLocalTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkLocalTest.java @@ -20,7 +20,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Sets; -import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.ArraySinkRow; import java.io.IOException; import java.nio.file.Files; @@ -109,7 +108,7 @@ public void testSync() throws IOException { TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); UpsertIcebergSink sink = new UpsertIcebergSink( - TableSchema.getMockTableSchema(), + TestUtils.getMockTableSchema(), hadoopCatalog, hadoopCatalog.loadTable(tableIdentifier), FileFormat.PARQUET); @@ -152,7 +151,7 @@ public void testWrite() throws IOException { TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); UpsertIcebergSink sink = new UpsertIcebergSink( - TableSchema.getMockTableSchema(), + TestUtils.getMockTableSchema(), hadoopCatalog, hadoopCatalog.loadTable(tableIdentifier), FileFormat.PARQUET); @@ -198,7 +197,7 @@ public void testDrop() throws IOException { TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); UpsertIcebergSink sink = new UpsertIcebergSink( - TableSchema.getMockTableSchema(), + TestUtils.getMockTableSchema(), hadoopCatalog, hadoopCatalog.loadTable(tableIdentifier), FileFormat.PARQUET); diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkPartitionTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkPartitionTest.java index 1be8a1bbaa16d..4e53d8107031f 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkPartitionTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/iceberg/UpsertIcebergSinkPartitionTest.java @@ -24,7 +24,6 @@ import com.risingwave.connector.UpsertIcebergSink; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.ArraySinkRow; -import com.risingwave.proto.Data; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -57,9 +56,9 @@ public class UpsertIcebergSinkPartitionTest { new TableSchema( Lists.newArrayList("id", "name", "part"), Lists.newArrayList( - Data.DataType.TypeName.INT32, - Data.DataType.TypeName.VARCHAR, - Data.DataType.TypeName.VARCHAR), + DataType.newBuilder().setTypeName(DataType.TypeName.INT32).build(), + DataType.newBuilder().setTypeName(DataType.TypeName.VARCHAR).build(), + DataType.newBuilder().setTypeName(DataType.TypeName.VARCHAR).build()), Lists.newArrayList("id")); private void createMockTable() throws IOException { diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java index 872be3ee53aa1..8901b977f6d9c 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java @@ -22,6 +22,7 @@ import com.risingwave.connector.JDBCSinkConfig; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.ArraySinkRow; +import com.risingwave.proto.Data; import com.risingwave.proto.Data.DataType.TypeName; import com.risingwave.proto.Data.Op; import java.io.ByteArrayInputStream; @@ -62,13 +63,13 @@ static TableSchema getTestTableSchema() { Lists.newArrayList( "id", "v_varchar", "v_date", "v_time", "v_timestamp", "v_jsonb", "v_bytea"), Lists.newArrayList( - TypeName.INT32, - TypeName.VARCHAR, - TypeName.DATE, - TypeName.TIME, - TypeName.TIMESTAMP, - TypeName.JSONB, - TypeName.BYTEA), + Data.DataType.newBuilder().setTypeName(TypeName.INT32).build(), + Data.DataType.newBuilder().setTypeName(TypeName.VARCHAR).build(), + Data.DataType.newBuilder().setTypeName(TypeName.DATE).build(), + Data.DataType.newBuilder().setTypeName(TypeName.TIME).build(), + Data.DataType.newBuilder().setTypeName(TypeName.TIMESTAMP).build(), + Data.DataType.newBuilder().setTypeName(TypeName.JSONB).build(), + Data.DataType.newBuilder().setTypeName(TypeName.BYTEA).build()), Lists.newArrayList("id")); } diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java index 02e5abeceabd9..ab48ab105b254 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java @@ -165,12 +165,18 @@ public void testPermissionCheck() throws SQLException { .addColumns( ConnectorServiceProto.TableSchema.Column.newBuilder() .setName("o_key") - .setDataType(Data.DataType.TypeName.INT64) + .setDataType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT64) + .build()) .build()) .addColumns( ConnectorServiceProto.TableSchema.Column.newBuilder() .setName("o_val") - .setDataType(Data.DataType.TypeName.INT32) + .setDataType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT32) + .build()) .build()) .addPkIndices(0) .build(); diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java index ffd07ffc471b2..71ebf0c61b097 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java @@ -180,12 +180,18 @@ public void testPermissionCheck() throws SQLException { .addColumns( ConnectorServiceProto.TableSchema.Column.newBuilder() .setName("o_key") - .setDataType(Data.DataType.TypeName.INT64) + .setDataType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT64) + .build()) .build()) .addColumns( ConnectorServiceProto.TableSchema.Column.newBuilder() .setName("o_val") - .setDataType(Data.DataType.TypeName.INT32) + .setDataType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT32) + .build()) .build()) .addPkIndices(0) .build(); diff --git a/java/connector-node/risingwave-sink-jdbc/pom.xml b/java/connector-node/risingwave-sink-jdbc/pom.xml index b61ca752a8951..2284d81b1aa02 100644 --- a/java/connector-node/risingwave-sink-jdbc/pom.xml +++ b/java/connector-node/risingwave-sink-jdbc/pom.xml @@ -34,7 +34,10 @@ org.apache.logging.log4j log4j-core - + + org.apache.commons + commons-text + com.fasterxml.jackson.core jackson-databind diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 9a74935564872..1207c72e99617 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.commons.lang3.StringUtils; import org.postgresql.util.PGInterval; import org.postgresql.util.PGobject; import org.slf4j.Logger; @@ -93,7 +94,7 @@ private static List getPkColumnNames(Connection conn, String tableName) String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage())) .asRuntimeException(); } - LOG.info("detected pk {}", pkColumnNames); + LOG.info("detected pk column {}", pkColumnNames); return pkColumnNames; } @@ -109,6 +110,7 @@ private PreparedStatement prepareStatement(SinkRow row) { String insertStmt = String.format( INSERT_TEMPLATE, config.getTableName(), columnsRepr, valuesRepr); + try { return generatePreparedStatement(insertStmt, row, null); } catch (SQLException e) { @@ -219,10 +221,14 @@ private PreparedStatement prepareStatement(SinkRow row) { private PreparedStatement generatePreparedStatement( String inputStmt, SinkRow row, Object[] updateDeleteValueBuffer) throws SQLException { PreparedStatement stmt = conn.prepareStatement(inputStmt, Statement.RETURN_GENERATED_KEYS); - var columnNames = getTableSchema().getColumnNames(); + var columnDescs = getTableSchema().getColumnDescs(); int placeholderIdx = 1; for (int i = 0; i < row.size(); i++) { - switch (getTableSchema().getColumnType(columnNames[i])) { + var column = columnDescs.get(i); + switch (column.getDataType().getTypeName()) { + case DECIMAL: + stmt.setBigDecimal(placeholderIdx++, (java.math.BigDecimal) row.get(i)); + break; case INTERVAL: if (targetDbType == DatabaseType.POSTGRES) { stmt.setObject(placeholderIdx++, new PGInterval((String) row.get(i))); @@ -244,6 +250,28 @@ private PreparedStatement generatePreparedStatement( case BYTEA: stmt.setBinaryStream(placeholderIdx++, (InputStream) row.get(i)); break; + case LIST: + var val = row.get(i); + // JSON payload returns a List, but JDBC expects an array + if (val instanceof java.util.List) { + val = ((java.util.List) val).toArray(); + } + assert (val instanceof Object[]); + Object[] objArray = (Object[]) val; + if (targetDbType == DatabaseType.POSTGRES) { + var fieldType = column.getDataType().getFieldType(0); + var sqlType = + JdbcUtils.getDbSqlType( + targetDbType, fieldType.getTypeName(), fieldType); + stmt.setArray(i + 1, conn.createArrayOf(sqlType, objArray)); + } else { + // convert Array type to a string for other database + // reference: + // https://dev.mysql.com/doc/workbench/en/wb-migration-database-postgresql-typemapping.html + var arrayString = StringUtils.join(objArray, ","); + stmt.setString(placeholderIdx++, arrayString); + } + break; default: stmt.setObject(placeholderIdx++, row.get(i)); break; @@ -266,7 +294,7 @@ public void write(Iterator rows) { continue; } if (stmt != null) { - try { + try (stmt) { LOG.debug("Executing statement: {}", stmt); stmt.executeUpdate(); } catch (SQLException e) { diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java index 885549e5b07c2..5562c35b973fd 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java @@ -72,6 +72,7 @@ public void validate( jdbcPk.add(pkResultSet.getString("COLUMN_NAME")); } } catch (SQLException e) { + LOG.error("failed to connect to target database. jdbcUrl: {}", jdbcUrl, e); throw Status.INVALID_ARGUMENT .withDescription("failed to connect to target database: " + e.getSQLState()) .asRuntimeException(); diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JdbcUtils.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JdbcUtils.java new file mode 100644 index 0000000000000..3587d08045d58 --- /dev/null +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JdbcUtils.java @@ -0,0 +1,84 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector; + +import com.risingwave.proto.Data; + +public abstract class JdbcUtils { + + public static String getDbSqlType( + DatabaseType db, Data.DataType.TypeName typeName, Object value) { + String sqlType; + switch (db) { + case MYSQL: + sqlType = getMySqlSqlType(typeName, value); + break; + case POSTGRES: + sqlType = getPostgresSqlType(typeName, value); + break; + default: + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription("Unsupported database type: " + db) + .asRuntimeException(); + } + return sqlType; + } + + // https://dev.mysql.com/doc/workbench/en/wb-migration-database-postgresql-typemapping.html + private static String getMySqlSqlType(Data.DataType.TypeName typeName, Object value) { + // convert to mysql data type name from postgres data type + switch (typeName) { + case INT32: + return "INT"; + case INT64: + return "BIGINT"; + case FLOAT: + return "FLOAT"; + case DOUBLE: + return "DOUBLE"; + case BOOLEAN: + return "TINYINT"; + case VARCHAR: + if (!(value instanceof String)) { + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription("Expected string, got " + value.getClass()) + .asRuntimeException(); + } + String string = (String) value; + if (string.length() > 65535) { + return "LONGTEXT"; + } else { + return "VARCHAR"; + } + case BYTEA: + return "LONGBLOB"; + case DATE: + return "DATE"; + case TIME: + case INTERVAL: + return "TIME"; + case TIMESTAMP: + return "TIMESTAMP"; + case DECIMAL: + return "DECIMAL"; + default: + return null; + } + } + + private static String getPostgresSqlType(Data.DataType.TypeName typeName, Object value) { + return typeName.name(); + } +} diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java index 154c038dd9fed..349fc452d0316 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java @@ -87,6 +87,18 @@ public ByteArrayInputStream getBytea(int index) { return Binding.rowGetByteaValue(pointer, index); } + /** + * Only supports one-dimensional array right now + * + * @return an Object[] which will be used in java.sql.Connection#createArrayOf(String typeName, + * Object[] elements) + */ + public Object[] getArray(int index, Class clazz) { + var val = Binding.rowGetArrayValue(pointer, index, clazz); + assert (val instanceof Object[]); + return (Object[]) val; + } + @Override public void close() { if (!isClosed) { diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index d3bfe8359a432..1d7ec11be8254 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -69,6 +69,9 @@ public class Binding { static native ByteArrayInputStream rowGetByteaValue(long pointer, int index); + // TODO: object or object array? + static native Object rowGetArrayValue(long pointer, int index, Class clazz); + // Since the underlying rust does not have garbage collection, we will have to manually call // close on the row to release the row instance pointed by the pointer. static native void rowClose(long pointer); diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 6eeced329b8f6..ddaf3920871c7 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -11,7 +11,7 @@ option java_package = "com.risingwave.proto"; message TableSchema { message Column { string name = 1; - data.DataType.TypeName data_type = 2; + data.DataType data_type = 2; } repeated Column columns = 1; repeated uint32 pk_indices = 2; diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index bafa9878b6614..e91fccac23a19 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -139,7 +139,7 @@ impl RemoteSink { .iter() .map(|c| Column { name: c.name.clone(), - data_type: c.data_type().to_protobuf().type_name, + data_type: Some(c.data_type().to_protobuf()), }) .collect(), pk_indices: pk_indices.iter().map(|i| *i as u32).collect(), @@ -210,14 +210,15 @@ impl RemoteSink { | DataType::Interval | DataType::Jsonb | DataType::Bytea + | DataType::List(_) ) { Ok( Column { name: column.column_desc.name.clone(), - data_type: column.column_desc.data_type.to_protobuf().type_name, + data_type: Some(column.column_desc.data_type.to_protobuf()), }) } else { Err(SinkError::Remote(format!( - "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea and Varchar, got {:?}: {:?}", + "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, List, Bytea and Varchar, got {:?}: {:?}", column.column_desc.name, column.column_desc.data_type ))) diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index 4b9fc59f188c1..4c9e425271778 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -32,7 +32,9 @@ use jni::objects::{ AutoArray, GlobalRef, JClass, JMethodID, JObject, JStaticMethodID, JString, JValue, ReleaseMode, }; use jni::signature::ReturnType; -use jni::sys::{jboolean, jbyte, jbyteArray, jdouble, jfloat, jint, jlong, jshort, jvalue}; +use jni::sys::{ + jboolean, jbyte, jbyteArray, jdouble, jfloat, jint, jlong, jobject, jshort, jsize, jvalue, +}; use jni::JNIEnv; use once_cell::sync::OnceCell; use prost::{DecodeError, Message}; @@ -712,6 +714,97 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetByteaValue }) } +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetArrayValue<'a>( + env: EnvParam<'a>, + pointer: Pointer<'a, JavaBindingRow>, + idx: jint, + class: JClass<'a>, +) -> JObject<'a> { + execute_and_catch(env, move || { + let elems = pointer + .as_ref() + .datum_at(idx as usize) + .unwrap() + .into_list() + .iter(); + + // convert the Rust elements to a Java object array (Object[]) + let jarray = env.new_object_array(elems.len() as jsize, class, JObject::null())?; + + for (i, ele) in elems.enumerate() { + let index = i as jsize; + match ele { + None => env.set_object_array_element(jarray, i as jsize, JObject::null())?, + Some(val) => match val { + ScalarRefImpl::Int16(v) => { + let obj = env.call_static_method( + class, + "valueOf", + "(S)Ljava.lang.Short;", + &[JValue::from(v as jshort)], + )?; + if let JValue::Object(o) = obj { + env.set_object_array_element(jarray, index, o)? + } + } + ScalarRefImpl::Int32(v) => { + let obj = env.call_static_method( + class, + "valueOf", + "(I)Ljava.lang.Integer;", + &[JValue::from(v as jint)], + )?; + if let JValue::Object(o) = obj { + env.set_object_array_element(jarray, index, o)? + } + } + ScalarRefImpl::Int64(v) => { + let obj = env.call_static_method( + class, + "valueOf", + "(J)Ljava.lang.Long;", + &[JValue::from(v as jlong)], + )?; + if let JValue::Object(o) = obj { + env.set_object_array_element(jarray, index, o)? + } + } + ScalarRefImpl::Float32(v) => { + let obj = env.call_static_method( + class, + "valueOf", + "(F)Ljava/lang/Float;", + &[JValue::from(v.into_inner() as jfloat)], + )?; + if let JValue::Object(o) = obj { + env.set_object_array_element(jarray, index, o)? + } + } + ScalarRefImpl::Float64(v) => { + let obj = env.call_static_method( + class, + "valueOf", + "(D)Ljava/lang/Double;", + &[JValue::from(v.into_inner() as jdouble)], + )?; + if let JValue::Object(o) = obj { + env.set_object_array_element(jarray, index, o)? + } + } + ScalarRefImpl::Utf8(v) => { + let obj = env.new_string(v)?; + env.set_object_array_element(jarray, index, obj)? + } + _ => env.set_object_array_element(jarray, index, JObject::null())?, + }, + } + } + let output = unsafe { JObject::from_raw(jarray as jobject) }; + Ok(output) + }) +} + #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowClose<'a>( _env: EnvParam<'a>, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 5a5b0d146a0a3..fcee8c3ebea80 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -180,7 +180,7 @@ impl ConnectorSourceWorker { .flat_map(|col| &col.column_desc) .map(|col| Column { name: col.name.clone(), - data_type: col.column_type.as_ref().unwrap().type_name, + data_type: col.column_type.clone(), }) .collect(), pk_indices,