Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3368] feat(jdbc-doris): support pre-assign partition for Doris table #3887

Merged
merged 16 commits into from
Jul 30, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.gravitino.catalog.doris.operation;

import static org.apache.gravitino.catalog.doris.utils.DorisUtils.generatePartitionSqlFragment;
import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -54,16 +55,18 @@
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Strategy;
import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes;
import org.apache.gravitino.rel.partitions.ListPartition;
import org.apache.gravitino.rel.partitions.RangePartition;

/** Table operations for Apache Doris. */
public class DorisTableOperations extends JdbcTableOperations {
private static final String BACK_QUOTE = "`";
private static final String DORIS_AUTO_INCREMENT = "AUTO_INCREMENT";

private static final String NEW_LINE = "\n";

@Override
Expand Down Expand Up @@ -194,7 +197,6 @@ private static void validateDistribution(Distribution distribution, JdbcColumn[]
}

private static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {

if (indexes.length == 0) {
return;
}
Expand Down Expand Up @@ -224,47 +226,91 @@ private static void appendPartitionSql(
Preconditions.checkArgument(
partitioning.length == 1, "Composite partition type is not supported");

StringBuilder partitionSqlBuilder = new StringBuilder();
StringBuilder partitionSqlBuilder;
Set<String> columnNames =
Arrays.stream(columns).map(JdbcColumn::name).collect(Collectors.toSet());

if (partitioning[0] instanceof Transforms.RangeTransform) {
partitionSqlBuilder.append(NEW_LINE).append(" PARTITION BY RANGE(");
// TODO support multi-column range partitioning in doris
// We do not support multi-column range partitioning in doris for now
Transforms.RangeTransform rangePartition = (Transforms.RangeTransform) partitioning[0];
partitionSqlBuilder = generateRangePartitionSql(rangePartition, columnNames);
} else if (partitioning[0] instanceof Transforms.ListTransform) {
Transforms.ListTransform listPartition = (Transforms.ListTransform) partitioning[0];
partitionSqlBuilder = generateListPartitionSql(listPartition, columnNames);
} else {
throw new IllegalArgumentException("Unsupported partition type of Doris");
}

sqlBuilder.append(partitionSqlBuilder);
}

private static StringBuilder generateRangePartitionSql(
Transforms.RangeTransform rangePartition, Set<String> columnNames) {
Preconditions.checkArgument(
rangePartition.fieldName().length == 1, "Doris partition does not support nested field");
Preconditions.checkArgument(
columnNames.contains(rangePartition.fieldName()[0]),
"The partition field must be one of the columns");

StringBuilder partitionSqlBuilder = new StringBuilder(NEW_LINE);
String partitionDefinition =
String.format(" PARTITION BY RANGE(`%s`)", rangePartition.fieldName()[0]);
partitionSqlBuilder.append(partitionDefinition).append(NEW_LINE).append("(");

// Assign range partitions
RangePartition[] assignments = rangePartition.assignments();
if (!ArrayUtils.isEmpty(assignments)) {
String partitionSqlFragments =
Arrays.stream(assignments)
.map(DorisUtils::generatePartitionSqlFragment)
.collect(Collectors.joining("," + NEW_LINE));
partitionSqlBuilder.append(NEW_LINE).append(partitionSqlFragments);
}

partitionSqlBuilder.append(NEW_LINE).append(")");
return partitionSqlBuilder;
}

private static StringBuilder generateListPartitionSql(
Transforms.ListTransform listPartition, Set<String> columnNames) {
ImmutableList.Builder<String> partitionColumnsBuilder = ImmutableList.builder();
String[][] filedNames = listPartition.fieldNames();
for (String[] filedName : filedNames) {
Preconditions.checkArgument(
rangePartition.fieldName().length == 1, "Doris partition does not support nested field");
filedName.length == 1, "Doris partition does not support nested field");
Preconditions.checkArgument(
columnNames.contains(rangePartition.fieldName()[0]),
"The partition field must be one of the columns");
columnNames.contains(filedName[0]), "The partition field must be one of the columns");

String partitionColumn = BACK_QUOTE + rangePartition.fieldName()[0] + BACK_QUOTE;
// TODO we currently do not support pre-assign partition when creating range partitioning
partitionSqlBuilder.append(partitionColumn).append(") () ");
} else if (partitioning[0] instanceof Transforms.ListTransform) {
Transforms.ListTransform listPartition = (Transforms.ListTransform) partitioning[0];
partitionSqlBuilder.append(" PARTITION BY LIST(");
partitionColumnsBuilder.add(BACK_QUOTE + filedName[0] + BACK_QUOTE);
}
String partitionColumns =
partitionColumnsBuilder.build().stream().collect(Collectors.joining(","));

StringBuilder partitionSqlBuilder = new StringBuilder(NEW_LINE);
String partitionDefinition = String.format(" PARTITION BY LIST(%s)", partitionColumns);
partitionSqlBuilder.append(partitionDefinition).append(NEW_LINE).append("(");

ImmutableList.Builder<String> partitionColumnsBuilder = ImmutableList.builder();
String[][] filedNames = listPartition.fieldNames();
for (String[] filedName : filedNames) {
// Assign list partitions
ListPartition[] assignments = listPartition.assignments();
if (!ArrayUtils.isEmpty(assignments)) {
ImmutableList.Builder<String> partitions = ImmutableList.builder();
for (ListPartition part : assignments) {
Literal<?>[][] lists = part.lists();
Preconditions.checkArgument(
filedName.length == 1, "Doris partition does not support nested field");
lists.length > 0, "The number of values in list partition must be greater than 0");
Preconditions.checkArgument(
columnNames.contains(filedName[0]), "The partition field must be one of the columns");
Arrays.stream(lists).allMatch(p -> p.length == filedNames.length),
"The number of partitioning columns must be consistent");

partitionColumnsBuilder.add(BACK_QUOTE + filedName[0] + BACK_QUOTE);
partitions.add(generatePartitionSqlFragment(part));
}
String partitionColumns =
partitionColumnsBuilder.build().stream().collect(Collectors.joining(","));
// TODO we currently do not support pre-assign partition when creating list partitioning table
partitionSqlBuilder.append(partitionColumns).append(") () ");
} else {
throw new IllegalArgumentException("Unsupported partition type of Doris");
partitionSqlBuilder
.append(NEW_LINE)
.append(partitions.build().stream().collect(Collectors.joining("," + NEW_LINE)));
}

sqlBuilder.append(partitionSqlBuilder);
partitionSqlBuilder.append(NEW_LINE).append(")");
return partitionSqlBuilder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VALUES_RANGE;
import static org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VISIBLE_VERSION;
import static org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VISIBLE_VERSION_TIME;
import static org.apache.gravitino.catalog.doris.utils.DorisUtils.generatePartitionSqlFragment;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -43,7 +44,6 @@
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.gravitino.catalog.jdbc.JdbcTable;
import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
Expand Down Expand Up @@ -145,8 +145,8 @@ public Partition addPartition(Partition partition) throws PartitionAlreadyExists
try (Connection connection = getConnection(loadedTable.databaseName())) {
Transform partitionInfo = loadedTable.partitioning()[0];

String addPartitionSqlFormat = "ALTER TABLE `%s` ADD PARTITION `%s` VALUES %s";
String partitionValues;
String addPartitionSqlFormat = "ALTER TABLE `%s` ADD %s";
String partitionSqlFragment;
Partition added;

if (partition instanceof RangePartition) {
Expand All @@ -156,7 +156,7 @@ public Partition addPartition(Partition partition) throws PartitionAlreadyExists
loadedTable.name());

RangePartition rangePartition = (RangePartition) partition;
partitionValues = buildRangePartitionValues(rangePartition);
partitionSqlFragment = generatePartitionSqlFragment(rangePartition);

// The partition properties actually cannot be passed into Doris, we just return an empty
// map instead.
Expand All @@ -173,9 +173,18 @@ public Partition addPartition(Partition partition) throws PartitionAlreadyExists
loadedTable.name());

ListPartition listPartition = (ListPartition) partition;
partitionValues =
buildListPartitionValues(
listPartition, ((Transforms.ListTransform) partitionInfo).fieldNames().length);
Literal<?>[][] lists = listPartition.lists();
Preconditions.checkArgument(
lists.length > 0, "The number of values in list partition must be greater than 0");
Preconditions.checkArgument(
Arrays.stream(lists)
.allMatch(
part ->
part.length
== ((Transforms.ListTransform) partitionInfo).fieldNames().length),
"The number of partitioning columns must be consistent");

partitionSqlFragment = generatePartitionSqlFragment(listPartition);

added =
Partitions.list(listPartition.name(), listPartition.lists(), Collections.emptyMap());
Expand All @@ -185,8 +194,7 @@ public Partition addPartition(Partition partition) throws PartitionAlreadyExists

try (Statement statement = connection.createStatement()) {
statement.executeUpdate(
String.format(
addPartitionSqlFormat, loadedTable.name(), partition.name(), partitionValues));
String.format(addPartitionSqlFormat, loadedTable.name(), partitionSqlFragment));
return added;
}
} catch (SQLException e) {
Expand Down Expand Up @@ -287,48 +295,4 @@ private Map<String, Type> getColumnType(Connection connection) throws SQLExcepti
return columnTypes.build();
}
}

private String buildRangePartitionValues(RangePartition rangePartition) {
Literal<?> upper = rangePartition.upper();
Literal<?> lower = rangePartition.lower();
String partitionValues;
if (Literals.NULL.equals(upper) && Literals.NULL.equals(lower)) {
partitionValues = "LESS THAN MAXVALUE";
} else if (Literals.NULL.equals(lower)) {
partitionValues = "LESS THAN (\"" + upper.value() + "\")";
} else if (Literals.NULL.equals(upper)) {
partitionValues = "[(\"" + lower.value() + "\"), (MAXVALUE))";
} else {
partitionValues = "[(\"" + lower.value() + "\"), (\"" + upper.value() + "\"))";
}
return partitionValues;
}

private String buildListPartitionValues(ListPartition listPartition, int partitionedFieldNums) {
Literal<?>[][] lists = listPartition.lists();
Preconditions.checkArgument(
lists.length > 0, "The number of values in list partition must be greater than 0");

ImmutableList.Builder<String> listValues = ImmutableList.builder();
for (Literal<?>[] part : lists) {
Preconditions.checkArgument(
part.length == partitionedFieldNums,
"The number of partitioning columns must be consistent");

StringBuilder values = new StringBuilder();
if (part.length > 1) {
values
.append("(")
.append(
Arrays.stream(part)
.map(p -> "\"" + p.value() + "\"")
.collect(Collectors.joining(",")))
.append(")");
} else {
values.append("\"").append(part[0].value()).append("\"");
}
listValues.add(values.toString());
}
return String.format("IN (%s)", listValues.build().stream().collect(Collectors.joining(",")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@
*/
package org.apache.gravitino.catalog.doris.utils;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.partitions.ListPartition;
import org.apache.gravitino.rel.partitions.Partition;
import org.apache.gravitino.rel.partitions.RangePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,4 +111,69 @@ public static Optional<Transform> extractPartitionInfoFromSql(String createTable
return Optional.empty();
}
}

/**
* Generate sql fragment that create partition in Apache Doris.
*
* <p>The sql fragment looks like "PARTITION {partitionName} VALUES {values}", for example:
*
* <pre>PARTITION `p20240724` VALUES LESS THAN ("2024-07-24")</pre>
*
* <pre>PARTITION `p20240724_v1` VALUES IN ("2024-07-24", "v1")</pre>
*
* @param partition The partition to be created.
* @return The partition sql fragment.
*/
public static String generatePartitionSqlFragment(Partition partition) {
String partitionSqlFragment = "PARTITION `%s` VALUES %s";
if (partition instanceof RangePartition) {
return String.format(
partitionSqlFragment,
partition.name(),
generateRangePartitionValues((RangePartition) partition));
} else if (partition instanceof ListPartition) {
return String.format(
partitionSqlFragment,
partition.name(),
generateListPartitionSqlValues((ListPartition) partition));
} else {
throw new IllegalArgumentException("Unsupported partition type of Doris");
}
}

private static String generateRangePartitionValues(RangePartition rangePartition) {
Literal<?> upper = rangePartition.upper();
Literal<?> lower = rangePartition.lower();
String partitionValues;
if (Literals.NULL.equals(upper) && Literals.NULL.equals(lower)) {
partitionValues = "LESS THAN MAXVALUE";
} else if (Literals.NULL.equals(lower)) {
partitionValues = String.format("LESS THAN (\"%s\")", upper.value());
} else if (Literals.NULL.equals(upper)) {
partitionValues = String.format("[(\"%s\"), (MAXVALUE))", lower.value());
} else {
partitionValues = String.format("[(\"%s\"), (\"%s\"))", lower.value(), upper.value());
}
return partitionValues;
}

private static String generateListPartitionSqlValues(ListPartition listPartition) {
Literal<?>[][] lists = listPartition.lists();
ImmutableList.Builder<String> listValues = ImmutableList.builder();
for (Literal<?>[] part : lists) {
String values;
if (part.length > 1) {
values =
String.format(
"(%s)",
Arrays.stream(part)
.map(p -> "\"" + p.value() + "\"")
.collect(Collectors.joining(",")));
} else {
values = String.format("\"%s\"", part[0].value());
}
listValues.add(values);
}
return String.format("IN (%s)", listValues.build().stream().collect(Collectors.joining(",")));
}
}
Loading
Loading