Skip to content

Commit

Permalink
[#3892] feat(catalog-lakehouse-paimon): Support table operations for …
Browse files Browse the repository at this point in the history
…Paimon Catalog (#3939)

### What changes were proposed in this pull request?
Support table operations for Paimon Catalog.

### Why are the changes needed?

Fix: #3892

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New UTs and ITs.

---------

Co-authored-by: caican <[email protected]>
  • Loading branch information
caican00 and caican committed Aug 1, 2024
1 parent 2393bfc commit a9950b2
Show file tree
Hide file tree
Showing 20 changed files with 1,824 additions and 54 deletions.
3 changes: 3 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@
./clients/client-java/src/main/java/com/datastrato/gravitino/client/OAuth2ClientUtil.java
./gradlew

Apache Paimon
./catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TypeUtils.java

Apache Hive
./catalogs/catalog-hive/src/test/resources/hive-schema-3.1.0.derby.sql

Expand Down
19 changes: 19 additions & 0 deletions catalogs/catalog-lakehouse-paimon/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ plugins {
id("idea")
}

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark34.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val paimonVersion: String = libs.versions.paimon.get()

dependencies {
implementation(project(":api"))
implementation(project(":common"))
Expand All @@ -18,6 +23,7 @@ dependencies {
exclude("com.sun.jersey")
exclude("javax.servlet")
}
implementation(libs.bundles.log4j)
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.hadoop2.common) {
Expand All @@ -41,6 +47,19 @@ dependencies {
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
exclude("org.apache.hadoop")
}
testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") {
exclude("org.apache.avro")
exclude("org.apache.hadoop")
exclude("org.apache.zookeeper")
exclude("io.dropwizard.metrics")
exclude("org.rocksdb")
}
testImplementation("org.apache.paimon:paimon-spark-$sparkMajorVersion:$paimonVersion") {
exclude("org.apache.hadoop")
}
testImplementation(libs.slf4j.api)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.mysql.driver)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.lakehouse.paimon;

import static com.datastrato.gravitino.catalog.lakehouse.paimon.utils.TypeUtils.fromPaimonType;
import static com.datastrato.gravitino.catalog.lakehouse.paimon.utils.TypeUtils.toPaimonType;

import com.datastrato.gravitino.connector.BaseColumn;
import com.datastrato.gravitino.rel.Column;
import java.util.List;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;

/** Implementation of {@link Column} that represents a column in the Paimon column. */
@EqualsAndHashCode(callSuper = true)
public class GravitinoPaimonColumn extends BaseColumn {

private GravitinoPaimonColumn() {}

/**
* Converts {@link GravitinoPaimonColumn} instance to inner column.
*
* @param id The id of inner column.
* @return The converted inner column.
*/
public static DataField toPaimonColumn(int id, Column gravitinoColumn) {
DataType paimonType = toPaimonType(gravitinoColumn.dataType());
DataType paimonTypeWithNullable =
gravitinoColumn.nullable() ? paimonType.nullable() : paimonType.notNull();
return new DataField(
id, gravitinoColumn.name(), paimonTypeWithNullable, gravitinoColumn.comment());
}

/**
* Creates new {@link GravitinoPaimonColumn} instance from Paimon columns.
*
* @param rowType The {@link RowType} instance of Paimon column.
* @return New {@link GravitinoPaimonColumn} instances.
*/
public static List<GravitinoPaimonColumn> fromPaimonRowType(RowType rowType) {
return rowType.getFields().stream()
.map(GravitinoPaimonColumn::fromPaimonColumn)
.collect(Collectors.toList());
}

/**
* Creates a new {@link GravitinoPaimonColumn} instance from inner column.
*
* @param dataField The {@link DataField} instance of inner column.
* @return A new {@link GravitinoPaimonColumn} instance.
*/
public static GravitinoPaimonColumn fromPaimonColumn(DataField dataField) {
return builder()
.withName(dataField.name())
.withType(fromPaimonType(dataField.type()))
.withComment(dataField.description())
.withNullable(dataField.type().isNullable())
.build();
}

/** A builder class for constructing {@link GravitinoPaimonColumn} instance. */
public static class Builder extends BaseColumnBuilder<Builder, GravitinoPaimonColumn> {

/** Creates a new instance of {@link Builder}. */
private Builder() {}

/**
* Internal method to build a {@link GravitinoPaimonColumn} instance using the provided values.
*
* @return A new {@link GravitinoPaimonColumn} instance with the configured values.
*/
@Override
protected GravitinoPaimonColumn internalBuild() {
GravitinoPaimonColumn paimonColumn = new GravitinoPaimonColumn();
paimonColumn.name = name;
paimonColumn.comment = comment;
paimonColumn.dataType = dataType;
paimonColumn.nullable = nullable;
paimonColumn.autoIncrement = autoIncrement;
paimonColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET : defaultValue;
return paimonColumn;
}
}

/**
* Creates a new instance of {@link Builder}.
*
* @return The new instance.
*/
public static Builder builder() {
return new Builder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.lakehouse.paimon;

import static com.datastrato.gravitino.catalog.lakehouse.paimon.GravitinoPaimonColumn.fromPaimonRowType;
import static com.datastrato.gravitino.catalog.lakehouse.paimon.GravitinoPaimonColumn.toPaimonColumn;
import static com.datastrato.gravitino.meta.AuditInfo.EMPTY;

import com.datastrato.gravitino.connector.BaseTable;
import com.datastrato.gravitino.connector.TableOperations;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.ToString;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;

/** Implementation of {@link Table} that represents a Paimon Table entity in the Paimon table. */
@ToString
@Getter
public class GravitinoPaimonTable extends BaseTable {

private GravitinoPaimonTable() {}

@Override
protected TableOperations newOps() {
// TODO: Implement this interface when we have the Paimon table operations.
throw new UnsupportedOperationException("PaimonTable does not support TableOperations.");
}

/**
* Converts {@link GravitinoPaimonTable} instance to Paimon table.
*
* @return The converted Paimon table.
*/
public Schema toPaimonTableSchema() {
Schema.Builder builder = Schema.newBuilder().comment(comment).options(properties);
for (int index = 0; index < columns.length; index++) {
DataField dataField = toPaimonColumn(index, columns[index]);
builder.column(dataField.name(), dataField.type(), dataField.description());
}
return builder.build();
}

/**
* Creates a new {@link GravitinoPaimonTable} instance from Paimon table.
*
* @param table The {@link Table} instance of Paimon table.
* @return A new {@link GravitinoPaimonTable} instance.
*/
public static GravitinoPaimonTable fromPaimonTable(Table table) {
return builder()
.withName(table.name())
.withColumns(fromPaimonRowType(table.rowType()).toArray(new GravitinoPaimonColumn[0]))
.withComment(table.comment().orElse(null))
.withProperties(table.options())
.withAuditInfo(EMPTY)
.build();
}

/** A builder class for constructing {@link GravitinoPaimonTable} instance. */
public static class Builder extends BaseTableBuilder<Builder, GravitinoPaimonTable> {

/** Creates a new instance of {@link Builder}. */
private Builder() {}

/**
* Internal method to build a {@link GravitinoPaimonTable} instance using the provided values.
*
* @return A new {@link GravitinoPaimonTable} instance with the configured values.
*/
@Override
protected GravitinoPaimonTable internalBuild() {
GravitinoPaimonTable paimonTable = new GravitinoPaimonTable();
paimonTable.name = name;
paimonTable.comment = comment;
paimonTable.columns = columns;
paimonTable.properties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties);
paimonTable.auditInfo = auditInfo;
return paimonTable;
}
}

/**
* Creates a new instance of {@link Builder}.
*
* @return The new instance.
*/
public static Builder builder() {
return new Builder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public class PaimonCatalog extends BaseCatalog<PaimonCatalog> {
static final PaimonSchemaPropertiesMetadata SCHEMA_PROPERTIES_META =
new PaimonSchemaPropertiesMetadata();

static final PaimonTablePropertiesMetadata TABLE_PROPERTIES_META =
new PaimonTablePropertiesMetadata();

/** @return The short name of the catalog. */
@Override
public String shortName() {
Expand All @@ -44,8 +47,7 @@ public Capability newCapability() {

@Override
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"The catalog does not support table properties metadata");
return TABLE_PROPERTIES_META;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,14 @@
package com.datastrato.gravitino.catalog.lakehouse.paimon;

import com.datastrato.gravitino.connector.capability.Capability;
import com.datastrato.gravitino.connector.capability.CapabilityResult;

public class PaimonCatalogCapability implements Capability {}
public class PaimonCatalogCapability implements Capability {

@Override
public CapabilityResult columnDefaultValue() {
// See https://github.com/apache/paimon/pull/1425/files
return CapabilityResult.unsupported(
"Paimon set column default value through table properties instead of column info.");
}
}
Loading

0 comments on commit a9950b2

Please sign in to comment.