Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
hdygxsj committed Dec 19, 2024
1 parent 7ad9377 commit e357efe
Show file tree
Hide file tree
Showing 9 changed files with 603 additions and 2 deletions.
13 changes: 12 additions & 1 deletion flink-connector/flink/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ repositories {
mavenCentral()
}

var paimonVersion: String = libs.versions.paimon.get()
val flinkVersion: String = libs.versions.flink.get()
val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".")

Expand All @@ -37,15 +38,22 @@ val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".")
val scalaVersion: String = "2.12"
val artifactName = "${rootProject.name}-flink-${flinkMajorVersion}_$scalaVersion"

tasks.compileJava {
dependsOn(":catalogs:catalog-lakehouse-paimon:runtimeJars")
}

dependencies {
implementation(project(":core"))
implementation(project(":catalogs:catalog-common"))

implementation(libs.guava)

compileOnly(project(":clients:client-java-runtime", configuration = "shadow"))

compileOnly(project(":catalogs:catalog-lakehouse-paimon"))
compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
compileOnly("org.apache.flink:flink-table-common:$flinkVersion")
compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion")
compileOnly("org.apache.paimon:paimon-flink-1.18:$paimonVersion")

compileOnly(libs.hive2.exec) {
artifact {
Expand Down Expand Up @@ -75,6 +83,7 @@ dependencies {
testImplementation(project(":clients:client-java"))
testImplementation(project(":core"))
testImplementation(project(":common"))
testImplementation(project(":catalogs:catalog-lakehouse-paimon"))
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
Expand All @@ -90,6 +99,7 @@ dependencies {
testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
testImplementation("org.apache.flink:flink-table-common:$flinkVersion")
testImplementation("org.apache.flink:flink-table-api-java:$flinkVersion")
testImplementation("org.apache.paimon:paimon-flink-$flinkMajorVersion:$paimonVersion")

testImplementation(libs.hive2.exec) {
artifact {
Expand Down Expand Up @@ -170,6 +180,7 @@ tasks.test {
} else {
dependsOn(tasks.jar)
dependsOn(":catalogs:catalog-hive:jar")
dependsOn(":catalogs:catalog-lakehouse-paimon:jar")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.paimon;

import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;

/**
* The GravitinoPaimonCatalog class is an implementation of the BaseCatalog class that is used to
* proxy the PaimonCatalog class.
*/
public class GravitinoPaimonCatalog extends BaseCatalog {

private AbstractCatalog paimonCatalog;

protected GravitinoPaimonCatalog(
String catalogName,
AbstractCatalog paimonCatalog,
PropertiesConverter propertiesConverter,
PartitionConverter partitionConverter) {
super(catalogName, paimonCatalog.getDefaultDatabase(), propertiesConverter, partitionConverter);
}

@Override
protected AbstractCatalog realCatalog() {
return paimonCatalog;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.paimon;

import com.google.common.collect.ImmutableSet;
import java.util.Collections;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;

/**
* Factory for creating instances of {@link GravitinoPaimonCatalog}. It will be created by SPI
* discovery in Flink.
*/
public class GravitinoPaimonCatalogFactory implements BaseCatalogFactory {

@Override
public Catalog createCatalog(Context context) {
FlinkCatalog catalog = new FlinkCatalogFactory().createCatalog(context);
return new GravitinoPaimonCatalog(
context.getName(), catalog, propertiesConverter(), partitionConverter());
}

@Override
public String factoryIdentifier() {
return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return ImmutableSet.of(GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND);
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}

@Override
public String gravitinoCatalogProvider() {
return "lakehouse-paimon";
}

@Override
public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
return org.apache.gravitino.Catalog.Type.RELATIONAL;
}

@Override
public PropertiesConverter propertiesConverter() {
return PaimonPropertiesConverter.INSTANCE;
}

@Override
public PartitionConverter partitionConverter() {
return DefaultPartitionConverter.INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.paimon;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

public class GravitinoPaimonCatalogFactoryOptions {

/** Identifier for the {@link GravitinoPaimonCatalog}. */
public static final String IDENTIFIER = "gravitino-paimon";

public static ConfigOption<String> CATALOG_BACKEND =
ConfigOptions.key("catalog.backend")
.stringType()
.defaultValue("fileSystem")
.withDescription("");

public static ConfigOption<String> WAREHOUSE =
ConfigOptions.key("warehouse").stringType().noDefaultValue();

public static ConfigOption<String> URI = ConfigOptions.key("uri").stringType().noDefaultValue();

public static ConfigOption<String> JDBC_USER =
ConfigOptions.key("jdbc.user").stringType().noDefaultValue();

public static ConfigOption<String> JDBC_PASSWORD =
ConfigOptions.key("jdbc.password").stringType().noDefaultValue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.paimon;

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogBackend;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.paimon.options.CatalogOptions;

public class PaimonPropertiesConverter implements PropertiesConverter {

public static final PaimonPropertiesConverter INSTANCE = new PaimonPropertiesConverter();

private PaimonPropertiesConverter() {}

@Override
public Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
Map<String, String> gravitinoCatalogProperties = flinkConf.toMap();
String warehouse = flinkConf.get(GravitinoPaimonCatalogFactoryOptions.WAREHOUSE);
gravitinoCatalogProperties.put(PaimonConfig.CATALOG_WAREHOUSE.getKey(), warehouse);
String backendType = flinkConf.get(GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND);
gravitinoCatalogProperties.put(
PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, backendType);
if (PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(backendType)) {
gravitinoCatalogProperties.put(
PaimonConfig.CATALOG_URI.getKey(),
flinkConf.get(GravitinoPaimonCatalogFactoryOptions.URI));
gravitinoCatalogProperties.put(
PaimonConfig.CATALOG_JDBC_USER.getKey(),
flinkConf.get(GravitinoPaimonCatalogFactoryOptions.JDBC_USER));
gravitinoCatalogProperties.put(
PaimonConfig.CATALOG_JDBC_PASSWORD.getKey(),
flinkConf.get(GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD));
} else if (PaimonCatalogBackend.HIVE.name().equalsIgnoreCase(backendType)) {
throw new UnsupportedOperationException(
"The Gravitino Connector does not currently support creating a Paimon Catalog that uses Hive Metastore.");
}
return gravitinoCatalogProperties;
}

@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Map<String, String> flinkCatalogProperties = Maps.newHashMap();
flinkCatalogProperties.putAll(gravitinoProperties);
String backendType =
flinkCatalogProperties.get(PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND);
if (PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(backendType)) {
flinkCatalogProperties.put(CatalogOptions.METASTORE.key(), backendType);
flinkCatalogProperties.put(
GravitinoPaimonCatalogFactoryOptions.URI.key(),
gravitinoProperties.get(PaimonConfig.CATALOG_URI.getKey()));
flinkCatalogProperties.put(
GravitinoPaimonCatalogFactoryOptions.JDBC_USER.key(),
gravitinoProperties.get(PaimonConfig.CATALOG_JDBC_USER.getKey()));
flinkCatalogProperties.put(
GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD.key(),
gravitinoProperties.get(PaimonConfig.CATALOG_JDBC_PASSWORD.getKey()));
} else if (PaimonCatalogBackend.HIVE.name().equalsIgnoreCase(backendType)) {
throw new UnsupportedOperationException(
"The Gravitino Connector does not currently support creating a Paimon Catalog that uses Hive Metastore.");
}
flinkCatalogProperties.put(
GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND.key(), backendType);
flinkCatalogProperties.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER);
return flinkCatalogProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
#

org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory
org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory
Loading

0 comments on commit e357efe

Please sign in to comment.