diff --git a/build.gradle.kts b/build.gradle.kts index 818e06a8a97..d87ec855de8 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -673,6 +673,7 @@ tasks { ":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig", ":catalogs:catalog-jdbc-mysql:copyLibAndConfig", ":catalogs:catalog-jdbc-postgresql:copyLibAndConfig", + ":catalogs:catalog-jdbc-doris:copyLibAndConfig", ":catalogs:catalog-hadoop:copyLibAndConfig", "catalogs:catalog-messaging-kafka:copyLibAndConfig" ) diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java index 828ad8b1c21..a054e08a2f1 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java @@ -35,7 +35,6 @@ import com.datastrato.gravitino.rel.TableCatalog; import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.rel.expressions.distributions.Distribution; -import com.datastrato.gravitino.rel.expressions.distributions.Distributions; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; @@ -370,9 +369,6 @@ public Table createTable( SortOrder[] sortOrders, Index[] indexes) throws NoSuchSchemaException, TableAlreadyExistsException { - Preconditions.checkArgument( - null == distribution || distribution == Distributions.NONE, - "jdbc-catalog does not support distribution"); Preconditions.checkArgument( null == sortOrders || sortOrders.length == 0, "jdbc-catalog does not support sort orders"); @@ -406,6 +402,7 @@ public Table createTable( StringIdentifier.addToComment(identifier, comment), resultProperties, partitioning, + distribution, indexes); return new JdbcTable.Builder() diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java index 9a1394f7639..420c744209b 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java @@ -18,6 +18,7 @@ import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.rel.expressions.Expression; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.literals.Literals; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; @@ -79,13 +80,15 @@ public void create( String comment, Map properties, Transform[] partitioning, + Distribution distribution, Index[] indexes) throws TableAlreadyExistsException { LOG.info("Attempting to create table {} in database {}", tableName, databaseName); try (Connection connection = getConnection(databaseName)) { JdbcConnectorUtils.executeUpdate( connection, - generateCreateTableSql(tableName, columns, comment, properties, partitioning, indexes)); + generateCreateTableSql( + tableName, columns, comment, properties, partitioning, distribution, indexes)); LOG.info("Created table {} in database {}", tableName, databaseName); } catch (final SQLException se) { throw this.exceptionMapper.toGravitinoException(se); @@ -172,7 +175,7 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE jdbcTableBuilder.withProperties(tableProperties); // 5.Leave the information to the bottom layer to append the table - correctJdbcTableFields(connection, tableName, jdbcTableBuilder); + correctJdbcTableFields(connection, databaseName, tableName, jdbcTableBuilder); return jdbcTableBuilder.build(); } catch (SQLException e) { throw exceptionMapper.toGravitinoException(e); @@ -272,7 +275,10 @@ protected ResultSet getColumns(Connection connection, String databaseName, Strin * @throws SQLException */ protected void correctJdbcTableFields( - Connection connection, String tableName, JdbcTable.Builder jdbcTableBuilder) + Connection connection, + String databaseName, + String tableName, + JdbcTable.Builder jdbcTableBuilder) throws SQLException { // nothing to do } @@ -357,6 +363,7 @@ protected abstract String generateCreateTableSql( String comment, Map properties, Transform[] partitioning, + Distribution distribution, Index[] indexes); protected abstract String generateRenameTableSql(String oldTableName, String newTableName); diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/TableOperation.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/TableOperation.java index e170223ad2b..e8a4fb36dbc 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/TableOperation.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/TableOperation.java @@ -14,6 +14,7 @@ import com.datastrato.gravitino.exceptions.NoSuchTableException; import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; import java.util.List; @@ -53,6 +54,7 @@ void create( String comment, Map properties, Transform[] partitioning, + Distribution distribution, Index[] indexes) throws TableAlreadyExistsException; diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteTableOperations.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteTableOperations.java index e71bd6a3a36..efc9cec5070 100644 --- a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteTableOperations.java +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteTableOperations.java @@ -9,6 +9,7 @@ import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; import com.google.common.collect.Lists; @@ -27,6 +28,7 @@ protected String generateCreateTableSql( String comment, Map properties, Transform[] partitioning, + Distribution distribution, Index[] indexes) { StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("CREATE TABLE ").append(tableName).append(" ("); diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java index 17ce131e33f..421c83c062b 100644 --- a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java @@ -134,7 +134,14 @@ public void testOperationTable() { Assertions.assertDoesNotThrow( () -> JDBC_TABLE_OPERATIONS.create( - DATABASE_NAME, table1, jdbcColumns, null, properties, null, Indexes.EMPTY_INDEXES)); + DATABASE_NAME, + table1, + jdbcColumns, + null, + properties, + null, + null, + Indexes.EMPTY_INDEXES)); // list table. List allTables = JDBC_TABLE_OPERATIONS.listTables(DATABASE_NAME); diff --git a/catalogs/catalog-jdbc-doris/build.gradle.kts b/catalogs/catalog-jdbc-doris/build.gradle.kts new file mode 100644 index 00000000000..e553ac6636e --- /dev/null +++ b/catalogs/catalog-jdbc-doris/build.gradle.kts @@ -0,0 +1,91 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +description = "catalog-jdbc-doris" + +plugins { + `maven-publish` + id("java") + id("idea") +} + +dependencies { + implementation(project(":api")) + implementation(project(":catalogs:catalog-jdbc-common")) + implementation(project(":common")) + implementation(project(":core")) + + implementation(libs.bundles.log4j) + implementation(libs.commons.collections4) + implementation(libs.commons.lang3) + implementation(libs.guava) + implementation(libs.jsqlparser) + implementation(libs.slf4j.api) + + testImplementation(project(":catalogs:catalog-jdbc-common", "testArtifacts")) + testImplementation(project(":clients:client-java")) + testImplementation(project(":integration-test-common", "testArtifacts")) + testImplementation(project(":server")) + testImplementation(project(":server-common")) + + testImplementation(libs.commons.lang3) + testImplementation(libs.guava) + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.junit.jupiter.params) + testImplementation(libs.mysql.driver) + testImplementation(libs.testcontainers) + testImplementation(libs.testcontainers.mysql) + + testRuntimeOnly(libs.junit.jupiter.engine) +} + +tasks { + val runtimeJars by registering(Copy::class) { + from(configurations.runtimeClasspath) + into("build/libs") + } + val copyCatalogLibs by registering(Copy::class) { + dependsOn("jar", "runtimeJars") + from("build/libs") + into("$rootDir/distribution/package/catalogs/jdbc-doris/libs") + } + + val copyCatalogConfig by registering(Copy::class) { + from("src/main/resources") + into("$rootDir/distribution/package/catalogs/jdbc-doris/conf") + + include("jdbc-doris.conf") + + exclude { details -> + details.file.isDirectory() + } + } + + register("copyLibAndConfig", Copy::class) { + dependsOn(copyCatalogLibs, copyCatalogConfig) + } +} + +tasks.test { + val skipUTs = project.hasProperty("skipTests") + if (skipUTs) { + // Only run integration tests + include("**/integration/**") + } + + val skipITs = project.hasProperty("skipITs") + if (skipITs) { + // Exclude integration tests + exclude("**/integration/**") + } else { + dependsOn(tasks.jar) + + doFirst { + environment("GRAVITINO_CI_DORIS_DOCKER_IMAGE", "datastrato/gravitino-ci-doris:0.1.0") + } + + val init = project.extra.get("initIntegrationTest") as (Test) -> Unit + init(this) + } +} diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/DorisCatalog.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/DorisCatalog.java new file mode 100644 index 00000000000..69fa0b39cb2 --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/DorisCatalog.java @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris; + +import com.datastrato.gravitino.catalog.doris.converter.DorisColumnDefaultValueConverter; +import com.datastrato.gravitino.catalog.doris.converter.DorisExceptionConverter; +import com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter; +import com.datastrato.gravitino.catalog.doris.operation.DorisDatabaseOperations; +import com.datastrato.gravitino.catalog.doris.operation.DorisTableOperations; +import com.datastrato.gravitino.catalog.jdbc.JdbcCatalog; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations; + +/** Implementation of a Doris catalog in Gravitino. */ +public class DorisCatalog extends JdbcCatalog { + + @Override + public String shortName() { + return "jdbc-doris"; + } + + @Override + protected JdbcExceptionConverter createExceptionConverter() { + return new DorisExceptionConverter(); + } + + @Override + protected JdbcTypeConverter createJdbcTypeConverter() { + return new DorisTypeConverter(); + } + + @Override + protected JdbcDatabaseOperations createJdbcDatabaseOperations() { + return new DorisDatabaseOperations(); + } + + @Override + protected JdbcTableOperations createJdbcTableOperations() { + return new DorisTableOperations(); + } + + @Override + protected JdbcColumnDefaultValueConverter createJdbcColumnDefaultValueConverter() { + return new DorisColumnDefaultValueConverter(); + } +} diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisColumnDefaultValueConverter.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisColumnDefaultValueConverter.java new file mode 100644 index 00000000000..09a968245cd --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisColumnDefaultValueConverter.java @@ -0,0 +1,96 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris.converter; + +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.BIGINT; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.CHAR; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.DATETIME; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.DECIMAL; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.DOUBLE; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.FLOAT; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.INT; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.SMALLINT; +import static com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter.TINYINT; +import static com.datastrato.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET; +import static com.datastrato.gravitino.rel.Column.DEFAULT_VALUE_OF_CURRENT_TIMESTAMP; + +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcColumnDefaultValueConverter; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import com.datastrato.gravitino.rel.expressions.Expression; +import com.datastrato.gravitino.rel.expressions.UnparsedExpression; +import com.datastrato.gravitino.rel.expressions.literals.Literals; +import com.datastrato.gravitino.rel.types.Decimal; +import com.datastrato.gravitino.rel.types.Types; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; + +public class DorisColumnDefaultValueConverter extends JdbcColumnDefaultValueConverter { + + @Override + public Expression toGravitino( + JdbcTypeConverter.JdbcTypeBean type, + String columnDefaultValue, + boolean isExpression, + boolean nullable) { + if (columnDefaultValue == null) { + return nullable ? Literals.NULL : DEFAULT_VALUE_NOT_SET; + } + + if (columnDefaultValue.equalsIgnoreCase(NULL)) { + return Literals.NULL; + } + + if (isExpression) { + if (columnDefaultValue.equals(CURRENT_TIMESTAMP)) { + return DEFAULT_VALUE_OF_CURRENT_TIMESTAMP; + } + // The parsing of Doris expressions is complex, so we are not currently undertaking the + // parsing. + return UnparsedExpression.of(columnDefaultValue); + } + + switch (type.getTypeName().toLowerCase()) { + case TINYINT: + return Literals.byteLiteral(Byte.valueOf(columnDefaultValue)); + case SMALLINT: + return Literals.shortLiteral(Short.valueOf(columnDefaultValue)); + case INT: + return Literals.integerLiteral(Integer.valueOf(columnDefaultValue)); + case BIGINT: + return Literals.longLiteral(Long.valueOf(columnDefaultValue)); + case FLOAT: + return Literals.floatLiteral(Float.valueOf(columnDefaultValue)); + case DOUBLE: + return Literals.doubleLiteral(Double.valueOf(columnDefaultValue)); + case DECIMAL: + return Literals.decimalLiteral( + Decimal.of( + columnDefaultValue, + Integer.parseInt(type.getColumnSize()), + Integer.parseInt(type.getScale()))); + case JdbcTypeConverter.DATE: + return Literals.dateLiteral(LocalDate.parse(columnDefaultValue, DATE_TIME_FORMATTER)); + case JdbcTypeConverter.TIME: + return Literals.timeLiteral(LocalTime.parse(columnDefaultValue, DATE_TIME_FORMATTER)); + case JdbcTypeConverter.TIMESTAMP: + case DATETIME: + return CURRENT_TIMESTAMP.equals(columnDefaultValue) + ? DEFAULT_VALUE_OF_CURRENT_TIMESTAMP + : Literals.timestampLiteral( + LocalDateTime.parse(columnDefaultValue, DATE_TIME_FORMATTER)); + case JdbcTypeConverter.VARCHAR: + return Literals.of( + columnDefaultValue, Types.VarCharType.of(Integer.parseInt(type.getColumnSize()))); + case CHAR: + return Literals.of( + columnDefaultValue, Types.FixedCharType.of(Integer.parseInt(type.getColumnSize()))); + case JdbcTypeConverter.TEXT: + return Literals.stringLiteral(columnDefaultValue); + default: + throw new IllegalArgumentException("Unknown data type for literal: " + type); + } + } +} diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisExceptionConverter.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisExceptionConverter.java new file mode 100644 index 00000000000..685eabeb657 --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisExceptionConverter.java @@ -0,0 +1,75 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris.converter; + +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; +import com.datastrato.gravitino.exceptions.NoSuchColumnException; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.NoSuchTableException; +import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; +import com.datastrato.gravitino.exceptions.UnauthorizedException; +import com.google.common.annotations.VisibleForTesting; +import java.sql.SQLException; +import java.util.regex.Pattern; + +/** Exception converter to gravitino exception for Doris. */ +public class DorisExceptionConverter extends JdbcExceptionConverter { + + // see: https://doris.apache.org/docs/admin-manual/maint-monitor/doris-error-code/ + @VisibleForTesting static final int CODE_DATABASE_EXISTS = 1007; + + static final int CODE_TABLE_EXISTS = 1050; + static final int CODE_NO_SUCH_SCHEMA = 1049; + static final int CODE_DATABASE_NOT_EXISTS = 1008; + static final int CODE_NO_SUCH_TABLE = 1051; + static final int CODE_UNAUTHORIZED = 1045; + static final int CODE_NO_SUCH_COLUMN = 1054; + static final int CODE_OTHER = 1105; + + private static final String DATABASE_ALREADY_EXISTS_PATTERN_STRING = + ".*detailMessage = Can't create database '.*'; database exists"; + private static final Pattern DATABASE_ALREADY_EXISTS_PATTERN = + Pattern.compile(DATABASE_ALREADY_EXISTS_PATTERN_STRING); + + @SuppressWarnings("FormatStringAnnotation") + @Override + public GravitinoRuntimeException toGravitinoException(SQLException se) { + int errorCode = se.getErrorCode(); + if (errorCode == CODE_OTHER) { + errorCode = getErrorCodeFromMessage(se.getMessage()); + } + + switch (errorCode) { + case CODE_DATABASE_EXISTS: + return new SchemaAlreadyExistsException(se, se.getMessage()); + case CODE_TABLE_EXISTS: + return new TableAlreadyExistsException(se, se.getMessage()); + case CODE_DATABASE_NOT_EXISTS: + return new NoSuchSchemaException(se, se.getMessage()); + case CODE_NO_SUCH_TABLE: + return new NoSuchTableException(se, se.getMessage()); + case CODE_UNAUTHORIZED: + return new UnauthorizedException(se, se.getMessage()); + case CODE_NO_SUCH_COLUMN: + return new NoSuchColumnException(se, se.getMessage()); + default: + return new GravitinoRuntimeException(se, se.getMessage()); + } + } + + @VisibleForTesting + static int getErrorCodeFromMessage(String message) { + if (message.isEmpty()) { + return CODE_OTHER; + } + if (DATABASE_ALREADY_EXISTS_PATTERN.matcher(message).matches()) { + return CODE_DATABASE_EXISTS; + } + + return CODE_OTHER; + } +} diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisTypeConverter.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisTypeConverter.java new file mode 100644 index 00000000000..68127a2b17f --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisTypeConverter.java @@ -0,0 +1,105 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris.converter; + +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import com.datastrato.gravitino.rel.types.Type; +import com.datastrato.gravitino.rel.types.Types; + +/** Type converter for Doris. */ +public class DorisTypeConverter extends JdbcTypeConverter { + static final String BOOLEAN = "boolean"; + static final String TINYINT = "tinyint"; + static final String SMALLINT = "smallint"; + static final String INT = "int"; + static final String BIGINT = "bigint"; + static final String FLOAT = "float"; + static final String DOUBLE = "double"; + static final String DECIMAL = "decimal"; + static final String DATETIME = "datetime"; + static final String CHAR = "char"; + static final String STRING = "string"; + + @Override + public Type toGravitinoType(JdbcTypeBean typeBean) { + switch (typeBean.getTypeName().toLowerCase()) { + case BOOLEAN: + return Types.BooleanType.get(); + case TINYINT: + return Types.ByteType.get(); + case SMALLINT: + return Types.ShortType.get(); + case INT: + return Types.IntegerType.get(); + case BIGINT: + return Types.LongType.get(); + case FLOAT: + return Types.FloatType.get(); + case DOUBLE: + return Types.DoubleType.get(); + case DECIMAL: + return Types.DecimalType.of( + Integer.parseInt(typeBean.getColumnSize()), Integer.parseInt(typeBean.getScale())); + case DATE: + return Types.DateType.get(); + case DATETIME: + return Types.TimestampType.withTimeZone(); + case CHAR: + return Types.FixedCharType.of(Integer.parseInt(typeBean.getColumnSize())); + case VARCHAR: + return Types.VarCharType.of(Integer.parseInt(typeBean.getColumnSize())); + case STRING: + case TEXT: + return Types.StringType.get(); + default: + throw new IllegalArgumentException("Not a supported type: " + typeBean); + } + } + + @Override + public String fromGravitinoType(Type type) { + if (type instanceof Types.BooleanType) { + return BOOLEAN; + } else if (type instanceof Types.ByteType) { + return TINYINT; + } else if (type instanceof Types.ShortType) { + return SMALLINT; + } else if (type instanceof Types.IntegerType) { + return INT; + } else if (type instanceof Types.LongType) { + return BIGINT; + } else if (type instanceof Types.FloatType) { + return FLOAT; + } else if (type instanceof Types.DoubleType) { + return DOUBLE; + } else if (type instanceof Types.DecimalType) { + return DECIMAL + + "(" + + ((Types.DecimalType) type).precision() + + "," + + ((Types.DecimalType) type).scale() + + ")"; + } else if (type instanceof Types.DateType) { + return DATE; + } else if (type instanceof Types.TimestampType) { + return DATETIME; + } else if (type instanceof Types.VarCharType) { + return VARCHAR + "(" + ((Types.VarCharType) type).length() + ")"; + } else if (type instanceof Types.FixedCharType) { + int length = ((Types.FixedCharType) type).length(); + if (length < 1 || length > 255) { + throw new IllegalArgumentException( + String.format( + "Type %s is invalid, length should be between 1 and 255", type.toString())); + } + + return CHAR + "(" + ((Types.FixedCharType) type).length() + ")"; + } else if (type instanceof Types.StringType) { + return STRING; + } + throw new IllegalArgumentException( + String.format("Couldn't convert Gravitino type %s to Doris type", type.toString())); + } +} diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisDatabaseOperations.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisDatabaseOperations.java new file mode 100644 index 00000000000..a66bee7d586 --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisDatabaseOperations.java @@ -0,0 +1,125 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris.operation; + +import com.datastrato.gravitino.catalog.doris.utils.DorisUtils; +import com.datastrato.gravitino.catalog.jdbc.JdbcSchema; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.NoSuchTableException; +import com.datastrato.gravitino.meta.AuditInfo; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; + +/** Database operations for Doris. */ +public class DorisDatabaseOperations extends JdbcDatabaseOperations { + private static final String BACK_QUOTE = "`"; + + @Override + public String generateCreateDatabaseSql( + String databaseName, String comment, Map properties) { + if (StringUtils.isNotEmpty(comment)) { + LOG.warn( + "Ignoring comment option on database create. Doris does not support comment option on database create."); + } + StringBuilder sqlBuilder = new StringBuilder("CREATE DATABASE "); + + // Append database name + sqlBuilder.append(BACK_QUOTE).append(databaseName).append(BACK_QUOTE); + + // Append properties + sqlBuilder.append(DorisUtils.generatePropertiesSql(properties)); + + String result = sqlBuilder.toString(); + LOG.info("Generated create database:{} sql: {}", databaseName, result); + return result; + } + + @Override + public String generateDropDatabaseSql(String databaseName, boolean cascade) { + StringBuilder sqlBuilder = new StringBuilder("DROP DATABASE"); + sqlBuilder.append(BACK_QUOTE).append(databaseName).append(BACK_QUOTE); + if (cascade) { + sqlBuilder.append(" FORCE"); + return sqlBuilder.toString(); + } + + try (final Connection connection = this.dataSource.getConnection()) { + String query = "SHOW TABLES IN " + BACK_QUOTE + databaseName + BACK_QUOTE; + try (Statement statement = connection.createStatement()) { + // Execute the query and check if there exists any tables in the database + try (ResultSet resultSet = statement.executeQuery(query)) { + if (resultSet.next()) { + throw new IllegalStateException( + String.format( + "Database %s is not empty, the value of cascade should be true.", + databaseName)); + } + } + } + } catch (SQLException sqlException) { + throw this.exceptionMapper.toGravitinoException(sqlException); + } + + return sqlBuilder.toString(); + } + + @Override + public JdbcSchema load(String databaseName) throws NoSuchSchemaException { + try (final Connection connection = this.dataSource.getConnection()) { + String query = "SELECT * FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = ?"; + try (PreparedStatement preparedStatement = connection.prepareStatement(query)) { + preparedStatement.setString(1, databaseName); + + // Execute the query + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (!resultSet.next()) { + throw new NoSuchSchemaException( + "Database %s could not be found in information_schema.SCHEMATA", databaseName); + } + String schemaName = resultSet.getString("SCHEMA_NAME"); + Map properties = getDatabaseProperties(connection, databaseName); + return new JdbcSchema.Builder() + .withName(schemaName) + .withProperties(properties) + .withAuditInfo(AuditInfo.EMPTY) + .build(); + } + } + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + protected Map getDatabaseProperties(Connection connection, String databaseName) + throws SQLException { + + String showCreateDatabaseSql = String.format("SHOW CREATE DATABASE `%s`", databaseName); + + StringBuilder createDatabaseSb = new StringBuilder(); + try (PreparedStatement statement = connection.prepareStatement(showCreateDatabaseSql)) { + try (ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + createDatabaseSb.append(resultSet.getString("Create Database")); + } + } + } + + String createDatabaseSql = createDatabaseSb.toString(); + + if (StringUtils.isEmpty(createDatabaseSql)) { + throw new NoSuchTableException( + "Database %s does not exist in %s.", databaseName, connection.getCatalog()); + } + + return Collections.unmodifiableMap(DorisUtils.extractPropertiesFromSql(createDatabaseSql)); + } +} diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisTableOperations.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisTableOperations.java new file mode 100644 index 00000000000..8ab6e7da7c5 --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/operation/DorisTableOperations.java @@ -0,0 +1,598 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris.operation; + +import static com.datastrato.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET; + +import com.datastrato.gravitino.StringIdentifier; +import com.datastrato.gravitino.catalog.doris.utils.DorisUtils; +import com.datastrato.gravitino.catalog.jdbc.JdbcColumn; +import com.datastrato.gravitino.catalog.jdbc.JdbcTable; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations; +import com.datastrato.gravitino.exceptions.NoSuchColumnException; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.NoSuchTableException; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; +import com.datastrato.gravitino.rel.expressions.distributions.Strategy; +import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.indexes.Indexes; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; + +/** Table operations for Doris. */ +public class DorisTableOperations extends JdbcTableOperations { + private static final String BACK_QUOTE = "`"; + private static final String DORIS_AUTO_INCREMENT = "AUTO_INCREMENT"; + + private static final String NEW_LINE = "\n"; + + @Override + public List listTables(String databaseName) throws NoSuchSchemaException { + try (Connection connection = getConnection(databaseName)) { + try (Statement statement = connection.createStatement()) { + String showTablesQuery = "SHOW TABLES"; + ResultSet resultSet = statement.executeQuery(showTablesQuery); + List names = new ArrayList<>(); + while (resultSet.next()) { + String tableName = resultSet.getString(1); + names.add(tableName); + } + LOG.info( + "Finished listing tables size {} for database name {} ", names.size(), databaseName); + return names; + } + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + @Override + protected String generateCreateTableSql( + String tableName, + JdbcColumn[] columns, + String comment, + Map properties, + Transform[] partitioning, + Distribution distribution, + com.datastrato.gravitino.rel.indexes.Index[] indexes) { + + validateIncrementCol(columns); + validateDistribution(distribution, columns); + + StringBuilder sqlBuilder = new StringBuilder(); + + sqlBuilder.append(String.format("CREATE TABLE `%s` (", tableName)).append(NEW_LINE); + + // Add columns + sqlBuilder.append( + Arrays.stream(columns) + .map( + column -> { + StringBuilder columnsSql = new StringBuilder(); + columnsSql + .append(SPACE) + .append(BACK_QUOTE) + .append(column.name()) + .append(BACK_QUOTE); + appendColumnDefinition(column, columnsSql); + return columnsSql.toString(); + }) + .collect(Collectors.joining(",\n"))); + + appendIndexesSql(indexes, sqlBuilder); + + sqlBuilder.append(NEW_LINE).append(")"); + + // Add table comment if specified + if (StringUtils.isNotEmpty(comment)) { + sqlBuilder.append(" COMMENT \"").append(comment).append("\""); + } + + // Add distribution info + if (distribution.strategy() == Strategy.HASH) { + sqlBuilder.append(NEW_LINE).append(" DISTRIBUTED BY HASH("); + sqlBuilder.append( + Arrays.stream(distribution.expressions()) + .map(column -> BACK_QUOTE + column.toString() + BACK_QUOTE) + .collect(Collectors.joining(", "))); + sqlBuilder.append(")"); + } else if (distribution.strategy() == Strategy.EVEN) { + sqlBuilder.append(NEW_LINE).append(" DISTRIBUTED BY ").append("RANDOM"); + } + + if (distribution.number() != 0) { + sqlBuilder.append(" BUCKETS ").append(distribution.number()); + } + + // Add table properties + sqlBuilder.append(NEW_LINE).append(DorisUtils.generatePropertiesSql(properties)); + + // Add Partition Info + if (partitioning != null && partitioning.length > 0) { + // TODO: Add partitioning support + } + + // Return the generated SQL statement + String result = sqlBuilder.toString(); + + LOG.info("Generated create table:{} sql: {}", tableName, result); + return result; + } + + private static void validateIncrementCol(JdbcColumn[] columns) { + // Get all auto increment column + List autoIncrementCols = + Arrays.stream(columns).filter(Column::autoIncrement).collect(Collectors.toList()); + + // Doris does not support auto increment column before version 2.1.0 + Preconditions.checkArgument( + autoIncrementCols.isEmpty(), "Doris does not support auto-increment column"); + } + + private static void validateDistribution(Distribution distribution, JdbcColumn[] columns) { + Preconditions.checkArgument(null != distribution, "Doris must set distribution"); + + Preconditions.checkArgument( + Strategy.HASH == distribution.strategy() || Strategy.EVEN == distribution.strategy(), + "Doris only supports HASH or EVEN distribution strategy"); + + if (distribution.strategy() == Strategy.HASH) { + // Check if the distribution column exists + Arrays.stream(distribution.expressions()) + .forEach( + expression -> { + Preconditions.checkArgument( + Arrays.stream(columns) + .anyMatch(column -> column.name().equalsIgnoreCase(expression.toString())), + "Distribution column " + expression + " does not exist in the table columns"); + }); + } + } + + @VisibleForTesting + static void appendIndexesSql( + com.datastrato.gravitino.rel.indexes.Index[] indexes, StringBuilder sqlBuilder) { + + if (indexes.length == 0) { + return; + } + + // validate indexes + Arrays.stream(indexes) + .forEach( + index -> { + if (index.fieldNames().length > 1) { + throw new IllegalArgumentException("Index does not support multi fields in Doris"); + } + }); + + String indexSql = + Arrays.stream(indexes) + .map(index -> String.format("INDEX %s (%s)", index.name(), index.fieldNames()[0][0])) + .collect(Collectors.joining(",\n")); + + sqlBuilder.append(",").append(NEW_LINE).append(indexSql); + } + + @Override + protected boolean getAutoIncrementInfo(ResultSet resultSet) throws SQLException { + return "YES".equalsIgnoreCase(resultSet.getString("IS_AUTOINCREMENT")); + } + + @Override + protected Map getTableProperties(Connection connection, String tableName) + throws SQLException { + + String showCreateTableSQL = String.format("SHOW CREATE TABLE `%s`", tableName); + + StringBuilder createTableSqlSb = new StringBuilder(); + try (PreparedStatement statement = connection.prepareStatement(showCreateTableSQL)) { + try (ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + createTableSqlSb.append(resultSet.getString("Create Table")); + } + } + } + + String createTableSql = createTableSqlSb.toString(); + + if (StringUtils.isEmpty(createTableSql)) { + throw new NoSuchTableException( + "Table %s does not exist in %s.", tableName, connection.getCatalog()); + } + + return Collections.unmodifiableMap(DorisUtils.extractPropertiesFromSql(createTableSql)); + } + + @Override + protected JdbcTable.Builder getBasicJdbcTableInfo(ResultSet table) throws SQLException { + return new JdbcTable.Builder() + .withName(table.getString("TABLE_NAME")) + .withComment(table.getString("REMARKS")) + .withAuditInfo(AuditInfo.EMPTY); + } + + @Override + protected List getIndexes(String databaseName, String tableName, DatabaseMetaData metaData) + throws SQLException { + // get Indexes from SQL + try { + Connection connection = metaData.getConnection(); + + String sql = String.format("SHOW INDEX FROM `%s` FROM `%s`", tableName, databaseName); + + PreparedStatement preparedStatement = connection.prepareStatement(sql); + ResultSet resultSet = preparedStatement.executeQuery(); + List indexes = new ArrayList<>(); + while (resultSet.next()) { + String indexName = resultSet.getString("Key_name"); + String columnName = resultSet.getString("Column_name"); + indexes.add( + Indexes.of(Index.IndexType.PRIMARY_KEY, indexName, new String[][] {{columnName}})); + } + return indexes; + } catch (SQLException e) { + throw exceptionMapper.toGravitinoException(e); + } + } + + @Override + protected void correctJdbcTableFields( + Connection connection, String databaseName, String tableName, JdbcTable.Builder tableBuilder) + throws SQLException { + + if (StringUtils.isEmpty(tableBuilder.comment())) { + // Doris Cannot get comment from JDBC 8.x, so we need to get comment from sql + try { + String sql = + "SELECT TABLE_COMMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"; + String comment = ""; + PreparedStatement preparedStatement = connection.prepareStatement(sql); + preparedStatement.setString(1, databaseName); + preparedStatement.setString(2, tableName); + + ResultSet resultSet = preparedStatement.executeQuery(); + while (resultSet.next()) { + comment += resultSet.getString("TABLE_COMMENT"); + } + + tableBuilder.withComment(comment); + } catch (SQLException e) { + throw exceptionMapper.toGravitinoException(e); + } + } + } + + @Override + protected String generateRenameTableSql(String oldTableName, String newTableName) { + return String.format("ALTER TABLE `%s` RENAME `%s`", oldTableName, newTableName); + } + + @Override + protected String generateDropTableSql(String tableName) { + return String.format("DROP TABLE `%s`", tableName); + } + + @Override + protected String generatePurgeTableSql(String tableName) { + return String.format("TRUNCATE TABLE `%s`", tableName); + } + + @Override + protected String generateAlterTableSql( + String databaseName, String tableName, TableChange... changes) { + // Not all operations require the original table information, so lazy loading is used here + JdbcTable lazyLoadTable = null; + TableChange.UpdateComment updateComment = null; + List setProperties = new ArrayList<>(); + List alterSql = new ArrayList<>(); + for (int i = 0; i < changes.length; i++) { + TableChange change = changes[i]; + if (change instanceof TableChange.UpdateComment) { + updateComment = (TableChange.UpdateComment) change; + } else if (change instanceof TableChange.SetProperty) { + // The set attribute needs to be added at the end. + setProperties.add(((TableChange.SetProperty) change)); + } else if (change instanceof TableChange.RemoveProperty) { + // Doris only support set properties, remove property is not supported yet + throw new IllegalArgumentException("Remove property is not supported yet"); + } else if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + alterSql.add(addColumnFieldDefinition(addColumn)); + } else if (change instanceof TableChange.RenameColumn) { + throw new IllegalArgumentException("Rename column is not supported yet"); + } else if (change instanceof TableChange.UpdateColumnType) { + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + TableChange.UpdateColumnType updateColumnType = (TableChange.UpdateColumnType) change; + alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, lazyLoadTable)); + } else if (change instanceof TableChange.UpdateColumnComment) { + TableChange.UpdateColumnComment updateColumnComment = + (TableChange.UpdateColumnComment) change; + alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment)); + } else if (change instanceof TableChange.UpdateColumnPosition) { + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + TableChange.UpdateColumnPosition updateColumnPosition = + (TableChange.UpdateColumnPosition) change; + alterSql.add(updateColumnPositionFieldDefinition(updateColumnPosition, lazyLoadTable)); + } else if (change instanceof TableChange.DeleteColumn) { + TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change; + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + String deleteColSql = deleteColumnFieldDefinition(deleteColumn, lazyLoadTable); + if (StringUtils.isNotEmpty(deleteColSql)) { + alterSql.add(deleteColSql); + } + } else if (change instanceof TableChange.UpdateColumnNullability) { + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + alterSql.add( + updateColumnNullabilityDefinition( + (TableChange.UpdateColumnNullability) change, lazyLoadTable)); + } else if (change instanceof TableChange.AddIndex) { + alterSql.add(addIndexDefinition((TableChange.AddIndex) change)); + } else if (change instanceof TableChange.DeleteIndex) { + lazyLoadTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + alterSql.add(deleteIndexDefinition(lazyLoadTable, (TableChange.DeleteIndex) change)); + } else { + throw new IllegalArgumentException( + "Unsupported table change type: " + change.getClass().getName()); + } + } + if (!setProperties.isEmpty()) { + alterSql.add(generateTableProperties(setProperties)); + } + + // Last modified comment + if (null != updateComment) { + String newComment = updateComment.getNewComment(); + if (null == StringIdentifier.fromComment(newComment)) { + // Detect and add gravitino id. + JdbcTable jdbcTable = getOrCreateTable(databaseName, tableName, lazyLoadTable); + StringIdentifier identifier = StringIdentifier.fromComment(jdbcTable.comment()); + if (null != identifier) { + newComment = StringIdentifier.addToComment(identifier, newComment); + } + } + alterSql.add("MODIFY COMMENT \"" + newComment + "\""); + } + + if (!setProperties.isEmpty()) { + alterSql.add(generateTableProperties(setProperties)); + } + + if (CollectionUtils.isEmpty(alterSql)) { + return ""; + } + // Return the generated SQL statement + String result = "ALTER TABLE `" + tableName + "`\n" + String.join(",\n", alterSql) + ";"; + LOG.info("Generated alter table:{} sql: {}", databaseName + "." + tableName, result); + return result; + } + + private String updateColumnNullabilityDefinition( + TableChange.UpdateColumnNullability change, JdbcTable table) { + validateUpdateColumnNullable(change, table); + String col = change.fieldName()[0]; + JdbcColumn column = getJdbcColumnFromTable(table, col); + JdbcColumn updateColumn = + new JdbcColumn.Builder() + .withName(col) + .withDefaultValue(column.defaultValue()) + .withNullable(change.nullable()) + .withType(column.dataType()) + .withComment(column.comment()) + .withAutoIncrement(column.autoIncrement()) + .build(); + return "MODIFY COLUMN " + + BACK_QUOTE + + col + + BACK_QUOTE + + appendColumnDefinition(updateColumn, new StringBuilder()); + } + + private String generateTableProperties(List setProperties) { + return setProperties.stream() + .map( + setProperty -> + String.format("\"%s\" = \"%s\"", setProperty.getProperty(), setProperty.getValue())) + .collect(Collectors.joining(",\n")); + } + + protected JdbcTable getOrCreateTable( + String databaseName, String tableName, JdbcTable lazyLoadCreateTable) { + return null != lazyLoadCreateTable ? lazyLoadCreateTable : load(databaseName, tableName); + } + + private String updateColumnCommentFieldDefinition( + TableChange.UpdateColumnComment updateColumnComment) { + String newComment = updateColumnComment.getNewComment(); + if (updateColumnComment.fieldName().length > 1) { + throw new UnsupportedOperationException("Doris does not support nested column names."); + } + String col = updateColumnComment.fieldName()[0]; + + return String.format("MODIFY COLUMN `%s` COMMENT '%s'", col, newComment); + } + + private String addColumnFieldDefinition(TableChange.AddColumn addColumn) { + String dataType = (String) typeConverter.fromGravitinoType(addColumn.getDataType()); + if (addColumn.fieldName().length > 1) { + throw new UnsupportedOperationException("Doris does not support nested column names."); + } + String col = addColumn.fieldName()[0]; + + StringBuilder columnDefinition = new StringBuilder(); + columnDefinition + .append("ADD COLUMN ") + .append(BACK_QUOTE) + .append(col) + .append(BACK_QUOTE) + .append(SPACE) + .append(dataType) + .append(SPACE); + + if (!addColumn.isNullable()) { + columnDefinition.append("NOT NULL "); + } + // Append comment if available + if (StringUtils.isNotEmpty(addColumn.getComment())) { + columnDefinition.append("COMMENT '").append(addColumn.getComment()).append("' "); + } + + // Append position if available + if (addColumn.getPosition() instanceof TableChange.First) { + columnDefinition.append("FIRST"); + } else if (addColumn.getPosition() instanceof TableChange.After) { + TableChange.After afterPosition = (TableChange.After) addColumn.getPosition(); + columnDefinition + .append("AFTER ") + .append(BACK_QUOTE) + .append(afterPosition.getColumn()) + .append(BACK_QUOTE); + } else if (addColumn.getPosition() instanceof TableChange.Default) { + // do nothing, follow the default behavior of doris + } else { + throw new IllegalArgumentException("Invalid column position."); + } + return columnDefinition.toString(); + } + + private String updateColumnPositionFieldDefinition( + TableChange.UpdateColumnPosition updateColumnPosition, JdbcTable jdbcTable) { + if (updateColumnPosition.fieldName().length > 1) { + throw new UnsupportedOperationException("Doris does not support nested column names."); + } + String col = updateColumnPosition.fieldName()[0]; + JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col); + StringBuilder columnDefinition = new StringBuilder(); + columnDefinition.append("MODIFY COLUMN ").append(BACK_QUOTE).append(col).append(BACK_QUOTE); + appendColumnDefinition(column, columnDefinition); + if (updateColumnPosition.getPosition() instanceof TableChange.First) { + columnDefinition.append("FIRST"); + } else if (updateColumnPosition.getPosition() instanceof TableChange.After) { + TableChange.After afterPosition = (TableChange.After) updateColumnPosition.getPosition(); + columnDefinition + .append("AFTER ") + .append(BACK_QUOTE) + .append(afterPosition.getColumn()) + .append(BACK_QUOTE); + } else { + Arrays.stream(jdbcTable.columns()) + .reduce((column1, column2) -> column2) + .map(Column::name) + .ifPresent(s -> columnDefinition.append("AFTER ").append(s)); + } + return columnDefinition.toString(); + } + + private String deleteColumnFieldDefinition( + TableChange.DeleteColumn deleteColumn, JdbcTable jdbcTable) { + if (deleteColumn.fieldName().length > 1) { + throw new UnsupportedOperationException("Doris does not support nested column names."); + } + String col = deleteColumn.fieldName()[0]; + boolean colExists = true; + try { + getJdbcColumnFromTable(jdbcTable, col); + } catch (NoSuchColumnException noSuchColumnException) { + colExists = false; + } + if (!colExists) { + if (BooleanUtils.isTrue(deleteColumn.getIfExists())) { + return ""; + } else { + throw new IllegalArgumentException("Delete column does not exist: " + col); + } + } + return "DROP COLUMN " + BACK_QUOTE + col + BACK_QUOTE; + } + + private String updateColumnTypeFieldDefinition( + TableChange.UpdateColumnType updateColumnType, JdbcTable jdbcTable) { + if (updateColumnType.fieldName().length > 1) { + throw new UnsupportedOperationException("Doris does not support nested column names."); + } + String col = updateColumnType.fieldName()[0]; + JdbcColumn column = getJdbcColumnFromTable(jdbcTable, col); + StringBuilder sqlBuilder = new StringBuilder("MODIFY COLUMN " + BACK_QUOTE + col + BACK_QUOTE); + JdbcColumn newColumn = + new JdbcColumn.Builder() + .withName(col) + .withType(updateColumnType.getNewDataType()) + .withComment(column.comment()) + .withDefaultValue(DEFAULT_VALUE_NOT_SET) + .withNullable(column.nullable()) + .withAutoIncrement(column.autoIncrement()) + .build(); + return appendColumnDefinition(newColumn, sqlBuilder).toString(); + } + + private StringBuilder appendColumnDefinition(JdbcColumn column, StringBuilder sqlBuilder) { + // Add data type + sqlBuilder + .append(SPACE) + .append(typeConverter.fromGravitinoType(column.dataType())) + .append(SPACE); + + // Add NOT NULL if the column is marked as such + if (column.nullable()) { + sqlBuilder.append("NULL "); + } else { + sqlBuilder.append("NOT NULL "); + } + + // Add DEFAULT value if specified + if (!DEFAULT_VALUE_NOT_SET.equals(column.defaultValue())) { + sqlBuilder + .append("DEFAULT ") + .append(columnDefaultValueConverter.fromGravitino(column.defaultValue())) + .append(SPACE); + } + + // Add column auto_increment if specified + if (column.autoIncrement()) { + sqlBuilder.append(DORIS_AUTO_INCREMENT).append(" "); + } + + // Add column comment if specified + if (StringUtils.isNotEmpty(column.comment())) { + sqlBuilder.append("COMMENT '").append(column.comment()).append("' "); + } + return sqlBuilder; + } + + static String addIndexDefinition(TableChange.AddIndex addIndex) { + return String.format("ADD INDEX %s (%s)", addIndex.getName(), addIndex.getFieldNames()[0][0]); + } + + static String deleteIndexDefinition( + JdbcTable lazyLoadTable, TableChange.DeleteIndex deleteIndex) { + if (deleteIndex.isIfExists()) { + Preconditions.checkArgument( + Arrays.stream(lazyLoadTable.index()) + .anyMatch(index -> index.name().equals(deleteIndex.getName())), + "Index does not exist"); + } + return "DROP INDEX " + deleteIndex.getName(); + } +} diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/utils/DorisUtils.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/utils/DorisUtils.java new file mode 100644 index 00000000000..23980689b58 --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/utils/DorisUtils.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris.utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public final class DorisUtils { + private DorisUtils() {} + + // convert Map properties to SQL String + public static String generatePropertiesSql(Map properties) { + if (properties == null || properties.isEmpty()) { + return ""; + } + StringBuilder sqlBuilder = new StringBuilder(" PROPERTIES (\n"); + sqlBuilder.append( + properties.entrySet().stream() + .map(entry -> "\"" + entry.getKey() + "\"=\"" + entry.getValue() + "\"") + .collect(Collectors.joining(",\n"))); + sqlBuilder.append("\n)"); + return sqlBuilder.toString(); + } + + public static Map extractPropertiesFromSql(String createTableSql) { + Map properties = new HashMap<>(); + String[] lines = createTableSql.split("\n"); + + boolean isProperties = false; + final String sProperties = "\"(.*)\"\\s{0,}=\\s{0,}\"(.*)\",?"; + final Pattern patternProperties = Pattern.compile(sProperties); + + for (String line : lines) { + if (line.contains("PROPERTIES")) { + isProperties = true; + } + + if (isProperties) { + final Matcher matcherProperties = patternProperties.matcher(line); + if (matcherProperties.find()) { + final String key = matcherProperties.group(1).trim(); + String value = matcherProperties.group(2).trim(); + properties.put(key, value); + } + } + } + return properties; + } +} diff --git a/catalogs/catalog-jdbc-doris/src/main/resources/META-INF/services/com.datastrato.gravitino.CatalogProvider b/catalogs/catalog-jdbc-doris/src/main/resources/META-INF/services/com.datastrato.gravitino.CatalogProvider new file mode 100644 index 00000000000..94219d8a5d2 --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/main/resources/META-INF/services/com.datastrato.gravitino.CatalogProvider @@ -0,0 +1,5 @@ +# +# Copyright 2024 Datastrato Pvt Ltd. +# This software is licensed under the Apache License version 2. +# +com.datastrato.gravitino.catalog.doris.DorisCatalog \ No newline at end of file diff --git a/catalogs/catalog-jdbc-doris/src/main/resources/jdbc-doris.conf b/catalogs/catalog-jdbc-doris/src/main/resources/jdbc-doris.conf new file mode 100644 index 00000000000..a2fefc4c785 --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/main/resources/jdbc-doris.conf @@ -0,0 +1,9 @@ +# +# Copyright 2024 Datastrato Pvt Ltd. +# This software is licensed under the Apache License version 2. +# +# jdbc-url: jdbc:mysql://localhost:3306/ +# jdbc-user: strato +# jdbc-password: strato +# jdbc-driver: com.mysql.jdbc.Driver +# jdbc-driver: com.mysql.cj.jdbc.Driver \ No newline at end of file diff --git a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/converter/TestDorisExceptionConverter.java b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/converter/TestDorisExceptionConverter.java new file mode 100644 index 00000000000..5f88a6f2f26 --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/converter/TestDorisExceptionConverter.java @@ -0,0 +1,19 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris.converter; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestDorisExceptionConverter { + @Test + public void testGetErrorCodeFromMessage() { + String msg = + "errCode = 2, detailMessage = Can't create database 'default_cluster:test_schema'; database exists"; + Assertions.assertEquals( + DorisExceptionConverter.CODE_DATABASE_EXISTS, + DorisExceptionConverter.getErrorCodeFromMessage(msg)); + } +} diff --git a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/CatalogDorisIT.java b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/CatalogDorisIT.java new file mode 100644 index 00000000000..39340347296 --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/CatalogDorisIT.java @@ -0,0 +1,406 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris.integration.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig; +import com.datastrato.gravitino.client.GravitinoMetaLake; +import com.datastrato.gravitino.dto.rel.ColumnDTO; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import com.datastrato.gravitino.integration.test.container.ContainerSuite; +import com.datastrato.gravitino.integration.test.container.DorisContainer; +import com.datastrato.gravitino.integration.test.util.AbstractIT; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.integration.test.util.ITUtils; +import com.datastrato.gravitino.integration.test.util.JdbcDriverDownloader; +import com.datastrato.gravitino.rel.Schema; +import com.datastrato.gravitino.rel.SupportsSchemas; +import com.datastrato.gravitino.rel.Table; +import com.datastrato.gravitino.rel.TableCatalog; +import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.expressions.NamedReference; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.transforms.Transforms; +import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.indexes.Indexes; +import com.datastrato.gravitino.rel.types.Types; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Tag("gravitino-docker-it") +@TestInstance(Lifecycle.PER_CLASS) +public class CatalogDorisIT extends AbstractIT { + public static final Logger LOG = LoggerFactory.getLogger(CatalogDorisIT.class); + + private static final String provider = "jdbc-doris"; + private static final String DOWNLOAD_JDBC_DRIVER_URL = + "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar"; + + private static final String DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver"; + + public String metalakeName = GravitinoITUtils.genRandomName("doris_it_metalake"); + public String catalogName = GravitinoITUtils.genRandomName("doris_it_catalog"); + public String schemaName = GravitinoITUtils.genRandomName("doris_it_schema"); + public String tableName = GravitinoITUtils.genRandomName("doris_it_table"); + + public String table_comment = "table_comment"; + + // Doris doesn't support schema comment + public String schema_comment = null; + public String DORIS_COL_NAME1 = "doris_col_name1"; + public String DORIS_COL_NAME2 = "doris_col_name2"; + public String DORIS_COL_NAME3 = "doris_col_name3"; + + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + private GravitinoMetaLake metalake; + + protected Catalog catalog; + + @BeforeAll + public void startup() throws IOException { + + if (!ITUtils.EMBEDDED_TEST_MODE.equals(AbstractIT.testMode)) { + String gravitinoHome = System.getenv("GRAVITINO_HOME"); + Path tmpPath = Paths.get(gravitinoHome, "/catalogs/jdbc-doris/libs"); + JdbcDriverDownloader.downloadJdbcDriver(DOWNLOAD_JDBC_DRIVER_URL, tmpPath.toString()); + } + + containerSuite.startDorisContainer(); + + createMetalake(); + createCatalog(); + createSchema(); + } + + @AfterAll + public void stop() { + clearTableAndSchema(); + AbstractIT.client.dropMetalake(NameIdentifier.of(metalakeName)); + } + + @AfterEach + private void resetSchema() { + clearTableAndSchema(); + createSchema(); + } + + private static void waitForDorisOperation() { + // TODO: use a better way to wait for the operation to complete + // see: https://doris.apache.org/docs/1.2/advanced/alter-table/schema-change/ + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // do nothing + } + } + + private void clearTableAndSchema() { + NameIdentifier[] nameIdentifiers = + catalog.asTableCatalog().listTables(Namespace.of(metalakeName, catalogName, schemaName)); + for (NameIdentifier nameIdentifier : nameIdentifiers) { + catalog.asTableCatalog().dropTable(nameIdentifier); + } + catalog.asSchemas().dropSchema(NameIdentifier.of(metalakeName, catalogName, schemaName), true); + } + + private void createMetalake() { + GravitinoMetaLake[] gravitinoMetaLakes = AbstractIT.client.listMetalakes(); + Assertions.assertEquals(0, gravitinoMetaLakes.length); + + GravitinoMetaLake createdMetalake = + AbstractIT.client.createMetalake( + NameIdentifier.of(metalakeName), "comment", Collections.emptyMap()); + GravitinoMetaLake loadMetalake = + AbstractIT.client.loadMetalake(NameIdentifier.of(metalakeName)); + Assertions.assertEquals(createdMetalake, loadMetalake); + + metalake = loadMetalake; + } + + private void createCatalog() { + Map catalogProperties = Maps.newHashMap(); + + DorisContainer dorisContainer = containerSuite.getDorisContainer(); + + String jdbcUrl = + String.format( + "jdbc:mysql://%s:%d/", + dorisContainer.getContainerIpAddress(), DorisContainer.FE_MYSQL_PORT); + + catalogProperties.put(JdbcConfig.JDBC_URL.getKey(), jdbcUrl); + catalogProperties.put(JdbcConfig.JDBC_DRIVER.getKey(), DRIVER_CLASS_NAME); + catalogProperties.put(JdbcConfig.USERNAME.getKey(), DorisContainer.USER_NAME); + catalogProperties.put(JdbcConfig.PASSWORD.getKey(), DorisContainer.PASSWORD); + + Catalog createdCatalog = + metalake.createCatalog( + NameIdentifier.of(metalakeName, catalogName), + Catalog.Type.RELATIONAL, + provider, + "doris catalog comment", + catalogProperties); + Catalog loadCatalog = metalake.loadCatalog(NameIdentifier.of(metalakeName, catalogName)); + Assertions.assertEquals(createdCatalog, loadCatalog); + + catalog = loadCatalog; + } + + private void createSchema() { + NameIdentifier ident = NameIdentifier.of(metalakeName, catalogName, schemaName); + String propKey = "key"; + String propValue = "value"; + Map prop = Maps.newHashMap(); + prop.put(propKey, propValue); + + Schema createdSchema = catalog.asSchemas().createSchema(ident, schema_comment, prop); + Schema loadSchema = catalog.asSchemas().loadSchema(ident); + Assertions.assertEquals(createdSchema.name(), loadSchema.name()); + + Assertions.assertEquals(createdSchema.properties().get(propKey), propValue); + } + + @Test + void testDropDorisSchema() { + String schemaName = GravitinoITUtils.genRandomName("doris_it_schema_dropped").toLowerCase(); + + catalog + .asSchemas() + .createSchema( + NameIdentifier.of(metalakeName, catalogName, schemaName), + null, + ImmutableMap.builder().build()); + + catalog + .asTableCatalog() + .createTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName), + createColumns(), + "Created by gravitino client", + createProperties(), + Transforms.EMPTY_TRANSFORM, + createDistribution(), + null); + + // Try to drop a database, and cascade equals to false, it should not be allowed. + catalog.asSchemas().dropSchema(NameIdentifier.of(metalakeName, catalogName, schemaName), false); + // Check the database still exists + catalog.asSchemas().loadSchema(NameIdentifier.of(metalakeName, catalogName, schemaName)); + + // Try to drop a database, and cascade equals to true, it should be allowed. + catalog.asSchemas().dropSchema(NameIdentifier.of(metalakeName, catalogName, schemaName), true); + // Check database has been dropped + SupportsSchemas schemas = catalog.asSchemas(); + NameIdentifier of = NameIdentifier.of(metalakeName, catalogName, schemaName); + Assertions.assertThrows( + NoSuchSchemaException.class, + () -> { + schemas.loadSchema(of); + }); + } + + private ColumnDTO[] createColumns() { + ColumnDTO col1 = + new ColumnDTO.Builder() + .withName(DORIS_COL_NAME1) + .withDataType(Types.IntegerType.get()) + .withComment("col_1_comment") + .build(); + ColumnDTO col2 = + new ColumnDTO.Builder() + .withName(DORIS_COL_NAME2) + .withDataType(Types.DateType.get()) + .withComment("col_2_comment") + .build(); + ColumnDTO col3 = + new ColumnDTO.Builder() + .withName(DORIS_COL_NAME3) + .withDataType(Types.VarCharType.of(10)) + .withComment("col_3_comment") + .build(); + return new ColumnDTO[] {col1, col2, col3}; + } + + private Map createProperties() { + Map properties = Maps.newHashMap(); + properties.put("replication_allocation", "tag.location.default: 1"); + return properties; + } + + private Distribution createDistribution() { + return Distributions.hash(32, NamedReference.field(DORIS_COL_NAME1)); + } + + @Test + void testCreateAndLoadDorisSchema() { + SupportsSchemas schemas = catalog.asSchemas(); + Namespace namespace = Namespace.of(metalakeName, catalogName); + + // test list schemas + NameIdentifier[] nameIdentifiers = schemas.listSchemas(namespace); + Set schemaNames = + Arrays.stream(nameIdentifiers).map(NameIdentifier::name).collect(Collectors.toSet()); + Assertions.assertTrue(schemaNames.contains(schemaName)); + + // test create schema already exists + String testSchemaName = GravitinoITUtils.genRandomName("test_schema_1"); + NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName); + schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap()); + nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName)); + Map schemaMap = + Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v)); + Assertions.assertTrue(schemaMap.containsKey(testSchemaName)); + + Assertions.assertThrows( + SchemaAlreadyExistsException.class, + () -> { + schemas.createSchema(schemaIdent, schema_comment, Collections.emptyMap()); + }); + + // test drop schema + Assertions.assertTrue(schemas.dropSchema(schemaIdent, false)); + + // check schema is deleted + // 1. check by load schema + Assertions.assertThrows(NoSuchSchemaException.class, () -> schemas.loadSchema(schemaIdent)); + + // 2. check by list schema + nameIdentifiers = schemas.listSchemas(Namespace.of(metalakeName, catalogName)); + schemaMap = + Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v)); + Assertions.assertFalse(schemaMap.containsKey(testSchemaName)); + + // test drop schema not exists + NameIdentifier notExistsSchemaIdent = NameIdentifier.of(metalakeName, catalogName, "no-exits"); + Assertions.assertFalse(schemas.dropSchema(notExistsSchemaIdent, false)); + } + + @Test + void testCreateAndLoadDorisTable() { + // create a table + NameIdentifier tableIdentifier = + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName); + ColumnDTO[] columns = createColumns(); + + Distribution distribution = createDistribution(); + + Index[] indexes = + new Index[] { + Indexes.of(Index.IndexType.PRIMARY_KEY, "k1_index", new String[][] {{DORIS_COL_NAME1}}) + }; + + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + distribution, + null, + indexes); + Assertions.assertEquals(createdTable.name(), tableName); + Map resultProp = createdTable.properties(); + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertTrue(resultProp.containsKey(entry.getKey())); + Assertions.assertEquals(entry.getValue(), resultProp.get(entry.getKey())); + } + Assertions.assertEquals(createdTable.columns().length, columns.length); + + for (int i = 0; i < columns.length; i++) { + AbstractIT.assertColumn(columns[i], createdTable.columns()[i]); + } + + // test load table + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(table_comment, loadTable.comment()); + resultProp = loadTable.properties(); + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertTrue(resultProp.containsKey(entry.getKey())); + Assertions.assertEquals(entry.getValue(), resultProp.get(entry.getKey())); + } + Assertions.assertEquals(loadTable.columns().length, columns.length); + for (int i = 0; i < columns.length; i++) { + AbstractIT.assertColumn(columns[i], loadTable.columns()[i]); + } + } + + @Test + void testDorisIndex() { + String tableName = GravitinoITUtils.genRandomName("test_add_index"); + + NameIdentifier tableIdentifier = + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName); + ColumnDTO[] columns = createColumns(); + + Distribution distribution = createDistribution(); + + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table createdTable = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + Transforms.EMPTY_TRANSFORM, + distribution, + null); + Assertions.assertEquals(createdTable.name(), tableName); + + // add index test. + tableCatalog.alterTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName), + TableChange.addIndex( + Index.IndexType.PRIMARY_KEY, "k1_index", new String[][] {{DORIS_COL_NAME1}})); + + waitForDorisOperation(); + Table table = + tableCatalog.loadTable(NameIdentifier.of(metalakeName, catalogName, schemaName, tableName)); + Index[] indexes = table.index(); + assertEquals(1, indexes.length); + + // delete index and add new column and index. + tableCatalog.alterTable( + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName), + TableChange.deleteIndex("k1_index", true), + TableChange.addIndex( + Index.IndexType.PRIMARY_KEY, "k2_index", new String[][] {{DORIS_COL_NAME2}})); + + waitForDorisOperation(); + + table = + tableCatalog.loadTable(NameIdentifier.of(metalakeName, catalogName, schemaName, tableName)); + indexes = table.index(); + assertEquals(1, indexes.length); + assertEquals("k2_index", indexes[0].name()); + } +} diff --git a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java new file mode 100644 index 00000000000..eb35504bb3d --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java @@ -0,0 +1,290 @@ +/* + * Copyright 2023 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris.integration.test; + +import com.datastrato.gravitino.catalog.doris.converter.DorisColumnDefaultValueConverter; +import com.datastrato.gravitino.catalog.doris.converter.DorisExceptionConverter; +import com.datastrato.gravitino.catalog.doris.converter.DorisTypeConverter; +import com.datastrato.gravitino.catalog.doris.operation.DorisDatabaseOperations; +import com.datastrato.gravitino.catalog.doris.operation.DorisTableOperations; +import com.datastrato.gravitino.catalog.jdbc.JdbcColumn; +import com.datastrato.gravitino.catalog.jdbc.JdbcTable; +import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import com.datastrato.gravitino.catalog.jdbc.integration.test.TestJdbcAbstractIT; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations; +import com.datastrato.gravitino.catalog.jdbc.utils.DataSourceUtils; +import com.datastrato.gravitino.integration.test.container.ContainerSuite; +import com.datastrato.gravitino.integration.test.container.DorisContainer; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.expressions.NamedReference; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.rel.types.Type; +import com.datastrato.gravitino.rel.types.Types; +import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.sql.DataSource; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Tag("gravitino-docker-it") +public class DorisTableOperationsIT extends TestJdbcAbstractIT { + public static final Logger LOG = LoggerFactory.getLogger(DorisTableOperationsIT.class); + + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + + protected static final String DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver"; + protected static DataSource dataSource; + + protected static JdbcDatabaseOperations databaseOperations; + + protected static JdbcTableOperations tableOperations; + + protected static JdbcExceptionConverter jdbcExceptionConverter; + + private static final Type VARCHAR_255 = Types.VarCharType.of(255); + private static final Type VARCHAR_1024 = Types.VarCharType.of(1024); + + private static final String databaseName = GravitinoITUtils.genRandomName("doris_test_db"); + private static final String tableName = GravitinoITUtils.genRandomName("doris_test_table"); + + private static Type INT = Types.IntegerType.get(); + + @BeforeAll + public static void startup() { + containerSuite.startDorisContainer(); + + dataSource = DataSourceUtils.createDataSource(getCatalogProperties()); + + databaseOperations = new DorisDatabaseOperations(); + tableOperations = new DorisTableOperations(); + jdbcExceptionConverter = new DorisExceptionConverter(); + databaseOperations.initialize(dataSource, jdbcExceptionConverter, Collections.emptyMap()); + tableOperations.initialize( + dataSource, + jdbcExceptionConverter, + new DorisTypeConverter(), + new DorisColumnDefaultValueConverter(), + Collections.emptyMap()); + + createDatabase(); + } + + private static void createDatabase() { + databaseOperations.create(databaseName, "test_comment", null); + } + + private static Map getCatalogProperties() { + Map catalogProperties = Maps.newHashMap(); + + DorisContainer dorisContainer = containerSuite.getDorisContainer(); + + String jdbcUrl = + String.format( + "jdbc:mysql://%s:%d/", + dorisContainer.getContainerIpAddress(), DorisContainer.FE_MYSQL_PORT); + + catalogProperties.put(JdbcConfig.JDBC_URL.getKey(), jdbcUrl); + catalogProperties.put(JdbcConfig.JDBC_DRIVER.getKey(), DRIVER_CLASS_NAME); + catalogProperties.put(JdbcConfig.USERNAME.getKey(), DorisContainer.USER_NAME); + catalogProperties.put(JdbcConfig.PASSWORD.getKey(), DorisContainer.PASSWORD); + + return catalogProperties; + } + + private static Map createProperties() { + Map properties = Maps.newHashMap(); + properties.put("replication_allocation", "tag.location.default: 1"); + return properties; + } + + private static void waitForDorisOperation() { + // TODO: use a better way to wait for the operation to complete + // see: https://doris.apache.org/docs/1.2/advanced/alter-table/schema-change/ + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // do nothing + } + } + + @Test + public void testAlterTable() { + + String tableComment = "test_comment"; + List columns = new ArrayList<>(); + JdbcColumn col_1 = + new JdbcColumn.Builder().withName("col_1").withType(INT).withComment("id").build(); + columns.add(col_1); + JdbcColumn col_2 = + new JdbcColumn.Builder() + .withName("col_2") + .withType(VARCHAR_255) + .withComment("col_2") + .build(); + columns.add(col_2); + JdbcColumn col_3 = + new JdbcColumn.Builder() + .withName("col_3") + .withType(VARCHAR_255) + .withComment("col_3") + .build(); + columns.add(col_3); + Map properties = new HashMap<>(); + + Distribution distribution = Distributions.hash(32, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + + // create table + tableOperations.create( + databaseName, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + createProperties(), + null, + distribution, + indexes); + JdbcTable load = tableOperations.load(databaseName, tableName); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + + tableOperations.alterTable( + databaseName, + tableName, + TableChange.updateColumnType(new String[] {col_3.name()}, VARCHAR_1024)); + + waitForDorisOperation(); + + load = tableOperations.load(databaseName, tableName); + + // After modifying the type, check it + columns.clear(); + col_3 = + new JdbcColumn.Builder() + .withName(col_3.name()) + .withType(VARCHAR_1024) + .withComment(col_3.comment()) + .build(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_3); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + + String colNewComment = "new_comment"; + // update column comment + + tableOperations.alterTable( + databaseName, + tableName, + TableChange.updateColumnComment(new String[] {col_2.name()}, colNewComment)); + load = tableOperations.load(databaseName, tableName); + + columns.clear(); + col_2 = + new JdbcColumn.Builder() + .withName(col_2.name()) + .withType(col_2.dataType()) + .withComment(colNewComment) + .build(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_3); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + + // add new column + tableOperations.alterTable( + databaseName, + tableName, + TableChange.addColumn(new String[] {"col_4"}, VARCHAR_255, "txt4", true)); + + waitForDorisOperation(); + load = tableOperations.load(databaseName, tableName); + + columns.clear(); + JdbcColumn col_4 = + new JdbcColumn.Builder() + .withName("col_4") + .withType(VARCHAR_255) + .withComment("txt4") + .build(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_3); + columns.add(col_4); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + + // change column position + tableOperations.alterTable( + databaseName, + tableName, + TableChange.updateColumnPosition( + new String[] {"col_3"}, TableChange.ColumnPosition.after("col_4"))); + waitForDorisOperation(); + load = tableOperations.load(databaseName, tableName); + + columns.clear(); + columns.add(col_1); + columns.add(col_2); + columns.add(col_4); + columns.add(col_3); + assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); + } + + @Test + public void testCreateAllTypeTable() { + String tableName = GravitinoITUtils.genRandomName("type_table"); + String tableComment = "test_comment"; + List columns = new ArrayList<>(); + columns.add( + new JdbcColumn.Builder().withName("col_1").withType(Types.IntegerType.get()).build()); + columns.add( + new JdbcColumn.Builder().withName("col_2").withType(Types.BooleanType.get()).build()); + columns.add(new JdbcColumn.Builder().withName("col_3").withType(Types.ByteType.get()).build()); + columns.add(new JdbcColumn.Builder().withName("col_4").withType(Types.ShortType.get()).build()); + columns.add( + new JdbcColumn.Builder().withName("col_5").withType(Types.IntegerType.get()).build()); + columns.add(new JdbcColumn.Builder().withName("col_6").withType(Types.LongType.get()).build()); + columns.add(new JdbcColumn.Builder().withName("col_7").withType(Types.FloatType.get()).build()); + columns.add( + new JdbcColumn.Builder().withName("col_8").withType(Types.DoubleType.get()).build()); + columns.add( + new JdbcColumn.Builder().withName("col_9").withType(Types.DecimalType.of(10, 2)).build()); + columns.add(new JdbcColumn.Builder().withName("col_10").withType(Types.DateType.get()).build()); + columns.add(new JdbcColumn.Builder().withName("col_11").withType(Types.TimeType.get()).build()); + columns.add( + new JdbcColumn.Builder().withName("col_12").withType(Types.FixedCharType.of(10)).build()); + columns.add( + new JdbcColumn.Builder().withName("col_13").withType(Types.VarCharType.of(10)).build()); + columns.add( + new JdbcColumn.Builder().withName("col_14").withType(Types.StringType.get()).build()); + + Distribution distribution = Distributions.hash(32, NamedReference.field("col_1")); + Index[] indexes = new Index[] {}; + // create table + tableOperations.create( + databaseName, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + createProperties(), + null, + distribution, + indexes); + + JdbcTable load = tableOperations.load(databaseName, tableName); + assertionsTableInfo(tableName, tableComment, columns, Collections.emptyMap(), null, load); + } +} diff --git a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/utils/TestDorisUtils.java b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/utils/TestDorisUtils.java new file mode 100644 index 00000000000..200c1bec3a8 --- /dev/null +++ b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/utils/TestDorisUtils.java @@ -0,0 +1,63 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.doris.utils; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestDorisUtils { + @Test + public void generatePropertiesSql() { + // Test When properties is null + Map properties = null; + String result = DorisUtils.generatePropertiesSql(properties); + Assertions.assertEquals("", result); + + // Test When properties is empty + properties = Collections.emptyMap(); + result = DorisUtils.generatePropertiesSql(properties); + Assertions.assertEquals("", result); + + // Test When properties has single entry + properties = Collections.singletonMap("key", "value"); + result = DorisUtils.generatePropertiesSql(properties); + Assertions.assertEquals(" PROPERTIES (\n\"key\"=\"value\"\n)", result); + + // Test when properties has multiple entries + properties = new HashMap<>(); + properties.put("key1", "value1"); + properties.put("key2", "value2"); + + String expectedStr = " PROPERTIES (\n\"key1\"=\"value1\",\n\"key2\"=\"value2\"\n)"; + + result = DorisUtils.generatePropertiesSql(properties); + Assertions.assertEquals(expectedStr, result); + } + + @Test + public void testExtractTablePropertiesFromSql() { + // Test When properties is null + String createTableSql = + "CREATE TABLE `testTable` (\n`testColumn` STRING NOT NULL COMMENT 'test comment'\n) ENGINE=OLAP\nCOMMENT \"test comment\""; + Map result = DorisUtils.extractPropertiesFromSql(createTableSql); + Assertions.assertTrue(result.isEmpty()); + + // Test When properties exist + createTableSql = + "CREATE TABLE `testTable` (\n`testColumn` STRING NOT NULL COMMENT 'test comment'\n) ENGINE=OLAP\nCOMMENT \"test comment\"\nPROPERTIES (\n\"test_property\"=\"test_value\"\n)"; + result = DorisUtils.extractPropertiesFromSql(createTableSql); + Assertions.assertEquals("test_value", result.get("test_property")); + + // Test When multiple properties exist + createTableSql = + "CREATE TABLE `testTable` (\n`testColumn` STRING NOT NULL COMMENT 'test comment'\n) ENGINE=OLAP\nCOMMENT \"test comment\"\nPROPERTIES (\n\"test_property1\"=\"test_value1\",\n\"test_property2\"=\"test_value2\"\n)"; + result = DorisUtils.extractPropertiesFromSql(createTableSql); + Assertions.assertEquals("test_value1", result.get("test_property1")); + Assertions.assertEquals("test_value2", result.get("test_property2")); + } +} diff --git a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java index e103828f291..2fdda0ca251 100644 --- a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java +++ b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java @@ -17,6 +17,7 @@ import com.datastrato.gravitino.exceptions.NoSuchTableException; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; import com.datastrato.gravitino.rel.indexes.Indexes; @@ -78,6 +79,7 @@ protected String generateCreateTableSql( String comment, Map properties, Transform[] partitioning, + Distribution distribution, Index[] indexes) { if (ArrayUtils.isNotEmpty(partitioning)) { throw new UnsupportedOperationException("Currently we do not support Partitioning in mysql"); @@ -247,7 +249,8 @@ protected Map getTableProperties(Connection connection, String t @Override protected void correctJdbcTableFields( - Connection connection, String tableName, JdbcTable.Builder tableBuilder) throws SQLException { + Connection connection, String databaseName, String tableName, JdbcTable.Builder tableBuilder) + throws SQLException { if (StringUtils.isEmpty(tableBuilder.comment())) { // In Mysql version 5.7, the comment field value cannot be obtained in the driver API. LOG.warn("Not found comment in mysql driver api. Will try to get comment from sql"); diff --git a/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/MysqlTableOperationsIT.java b/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/MysqlTableOperationsIT.java index 676de26339b..2b2fe25aa55 100644 --- a/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/MysqlTableOperationsIT.java +++ b/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/MysqlTableOperationsIT.java @@ -86,6 +86,7 @@ public void testOperationTable() { tableComment, properties, null, + null, indexes); // list table @@ -218,6 +219,7 @@ public void testAlterTable() { tableComment, properties, null, + null, indexes); JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); assertionsTableInfo(tableName, tableComment, columns, properties, indexes, load); @@ -460,6 +462,7 @@ public void testCreateAndLoadTable() { tableComment, properties, null, + null, indexes); JdbcTable loaded = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); @@ -558,6 +561,7 @@ public void testCreateAllTypeTable() { tableComment, Collections.emptyMap(), null, + null, Indexes.EMPTY_INDEXES); JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); @@ -601,6 +605,7 @@ public void testCreateNotSupportTypeTable() { tableComment, emptyMap, null, + null, Indexes.EMPTY_INDEXES); }); Assertions.assertTrue( @@ -630,6 +635,7 @@ public void testCreateMultipleTables() { "test_comment", null, null, + null, Indexes.EMPTY_INDEXES); String testDb = "test_db_2"; @@ -654,6 +660,7 @@ public void testCreateMultipleTables() { "test_comment", null, null, + null, Indexes.EMPTY_INDEXES); tables = TABLE_OPERATIONS.listTables(TEST_DB_NAME); @@ -677,6 +684,7 @@ public void testLoadTableDefaultProperties() { "test_comment", null, null, + null, Indexes.EMPTY_INDEXES); JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME, test_table_1); Assertions.assertEquals("InnoDB", load.properties().get(MYSQL_ENGINE_KEY)); @@ -719,7 +727,8 @@ public void testAutoIncrement() { Indexes.createMysqlPrimaryKey(new String[][] {{"col_2"}}), Indexes.unique("uk_1", new String[][] {{"col_1"}}) }; - TABLE_OPERATIONS.create(TEST_DB_NAME, tableName, columns, comment, properties, null, indexes); + TABLE_OPERATIONS.create( + TEST_DB_NAME, tableName, columns, comment, properties, null, null, indexes); JdbcTable table = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); assertionsTableInfo( @@ -737,7 +746,8 @@ public void testAutoIncrement() { Indexes.createMysqlPrimaryKey(new String[][] {{"col_1"}}), Indexes.unique("uk_2", new String[][] {{"col_2"}}) }; - TABLE_OPERATIONS.create(TEST_DB_NAME, tableName, columns, comment, properties, null, indexes); + TABLE_OPERATIONS.create( + TEST_DB_NAME, tableName, columns, comment, properties, null, null, indexes); table = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); assertionsTableInfo( @@ -751,7 +761,8 @@ public void testAutoIncrement() { // Test create increment key for col_1 + col_3 uk. indexes = new Index[] {Indexes.unique("uk_2_3", new String[][] {{"col_1"}, {"col_3"}})}; - TABLE_OPERATIONS.create(TEST_DB_NAME, tableName, columns, comment, properties, null, indexes); + TABLE_OPERATIONS.create( + TEST_DB_NAME, tableName, columns, comment, properties, null, null, indexes); table = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); assertionsTableInfo( @@ -775,6 +786,7 @@ public void testAutoIncrement() { comment, properties, null, + null, Indexes.EMPTY_INDEXES)); Assertions.assertTrue( StringUtils.contains( @@ -802,7 +814,14 @@ public void testAutoIncrement() { IllegalArgumentException.class, () -> TABLE_OPERATIONS.create( - TEST_DB_NAME, tableName, newColumns, comment, properties, null, primaryIndex)); + TEST_DB_NAME, + tableName, + newColumns, + comment, + properties, + null, + null, + primaryIndex)); Assertions.assertTrue( StringUtils.contains( exception.getMessage(), diff --git a/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java b/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java index 1845cdfc1ca..a92603615a3 100644 --- a/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java +++ b/catalogs/catalog-jdbc-postgresql/src/main/java/com/datastrato/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java @@ -16,6 +16,7 @@ import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations; import com.datastrato.gravitino.exceptions.NoSuchColumnException; import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; import com.datastrato.gravitino.rel.types.Types; @@ -74,6 +75,7 @@ protected String generateCreateTableSql( String comment, Map properties, Transform[] partitioning, + Distribution distribution, Index[] indexes) { if (ArrayUtils.isNotEmpty(partitioning)) { throw new UnsupportedOperationException( diff --git a/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/PostgreSqlTableOperationsIT.java b/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/PostgreSqlTableOperationsIT.java index 0d01ce930d2..921423f477f 100644 --- a/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/PostgreSqlTableOperationsIT.java +++ b/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/PostgreSqlTableOperationsIT.java @@ -80,6 +80,7 @@ public void testOperationTable() { tableComment, properties, null, + null, Indexes.EMPTY_INDEXES); // list table @@ -306,6 +307,7 @@ public void testCreateAllTypeTable() { tableComment, Collections.emptyMap(), null, + null, Indexes.EMPTY_INDEXES); JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); @@ -360,6 +362,7 @@ public void testCreateMultipleTable() throws SQLException { null, null, null, + null, Indexes.EMPTY_INDEXES); List tableNames = TABLE_OPERATIONS.listTables(TEST_DB_NAME); @@ -388,6 +391,7 @@ public void testCreateMultipleTable() throws SQLException { null, null, null, + null, Indexes.EMPTY_INDEXES); tableNames = postgreSqlTableOperations.listTables(TEST_DB_NAME); Assertions.assertFalse(tableNames.contains(table_2)); @@ -439,6 +443,7 @@ public void testCreateAutoIncrementTable() { tableComment, properties, null, + null, Indexes.EMPTY_INDEXES); // list table @@ -480,6 +485,7 @@ public void testCreateAutoIncrementTable() { tableComment, properties, null, + null, Indexes.EMPTY_INDEXES); }); @@ -538,6 +544,7 @@ public void testCreateIndexTable() { tableComment, properties, null, + null, indexes); JdbcTable load = TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); @@ -563,6 +570,7 @@ public void testCreateIndexTable() { tableComment, properties, null, + null, primaryIndex); }); Assertions.assertTrue( diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/BaseContainer.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/BaseContainer.java index 331efae498e..b0fb3973c9e 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/BaseContainer.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/BaseContainer.java @@ -97,6 +97,10 @@ protected void withLogConsumer(Consumer logConsumer) { container.withLogConsumer(logConsumer); } + protected void withStartupTimeout(Duration duration) { + container.withStartupTimeout(duration); + } + // This method is used to get the expose port number of the container. public Integer getMappedPort(int exposedPort) { return container.getMappedPort(exposedPort); diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java index c02095dfb5b..e2344ab0928 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/ContainerSuite.java @@ -36,6 +36,8 @@ public class ContainerSuite implements Closeable { private static TrinoContainer trinoContainer; private static TrinoITContainers trinoITContainers; + private static DorisContainer dorisContainer; + protected static final CloseableGroup closer = CloseableGroup.create(); private ContainerSuite() { @@ -123,6 +125,17 @@ public void startTrinoContainer( trinoContainer.start(); } + public void startDorisContainer() { + if (dorisContainer != null) { + return; + } + // Start Doris container + DorisContainer.Builder dorisBuilder = + DorisContainer.builder().withHostName("gravitino-ci-doris").withNetwork(network); + dorisContainer = closer.register(dorisBuilder.build()); + dorisContainer.start(); + } + public TrinoContainer getTrinoContainer() { return trinoContainer; } @@ -139,6 +152,10 @@ public HiveContainer getHiveContainer() { return hiveContainer; } + public DorisContainer getDorisContainer() { + return dorisContainer; + } + // Let containers assign addresses in a fixed subnet to avoid `mac-docker-connector` needing to // refresh the configuration private static Network createDockerNetwork() { diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/DorisContainer.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/DorisContainer.java new file mode 100644 index 00000000000..7ded969c2cb --- /dev/null +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/container/DorisContainer.java @@ -0,0 +1,130 @@ +/* + * Copyright 2023 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.container; + +import static java.lang.String.format; + +import com.google.common.collect.ImmutableSet; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.rnorth.ducttape.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; + +public class DorisContainer extends BaseContainer { + public static final Logger LOG = LoggerFactory.getLogger(DorisContainer.class); + + public static final String DEFAULT_IMAGE = System.getenv("GRAVITINO_CI_DORIS_DOCKER_IMAGE"); + public static final String HOST_NAME = "gravitino-ci-doris"; + public static final String USER_NAME = "root"; + public static final String PASSWORD = "root"; + public static final int FE_HTTP_PORT = 8030; + public static final int FE_MYSQL_PORT = 9030; + + public static Builder builder() { + return new Builder(); + } + + protected DorisContainer( + String image, + String hostName, + Set ports, + Map extraHosts, + Map filesToMount, + Map envVars, + Optional network) { + super(image, hostName, ports, extraHosts, filesToMount, envVars, network); + } + + @Override + protected void setupContainer() { + super.setupContainer(); + withLogConsumer(new PrintingContainerLog(format("%-14s| ", "DorisContainer"))); + withStartupTimeout(Duration.ofMinutes(10)); + } + + @Override + public void start() { + super.start(); + Preconditions.check("Doris container startup failed!", checkContainerStatus(5)); + changePassword(); + } + + @Override + protected boolean checkContainerStatus(int retryLimit) { + int nRetry = 0; + boolean isDorisContainerReady = false; + + String dorisMySQLUrl = format("jdbc:mysql://%s:%d/", getContainerIpAddress(), FE_MYSQL_PORT); + LOG.info("Doris url is " + dorisMySQLUrl); + + while (nRetry++ < retryLimit) { + try { + Connection connection = DriverManager.getConnection(dorisMySQLUrl, USER_NAME, ""); + + // execute `SHOW PROC '/backends';` to check if backends is ready + String query = "SHOW PROC '/backends';"; + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(query); + while (resultSet.next()) { + String alive = resultSet.getString("Alive"); + if (alive.equalsIgnoreCase("true")) { + LOG.info("Doris container startup success!"); + isDorisContainerReady = true; + break; + } + } + + if (!isDorisContainerReady) { + LOG.info("Doris container is not ready yet!"); + Thread.sleep(5000); + } else { + break; + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + return isDorisContainerReady; + } + + private void changePassword() { + // change password for root user, Gravitino API must set password in catalog properties + try { + String dorisMySQLUrl = format("jdbc:mysql://%s:%d/", getContainerIpAddress(), FE_MYSQL_PORT); + Connection connection = DriverManager.getConnection(dorisMySQLUrl, USER_NAME, ""); + + String query = String.format("SET PASSWORD FOR '%s' = PASSWORD('%s');", USER_NAME, PASSWORD); + Statement statement = connection.createStatement(); + statement.execute(query); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + LOG.info("Doris container password has been changed"); + } + + public static class Builder + extends BaseContainer.Builder { + private Builder() { + this.image = DEFAULT_IMAGE; + this.hostName = HOST_NAME; + this.exposePorts = ImmutableSet.of(FE_HTTP_PORT, FE_MYSQL_PORT); + } + + @Override + public DorisContainer build() { + return new DorisContainer( + image, hostName, exposePorts, extraHosts, filesToMount, envVars, network); + } + } +} diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index 0aacb8399ab..39ebc342076 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -140,6 +140,7 @@ tasks.test { } else { dependsOn(":trino-connector:jar") dependsOn(":catalogs:catalog-lakehouse-iceberg:jar", ":catalogs:catalog-lakehouse-iceberg:runtimeJars") + dependsOn(":catalogs:catalog-jdbc-doris:jar", ":catalogs:catalog-jdbc-doris:runtimeJars") dependsOn(":catalogs:catalog-jdbc-mysql:jar", ":catalogs:catalog-jdbc-mysql:runtimeJars") dependsOn(":catalogs:catalog-jdbc-postgresql:jar", ":catalogs:catalog-jdbc-postgresql:runtimeJars") dependsOn(":catalogs:catalog-hadoop:jar", ":catalogs:catalog-hadoop:runtimeJars") diff --git a/settings.gradle.kts b/settings.gradle.kts index 53066813eb6..ed715f68400 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -12,7 +12,7 @@ include("api", "common", "core", "meta", "server", "integration-test", "server-c include("catalogs:bundled-catalog") include("catalogs:catalog-hive") include("catalogs:catalog-lakehouse-iceberg") -include("catalogs:catalog-jdbc-common", "catalogs:catalog-jdbc-mysql", "catalogs:catalog-jdbc-postgresql") +include("catalogs:catalog-jdbc-common", "catalogs:catalog-jdbc-mysql", "catalogs:catalog-jdbc-postgresql", "catalogs:catalog-jdbc-doris") include("catalogs:catalog-hadoop") include("catalogs:catalog-messaging-kafka") include("clients:client-java", "clients:client-java-runtime")