Skip to content

Commit

Permalink
[apache#2587] feat(spark-connector): Support iceberg metadataColumns
Browse files Browse the repository at this point in the history
  • Loading branch information
caican00 committed Mar 28, 2024
2 parents 2bd021c + 69b440e commit 682b942
Show file tree
Hide file tree
Showing 5 changed files with 418 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@
package com.datastrato.gravitino.integration.test.spark.iceberg;

import com.datastrato.gravitino.integration.test.spark.SparkCommonIT;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.Assertions;
import com.datastrato.gravitino.spark.connector.iceberg.SparkIcebergTable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.catalyst.analysis.ResolvedTable;
import org.apache.spark.sql.catalyst.plans.logical.CommandResult;
import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation;
import org.apache.spark.sql.connector.catalog.Table;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
Expand All @@ -20,6 +28,19 @@
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SparkIcebergCatalogIT extends SparkCommonIT {

protected List<SparkTableInfo.SparkColumnInfo> getIcebergSimpleTableColumn() {
return Arrays.asList(
SparkTableInfo.SparkColumnInfo.of("id", DataTypes.IntegerType, "id comment"),
SparkTableInfo.SparkColumnInfo.of("name", DataTypes.StringType, ""),
SparkTableInfo.SparkColumnInfo.of("ts", DataTypes.TimestampType, null));
}

private String getCreateIcebergSimpleTableString(String tableName) {
return String.format(
"CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', ts TIMESTAMP)",
tableName);
}

@Override
protected String getCatalogName() {
return "iceberg";
Expand All @@ -37,7 +58,181 @@ protected boolean supportsSparkSQLClusteredBy() {

@Override
protected boolean supportsPartition() {
return false;
return true;
}

@Test
void testCreateIcebergBucketPartitionTable() {
String tableName = "iceberg_bucket_partition_table";
dropTableIfExists(tableName);
String createTableSQL = getCreateIcebergSimpleTableString(tableName);
createTableSQL = createTableSQL + " PARTITIONED BY (bucket(16, id));";
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withColumns(getIcebergSimpleTableColumn())
.withBucket(16, Collections.singletonList("id"));
checker.check(tableInfo);

String insertData =
String.format(
"INSERT into %s values(2,'a',cast('2024-01-01 12:00:00.000' as timestamp));",
tableName);
sql(insertData);
List<String> queryResult = getTableData(tableName);
Assertions.assertTrue(queryResult.size() == 1);
Assertions.assertEquals("2,a,2024-01-01 12:00:00.000", queryResult.get(0));
String location = tableInfo.getTableLocation() + File.separator + "data";
String partitionExpression = "id_bucket=4";
Path partitionPath = new Path(location, partitionExpression);
checkDirExists(partitionPath);
}

@Test
void testCreateIcebergHourPartitionTable() {
String tableName = "iceberg_hour_partition_table";
dropTableIfExists(tableName);
String createTableSQL = getCreateIcebergSimpleTableString(tableName);
createTableSQL = createTableSQL + " PARTITIONED BY (hours(ts));";
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withColumns(getIcebergSimpleTableColumn())
.withHour(Collections.singletonList("ts"));
checker.check(tableInfo);

String insertData =
String.format(
"INSERT into %s values(2,'a',cast('2024-01-01 12:00:00.000' as timestamp));",
tableName);
sql(insertData);
List<String> queryResult = getTableData(tableName);
Assertions.assertTrue(queryResult.size() == 1);
Assertions.assertEquals("2,a,2024-01-01 12:00:00.000", queryResult.get(0));
String location = tableInfo.getTableLocation() + File.separator + "data";
String partitionExpression = "ts_hour=12";
Path partitionPath = new Path(location, partitionExpression);
checkDirExists(partitionPath);
}

@Test
void testCreateIcebergDayPartitionTable() {
String tableName = "iceberg_day_partition_table";
dropTableIfExists(tableName);
String createTableSQL = getCreateIcebergSimpleTableString(tableName);
createTableSQL = createTableSQL + " PARTITIONED BY (days(ts));";
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withColumns(getIcebergSimpleTableColumn())
.withDay(Collections.singletonList("ts"));
checker.check(tableInfo);

String insertData =
String.format(
"INSERT into %s values(2,'a',cast('2024-01-01 12:00:00.000' as timestamp));",
tableName);
sql(insertData);
List<String> queryResult = getTableData(tableName);
Assertions.assertTrue(queryResult.size() == 1);
Assertions.assertEquals("2,a,2024-01-01 12:00:00.000", queryResult.get(0));
String location = tableInfo.getTableLocation() + File.separator + "data";
String partitionExpression = "ts_day=2024-01-01";
Path partitionPath = new Path(location, partitionExpression);
checkDirExists(partitionPath);
}

@Test
void testCreateIcebergMonthPartitionTable() {
String tableName = "iceberg_month_partition_table";
dropTableIfExists(tableName);
String createTableSQL = getCreateIcebergSimpleTableString(tableName);
createTableSQL = createTableSQL + " PARTITIONED BY (months(ts));";
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withColumns(getIcebergSimpleTableColumn())
.withMonth(Collections.singletonList("ts"));
checker.check(tableInfo);

String insertData =
String.format(
"INSERT into %s values(2,'a',cast('2024-01-01 12:00:00.000' as timestamp));",
tableName);
sql(insertData);
List<String> queryResult = getTableData(tableName);
Assertions.assertTrue(queryResult.size() == 1);
Assertions.assertEquals("2,a,2024-01-01 12:00:00.000", queryResult.get(0));
String location = tableInfo.getTableLocation() + File.separator + "data";
String partitionExpression = "ts_month=2024-01";
Path partitionPath = new Path(location, partitionExpression);
checkDirExists(partitionPath);
}

@Test
void testCreateIcebergYearPartitionTable() {
String tableName = "iceberg_year_partition_table";
dropTableIfExists(tableName);
String createTableSQL = getCreateIcebergSimpleTableString(tableName);
createTableSQL = createTableSQL + " PARTITIONED BY (years(ts));";
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withColumns(getIcebergSimpleTableColumn())
.withYear(Collections.singletonList("ts"));
checker.check(tableInfo);

String insertData =
String.format(
"INSERT into %s values(2,'a',cast('2024-01-01 12:00:00.000' as timestamp));",
tableName);
sql(insertData);
List<String> queryResult = getTableData(tableName);
Assertions.assertTrue(queryResult.size() == 1);
Assertions.assertEquals("2,a,2024-01-01 12:00:00.000", queryResult.get(0));
String location = tableInfo.getTableLocation() + File.separator + "data";
String partitionExpression = "ts_year=2024";
Path partitionPath = new Path(location, partitionExpression);
checkDirExists(partitionPath);
}

@Test
void testCreateIcebergTruncatePartitionTable() {
String tableName = "iceberg_truncate_partition_table";
dropTableIfExists(tableName);
String createTableSQL = getCreateIcebergSimpleTableString(tableName);
createTableSQL = createTableSQL + " PARTITIONED BY (truncate(1, name));";
sql(createTableSQL);
SparkTableInfo tableInfo = getTableInfo(tableName);
SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withColumns(getIcebergSimpleTableColumn())
.withTruncate(1, Collections.singletonList("name"));
checker.check(tableInfo);

String insertData =
String.format(
"INSERT into %s values(2,'a',cast('2024-01-01 12:00:00.000' as timestamp));",
tableName);
sql(insertData);
List<String> queryResult = getTableData(tableName);
Assertions.assertTrue(queryResult.size() == 1);
Assertions.assertEquals("2,a,2024-01-01 12:00:00.000", queryResult.get(0));
String location = tableInfo.getTableLocation() + File.separator + "data";
String partitionExpression = "name_trunc=a";
Path partitionPath = new Path(location, partitionExpression);
checkDirExists(partitionPath);
}

// TODO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
import javax.ws.rs.NotSupportedException;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.connector.expressions.BucketTransform;
import org.apache.spark.sql.connector.expressions.IdentityTransform;
import org.apache.spark.sql.connector.expressions.SortedBucketTransform;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.DataType;
import org.junit.jupiter.api.Assertions;

Expand All @@ -34,6 +30,11 @@ public class SparkTableInfo {
private Map<String, String> tableProperties;
private List<String> unknownItems = new ArrayList<>();
private Transform bucket;
private Transform hour;
private Transform day;
private Transform month;
private Transform year;
private Transform truncate;
private List<Transform> partitions = new ArrayList<>();
private Set<String> partitionColumnNames = new HashSet<>();

Expand Down Expand Up @@ -65,6 +66,31 @@ void setBucket(Transform bucket) {
this.bucket = bucket;
}

void setHour(Transform hour) {
Assertions.assertNull(this.hour, "Should only one distribution");
this.hour = hour;
}

void setDay(Transform day) {
Assertions.assertNull(this.day, "Should only one distribution");
this.day = day;
}

void setMonth(Transform month) {
Assertions.assertNull(this.month, "Should only one distribution");
this.month = month;
}

void setYear(Transform year) {
Assertions.assertNull(this.year, "Should only one distribution");
this.year = year;
}

void setTruncate(Transform truncate) {
Assertions.assertNull(this.truncate, "Should only one distribution");
this.truncate = truncate;
}

void addPartition(Transform partition) {
if (partition instanceof IdentityTransform) {
partitionColumnNames.add(((IdentityTransform) partition).reference().fieldNames()[0]);
Expand Down Expand Up @@ -102,6 +128,17 @@ static SparkTableInfo create(SparkBaseTable baseTable) {
sparkTableInfo.setBucket(transform);
} else if (transform instanceof IdentityTransform) {
sparkTableInfo.addPartition(transform);
} else if (transform instanceof HoursTransform) {
sparkTableInfo.setHour(transform);
} else if (transform instanceof DaysTransform) {
sparkTableInfo.setDay(transform);
} else if (transform instanceof MonthsTransform) {
sparkTableInfo.setMonth(transform);
} else if (transform instanceof YearsTransform) {
sparkTableInfo.setYear(transform);
} else if (transform instanceof ApplyTransform
&& "truncate".equals(transform.name())) {
sparkTableInfo.setTruncate(transform);
} else {
throw new NotSupportedException(
"Doesn't support Spark transform: " + transform.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ private enum CheckField {
COLUMN,
PARTITION,
BUCKET,
HOUR,
DAY,
MONTH,
YEAR,
TRUNCATE,
COMMENT,
}

Expand Down Expand Up @@ -76,6 +81,43 @@ public SparkTableInfoChecker withBucket(
return this;
}

public SparkTableInfoChecker withHour(List<String> partitionColumns) {
Transform hourTransform = Expressions.hours(partitionColumns.get(0));
this.expectedTableInfo.setHour(hourTransform);
this.checkFields.add(CheckField.HOUR);
return this;
}

public SparkTableInfoChecker withDay(List<String> partitionColumns) {
Transform dayTransform = Expressions.days(partitionColumns.get(0));
this.expectedTableInfo.setDay(dayTransform);
this.checkFields.add(CheckField.DAY);
return this;
}

public SparkTableInfoChecker withMonth(List<String> partitionColumns) {
Transform monthTransform = Expressions.months(partitionColumns.get(0));
this.expectedTableInfo.setMonth(monthTransform);
this.checkFields.add(CheckField.MONTH);
return this;
}

public SparkTableInfoChecker withYear(List<String> partitionColumns) {
Transform yearTransform = Expressions.years(partitionColumns.get(0));
this.expectedTableInfo.setYear(yearTransform);
this.checkFields.add(CheckField.YEAR);
return this;
}

public SparkTableInfoChecker withTruncate(int width, List<String> partitionColumns) {
Transform truncateTransform =
Expressions.apply(
"truncate", Expressions.literal(width), Expressions.column(partitionColumns.get(0)));
this.expectedTableInfo.setTruncate(truncateTransform);
this.checkFields.add(CheckField.TRUNCATE);
return this;
}

public SparkTableInfoChecker withComment(String comment) {
this.expectedTableInfo.setComment(comment);
this.checkFields.add(CheckField.COMMENT);
Expand All @@ -102,6 +144,22 @@ public void check(SparkTableInfo realTableInfo) {
case BUCKET:
Assertions.assertEquals(expectedTableInfo.getBucket(), realTableInfo.getBucket());
break;
case HOUR:
Assertions.assertEquals(expectedTableInfo.getHour(), realTableInfo.getHour());
break;
case DAY:
Assertions.assertEquals(expectedTableInfo.getDay(), realTableInfo.getDay());
break;
case MONTH:
Assertions.assertEquals(expectedTableInfo.getMonth(), realTableInfo.getMonth());
break;
case YEAR:
Assertions.assertEquals(expectedTableInfo.getYear(), realTableInfo.getYear());
break;
case TRUNCATE:
Assertions.assertEquals(
expectedTableInfo.getTruncate(), realTableInfo.getTruncate());
break;
case COMMENT:
Assertions.assertEquals(
expectedTableInfo.getComment(), realTableInfo.getComment());
Expand Down
Loading

0 comments on commit 682b942

Please sign in to comment.