Skip to content

Commit

Permalink
[apache#2979] feat(spark-connector): Implement FunctionCatalog in Gra…
Browse files Browse the repository at this point in the history
…vitinoIcebergCatalog (apache#2983)

### What changes were proposed in this pull request?
Implement `FunctionCatalog` in `GravitinoIcebergCatalog`

### Why are the changes needed?

Implement `FunctionCatalog` in `GravitinoIcebergCatalog` to support
`Iceberg` partition functions.

Fix: apache#2979

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New ITs.
  • Loading branch information
caican00 authored Apr 17, 2024
1 parent b04efd0 commit 0e7fc9a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@

import com.datastrato.gravitino.integration.test.spark.SparkCommonIT;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -55,4 +63,67 @@ void testIcebergFileLevelDeleteOperation() {
List<String> queryResult2 = getTableData(tableName);
Assertions.assertEquals(0, queryResult2.size());
}

@Test
void testIcebergListAndLoadFunctions() throws NoSuchNamespaceException, NoSuchFunctionException {
String[] empty_namespace = new String[] {};
String[] system_namespace = new String[] {"system"};
String[] default_namespace = new String[] {getDefaultDatabase()};
String[] non_exists_namespace = new String[] {"non_existent"};
List<String> functions =
Arrays.asList("iceberg_version", "years", "months", "days", "hours", "bucket", "truncate");

CatalogPlugin catalogPlugin =
getSparkSession().sessionState().catalogManager().catalog(getCatalogName());
Assertions.assertInstanceOf(FunctionCatalog.class, catalogPlugin);
FunctionCatalog functionCatalog = (FunctionCatalog) catalogPlugin;

for (String[] namespace : ImmutableList.of(empty_namespace, system_namespace)) {
Arrays.stream(functionCatalog.listFunctions(namespace))
.map(Identifier::name)
.forEach(function -> Assertions.assertTrue(functions.contains(function)));
}
Arrays.stream(functionCatalog.listFunctions(default_namespace))
.map(Identifier::name)
.forEach(function -> Assertions.assertFalse(functions.contains(function)));
Assertions.assertThrows(
NoSuchNamespaceException.class, () -> functionCatalog.listFunctions(non_exists_namespace));

for (String[] namespace : ImmutableList.of(empty_namespace, system_namespace)) {
for (String function : functions) {
Identifier identifier = Identifier.of(namespace, function);
UnboundFunction func = functionCatalog.loadFunction(identifier);
Assertions.assertEquals(function, func.name());
}
}
functions.forEach(
function -> {
Identifier identifier = Identifier.of(new String[] {getDefaultDatabase()}, function);
Assertions.assertThrows(
NoSuchFunctionException.class, () -> functionCatalog.loadFunction(identifier));
});
}

@Test
void testIcebergFunction() {
String[] catalogAndNamespaces = new String[] {getCatalogName() + ".system", getCatalogName()};
Arrays.stream(catalogAndNamespaces)
.forEach(
catalogAndNamespace -> {
List<String> bucket =
getQueryData(String.format("SELECT %s.bucket(2, 100)", catalogAndNamespace));
Assertions.assertEquals(1, bucket.size());
Assertions.assertEquals("0", bucket.get(0));
});

Arrays.stream(catalogAndNamespaces)
.forEach(
catalogAndNamespace -> {
List<String> bucket =
getQueryData(
String.format("SELECT %s.truncate(2, 'abcdef')", catalogAndNamespace));
Assertions.assertEquals(1, bucket.size());
Assertions.assertEquals("ab", bucket.get(0));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ protected List<Object[]> sql(String query) {

// columns data are joined by ','
protected List<String> getTableData(String tableName) {
return sql(getSelectAllSql(tableName)).stream()
return getQueryData(getSelectAllSql(tableName));
}

protected List<String> getQueryData(String querySql) {
return sql(querySql).stream()
.map(
line ->
Arrays.stream(line)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
Expand All @@ -26,7 +30,7 @@
* StagingTableCatalog and FunctionCatalog, allowing for advanced operations like table staging and
* function management tailored to the needs of Iceberg tables.
*/
public class GravitinoIcebergCatalog extends BaseCatalog {
public class GravitinoIcebergCatalog extends BaseCatalog implements FunctionCatalog {

@Override
protected TableCatalog createAndInitSparkCatalog(
Expand Down Expand Up @@ -74,6 +78,16 @@ protected PropertiesConverter getPropertiesConverter() {
return new IcebergPropertiesConverter();
}

@Override
public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
return ((SparkCatalog) sparkCatalog).listFunctions(namespace);
}

@Override
public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
return ((SparkCatalog) sparkCatalog).loadFunction(ident);
}

private void initHiveProperties(
String catalogBackend,
Map<String, String> gravitinoProperties,
Expand Down

0 comments on commit 0e7fc9a

Please sign in to comment.