Skip to content

Commit

Permalink
[apache#2738] feat(catalog-lakehouse-paimon): introduce code skeleton…
Browse files Browse the repository at this point in the history
… for Paimon catalog
  • Loading branch information
SteNicholas committed Apr 1, 2024
1 parent ababbb1 commit 3d12272
Show file tree
Hide file tree
Showing 12 changed files with 464 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ tasks {
dependsOn(
":catalogs:catalog-hive:copyLibAndConfig",
":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig",
":catalogs:catalog-lakehouse-paimon:copyLibAndConfig",
":catalogs:catalog-jdbc-doris:copyLibAndConfig",
":catalogs:catalog-jdbc-mysql:copyLibAndConfig",
":catalogs:catalog-jdbc-postgresql:copyLibAndConfig",
Expand Down
2 changes: 2 additions & 0 deletions catalogs/bundled-catalog/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
implementation(project(":catalogs:catalog-jdbc-mysql"))
implementation(project(":catalogs:catalog-jdbc-postgresql"))
implementation(project(":catalogs:catalog-lakehouse-iceberg"))
implementation(project(":catalogs:catalog-lakehouse-paimon"))
implementation(project(":core"))
implementation(libs.slf4j.api)
}
Expand Down Expand Up @@ -80,6 +81,7 @@ tasks.jar {
tasks.compileJava {
dependsOn(":catalogs:catalog-jdbc-postgresql:runtimeJars")
dependsOn(":catalogs:catalog-lakehouse-iceberg:runtimeJars")
dependsOn(":catalogs:catalog-lakehouse-paimon:runtimeJars")
dependsOn(":catalogs:catalog-jdbc-mysql:runtimeJars")
dependsOn(":catalogs:catalog-hive:runtimeJars")
dependsOn(":catalogs:catalog-hadoop:runtimeJars")
Expand Down
54 changes: 54 additions & 0 deletions catalogs/catalog-lakehouse-paimon/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
description = "catalog-lakehouse-paimon"

plugins {
`maven-publish`
id("java")
id("idea")
}

dependencies {
implementation(project(":api"))
implementation(project(":common"))
implementation(project(":core"))
implementation(libs.guava)
}

tasks {
val runtimeJars by registering(Copy::class) {
from(configurations.runtimeClasspath)
into("build/libs")
}

val copyCatalogLibs by registering(Copy::class) {
dependsOn("jar", "runtimeJars")
from("build/libs")
into("$rootDir/distribution/package/catalogs/lakehouse-paimon/libs")
}

val copyCatalogConfig by registering(Copy::class) {
from("src/main/resources")
into("$rootDir/distribution/package/catalogs/lakehouse-paimon/conf")

include("lakehouse-paimon.conf")

rename { original ->
if (original.endsWith(".template")) {
original.replace(".template", "")
} else {
original
}
}

exclude { details ->
details.file.isDirectory()
}
}

register("copyLibAndConfig", Copy::class) {
dependsOn(copyCatalogLibs, copyCatalogConfig)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.lakehouse.paimon;

import com.datastrato.gravitino.connector.BaseCatalog;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.rel.TableCatalog;
import java.util.Map;

/** Implementation of a Paimon catalog in Gravitino. */
public class PaimonCatalog extends BaseCatalog<PaimonCatalog> {

/** @return The short name of the catalog. */
@Override
public String shortName() {
return "lakehouse-paimon";
}

/**
* Creates a new instance of {@link PaimonCatalogOperations} with the provided configuration.
*
* @param config The configuration map for the Paimon catalog operations.
* @return A new instance of {@link PaimonCatalogOperations}.
*/
@Override
protected CatalogOperations newOps(Map<String, String> config) {
return new PaimonCatalogOperations();
}

/** @return The Paimon catalog operations as {@link PaimonCatalogOperations}. */
@Override
public SupportsSchemas asSchemas() {
return (PaimonCatalogOperations) ops();
}

/** @return The Paimon catalog operations as {@link PaimonCatalogOperations}. */
@Override
public TableCatalog asTableCatalog() {
return (PaimonCatalogOperations) ops();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.lakehouse.paimon;

/** The type of Paimon catalog metastore. */
public enum PaimonCatalogMetastore {
FILE,
HIVE,
JDBC
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.lakehouse.paimon;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.connector.CatalogInfo;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.PropertiesMetadata;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.Schema;
import com.datastrato.gravitino.rel.SchemaChange;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import java.io.IOException;
import java.util.Map;

/** Operations for interacting with the Paimon catalog in Gravitino. */
public class PaimonCatalogOperations implements CatalogOperations, SupportsSchemas, TableCatalog {

private PaimonCatalogPropertiesMetadata paimonCatalogPropertiesMetadata;

private PaimonTablePropertiesMetadata paimonTablePropertiesMetadata;

private PaimonSchemaPropertiesMetadata paimonSchemaPropertiesMetadata;

/**
* Initializes the Paimon catalog operations with the provided configuration.
*
* @param conf The configuration map for the Paimon catalog operations.
* @param info The catalog info associated with this operations instance.
* @throws RuntimeException if initialization fails.
*/
@Override
public void initialize(Map<String, String> conf, CatalogInfo info) throws RuntimeException {
this.paimonCatalogPropertiesMetadata = new PaimonCatalogPropertiesMetadata();
this.paimonTablePropertiesMetadata = new PaimonTablePropertiesMetadata();
this.paimonSchemaPropertiesMetadata = new PaimonSchemaPropertiesMetadata();
}

/**
* Lists the schemas under the given namespace.
*
* @param namespace The namespace to list the schemas for.
* @return An array of {@link NameIdentifier} representing the schemas.
* @throws NoSuchCatalogException If the provided namespace is invalid or does not exist.
*/
@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
throw new UnsupportedOperationException();
}

/**
* Creates a new schema with the provided identifier, comment, and metadata.
*
* @param ident The identifier of the schema to create.
* @param comment The comment for the schema.
* @param properties The properties for the schema.
* @return The created {@link Schema}.
* @throws NoSuchCatalogException If the provided namespace is invalid or does not exist.
* @throws SchemaAlreadyExistsException If a schema with the same name already exists.
*/
@Override
public Schema createSchema(NameIdentifier ident, String comment, Map<String, String> properties)
throws NoSuchCatalogException, SchemaAlreadyExistsException {
throw new UnsupportedOperationException();
}

/**
* Loads the schema with the provided identifier.
*
* @param ident The identifier of the schema to load.
* @return The loaded {@link Schema}.
* @throws NoSuchSchemaException If the schema with the provided identifier does not exist.
*/
@Override
public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
}

/**
* Alters the schema with the provided identifier according to the specified changes.
*
* @param ident The identifier of the schema to alter.
* @param changes The changes to apply to the schema.
* @return The altered {@link Schema}.
* @throws NoSuchSchemaException If the schema with the provided identifier does not exist.
*/
@Override
public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
throws NoSuchSchemaException {
throw new UnsupportedOperationException();
}

/**
* Drops the schema with the provided identifier.
*
* @param ident The identifier of the schema to drop.
* @param cascade If set to true, drops all the tables in the schema as well.
* @return true if the schema was dropped successfully, false otherwise.
* @throws NonEmptySchemaException If the schema is not empty and 'cascade' is set to false.
*/
@Override
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
throw new UnsupportedOperationException();
}

/**
* Lists all the tables under the specified namespace.
*
* @param namespace The namespace to list tables for.
* @return An array of {@link NameIdentifier} representing the tables in the namespace.
* @throws NoSuchSchemaException If the schema with the provided namespace does not exist.
*/
@Override
public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
}

/**
* Loads a table from the Paimon.
*
* @param tableIdent The identifier of the table to load.
* @return The loaded PaimonTable instance representing the table.
* @throws NoSuchTableException If the specified table does not exist in the Paimon.
*/
@Override
public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
throw new UnsupportedOperationException();
}

/**
* Apply the {@link TableChange change} to an existing Paimon table.
*
* @param tableIdent The identifier of the table to alter.
* @param changes The changes to apply to the table.
* @return This method always throws UnsupportedOperationException.
* @throws NoSuchTableException This exception will not be thrown in this method.
* @throws IllegalArgumentException This exception will not be thrown in this method.
*/
@Override
public Table alterTable(NameIdentifier tableIdent, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
throw new UnsupportedOperationException();
}

/**
* Drops a table from the Paimon.
*
* @param tableIdent The identifier of the table to drop.
* @return true if the table is successfully dropped; false if the table does not exist.
*/
@Override
public boolean dropTable(NameIdentifier tableIdent) {
throw new UnsupportedOperationException();
}

/**
* Creates a new table in the Paimon.
*
* @param tableIdent The identifier of the table to create.
* @param columns The array of columns for the new table.
* @param comment The comment for the new table.
* @param properties The properties for the new table.
* @param partitioning The partitioning for the new table.
* @param indexes The indexes for the new table.
* @return The newly created PaimonTable instance.
* @throws NoSuchSchemaException If the schema for the table does not exist.
* @throws TableAlreadyExistsException If the table with the same name already exists.
*/
@Override
public Table createTable(
NameIdentifier tableIdent,
Column[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning,
Distribution distribution,
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
throw new UnsupportedOperationException();
}

/**
* Purges a table from the Paimon.
*
* @param tableIdent The identifier of the table to purge.
* @return true if the table is successfully purged; false if the table does not exist.
* @throws UnsupportedOperationException If the table type is EXTERNAL_TABLE, it cannot be purged.
*/
@Override
public boolean purgeTable(NameIdentifier tableIdent) throws UnsupportedOperationException {
throw new UnsupportedOperationException();
}

@Override
public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException {
return paimonTablePropertiesMetadata;
}

@Override
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException {
return paimonCatalogPropertiesMetadata;
}

@Override
public PropertiesMetadata schemaPropertiesMetadata() throws UnsupportedOperationException {
return paimonSchemaPropertiesMetadata;
}

@Override
public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"Paimon catalog doesn't support fileset related operations");
}

@Override
public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"Paimon catalog doesn't support topic related operations");
}

@Override
public void close() throws IOException {}
}
Loading

0 comments on commit 3d12272

Please sign in to comment.