Skip to content

Commit

Permalink
[apache#3885] feat(catalog-lakehouse-paimon): Add a basic code skelet…
Browse files Browse the repository at this point in the history
…on for Paimon catalog
  • Loading branch information
caican committed Jun 18, 2024
1 parent 2d13784 commit b904f10
Show file tree
Hide file tree
Showing 13 changed files with 464 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ tasks {
dependsOn(
":catalogs:catalog-hive:copyLibAndConfig",
":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig",
":catalogs:catalog-lakehouse-paimon:copyLibAndConfig",
":catalogs:catalog-jdbc-doris:copyLibAndConfig",
":catalogs:catalog-jdbc-mysql:copyLibAndConfig",
":catalogs:catalog-jdbc-postgresql:copyLibAndConfig",
Expand Down
111 changes: 111 additions & 0 deletions catalogs/catalog-lakehouse-paimon/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
description = "catalog-lakehouse-paimon"

plugins {
`maven-publish`
id("java")
id("idea")
}

dependencies {
implementation(project(":api"))
implementation(project(":common"))
implementation(project(":core"))
implementation(libs.bundles.paimon)
implementation(libs.guava)
implementation(libs.hadoop2.common) {
exclude("com.github.spotbugs")
}
implementation(libs.hadoop2.hdfs)
implementation(libs.hive2.exec) {
artifact {
classifier = "core"
}
exclude("com.google.code.findbugs", "jsr305")
exclude("com.google.protobuf")
exclude("org.apache.avro")
exclude("org.apache.calcite")
exclude("org.apache.calcite.avatica")
exclude("org.apache.curator")
exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager")
exclude("org.apache.logging.log4j")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.openjdk.jol")
exclude("org.pentaho")
exclude("org.slf4j")
}
implementation(libs.hive2.metastore) {
exclude("co.cask.tephra")
exclude("com.github.spotbugs")
exclude("com.google.code.findbugs", "jsr305")
exclude("com.tdunning", "json")
exclude("javax.transaction", "transaction-api")
exclude("org.apache.avro", "avro")
exclude("org.apache.hbase")
exclude("org.apache.hadoop", "hadoop-yarn-api")
exclude("org.apache.hadoop", "hadoop-yarn-server-applicationhistoryservice")
exclude("org.apache.hadoop", "hadoop-yarn-server-common")
exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager")
exclude("org.apache.hadoop", "hadoop-yarn-server-web-proxy")
exclude("org.apache.logging.log4j")
exclude("org.apache.parquet", "parquet-hadoop-bundle")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.pentaho") // missing dependency
exclude("org.slf4j", "slf4j-log4j12")
exclude("com.zaxxer", "HikariCP")
exclude("com.sun.jersey", "jersey-server")
}

annotationProcessor(libs.lombok)
compileOnly(libs.lombok)

testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.mockito.core)
testImplementation(libs.mockito.inline)

testRuntimeOnly(libs.junit.jupiter.engine)
}

tasks {
val runtimeJars by registering(Copy::class) {
from(configurations.runtimeClasspath)
into("build/libs")
}

val copyCatalogLibs by registering(Copy::class) {
dependsOn("jar", "runtimeJars")
from("build/libs")
into("$rootDir/distribution/package/catalogs/lakehouse-paimon/libs")
}

val copyCatalogConfig by registering(Copy::class) {
from("src/main/resources")
into("$rootDir/distribution/package/catalogs/lakehouse-paimon/conf")

include("lakehouse-paimon.conf")

rename { original ->
if (original.endsWith(".template")) {
original.replace(".template", "")
} else {
original
}
}

exclude { details ->
details.file.isDirectory()
}
}

register("copyLibAndConfig", Copy::class) {
dependsOn(copyCatalogLibs, copyCatalogConfig)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.lakehouse.paimon;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.connector.BaseCatalog;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.PropertiesMetadata;
import com.datastrato.gravitino.connector.capability.Capability;
import java.util.Map;

/** Implementation of {@link Catalog} that represents a Paimon catalog in Gravitino. */
public class PaimonCatalog extends BaseCatalog<PaimonCatalog> {

/** @return The short name of the catalog. */
@Override
public String shortName() {
return "lakehouse-paimon";
}

/**
* Creates a new instance of {@link PaimonCatalogOperations} with the provided configuration.
*
* @param config The configuration map for the Paimon catalog operations.
* @return A new instance of {@link PaimonCatalogOperations}.
*/
@Override
protected CatalogOperations newOps(Map<String, String> config) {
return new PaimonCatalogOperations();
}

@Override
public Capability newCapability() {
return new PaimonCatalogCapability();
}

@Override
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"The catalog does not support table properties metadata");
}

@Override
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"The catalog does not support catalog properties metadata");
}

@Override
public PropertiesMetadata schemaPropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"The catalog does not support schema properties metadata");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.lakehouse.paimon;

import com.datastrato.gravitino.connector.capability.Capability;

public class PaimonCatalogCapability implements Capability {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.lakehouse.paimon;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.connector.CatalogInfo;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.HasPropertyMetadata;
import com.datastrato.gravitino.connector.SupportsSchemas;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import java.io.IOException;
import java.util.Map;

/**
* Implementation of {@link CatalogOperations} that represents operations for interacting with the
* Paimon catalog in Gravitino.
*/
public class PaimonCatalogOperations implements CatalogOperations, SupportsSchemas, TableCatalog {

/**
* Initializes the Paimon catalog operations with the provided configuration.
*
* @param conf The configuration map for the Paimon catalog operations.
* @param info The catalog info associated with this operations instance.
* @throws RuntimeException if initialization fails.
*/
@Override
public void initialize(
Map<String, String> conf, CatalogInfo info, HasPropertyMetadata propertiesMetadata)
throws RuntimeException {}

/**
* Lists the schemas under the specified namespace.
*
* @param namespace The namespace to list the schemas for.
* @return An array of {@link NameIdentifier} representing the schemas in the namespace.
* @throws NoSuchCatalogException If the provided namespace is invalid or does not exist.
*/
@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
throw new UnsupportedOperationException();
}

/**
* Creates a new schema with the provided identifier, comment, and metadata.
*
* @param identifier The identifier of the schema to create.
* @param comment The comment for the new schema.
* @param properties The properties for the new schema.
* @return The newly created {@link PaimonSchema} instance.
* @throws NoSuchCatalogException If the provided namespace is invalid or does not exist.
* @throws SchemaAlreadyExistsException If a schema with the same name already exists.
*/
@Override
public PaimonSchema createSchema(
NameIdentifier identifier, String comment, Map<String, String> properties)
throws NoSuchCatalogException, SchemaAlreadyExistsException {
throw new UnsupportedOperationException();
}

/**
* Loads the schema with the provided identifier.
*
* @param identifier The identifier of the schema to load.
* @return The loaded {@link PaimonSchema} representing the schema.
* @throws NoSuchSchemaException If the schema with the provided identifier does not exist.
*/
@Override
public PaimonSchema loadSchema(NameIdentifier identifier) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
}

/**
* Alters the schema with the provided identifier according to the specified {@link SchemaChange}
* changes.
*
* @param identifier The identifier of the schema to alter.
* @param changes The changes to apply to the schema.
* @return The altered {@link PaimonSchema} instance.
* @throws NoSuchSchemaException If the schema with the provided identifier does not exist.
*/
@Override
public PaimonSchema alterSchema(NameIdentifier identifier, SchemaChange... changes)
throws NoSuchSchemaException {
throw new UnsupportedOperationException();
}

/**
* Drops the schema with the provided identifier.
*
* @param identifier The identifier of the schema to drop.
* @param cascade If set to true, drops all the tables in the schema as well.
* @return true if the schema is dropped successfully, false otherwise.
* @throws NonEmptySchemaException If the schema is not empty and 'cascade' is set to false.
*/
@Override
public boolean dropSchema(NameIdentifier identifier, boolean cascade)
throws NonEmptySchemaException {
throw new UnsupportedOperationException();
}

/**
* Lists all the tables under the specified namespace.
*
* @param namespace The namespace to list tables for.
* @return An array of {@link NameIdentifier} representing the tables in the namespace.
* @throws NoSuchSchemaException If the schema with the provided namespace does not exist.
*/
@Override
public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
}

/**
* Loads the table with the provided identifier.
*
* @param identifier The identifier of the table to load.
* @return The loaded {@link PaimonTable} instance representing the table.
* @throws NoSuchTableException If the table with the provided identifier does not exist.
*/
@Override
public PaimonTable loadTable(NameIdentifier identifier) throws NoSuchTableException {
throw new UnsupportedOperationException();
}

/**
* Creates a new table with the provided identifier, comment, and metadata.
*
* @param identifier The identifier of the table to create.
* @param columns The array of columns for the new table.
* @param comment The comment for the new table.
* @param properties The properties for the new table.
* @param partitioning The partitioning for the new table.
* @param indexes The indexes for the new table.
* @return The newly created {@link PaimonTable} instance.
* @throws NoSuchSchemaException If the schema with the provided namespace does not exist.
* @throws TableAlreadyExistsException If the table with the same identifier already exists.
*/
@Override
public PaimonTable createTable(
NameIdentifier identifier,
Column[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning,
Distribution distribution,
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
throw new UnsupportedOperationException();
}

/**
* Alters the table with the provided identifier according to the specified {@link TableChange}
* changes.
*
* @param identifier The identifier of the table to alter.
* @param changes The changes to apply to the table.
* @return The altered {@link PaimonTable} instance.
* @throws NoSuchTableException If the table with the provided identifier does not exist.
* @throws IllegalArgumentException This exception will not be thrown in this method.
*/
@Override
public PaimonTable alterTable(NameIdentifier identifier, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
throw new UnsupportedOperationException();
}

/**
* Drops the table with the provided identifier.
*
* @param identifier The identifier of the table to drop.
* @return true if the table is successfully dropped, false if the table does not exist.
*/
@Override
public boolean dropTable(NameIdentifier identifier) {
throw new UnsupportedOperationException();
}

/**
* Purges the table with the provided identifier.
*
* @param identifier The identifier of the table to purge.
* @return true if the table is successfully purged, false if the table does not exist.
* @throws UnsupportedOperationException If the table type is EXTERNAL_TABLE, it cannot be purged.
*/
@Override
public boolean purgeTable(NameIdentifier identifier) throws UnsupportedOperationException {
throw new UnsupportedOperationException();
}

@Override
public void close() throws IOException {}
}
Loading

0 comments on commit b904f10

Please sign in to comment.