Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4988] fix(doris-catalog): Fix the missing distribution information when loading Doris tables #4991

Merged
merged 12 commits into from
Sep 29, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,23 @@ public enum Strategy {
* @return The distribution strategy.
*/
public static Strategy getByName(String name) {
for (Strategy strategy : Strategy.values()) {
if (strategy.name().equalsIgnoreCase(name)) {
return strategy;
}
String upperName = name.toUpperCase();
switch (upperName) {
case "NONE":
return NONE;
case "HASH":
return HASH;
case "RANGE":
return RANGE;
case "EVEN":
case "RANDOM":
return EVEN;
default:
throw new IllegalArgumentException(
"Invalid distribution strategy: "
+ name
+ ". Valid values are: "
+ Arrays.toString(Strategy.values()));
}
throw new IllegalArgumentException(
"Invalid distribution strategy: "
+ name
+ ". Valid values are: "
+ Arrays.toString(Strategy.values()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
.withAuditInfo(load.auditInfo())
.withComment(comment)
.withProperties(properties)
.withDistribution(load.distribution())
.withIndexes(load.index())
.withPartitioning(load.partitioning())
.withDatabaseName(databaseName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ protected JdbcTable internalBuild() {
jdbcTable.auditInfo = auditInfo;
jdbcTable.columns = columns;
jdbcTable.partitioning = partitioning;
jdbcTable.distribution = distribution;
jdbcTable.sortOrders = sortOrders;
jdbcTable.indexes = indexes;
jdbcTable.proxyPlugin = proxyPlugin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.Expression;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;
Expand Down Expand Up @@ -204,11 +205,15 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
Transform[] tablePartitioning = getTablePartitioning(connection, databaseName, tableName);
jdbcTableBuilder.withPartitioning(tablePartitioning);

// 5.Get table properties
// 5.Get distribution information
Distribution distribution = getDistributionInfo(connection, databaseName, tableName);
mchades marked this conversation as resolved.
Show resolved Hide resolved
jdbcTableBuilder.withDistribution(distribution);

// 6.Get table properties
Map<String, String> tableProperties = getTableProperties(connection, tableName);
jdbcTableBuilder.withProperties(tableProperties);

// 6.Leave the information to the bottom layer to append the table
// 7.Leave the information to the bottom layer to append the table
correctJdbcTableFields(connection, databaseName, tableName, jdbcTableBuilder);

return jdbcTableBuilder.withTableOperation(this).build();
Expand Down Expand Up @@ -236,6 +241,20 @@ protected Transform[] getTablePartitioning(
return Transforms.EMPTY_TRANSFORM;
}

/**
* Get the distribution information of the table, including the distribution type and the fields
*
* @param connection
* @param databaseName
* @param tableName
* @return
* @throws SQLException
mchades marked this conversation as resolved.
Show resolved Hide resolved
*/
protected Distribution getDistributionInfo(
Connection connection, String databaseName, String tableName) throws SQLException {
return Distributions.NONE;
}

protected boolean getAutoIncrementInfo(ResultSet resultSet) throws SQLException {
return resultSet.getBoolean("IS_AUTOINCREMENT");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private static void validateDistribution(Distribution distribution, JdbcColumn[]

Preconditions.checkArgument(
Strategy.HASH == distribution.strategy() || Strategy.EVEN == distribution.strategy(),
"Doris only supports HASH or EVEN distribution strategy");
"Doris only supports HASH or EVEN(RANDOM) distribution strategy");
mchades marked this conversation as resolved.
Show resolved Hide resolved

if (distribution.strategy() == Strategy.HASH) {
// Check if the distribution column exists
Expand All @@ -235,6 +235,10 @@ private static void validateDistribution(Distribution distribution, JdbcColumn[]
"Distribution column "
+ expression
+ " does not exist in the table columns"));
} else if (distribution.strategy() == Strategy.EVEN) {
Preconditions.checkArgument(
distribution.expressions().length == 0,
"Doris does not support distribution column in EVEN distribution strategy");
}
}

Expand Down Expand Up @@ -806,4 +810,17 @@ static String deleteIndexDefinition(
}
return "DROP INDEX " + deleteIndex.getName();
}

@Override
protected Distribution getDistributionInfo(
Connection connection, String databaseName, String tableName) throws SQLException {

String showCreateTableSql = String.format("SHOW CREATE TABLE `%s`", tableName);
try (Statement statement = connection.createStatement();
ResultSet result = statement.executeQuery(showCreateTableSql)) {
result.next();
String createTableSyntax = result.getString("Create Table");
return DorisUtils.extractDistributionInfoFromSql(createTableSyntax);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions.DistributionImpl;
import org.apache.gravitino.rel.expressions.distributions.Strategy;
import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.expressions.transforms.Transform;
Expand All @@ -40,6 +45,11 @@ public final class DorisUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(DorisUtils.class);
private static final Pattern PARTITION_INFO_PATTERN =
Pattern.compile("PARTITION BY \\b(LIST|RANGE)\\b\\((.+)\\)");

private static final Pattern DISTRIBUTION_INFO_PATTERN =
Pattern.compile(
"DISTRIBUTED BY\\s+(HASH|RANDOM)\\s*(\\(([^)]+)\\))?\\s*(BUCKETS\\s+(\\d+))?");

private static final String LIST_PARTITION = "LIST";
private static final String RANGE_PARTITION = "RANGE";

Expand Down Expand Up @@ -176,4 +186,38 @@ private static String generateListPartitionSqlValues(ListPartition listPartition
}
return String.format("IN (%s)", listValues.build().stream().collect(Collectors.joining(",")));
}

public static Distribution extractDistributionInfoFromSql(String createTableSql) {
Matcher matcher = DISTRIBUTION_INFO_PATTERN.matcher(createTableSql.trim());
if (matcher.find()) {
String distributionType = matcher.group(1);

// For Random distribution, no need to specify distribution columns.
String distributionColumns = matcher.group(3);
String[] columns =
Objects.equals(distributionColumns, null)
? new String[] {}
: Arrays.stream(distributionColumns.split(","))
.map(String::trim)
.map(f -> f.substring(1, f.length() - 1))
.toArray(String[]::new);

// Default bucket number is 1.
int bucketNum = 1;
if (matcher.find(5)) {
bucketNum = Integer.valueOf(matcher.group(5));
}

return new DistributionImpl.Builder()
.withStrategy(Strategy.getByName(distributionType))
.withNumber(bucketNum)
.withExpressions(
Arrays.stream(columns)
.map(col -> NamedReference.field(new String[] {col}))
.toArray(NamedReference[]::new))
.build();
}

throw new RuntimeException("Failed to extract distribution info in sql:" + createTableSql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.Expression;
import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
Expand All @@ -73,6 +74,7 @@
import org.apache.gravitino.utils.RandomNameUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -895,4 +897,45 @@ void testNonPartitionedTable() {
assertThrows(
UnsupportedOperationException.class, () -> tablePartitionOperations.dropPartition("p1"));
}

@Test
void testAllDistribution() {
Distribution[] distributions =
new Distribution[] {
Distributions.even(1, Expression.EMPTY_EXPRESSION),
Distributions.hash(1, NamedReference.field(DORIS_COL_NAME1)),
Distributions.even(10, Expression.EMPTY_EXPRESSION),
Distributions.hash(0, NamedReference.field(DORIS_COL_NAME1)),
Distributions.hash(11, NamedReference.field(DORIS_COL_NAME1)),
Distributions.hash(
12, NamedReference.field(DORIS_COL_NAME1), NamedReference.field(DORIS_COL_NAME2))
};

for (Distribution distribution : distributions) {
String tableName = GravitinoITUtils.genRandomName("test_distribution_table");
NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
Column[] columns = createColumns();
Index[] indexes = Indexes.EMPTY_INDEXES;
Map<String, String> properties = createTableProperties();
Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
TableCatalog tableCatalog = catalog.asTableCatalog();
tableCatalog.createTable(
tableIdentifier,
columns,
table_comment,
properties,
partitioning,
distribution,
null,
indexes);
// load table
Table loadTable = tableCatalog.loadTable(tableIdentifier);

Assertions.assertEquals(distribution.strategy(), loadTable.distribution().strategy());
Assertions.assertArrayEquals(
distribution.expressions(), loadTable.distribution().expressions());

tableCatalog.dropTable(tableIdentifier);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.Expression;
import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
Expand Down Expand Up @@ -94,6 +95,54 @@ private static Map<String, String> createProperties() {
return properties;
}

@Test
mchades marked this conversation as resolved.
Show resolved Hide resolved
void testAllDistribution() {
Distribution[] distributions =
new Distribution[] {
Distributions.even(DEFAULT_BUCKET_SIZE, Expression.EMPTY_EXPRESSION),
Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1")),
Distributions.even(10, Expression.EMPTY_EXPRESSION),
Distributions.hash(0, NamedReference.field("col_1")),
Distributions.hash(11, NamedReference.field("col_1")),
Distributions.hash(12, NamedReference.field("col_1"), NamedReference.field("col_2"))
};

for (Distribution distribution : distributions) {
String tableName = GravitinoITUtils.genRandomName("doris_basic_test_table");
String tableComment = "test_comment";
List<JdbcColumn> columns = new ArrayList<>();
JdbcColumn col_1 =
JdbcColumn.builder().withName("col_1").withType(INT).withComment("id").build();
columns.add(col_1);
JdbcColumn col_2 =
JdbcColumn.builder().withName("col_2").withType(VARCHAR_255).withComment("col_2").build();
columns.add(col_2);
JdbcColumn col_3 =
JdbcColumn.builder().withName("col_3").withType(VARCHAR_255).withComment("col_3").build();
columns.add(col_3);
Map<String, String> properties = new HashMap<>();
Index[] indexes = new Index[] {};

// create table
TABLE_OPERATIONS.create(
databaseName,
tableName,
columns.toArray(new JdbcColumn[0]),
tableComment,
createProperties(),
null,
distribution,
indexes);
JdbcTable load = TABLE_OPERATIONS.load(databaseName, tableName);
assertionsTableInfo(
tableName, tableComment, columns, properties, indexes, Transforms.EMPTY_TRANSFORM, load);

Assertions.assertEquals(distribution.strategy(), load.distribution().strategy());
Assertions.assertArrayEquals(distribution.expressions(), load.distribution().expressions());
TABLE_OPERATIONS.drop(databaseName, tableName);
}
}

@Test
public void testBasicTableOperation() {
String tableName = GravitinoITUtils.genRandomName("doris_basic_test_table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static void validateFieldExistence(ColumnDTO[] columns, String[] fieldNam
.filter(c -> c.name().equalsIgnoreCase(fieldName[0]))
.collect(Collectors.toList());
Preconditions.checkArgument(
partitionColumn.size() == 1, "partition field %s not found in table", fieldName[0]);
partitionColumn.size() == 1, "Field '%s' not found in table", fieldName[0]);

// TODO: should validate nested fieldName after column type support namedStruct
}
Expand Down
9 changes: 9 additions & 0 deletions docs/jdbc-doris-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ Note that although Gravitino supports several partitioning strategies, Apache Do
The `fieldName` specified in the partitioning attributes must be the name of columns defined in the table.
:::

### Table distribution

Users can also specify the distribution strategy when creating tables in the Doris catalog. Currently, the Doris catalog supports the following distribution strategies:
- `HASH`
- `RANDOM`

For the `RANDOM` distribution strategy, Gravitino uses the `EVEN` to represent it. More information about the distribution strategy defined in Gravitino can be found [here](./table-partitioning-bucketing-sort-order-indexes.md#table-distribution).
mchades marked this conversation as resolved.
Show resolved Hide resolved


### Table operations

Please refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#table-operations) for more details.
Expand Down
8 changes: 4 additions & 4 deletions docs/manage-relational-metadata-using-gravitino.md
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,14 @@ The following is the table property that Gravitino supports:
| `jdbc-postgresql` | [PostgreSQL table property](./jdbc-postgresql-catalog.md#table-properties) | [PostgreSQL type mapping](./jdbc-postgresql-catalog.md#table-column-types) |
| `doris` | [Doris table property](./jdbc-doris-catalog.md#table-properties) | [Doris type mapping](./jdbc-doris-catalog.md#table-column-types) |

#### Table partitioning, bucketing, sort ordering and indexes
#### Table partitioning, distribution, sort ordering and indexes

In addition to the basic settings, Gravitino supports the following features:

| Feature | Description | Java doc |
|---------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------|
| Feature | Description | Java doc |
|---------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------|
| Table partitioning | Equal to `PARTITION BY` in Apache Hive, It is a partitioning strategy that is used to split a table into parts based on partition keys. Some table engine may not support this feature | [Partition](pathname:///docs/0.6.0-incubating/api/java/org/apache/gravitino/dto/rel/partitioning/Partitioning.html) |
| Table bucketing | Equal to `CLUSTERED BY` in Apache Hive, Bucketing a.k.a (Clustering) is a technique to split the data into more manageable files/parts, (By specifying the number of buckets to create). The value of the bucketing column will be hashed by a user-defined number into buckets. | [Distribution](pathname:///docs/0.6.0-incubating/api/java/org/apache/gravitino/rel/expressions/distributions/Distribution.html) |
| Table distribution | Equal to `CLUSTERED BY` in Apache Hive, distribution a.k.a (Clustering) is a technique to split the data into more manageable files/parts, (By specifying the number of buckets to create). The value of the distribution column will be hashed by a user-defined number into buckets. | [Distribution](pathname:///docs/0.6.0-incubating/api/java/org/apache/gravitino/rel/expressions/distributions/Distribution.html) |
| Table sort ordering | Equal to `SORTED BY` in Apache Hive, sort ordering is a method to sort the data in specific ways such as by a column or a function, and then store table data. it will highly improve the query performance under certain scenarios. | [SortOrder](pathname:///docs/0.6.0-incubating/api/java/org/apache/gravitino/rel/expressions/sorts/SortOrder.html) |
| Table indexes | Equal to `KEY/INDEX` in MySQL , unique key enforces uniqueness of values in one or more columns within a table. It ensures that no two rows have identical values in specified columns, thereby facilitating data integrity and enabling efficient data retrieval and manipulation operations. | [Index](pathname:///docs/0.6.0-incubating/api/java/org/apache/gravitino/rel/indexes/Index.html) |

Expand Down
Loading
Loading