From 9695c28ccf5ebb8e7e0f2f17553a63270b88adef Mon Sep 17 00:00:00 2001 From: Xiaojian Sun Date: Thu, 19 Dec 2024 10:01:42 +0800 Subject: [PATCH] [#5867] feat(flink-connector): Improve flink connector (#5868) ### What changes were proposed in this pull request? 1. Remove useless code 2. Optimize the logic of the store, and when adding connector support, there is no need to modify the logic of the store. ### Why are the changes needed? Fix : [#5867](https://github.com/apache/gravitino/issues/5867) ### Does this PR introduce any user-facing change? No. ### How was this patch tested? FlinkHiveCatalogIT --- .../flink/connector/PartitionConverter.java | 4 +- .../flink/connector/catalog/BaseCatalog.java | 14 +-- .../connector/catalog/BaseCatalogFactory.java | 56 +++++++++++ .../connector/hive/GravitinoHiveCatalog.java | 15 +-- .../hive/GravitinoHiveCatalogFactory.java | 49 +++++++++- .../store/GravitinoCatalogStore.java | 96 ++++++++++++------- 6 files changed, 176 insertions(+), 58 deletions(-) create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java index 5464c705a37..e8029e567d1 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java @@ -34,7 +34,7 @@ public interface PartitionConverter { * @param partitions The partition keys in Gravitino. * @return The partition keys in Flink. */ - public abstract List toFlinkPartitionKeys(Transform[] partitions); + List toFlinkPartitionKeys(Transform[] partitions); /** * Convert the partition keys to Gravitino partition keys. @@ -42,5 +42,5 @@ public interface PartitionConverter { * @param partitionsKey The partition keys in Flink. * @return The partition keys in Gravitino. */ - public abstract Transform[] toGravitinoPartitions(List partitionsKey); + Transform[] toGravitinoPartitions(List partitionsKey); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index 6b76e31b8d2..1496742177f 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -85,10 +85,14 @@ public abstract class BaseCatalog extends AbstractCatalog { private final PropertiesConverter propertiesConverter; private final PartitionConverter partitionConverter; - protected BaseCatalog(String catalogName, String defaultDatabase) { + protected BaseCatalog( + String catalogName, + String defaultDatabase, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter) { super(catalogName, defaultDatabase); - this.propertiesConverter = getPropertiesConverter(); - this.partitionConverter = getPartitionConverter(); + this.propertiesConverter = propertiesConverter; + this.partitionConverter = partitionConverter; } protected abstract AbstractCatalog realCatalog(); @@ -508,10 +512,6 @@ public void alterPartitionColumnStatistics( throw new UnsupportedOperationException(); } - protected abstract PropertiesConverter getPropertiesConverter(); - - protected abstract PartitionConverter getPartitionConverter(); - protected CatalogBaseTable toFlinkTable(Table table) { org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder(); diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java new file mode 100644 index 00000000000..5086b532571 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.catalog; + +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; + +public interface BaseCatalogFactory extends CatalogFactory { + + /** + * Define gravitino catalog provider {@link org.apache.gravitino.CatalogProvider}. + * + * @return The requested gravitino catalog provider. + */ + String gravitinoCatalogProvider(); + + /** + * Define gravitino catalog type {@link Catalog.Type}. + * + * @return The requested gravitino catalog type. + */ + Catalog.Type gravitinoCatalogType(); + + /** + * Define properties converter {@link PropertiesConverter}. + * + * @return The requested property converter. + */ + PropertiesConverter propertiesConverter(); + + /** + * Define partition converter. + * + * @return The requested partition converter. + */ + PartitionConverter partitionConverter(); +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java index b4754596858..3e5d31fd3c5 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java @@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.factories.Factory; -import org.apache.gravitino.flink.connector.DefaultPartitionConverter; import org.apache.gravitino.flink.connector.PartitionConverter; import org.apache.gravitino.flink.connector.PropertiesConverter; import org.apache.gravitino.flink.connector.catalog.BaseCatalog; @@ -41,9 +40,11 @@ public class GravitinoHiveCatalog extends BaseCatalog { GravitinoHiveCatalog( String catalogName, String defaultDatabase, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter, @Nullable HiveConf hiveConf, @Nullable String hiveVersion) { - super(catalogName, defaultDatabase); + super(catalogName, defaultDatabase, propertiesConverter, partitionConverter); this.hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConf, hiveVersion); } @@ -68,16 +69,6 @@ public Optional getFactory() { return hiveCatalog.getFactory(); } - @Override - protected PropertiesConverter getPropertiesConverter() { - return HivePropertiesConverter.INSTANCE; - } - - @Override - protected PartitionConverter getPartitionConverter() { - return DefaultPartitionConverter.INSTANCE; - } - @Override protected AbstractCatalog realCatalog() { return hiveCatalog; diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java index 91eaa4e1638..23607ebb402 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java @@ -28,8 +28,11 @@ import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory; import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; -import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.gravitino.flink.connector.DefaultPartitionConverter; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; import org.apache.gravitino.flink.connector.utils.FactoryUtils; import org.apache.gravitino.flink.connector.utils.PropertyUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -38,7 +41,7 @@ * Factory for creating instances of {@link GravitinoHiveCatalog}. It will be created by SPI * discovery in Flink. */ -public class GravitinoHiveCatalogFactory implements CatalogFactory { +public class GravitinoHiveCatalogFactory implements BaseCatalogFactory { private HiveCatalogFactory hiveCatalogFactory; @Override @@ -60,6 +63,8 @@ public Catalog createCatalog(Context context) { return new GravitinoHiveCatalog( context.getName(), helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE), + propertiesConverter(), + partitionConverter(), hiveConf, helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION)); } @@ -81,4 +86,44 @@ public Set> requiredOptions() { public Set> optionalOptions() { return hiveCatalogFactory.optionalOptions(); } + + /** + * Define gravitino catalog provider {@link org.apache.gravitino.CatalogProvider}. + * + * @return The requested gravitino catalog provider. + */ + @Override + public String gravitinoCatalogProvider() { + return "hive"; + } + + /** + * Define gravitino catalog type {@link org.apache.gravitino.Catalog.Type}. + * + * @return The requested gravitino catalog type. + */ + @Override + public org.apache.gravitino.Catalog.Type gravitinoCatalogType() { + return org.apache.gravitino.Catalog.Type.RELATIONAL; + } + + /** + * Define properties converter {@link PropertiesConverter}. + * + * @return The requested property converter. + */ + @Override + public PropertiesConverter propertiesConverter() { + return HivePropertiesConverter.INSTANCE; + } + + /** + * Define partition converter. + * + * @return The requested partition converter. + */ + @Override + public PartitionConverter partitionConverter() { + return DefaultPartitionConverter.INSTANCE; + } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java index 2c210f21c2b..92e778ce297 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java @@ -19,20 +19,25 @@ package org.apache.gravitino.flink.connector.store; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.ServiceLoader; import java.util.Set; +import java.util.function.Predicate; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.AbstractCatalogStore; import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.Factory; import org.apache.flink.util.Preconditions; import org.apache.gravitino.Catalog; import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager; -import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions; -import org.apache.gravitino.flink.connector.hive.HivePropertiesConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,11 +54,15 @@ public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) { public void storeCatalog(String catalogName, CatalogDescriptor descriptor) throws CatalogException { Configuration configuration = descriptor.getConfiguration(); - String provider = getGravitinoCatalogProvider(configuration); - Catalog.Type type = getGravitinoCatalogType(configuration); + BaseCatalogFactory catalogFactory = getCatalogFactory(configuration.toMap()); Map gravitinoProperties = - getPropertiesConverter(provider).toGravitinoCatalogProperties(configuration); - gravitinoCatalogManager.createCatalog(catalogName, type, null, provider, gravitinoProperties); + catalogFactory.propertiesConverter().toGravitinoCatalogProperties(configuration); + gravitinoCatalogManager.createCatalog( + catalogName, + catalogFactory.gravitinoCatalogType(), + null, + catalogFactory.gravitinoCatalogProvider(), + gravitinoProperties); } @Override @@ -69,8 +78,8 @@ public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws public Optional getCatalog(String catalogName) throws CatalogException { try { Catalog catalog = gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName); - String provider = catalog.provider(); - PropertiesConverter propertiesConverter = getPropertiesConverter(provider); + BaseCatalogFactory catalogFactory = getCatalogFactory(catalog.provider()); + PropertiesConverter propertiesConverter = catalogFactory.propertiesConverter(); Map flinkCatalogProperties = propertiesConverter.toFlinkCatalogProperties(catalog.properties()); CatalogDescriptor descriptor = @@ -96,43 +105,60 @@ public boolean contains(String catalogName) throws CatalogException { return gravitinoCatalogManager.contains(catalogName); } - private String getGravitinoCatalogProvider(Configuration configuration) { + private BaseCatalogFactory getCatalogFactory(Map configuration) { String catalogType = Preconditions.checkNotNull( - configuration.get(CommonCatalogOptions.CATALOG_TYPE), + configuration.get(CommonCatalogOptions.CATALOG_TYPE.key()), "%s should not be null.", CommonCatalogOptions.CATALOG_TYPE); - switch (catalogType) { - case GravitinoHiveCatalogFactoryOptions.IDENTIFIER: - return "hive"; - default: - throw new IllegalArgumentException( - String.format("The catalog type is not supported:%s", catalogType)); - } + return discoverFactories( + catalogFactory -> (catalogFactory.factoryIdentifier().equalsIgnoreCase(catalogType)), + String.format( + "Flink catalog type [%s] matched multiple flink catalog factories, it should only match one.", + catalogType)); } - private Catalog.Type getGravitinoCatalogType(Configuration configuration) { - String catalogType = - Preconditions.checkNotNull( - configuration.get(CommonCatalogOptions.CATALOG_TYPE), - "%s should not be null.", - CommonCatalogOptions.CATALOG_TYPE); + private BaseCatalogFactory getCatalogFactory(String provider) { + return discoverFactories( + catalogFactory -> + ((BaseCatalogFactory) catalogFactory) + .gravitinoCatalogProvider() + .equalsIgnoreCase(provider), + String.format( + "Gravitino catalog provider [%s] matched multiple flink catalog factories, it should only match one.", + provider)); + } - switch (catalogType) { - case GravitinoHiveCatalogFactoryOptions.IDENTIFIER: - return Catalog.Type.RELATIONAL; - default: - throw new IllegalArgumentException( - String.format("The catalog type is not supported:%s", catalogType)); + private BaseCatalogFactory discoverFactories(Predicate predicate, String errorMessage) { + Iterator serviceLoaderIterator = ServiceLoader.load(Factory.class).iterator(); + final List factories = new ArrayList<>(); + while (true) { + try { + if (!serviceLoaderIterator.hasNext()) { + break; + } + Factory catalogFactory = serviceLoaderIterator.next(); + if (catalogFactory instanceof BaseCatalogFactory && predicate.test(catalogFactory)) { + factories.add(catalogFactory); + } + } catch (Throwable t) { + if (t instanceof NoClassDefFoundError) { + LOG.debug( + "NoClassDefFoundError when loading a " + Factory.class.getCanonicalName() + ".", t); + } else { + throw new RuntimeException("Unexpected error when trying to load service provider.", t); + } + } } - } - private PropertiesConverter getPropertiesConverter(String provider) { - switch (provider) { - case "hive": - return HivePropertiesConverter.INSTANCE; + if (factories.isEmpty()) { + throw new RuntimeException("Failed to correctly match the Flink catalog factory."); + } + // It should only match one. + if (factories.size() > 1) { + throw new RuntimeException(errorMessage); } - throw new IllegalArgumentException("The provider is not supported:" + provider); + return (BaseCatalogFactory) factories.get(0); } }