Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
caican00 committed May 5, 2024
1 parent 90b7be8 commit 65ef2a4
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,7 @@ public Table createTable(
@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
try {
String database = getDatabase(ident);
com.datastrato.gravitino.rel.Table gravitinoTable =
gravitinoCatalogClient
.asTableCatalog()
.loadTable(NameIdentifier.of(metalakeName, catalogName, database, ident.name()));
com.datastrato.gravitino.rel.Table gravitinoTable = loadGravitinoTable(ident);
Table sparkTable = sparkCatalog.loadTable(ident);
// Will create a catalog specific table
return createSparkTable(
Expand All @@ -236,6 +232,44 @@ public Table loadTable(Identifier ident) throws NoSuchTableException {
}
}

@Override
public Table loadTable(Identifier ident, String version) throws NoSuchTableException {
try {
com.datastrato.gravitino.rel.Table gravitinoTable = loadGravitinoTable(ident);
// load SparkTable with version
Table sparkTable = sparkCatalog.loadTable(ident, version);
// Will create a catalog specific table
return createSparkTable(
ident,
gravitinoTable,
sparkTable,
sparkCatalog,
propertiesConverter,
sparkTransformConverter);
} catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
}

@Override
public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
try {
com.datastrato.gravitino.rel.Table gravitinoTable = loadGravitinoTable(ident);
// load SparkTable with timestamp
Table sparkTable = sparkCatalog.loadTable(ident, timestamp);
// Will create a catalog specific table
return createSparkTable(
ident,
gravitinoTable,
sparkTable,
sparkCatalog,
propertiesConverter,
sparkTransformConverter);
} catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
}

@SuppressWarnings("deprecation")
@Override
public Table createTable(
Expand Down Expand Up @@ -520,4 +554,16 @@ private static com.datastrato.gravitino.rel.TableChange.ColumnPosition transform
"Unsupported table column position %s", columnPosition.getClass().getName()));
}
}

private com.datastrato.gravitino.rel.Table loadGravitinoTable(Identifier ident)
throws NoSuchTableException {
try {
String database = getDatabase(ident);
return gravitinoCatalogClient
.asTableCatalog()
.loadTable(NameIdentifier.of(metalakeName, catalogName, database, ident.name()));
} catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
import com.datastrato.gravitino.spark.connector.utils.SparkBaseTableHelper;
import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.Set;
import org.apache.kyuubi.spark.connector.hive.HiveTable;
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/** Keep consistent behavior with the SparkIcebergTable */
public class SparkHiveTable extends HiveTable {
Expand All @@ -37,7 +43,11 @@ public SparkHiveTable(
(HiveTableCatalog) sparkHiveCatalog);
this.sparkBaseTableHelper =
new SparkBaseTableHelper(
identifier, gravitinoTable, propertiesConverter, sparkTransformConverter);
identifier,
gravitinoTable,
sparkHiveTable,
propertiesConverter,
sparkTransformConverter);
}

@Override
Expand All @@ -61,6 +71,21 @@ public Transform[] partitioning() {
return sparkBaseTableHelper.partitioning();
}

@Override
public Set<TableCapability> capabilities() {
return sparkBaseTableHelper.capabilities();
}

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return sparkBaseTableHelper.newScanBuilder(options);
}

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return sparkBaseTableHelper.newWriteBuilder(info);
}

@VisibleForTesting
public SparkTransformConverter getSparkTransformConverter() {
return sparkBaseTableHelper.getSparkTransformConverter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,25 @@
import com.google.common.annotations.VisibleForTesting;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.MetadataColumn;
import org.apache.spark.sql.connector.catalog.SupportsDelete;
import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.RowLevelOperationBuilder;
import org.apache.spark.sql.connector.write.RowLevelOperationInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* For spark-connector in Iceberg, it explicitly uses SparkTable to identify whether it is an
Expand All @@ -37,7 +50,11 @@ public SparkIcebergTable(
super(((SparkTable) sparkIcebergTable).table(), !isCacheEnabled(sparkIcebergCatalog));
this.sparkBaseTableHelper =
new SparkBaseTableHelper(
identifier, gravitinoTable, propertiesConverter, sparkTransformConverter);
identifier,
gravitinoTable,
sparkIcebergTable,
propertiesConverter,
sparkTransformConverter);
}

@Override
Expand All @@ -61,6 +78,42 @@ public Transform[] partitioning() {
return sparkBaseTableHelper.partitioning();
}

@Override
public Set<TableCapability> capabilities() {
return sparkBaseTableHelper.capabilities();
}

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return sparkBaseTableHelper.newScanBuilder(options);
}

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return sparkBaseTableHelper.newWriteBuilder(info);
}

@Override
public boolean canDeleteWhere(Filter[] filters) {
return ((SupportsDelete) sparkBaseTableHelper.getSparkTable()).canDeleteWhere(filters);
}

@Override
public void deleteWhere(Filter[] filters) {
((SupportsDelete) sparkBaseTableHelper.getSparkTable()).deleteWhere(filters);
}

@Override
public MetadataColumn[] metadataColumns() {
return ((SupportsMetadataColumns) sparkBaseTableHelper.getSparkTable()).metadataColumns();
}

@Override
public RowLevelOperationBuilder newRowLevelOperationBuilder(RowLevelOperationInfo info) {
return ((SupportsRowLevelOperations) sparkBaseTableHelper.getSparkTable())
.newRowLevelOperationBuilder(info);
}

@VisibleForTesting
public SparkTransformConverter getSparkTransformConverter() {
return sparkBaseTableHelper.getSparkTransformConverter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,23 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* Provides schema info from Gravitino, IO from the internal spark table. The specific catalog table
Expand All @@ -34,16 +43,19 @@ public class SparkBaseTableHelper {

private Identifier identifier;
private com.datastrato.gravitino.rel.Table gravitinoTable;
private Table sparkTable;
private PropertiesConverter propertiesConverter;
private SparkTransformConverter sparkTransformConverter;

public SparkBaseTableHelper(
Identifier identifier,
com.datastrato.gravitino.rel.Table gravitinoTable,
Table sparkTable,
PropertiesConverter propertiesConverter,
SparkTransformConverter sparkTransformConverter) {
this.identifier = identifier;
this.gravitinoTable = gravitinoTable;
this.sparkTable = sparkTable;
this.propertiesConverter = propertiesConverter;
this.sparkTransformConverter = sparkTransformConverter;
}
Expand Down Expand Up @@ -99,6 +111,22 @@ public Transform[] partitioning() {
return sparkTransformConverter.toSparkTransform(partitions, distribution, sortOrders);
}

public Set<TableCapability> capabilities() {
return sparkTable.capabilities();
}

public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return ((SupportsRead) sparkTable).newScanBuilder(options);
}

public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return ((SupportsWrite) sparkTable).newWriteBuilder(info);
}

public Table getSparkTable() {
return sparkTable;
}

public SparkTransformConverter getSparkTransformConverter() {
return sparkTransformConverter;
}
Expand Down

0 comments on commit 65ef2a4

Please sign in to comment.