From 165a96992333431f9b46fd4c5679c7439be7478a Mon Sep 17 00:00:00 2001 From: JennyChen Date: Thu, 12 Sep 2024 19:25:46 +0800 Subject: [PATCH] [ISSUE-154] Support Array[Tuple] clickhouse type (#155) * [ISSUE-154] Support Array[Tuple] clickhouse type * [ISSUE-154] Support Array[Tuple] clickhouse type * [ISSUE-154] Support Array[Tuple] clickhouse type for clickhousecatalog --------- Co-authored-by: jennychen --- .../converter/ClickHouseConverterUtils.java | 22 ++++++++++++++++++- .../clickhouse/util/DataTypeUtil.java | 8 +++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java index 0c79c00..7219c2e 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java @@ -21,13 +21,16 @@ import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; import com.clickhouse.data.value.UnsignedByte; import com.clickhouse.data.value.UnsignedInteger; @@ -46,7 +49,9 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.OffsetDateTime; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -119,9 +124,18 @@ public static Object toExternal(Object value, LogicalType type) { toExternal(valueGetter.getElementOrNull(valueArrayData, i), valueType)); } return objectMap; - case MULTISET: case ROW: + List result = new ArrayList<>(); + for (int i = 0; i < ((RowData) value).getArity(); i++) { + result.add( + toExternal( + RowData.createFieldGetter(((RowType) type).getTypeAt(i), i) + .getFieldOrNull((RowData) value), + ((RowType) type).getTypeAt(i))); + } + return result; case RAW: + case MULTISET: default: throw new UnsupportedOperationException("Unsupported type:" + type); } @@ -209,6 +223,12 @@ public static Object toInternal(Object value, LogicalType type) throws SQLExcept } return new GenericMapData(internalMap); case ROW: + List row = (List) value; + GenericRowData rowData = new GenericRowData(row.size()); + for (int i = 0; i < row.size(); i++) { + rowData.setField(i, toInternal(row.get(i), type.getChildren().get(i))); + } + return rowData; case MULTISET: case RAW: default: diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java index 57dc58d..1ba24b7 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java @@ -17,6 +17,7 @@ package org.apache.flink.connector.clickhouse.util; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.types.DataType; @@ -25,6 +26,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; @@ -110,6 +112,12 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) { toFlinkType(clickHouseColumnInfo.getKeyInfo()), toFlinkType(clickHouseColumnInfo.getValueInfo())); case Tuple: + return DataTypes.ROW( + clickHouseColumnInfo.getNestedColumns().stream() + .map((col) -> new Tuple2<>(col, toFlinkType(col))) + .map(tuple -> DataTypes.FIELD(tuple.f0.getColumnName(), tuple.f1)) + .collect(Collectors.toList())); + case Nested: case AggregateFunction: default: