Skip to content

Commit

Permalink
[#3368] feat(jdbc-doris): support pre-assign partition for Doris table (
Browse files Browse the repository at this point in the history
#3887)

### What changes were proposed in this pull request?

Support pre-assign partition for Doris table.

### Why are the changes needed?

Fix: #3368 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

UT & IT

---------

Co-authored-by: zhanghan18 <[email protected]>
  • Loading branch information
xiaozcy and zhanghan18 authored Jul 30, 2024
1 parent a9e503d commit 4acdc49
Show file tree
Hide file tree
Showing 7 changed files with 493 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.gravitino.catalog.doris.operation;

import static org.apache.gravitino.catalog.doris.utils.DorisUtils.generatePartitionSqlFragment;
import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -54,16 +55,18 @@
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Strategy;
import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes;
import org.apache.gravitino.rel.partitions.ListPartition;
import org.apache.gravitino.rel.partitions.RangePartition;

/** Table operations for Apache Doris. */
public class DorisTableOperations extends JdbcTableOperations {
private static final String BACK_QUOTE = "`";
private static final String DORIS_AUTO_INCREMENT = "AUTO_INCREMENT";

private static final String NEW_LINE = "\n";

@Override
Expand Down Expand Up @@ -194,7 +197,6 @@ private static void validateDistribution(Distribution distribution, JdbcColumn[]
}

private static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {

if (indexes.length == 0) {
return;
}
Expand Down Expand Up @@ -224,47 +226,91 @@ private static void appendPartitionSql(
Preconditions.checkArgument(
partitioning.length == 1, "Composite partition type is not supported");

StringBuilder partitionSqlBuilder = new StringBuilder();
StringBuilder partitionSqlBuilder;
Set<String> columnNames =
Arrays.stream(columns).map(JdbcColumn::name).collect(Collectors.toSet());

if (partitioning[0] instanceof Transforms.RangeTransform) {
partitionSqlBuilder.append(NEW_LINE).append(" PARTITION BY RANGE(");
// TODO support multi-column range partitioning in doris
// We do not support multi-column range partitioning in doris for now
Transforms.RangeTransform rangePartition = (Transforms.RangeTransform) partitioning[0];
partitionSqlBuilder = generateRangePartitionSql(rangePartition, columnNames);
} else if (partitioning[0] instanceof Transforms.ListTransform) {
Transforms.ListTransform listPartition = (Transforms.ListTransform) partitioning[0];
partitionSqlBuilder = generateListPartitionSql(listPartition, columnNames);
} else {
throw new IllegalArgumentException("Unsupported partition type of Doris");
}

sqlBuilder.append(partitionSqlBuilder);
}

private static StringBuilder generateRangePartitionSql(
Transforms.RangeTransform rangePartition, Set<String> columnNames) {
Preconditions.checkArgument(
rangePartition.fieldName().length == 1, "Doris partition does not support nested field");
Preconditions.checkArgument(
columnNames.contains(rangePartition.fieldName()[0]),
"The partition field must be one of the columns");

StringBuilder partitionSqlBuilder = new StringBuilder(NEW_LINE);
String partitionDefinition =
String.format(" PARTITION BY RANGE(`%s`)", rangePartition.fieldName()[0]);
partitionSqlBuilder.append(partitionDefinition).append(NEW_LINE).append("(");

// Assign range partitions
RangePartition[] assignments = rangePartition.assignments();
if (!ArrayUtils.isEmpty(assignments)) {
String partitionSqlFragments =
Arrays.stream(assignments)
.map(DorisUtils::generatePartitionSqlFragment)
.collect(Collectors.joining("," + NEW_LINE));
partitionSqlBuilder.append(NEW_LINE).append(partitionSqlFragments);
}

partitionSqlBuilder.append(NEW_LINE).append(")");
return partitionSqlBuilder;
}

private static StringBuilder generateListPartitionSql(
Transforms.ListTransform listPartition, Set<String> columnNames) {
ImmutableList.Builder<String> partitionColumnsBuilder = ImmutableList.builder();
String[][] filedNames = listPartition.fieldNames();
for (String[] filedName : filedNames) {
Preconditions.checkArgument(
rangePartition.fieldName().length == 1, "Doris partition does not support nested field");
filedName.length == 1, "Doris partition does not support nested field");
Preconditions.checkArgument(
columnNames.contains(rangePartition.fieldName()[0]),
"The partition field must be one of the columns");
columnNames.contains(filedName[0]), "The partition field must be one of the columns");

String partitionColumn = BACK_QUOTE + rangePartition.fieldName()[0] + BACK_QUOTE;
// TODO we currently do not support pre-assign partition when creating range partitioning
partitionSqlBuilder.append(partitionColumn).append(") () ");
} else if (partitioning[0] instanceof Transforms.ListTransform) {
Transforms.ListTransform listPartition = (Transforms.ListTransform) partitioning[0];
partitionSqlBuilder.append(" PARTITION BY LIST(");
partitionColumnsBuilder.add(BACK_QUOTE + filedName[0] + BACK_QUOTE);
}
String partitionColumns =
partitionColumnsBuilder.build().stream().collect(Collectors.joining(","));

StringBuilder partitionSqlBuilder = new StringBuilder(NEW_LINE);
String partitionDefinition = String.format(" PARTITION BY LIST(%s)", partitionColumns);
partitionSqlBuilder.append(partitionDefinition).append(NEW_LINE).append("(");

ImmutableList.Builder<String> partitionColumnsBuilder = ImmutableList.builder();
String[][] filedNames = listPartition.fieldNames();
for (String[] filedName : filedNames) {
// Assign list partitions
ListPartition[] assignments = listPartition.assignments();
if (!ArrayUtils.isEmpty(assignments)) {
ImmutableList.Builder<String> partitions = ImmutableList.builder();
for (ListPartition part : assignments) {
Literal<?>[][] lists = part.lists();
Preconditions.checkArgument(
filedName.length == 1, "Doris partition does not support nested field");
lists.length > 0, "The number of values in list partition must be greater than 0");
Preconditions.checkArgument(
columnNames.contains(filedName[0]), "The partition field must be one of the columns");
Arrays.stream(lists).allMatch(p -> p.length == filedNames.length),
"The number of partitioning columns must be consistent");

partitionColumnsBuilder.add(BACK_QUOTE + filedName[0] + BACK_QUOTE);
partitions.add(generatePartitionSqlFragment(part));
}
String partitionColumns =
partitionColumnsBuilder.build().stream().collect(Collectors.joining(","));
// TODO we currently do not support pre-assign partition when creating list partitioning table
partitionSqlBuilder.append(partitionColumns).append(") () ");
} else {
throw new IllegalArgumentException("Unsupported partition type of Doris");
partitionSqlBuilder
.append(NEW_LINE)
.append(partitions.build().stream().collect(Collectors.joining("," + NEW_LINE)));
}

sqlBuilder.append(partitionSqlBuilder);
partitionSqlBuilder.append(NEW_LINE).append(")");
return partitionSqlBuilder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VALUES_RANGE;
import static org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VISIBLE_VERSION;
import static org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VISIBLE_VERSION_TIME;
import static org.apache.gravitino.catalog.doris.utils.DorisUtils.generatePartitionSqlFragment;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -43,7 +44,6 @@
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.gravitino.catalog.jdbc.JdbcTable;
import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
Expand Down Expand Up @@ -145,8 +145,8 @@ public Partition addPartition(Partition partition) throws PartitionAlreadyExists
try (Connection connection = getConnection(loadedTable.databaseName())) {
Transform partitionInfo = loadedTable.partitioning()[0];

String addPartitionSqlFormat = "ALTER TABLE `%s` ADD PARTITION `%s` VALUES %s";
String partitionValues;
String addPartitionSqlFormat = "ALTER TABLE `%s` ADD %s";
String partitionSqlFragment;
Partition added;

if (partition instanceof RangePartition) {
Expand All @@ -156,7 +156,7 @@ public Partition addPartition(Partition partition) throws PartitionAlreadyExists
loadedTable.name());

RangePartition rangePartition = (RangePartition) partition;
partitionValues = buildRangePartitionValues(rangePartition);
partitionSqlFragment = generatePartitionSqlFragment(rangePartition);

// The partition properties actually cannot be passed into Doris, we just return an empty
// map instead.
Expand All @@ -173,9 +173,18 @@ public Partition addPartition(Partition partition) throws PartitionAlreadyExists
loadedTable.name());

ListPartition listPartition = (ListPartition) partition;
partitionValues =
buildListPartitionValues(
listPartition, ((Transforms.ListTransform) partitionInfo).fieldNames().length);
Literal<?>[][] lists = listPartition.lists();
Preconditions.checkArgument(
lists.length > 0, "The number of values in list partition must be greater than 0");
Preconditions.checkArgument(
Arrays.stream(lists)
.allMatch(
part ->
part.length
== ((Transforms.ListTransform) partitionInfo).fieldNames().length),
"The number of partitioning columns must be consistent");

partitionSqlFragment = generatePartitionSqlFragment(listPartition);

added =
Partitions.list(listPartition.name(), listPartition.lists(), Collections.emptyMap());
Expand All @@ -185,8 +194,7 @@ public Partition addPartition(Partition partition) throws PartitionAlreadyExists

try (Statement statement = connection.createStatement()) {
statement.executeUpdate(
String.format(
addPartitionSqlFormat, loadedTable.name(), partition.name(), partitionValues));
String.format(addPartitionSqlFormat, loadedTable.name(), partitionSqlFragment));
return added;
}
} catch (SQLException e) {
Expand Down Expand Up @@ -287,48 +295,4 @@ private Map<String, Type> getColumnType(Connection connection) throws SQLExcepti
return columnTypes.build();
}
}

private String buildRangePartitionValues(RangePartition rangePartition) {
Literal<?> upper = rangePartition.upper();
Literal<?> lower = rangePartition.lower();
String partitionValues;
if (Literals.NULL.equals(upper) && Literals.NULL.equals(lower)) {
partitionValues = "LESS THAN MAXVALUE";
} else if (Literals.NULL.equals(lower)) {
partitionValues = "LESS THAN (\"" + upper.value() + "\")";
} else if (Literals.NULL.equals(upper)) {
partitionValues = "[(\"" + lower.value() + "\"), (MAXVALUE))";
} else {
partitionValues = "[(\"" + lower.value() + "\"), (\"" + upper.value() + "\"))";
}
return partitionValues;
}

private String buildListPartitionValues(ListPartition listPartition, int partitionedFieldNums) {
Literal<?>[][] lists = listPartition.lists();
Preconditions.checkArgument(
lists.length > 0, "The number of values in list partition must be greater than 0");

ImmutableList.Builder<String> listValues = ImmutableList.builder();
for (Literal<?>[] part : lists) {
Preconditions.checkArgument(
part.length == partitionedFieldNums,
"The number of partitioning columns must be consistent");

StringBuilder values = new StringBuilder();
if (part.length > 1) {
values
.append("(")
.append(
Arrays.stream(part)
.map(p -> "\"" + p.value() + "\"")
.collect(Collectors.joining(",")))
.append(")");
} else {
values.append("\"").append(part[0].value()).append("\"");
}
listValues.add(values.toString());
}
return String.format("IN (%s)", listValues.build().stream().collect(Collectors.joining(",")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@
*/
package org.apache.gravitino.catalog.doris.utils;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.partitions.ListPartition;
import org.apache.gravitino.rel.partitions.Partition;
import org.apache.gravitino.rel.partitions.RangePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,4 +111,69 @@ public static Optional<Transform> extractPartitionInfoFromSql(String createTable
return Optional.empty();
}
}

/**
* Generate sql fragment that create partition in Apache Doris.
*
* <p>The sql fragment looks like "PARTITION {partitionName} VALUES {values}", for example:
*
* <pre>PARTITION `p20240724` VALUES LESS THAN ("2024-07-24")</pre>
*
* <pre>PARTITION `p20240724_v1` VALUES IN ("2024-07-24", "v1")</pre>
*
* @param partition The partition to be created.
* @return The partition sql fragment.
*/
public static String generatePartitionSqlFragment(Partition partition) {
String partitionSqlFragment = "PARTITION `%s` VALUES %s";
if (partition instanceof RangePartition) {
return String.format(
partitionSqlFragment,
partition.name(),
generateRangePartitionValues((RangePartition) partition));
} else if (partition instanceof ListPartition) {
return String.format(
partitionSqlFragment,
partition.name(),
generateListPartitionSqlValues((ListPartition) partition));
} else {
throw new IllegalArgumentException("Unsupported partition type of Doris");
}
}

private static String generateRangePartitionValues(RangePartition rangePartition) {
Literal<?> upper = rangePartition.upper();
Literal<?> lower = rangePartition.lower();
String partitionValues;
if (Literals.NULL.equals(upper) && Literals.NULL.equals(lower)) {
partitionValues = "LESS THAN MAXVALUE";
} else if (Literals.NULL.equals(lower)) {
partitionValues = String.format("LESS THAN (\"%s\")", upper.value());
} else if (Literals.NULL.equals(upper)) {
partitionValues = String.format("[(\"%s\"), (MAXVALUE))", lower.value());
} else {
partitionValues = String.format("[(\"%s\"), (\"%s\"))", lower.value(), upper.value());
}
return partitionValues;
}

private static String generateListPartitionSqlValues(ListPartition listPartition) {
Literal<?>[][] lists = listPartition.lists();
ImmutableList.Builder<String> listValues = ImmutableList.builder();
for (Literal<?>[] part : lists) {
String values;
if (part.length > 1) {
values =
String.format(
"(%s)",
Arrays.stream(part)
.map(p -> "\"" + p.value() + "\"")
.collect(Collectors.joining(",")));
} else {
values = String.format("\"%s\"", part[0].value());
}
listValues.add(values);
}
return String.format("IN (%s)", listValues.build().stream().collect(Collectors.joining(",")));
}
}
Loading

0 comments on commit 4acdc49

Please sign in to comment.