Skip to content

Commit

Permalink
[#1732] feat(trino-connector): Support MySQL property for Trino (#1749)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Properties for support tables in the Trino connector for MySQL tables.
this PR contains the following changes

- Support MySQL table property 'enging', 'auto_increment_offset'
- Support MySQL column property 'auto_increment'
- Remove `ColumnDTO` in TrinoConnectorIT.

### Why are the changes needed?

It's a must-feature. 

Fix: #1732 

### Does this PR introduce _any_ user-facing change?

N/A. 

### How was this patch tested?

UTs
  • Loading branch information
yuqi1129 authored Feb 1, 2024
1 parent 3a97a4a commit fdf71b2
Show file tree
Hide file tree
Showing 24 changed files with 387 additions and 161 deletions.
1 change: 1 addition & 0 deletions catalogs/bundled-catalog/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
implementation(project(":catalogs:catalog-lakehouse-iceberg"))
implementation(project(":catalogs:catalog-jdbc-mysql"))
implementation(project(":catalogs:catalog-jdbc-postgresql"))
implementation(project(":catalogs:catalog-jdbc-common"))
implementation(libs.slf4j.api)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta;
import com.datastrato.gravitino.catalog.hive.HiveSchemaPropertiesMetadata;
import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
import com.datastrato.gravitino.catalog.jdbc.JdbcTablePropertiesMetadata;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergSchemaPropertiesMetadata;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata;
import com.datastrato.gravitino.catalog.mysql.MysqlTablePropertiesMetadata;
import java.util.HashSet;
import java.util.Set;

Expand Down Expand Up @@ -66,7 +68,8 @@ public class ClassProvider {
private static final Set<Class<?>> MYSQL_NEED_CLASS =
new HashSet<Class<?>>() {
{
// TODO
add(MysqlTablePropertiesMetadata.class);
add(JdbcTablePropertiesMetadata.class);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.catalog.hive.HiveClientPool;
import com.datastrato.gravitino.client.GravitinoMetaLake;
import com.datastrato.gravitino.dto.rel.ColumnDTO;
import com.datastrato.gravitino.dto.rel.DistributionDTO;
import com.datastrato.gravitino.dto.rel.SortOrderDTO;
import com.datastrato.gravitino.dto.rel.expressions.FieldReferenceDTO;
Expand All @@ -22,6 +21,7 @@
import com.datastrato.gravitino.integration.test.util.AbstractIT;
import com.datastrato.gravitino.integration.test.util.GravitinoITUtils;
import com.datastrato.gravitino.integration.test.util.ITUtils;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.Schema;
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.expressions.NamedReference;
Expand All @@ -34,6 +34,8 @@
import com.datastrato.gravitino.rel.expressions.sorts.SortOrders;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.indexes.Indexes;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -522,84 +524,62 @@ private static boolean checkTrinoHasLoaded(String sql, long maxWaitTimeSec)
return false;
}

private ColumnDTO[] createHiveFullTypeColumns() {
ColumnDTO[] columnDTO = createFullTypeColumns();
private Column[] createHiveFullTypeColumns() {
Column[] columnDTO = createFullTypeColumns();
Set<String> unsupportedType = Sets.newHashSet("FixedType", "StringType", "TimeType");
// MySQL doesn't support timestamp time zone
return Arrays.stream(columnDTO)
.filter(c -> !unsupportedType.contains(c.name()))
.toArray(ColumnDTO[]::new);
.toArray(Column[]::new);
}

private ColumnDTO[] createMySQLFullTypeColumns() {
ColumnDTO[] columnDTO = createFullTypeColumns();
private Column[] createMySQLFullTypeColumns() {
Column[] columnDTO = createFullTypeColumns();
Set<String> unsupportedType =
Sets.newHashSet("FixedType", "StringType", "TimestampType", "BooleanType");
// MySQL doesn't support timestamp time zone
return Arrays.stream(columnDTO)
.filter(c -> !unsupportedType.contains(c.name()))
.toArray(ColumnDTO[]::new);
.toArray(Column[]::new);
}

private ColumnDTO[] createIcebergFullTypeColumns() {
ColumnDTO[] columnDTO = createFullTypeColumns();
private Column[] createIcebergFullTypeColumns() {
Column[] columnDTO = createFullTypeColumns();

Set<String> unsupportedType =
Sets.newHashSet("ByteType", "ShortType", "VarCharType", "FixedCharType");
return Arrays.stream(columnDTO)
.filter(c -> !unsupportedType.contains(c.name()))
.toArray(ColumnDTO[]::new);
.toArray(Column[]::new);
}

private ColumnDTO[] createFullTypeColumns() {
private Column[] createFullTypeColumns() {
// Generate all types of columns that in class Types
return new ColumnDTO[] {
new ColumnDTO.Builder<>()
.withName("BooleanType")
.withDataType(Types.BooleanType.get())
.build(),

return new Column[] {
// Int type
new ColumnDTO.Builder<>().withName("ByteType").withDataType(Types.ByteType.get()).build(),
new ColumnDTO.Builder<>().withName("ShortType").withDataType(Types.ShortType.get()).build(),
new ColumnDTO.Builder<>()
.withName("IntegerType")
.withDataType(Types.IntegerType.get())
.build(),
new ColumnDTO.Builder<>().withName("LongType").withDataType(Types.LongType.get()).build(),
Column.of("BooleanType", Types.BooleanType.get()),
Column.of("ByteType", Types.ByteType.get()),
Column.of("ShortType", Types.ShortType.get()),
Column.of("IntegerType", Types.IntegerType.get()),
Column.of("LongType", Types.LongType.get()),

// float type
new ColumnDTO.Builder<>().withName("FloatType").withDataType(Types.FloatType.get()).build(),
new ColumnDTO.Builder<>().withName("DoubleType").withDataType(Types.DoubleType.get()).build(),
new ColumnDTO.Builder<>()
.withName("DecimalType")
.withDataType(Types.DecimalType.of(10, 3))
.build(),
Column.of("FloatType", Types.FloatType.get()),
Column.of("DoubleType", Types.DoubleType.get()),
Column.of("DecimalType", Types.DecimalType.of(10, 3)),

// Date Type
new ColumnDTO.Builder<>().withName("DateType").withDataType(Types.DateType.get()).build(),
new ColumnDTO.Builder<>().withName("TimeType").withDataType(Types.TimeType.get()).build(),
new ColumnDTO.Builder<>()
.withName("TimestampType")
.withDataType(Types.TimestampType.withTimeZone())
.build(),
Column.of("DateType", Types.DateType.get()),
Column.of("TimeType", Types.TimeType.get()),
Column.of("TimestampType", Types.TimestampType.withTimeZone()),

// String Type
new ColumnDTO.Builder<>()
.withName("VarCharType")
.withDataType(Types.VarCharType.of(100))
.build(),
new ColumnDTO.Builder<>()
.withName("FixedCharType")
.withDataType(Types.FixedCharType.of(100))
.build(),
new ColumnDTO.Builder<>().withName("StringType").withDataType(Types.StringType.get()).build(),
new ColumnDTO.Builder<>()
.withName("FixedType")
.withDataType(Types.FixedType.of(1000))
.build(),
Column.of("VarCharType", Types.VarCharType.of(100)),
Column.of("FixedCharType", Types.FixedCharType.of(100)),
Column.of("FixedType", Types.FixedType.of(1000)),

// Binary Type
new ColumnDTO.Builder<>().withName("BinaryType").withDataType(Types.BinaryType.get()).build()
Column.of("BinaryType", Types.BinaryType.get())
// No Interval Type and complex type like map, struct, and list
};
}
Expand Down Expand Up @@ -1174,6 +1154,47 @@ void testMySQLTableCreatedByGravitino() throws InterruptedException {
if (!success) {
Assertions.fail("Trino fail to load table created by gravitino: " + sql);
}

// Create a table with primary key
Column[] columnDTOS = createMySQLFullTypeColumns();
columnDTOS =
Arrays.stream(columnDTOS)
.map(
c -> {
if ("IntegerType".equals(c.name())) {
return Column.of(c.name(), c.dataType(), "", false, true, null);
}
return c;
})
.toArray(Column[]::new);

tableName = GravitinoITUtils.genRandomName("mysql_table_with_primary").toLowerCase();
catalog
.asTableCatalog()
.createTable(
NameIdentifier.of(metalakeName, catalogName, schemaName, tableName),
columnDTOS,
"Created by gravitino client",
ImmutableMap.<String, String>builder().build(),
new Transform[0],
Distributions.NONE,
new SortOrder[0],
new Index[] {
Indexes.createMysqlPrimaryKey(new String[][] {new String[] {"IntegerType"}})
});
sql =
String.format(
"show create table \"%s.%s\".%s.%s", metalakeName, catalogName, schemaName, tableName);

success = checkTrinoHasLoaded(sql, 30);
if (!success) {
Assertions.fail("Trino fail to load table created by gravitino: " + sql);
}
data = containerSuite.getTrinoContainer().executeQuerySQL(sql).get(0).get(0);

Assertions.assertTrue(data.contains("engine = 'InnoDB'"));
Assertions.assertTrue(
data.contains("integertype integer NOT NULL WITH ( auto_increment = true )"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ CREATE TABLE "test.gt_mysql".gt_db1.tb01 (
salary int
);

show create table "test.gt_mysql".gt_db1.tb01;

drop table "test.gt_mysql".gt_db1.tb01;

drop schema "test.gt_mysql".gt_db1;
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ CREATE SCHEMA

CREATE TABLE

"CREATE TABLE ""test.gt_mysql"".gt_db1.tb01 (
name varchar(200),
salary integer
)
COMMENT ''
WITH (
engine = 'InnoDB'
)"

DROP TABLE

DROP SCHEMA
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ DROP COLUMN
name varchar(200),
salary integer
)
COMMENT ''"
COMMENT ''
WITH (
engine = 'InnoDB'
)"

SET COLUMN TYPE

"CREATE TABLE ""test.gt_mysql"".gt_db1.tb01 (
name varchar(200),
salary bigint
)
COMMENT ''"
COMMENT ''
WITH (
engine = 'InnoDB'
)"

ADD COLUMN

Expand All @@ -33,7 +39,10 @@ ADD COLUMN
salary bigint,
city varchar(50) COMMENT 'aaa'
)
COMMENT ''"
COMMENT ''
WITH (
engine = 'InnoDB'
)"

ADD COLUMN

Expand All @@ -43,7 +52,10 @@ ADD COLUMN
city varchar(50) COMMENT 'aaa',
age integer NOT NULL COMMENT 'age of users'
)
COMMENT ''"
COMMENT ''
WITH (
engine = 'InnoDB'
)"

ADD COLUMN

Expand All @@ -54,7 +66,10 @@ ADD COLUMN
age integer NOT NULL COMMENT 'age of users',
address varchar(200) NOT NULL COMMENT 'address of users'
)
COMMENT ''"
COMMENT ''
WITH (
engine = 'InnoDB'
)"

DROP TABLE

Expand Down
1 change: 1 addition & 0 deletions trino-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation(libs.guava)
implementation(libs.httpclient5)
implementation(libs.commons.lang3)
implementation(libs.commons.collections4)
implementation(libs.trino.spi) {
exclude("org.apache.logging.log4j")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ public GravitinoTable createTable(ConnectorTableMetadata tableMetadata) {
dataTypeTransformer.getGravitinoType(column.getType()),
i,
column.getComment(),
column.isNullable()));
column.isNullable(),
false,
column.getProperties()));
}

return new GravitinoTable(schemaName, tableName, columns, comment, properties);
}

Expand All @@ -114,6 +117,7 @@ public ColumnMetadata getColumnMetadata(GravitinoColumn column) {
.setComment(Optional.ofNullable(column.getComment()))
.setNullable(column.isNullable())
.setHidden(column.isHidden())
.setProperties(column.getProperties())
.build();
}

Expand Down Expand Up @@ -172,6 +176,8 @@ public GravitinoColumn createColumn(ColumnMetadata column) {
dataTypeTransformer.getGravitinoType(column.getType()),
-1,
column.getComment(),
column.isNullable());
column.isNullable(),
false,
column.getProperties());
}
}
Loading

0 comments on commit fdf71b2

Please sign in to comment.