Skip to content

Commit

Permalink
split transform to partition and bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed Mar 21, 2024
1 parent e851488 commit f3e1f0d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,51 +37,56 @@
*/
public class SparkTransformConverter {

public static class PartitionAndBucketInfo {
private List<Transform> partitions;
public static class DistributionAndSortOrdersInfo {
@Getter private Distribution distribution;
@Getter private SortOrder[] sortOrders;

public Transform[] getPartitions() {
if (partitions == null) {
return null;
}
return partitions.toArray(new Transform[0]);
private void setDistribution(Distribution distributionInfo) {
Preconditions.checkState(distribution == null, "Should only set distribution once");
this.distribution = distributionInfo;
}

private void addPartition(Transform partition) {
if (partitions == null) {
this.partitions = new ArrayList<>();
}
partitions.add(partition);
private void setSortOrders(SortOrder[] sortOrdersInfo) {
Preconditions.checkState(sortOrders == null, "Should only set sort orders once");
this.sortOrders = sortOrdersInfo;
}
}

private void setDistribution(Distribution distribution) {
Preconditions.checkState(this.distribution == null, "Should only set distribution once");
this.distribution = distribution;
public static Transform[] toGravitinoPartitions(
org.apache.spark.sql.connector.expressions.Transform[] transforms) {
if (ArrayUtils.isEmpty(transforms)) {
return null;
}

private void setSortOrders(SortOrder[] sortOrders) {
Preconditions.checkState(this.sortOrders == null, "Should only set sort orders once");
this.sortOrders = sortOrders;
}
return Arrays.stream(transforms)
.map(
transform -> {
if (transform instanceof IdentityTransform) {
IdentityTransform identityTransform = (IdentityTransform) transform;
return Transforms.identity(identityTransform.reference().fieldNames());
} else if (transform instanceof BucketTransform
|| transform instanceof SortedBucketTransform) {
return null;
} else {
throw new NotSupportedException(
"Doesn't support Spark transform: " + transform.name());
}
})
.filter(transform -> transform != null)
.toArray(Transform[]::new);
}

public static PartitionAndBucketInfo toGravitinoTransform(
public static DistributionAndSortOrdersInfo toGravitinoDistributionAndSortOrders(
org.apache.spark.sql.connector.expressions.Transform[] transforms) {
PartitionAndBucketInfo bundles = new PartitionAndBucketInfo();
DistributionAndSortOrdersInfo bundles = new DistributionAndSortOrdersInfo();
if (ArrayUtils.isEmpty(transforms)) {
return bundles;
}

Arrays.stream(transforms)
.forEach(
transform -> {
if (transform instanceof IdentityTransform) {
IdentityTransform identityTransform = (IdentityTransform) transform;
bundles.addPartition(
Transforms.identity(identityTransform.reference().fieldNames()));
} else if (transform instanceof SortedBucketTransform) {
if (transform instanceof SortedBucketTransform) {
Pair<Distribution, SortOrder[]> pair =
toGravitinoDistributionAndSortOrders((SortedBucketTransform) transform);
bundles.setDistribution(pair.getLeft());
Expand All @@ -90,9 +95,6 @@ public static PartitionAndBucketInfo toGravitinoTransform(
BucketTransform bucketTransform = (BucketTransform) transform;
Distribution distribution = toGravitinoDistribution(bucketTransform);
bundles.setDistribution(distribution);
} else {
throw new NotSupportedException(
"Doesn't support Spark transform: " + transform.name());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.datastrato.gravitino.spark.connector.GravitinoCatalogAdaptorFactory;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.datastrato.gravitino.spark.connector.SparkTransformConverter;
import com.datastrato.gravitino.spark.connector.SparkTransformConverter.PartitionAndBucketInfo;
import com.datastrato.gravitino.spark.connector.SparkTransformConverter.DistributionAndSortOrdersInfo;
import com.datastrato.gravitino.spark.connector.SparkTypeConverter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -115,7 +115,7 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti

@Override
public Table createTable(
Identifier ident, Column[] columns, Transform[] partitions, Map<String, String> properties)
Identifier ident, Column[] columns, Transform[] transforms, Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
NameIdentifier gravitinoIdentifier =
NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name());
Expand All @@ -129,8 +129,10 @@ public Table createTable(
// Spark store comment in properties, we should retrieve it and pass to Gravitino explicitly.
String comment = gravitinoProperties.remove(ConnectorConstants.COMMENT);

PartitionAndBucketInfo gravitinoTransformContext =
SparkTransformConverter.toGravitinoTransform(partitions);
DistributionAndSortOrdersInfo distributionAndSortOrdersInfo =
SparkTransformConverter.toGravitinoDistributionAndSortOrders(transforms);
com.datastrato.gravitino.rel.expressions.transforms.Transform[] partitions =
SparkTransformConverter.toGravitinoPartitions(transforms);

try {
com.datastrato.gravitino.rel.Table table =
Expand All @@ -141,9 +143,9 @@ public Table createTable(
gravitinoColumns,
comment,
gravitinoProperties,
gravitinoTransformContext.getPartitions(),
gravitinoTransformContext.getDistribution(),
gravitinoTransformContext.getSortOrders());
partitions,
distributionAndSortOrdersInfo.getDistribution(),
distributionAndSortOrdersInfo.getSortOrders());
return gravitinoAdaptor.createSparkTable(ident, table, sparkCatalog, propertiesConverter);
} catch (NoSuchSchemaException e) {
throw new NoSuchNamespaceException(ident.namespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import com.datastrato.gravitino.rel.expressions.sorts.SortOrders;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.datastrato.gravitino.spark.connector.SparkTransformConverter.PartitionAndBucketInfo;
import com.datastrato.gravitino.spark.connector.SparkTransformConverter.DistributionAndSortOrdersInfo;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -49,12 +49,9 @@ void init() {
void testPartition() {
sparkToGravitinoPartitionTransformMaps.forEach(
(sparkTransform, gravitinoTransform) -> {
PartitionAndBucketInfo bundles =
SparkTransformConverter.toGravitinoTransform(
Transform[] gravitinoPartitions =
SparkTransformConverter.toGravitinoPartitions(
new org.apache.spark.sql.connector.expressions.Transform[] {sparkTransform});
Assertions.assertNull(bundles.getDistribution());
Assertions.assertNull(bundles.getSortOrders());
Transform[] gravitinoPartitions = bundles.getPartitions();
Assertions.assertTrue(gravitinoPartitions != null && gravitinoPartitions.length == 1);
Assertions.assertEquals(gravitinoTransform, gravitinoPartitions[0]);
});
Expand Down Expand Up @@ -110,12 +107,11 @@ void testSparkToGravitinoDistributionWithoutSortOrder() {

org.apache.spark.sql.connector.expressions.Transform sparkBucket =
Expressions.bucket(bucketNum, sparkFieldReferences);
PartitionAndBucketInfo bundles =
SparkTransformConverter.toGravitinoTransform(
DistributionAndSortOrdersInfo bundles =
SparkTransformConverter.toGravitinoDistributionAndSortOrders(
new org.apache.spark.sql.connector.expressions.Transform[] {sparkBucket});

Assertions.assertNull(bundles.getSortOrders());
Assertions.assertNull(bundles.getPartitions());

Distribution distribution = bundles.getDistribution();
String[][] gravitinoFieldReferences = createGravitinoFieldReferenceNames(sparkFieldReferences);
Expand All @@ -134,10 +130,9 @@ void testSparkToGravitinoSortOrder() {
createSparkFieldReference(bucketColumnNames),
createSparkFieldReference(sortColumnNames));

PartitionAndBucketInfo bundles =
SparkTransformConverter.toGravitinoTransform(
DistributionAndSortOrdersInfo bundles =
SparkTransformConverter.toGravitinoDistributionAndSortOrders(
new org.apache.spark.sql.connector.expressions.Transform[] {sortedBucketTransform});
Assertions.assertNull(bundles.getPartitions());
Assertions.assertTrue(
bundles.getDistribution().equals(createHashDistribution(bucketNum, bucketColumnNames)));

Expand Down

0 comments on commit f3e1f0d

Please sign in to comment.