From 182105a3dfb16cd6b3915d39e962de5d43c294eb Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Fri, 13 Dec 2024 11:11:11 +0800 Subject: [PATCH] Improve flink connector --- .../flink/connector/PartitionConverter.java | 4 +- .../flink/connector/catalog/BaseCatalog.java | 14 +-- .../connector/catalog/BaseCatalogFactory.java | 56 ++++++++++++ .../connector/hive/GravitinoHiveCatalog.java | 15 +--- .../hive/GravitinoHiveCatalogFactory.java | 44 +++++++++- .../store/GravitinoCatalogStore.java | 88 +++++++++++-------- 6 files changed, 162 insertions(+), 59 deletions(-) create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java index 5464c705a37..e8029e567d1 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PartitionConverter.java @@ -34,7 +34,7 @@ public interface PartitionConverter { * @param partitions The partition keys in Gravitino. * @return The partition keys in Flink. */ - public abstract List toFlinkPartitionKeys(Transform[] partitions); + List toFlinkPartitionKeys(Transform[] partitions); /** * Convert the partition keys to Gravitino partition keys. @@ -42,5 +42,5 @@ public interface PartitionConverter { * @param partitionsKey The partition keys in Flink. * @return The partition keys in Gravitino. */ - public abstract Transform[] toGravitinoPartitions(List partitionsKey); + Transform[] toGravitinoPartitions(List partitionsKey); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index 6b76e31b8d2..1496742177f 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -85,10 +85,14 @@ public abstract class BaseCatalog extends AbstractCatalog { private final PropertiesConverter propertiesConverter; private final PartitionConverter partitionConverter; - protected BaseCatalog(String catalogName, String defaultDatabase) { + protected BaseCatalog( + String catalogName, + String defaultDatabase, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter) { super(catalogName, defaultDatabase); - this.propertiesConverter = getPropertiesConverter(); - this.partitionConverter = getPartitionConverter(); + this.propertiesConverter = propertiesConverter; + this.partitionConverter = partitionConverter; } protected abstract AbstractCatalog realCatalog(); @@ -508,10 +512,6 @@ public void alterPartitionColumnStatistics( throw new UnsupportedOperationException(); } - protected abstract PropertiesConverter getPropertiesConverter(); - - protected abstract PartitionConverter getPartitionConverter(); - protected CatalogBaseTable toFlinkTable(Table table) { org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder(); diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java new file mode 100644 index 00000000000..2144c5ac1ce --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.catalog; + +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; + +public interface BaseCatalogFactory extends CatalogFactory { + + /** + * Define gravitino catalog provider {@link org.apache.gravitino.CatalogProvider}. + * + * @return + */ + String gravitinoCatalogProvider(); + + /** + * Define gravitino catalog type. + * + * @return + */ + Catalog.Type gravitinoCatalogType(); + + /** + * Define properties converter. + * + * @return + */ + PropertiesConverter propertiesConverter(); + + /** + * Define partition converter. + * + * @return + */ + PartitionConverter partitionConverter(); +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java index b4754596858..3e5d31fd3c5 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalog.java @@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.factories.Factory; -import org.apache.gravitino.flink.connector.DefaultPartitionConverter; import org.apache.gravitino.flink.connector.PartitionConverter; import org.apache.gravitino.flink.connector.PropertiesConverter; import org.apache.gravitino.flink.connector.catalog.BaseCatalog; @@ -41,9 +40,11 @@ public class GravitinoHiveCatalog extends BaseCatalog { GravitinoHiveCatalog( String catalogName, String defaultDatabase, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter, @Nullable HiveConf hiveConf, @Nullable String hiveVersion) { - super(catalogName, defaultDatabase); + super(catalogName, defaultDatabase, propertiesConverter, partitionConverter); this.hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConf, hiveVersion); } @@ -68,16 +69,6 @@ public Optional getFactory() { return hiveCatalog.getFactory(); } - @Override - protected PropertiesConverter getPropertiesConverter() { - return HivePropertiesConverter.INSTANCE; - } - - @Override - protected PartitionConverter getPartitionConverter() { - return DefaultPartitionConverter.INSTANCE; - } - @Override protected AbstractCatalog realCatalog() { return hiveCatalog; diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java index 91eaa4e1638..7421f6bdc96 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java @@ -28,8 +28,11 @@ import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory; import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; -import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.gravitino.flink.connector.DefaultPartitionConverter; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; import org.apache.gravitino.flink.connector.utils.FactoryUtils; import org.apache.gravitino.flink.connector.utils.PropertyUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -38,7 +41,7 @@ * Factory for creating instances of {@link GravitinoHiveCatalog}. It will be created by SPI * discovery in Flink. */ -public class GravitinoHiveCatalogFactory implements CatalogFactory { +public class GravitinoHiveCatalogFactory implements BaseCatalogFactory { private HiveCatalogFactory hiveCatalogFactory; @Override @@ -60,6 +63,8 @@ public Catalog createCatalog(Context context) { return new GravitinoHiveCatalog( context.getName(), helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE), + propertiesConverter(), + partitionConverter(), hiveConf, helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION)); } @@ -81,4 +86,39 @@ public Set> requiredOptions() { public Set> optionalOptions() { return hiveCatalogFactory.optionalOptions(); } + + /** + * Define gravitino catalog provider {@link org.apache.gravitino.CatalogProvider}. + * + * @return + */ + @Override + public String gravitinoCatalogProvider() { + return "hive"; + } + + /** + * Define gravitino catalog type. + * + * @return + */ + @Override + public org.apache.gravitino.Catalog.Type gravitinoCatalogType() { + return org.apache.gravitino.Catalog.Type.RELATIONAL; + } + + /** + * Define properties converter. + * + * @return + */ + @Override + public PropertiesConverter propertiesConverter() { + return HivePropertiesConverter.INSTANCE; + } + + @Override + public PartitionConverter partitionConverter() { + return DefaultPartitionConverter.INSTANCE; + } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java index 2c210f21c2b..02d92990663 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java @@ -19,20 +19,26 @@ package org.apache.gravitino.flink.connector.store; +import com.google.common.collect.Streams; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.ServiceConfigurationError; +import java.util.ServiceLoader; import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.AbstractCatalogStore; import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.Factory; import org.apache.flink.util.Preconditions; import org.apache.gravitino.Catalog; import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager; -import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions; -import org.apache.gravitino.flink.connector.hive.HivePropertiesConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,11 +55,15 @@ public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) { public void storeCatalog(String catalogName, CatalogDescriptor descriptor) throws CatalogException { Configuration configuration = descriptor.getConfiguration(); - String provider = getGravitinoCatalogProvider(configuration); - Catalog.Type type = getGravitinoCatalogType(configuration); + BaseCatalogFactory catalogFactory = catalogFactory(configuration.toMap()); Map gravitinoProperties = - getPropertiesConverter(provider).toGravitinoCatalogProperties(configuration); - gravitinoCatalogManager.createCatalog(catalogName, type, null, provider, gravitinoProperties); + catalogFactory.propertiesConverter().toGravitinoCatalogProperties(configuration); + gravitinoCatalogManager.createCatalog( + catalogName, + catalogFactory.gravitinoCatalogType(), + null, + catalogFactory.gravitinoCatalogProvider(), + gravitinoProperties); } @Override @@ -69,8 +79,8 @@ public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws public Optional getCatalog(String catalogName) throws CatalogException { try { Catalog catalog = gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName); - String provider = catalog.provider(); - PropertiesConverter propertiesConverter = getPropertiesConverter(provider); + BaseCatalogFactory catalogFactory = catalogFactory(catalog.provider()); + PropertiesConverter propertiesConverter = catalogFactory.propertiesConverter(); Map flinkCatalogProperties = propertiesConverter.toFlinkCatalogProperties(catalog.properties()); CatalogDescriptor descriptor = @@ -96,43 +106,49 @@ public boolean contains(String catalogName) throws CatalogException { return gravitinoCatalogManager.contains(catalogName); } - private String getGravitinoCatalogProvider(Configuration configuration) { + private BaseCatalogFactory catalogFactory(Map configuration) { String catalogType = Preconditions.checkNotNull( - configuration.get(CommonCatalogOptions.CATALOG_TYPE), + configuration.get(CommonCatalogOptions.CATALOG_TYPE.key()), "%s should not be null.", CommonCatalogOptions.CATALOG_TYPE); - switch (catalogType) { - case GravitinoHiveCatalogFactoryOptions.IDENTIFIER: - return "hive"; - default: - throw new IllegalArgumentException( - String.format("The catalog type is not supported:%s", catalogType)); - } + return discoverFactories( + catalogFactory -> + (catalogFactory instanceof BaseCatalogFactory) + && (catalogFactory.factoryIdentifier().equalsIgnoreCase(catalogType)), + String.format( + "Flink catalog type [%s] matched multiple flink catalog factories, it should only match one.", + catalogType)); } - private Catalog.Type getGravitinoCatalogType(Configuration configuration) { - String catalogType = - Preconditions.checkNotNull( - configuration.get(CommonCatalogOptions.CATALOG_TYPE), - "%s should not be null.", - CommonCatalogOptions.CATALOG_TYPE); - - switch (catalogType) { - case GravitinoHiveCatalogFactoryOptions.IDENTIFIER: - return Catalog.Type.RELATIONAL; - default: - throw new IllegalArgumentException( - String.format("The catalog type is not supported:%s", catalogType)); - } + private BaseCatalogFactory catalogFactory(String provider) { + return discoverFactories( + catalogFactory -> + (catalogFactory instanceof BaseCatalogFactory) + && ((BaseCatalogFactory) catalogFactory) + .gravitinoCatalogProvider() + .equalsIgnoreCase(provider), + String.format( + "Gravitino catalog provider [%s] matched multiple flink catalog factories, it should only match one.", + provider)); } - private PropertiesConverter getPropertiesConverter(String provider) { - switch (provider) { - case "hive": - return HivePropertiesConverter.INSTANCE; + private static BaseCatalogFactory discoverFactories( + Predicate predicate, String errorMessage) { + try { + ServiceLoader serviceLoader = ServiceLoader.load(Factory.class); + List factories = + Streams.stream(serviceLoader.iterator()) + .filter(factory -> predicate.test(factory)) + .collect(Collectors.toList()); + // It should only match one. + if (factories.size() > 1) { + throw new RuntimeException(errorMessage); + } + return (BaseCatalogFactory) factories.get(0); + } catch (ServiceConfigurationError e) { + throw new RuntimeException("Could not load service provider for flink catalog factory."); } - throw new IllegalArgumentException("The provider is not supported:" + provider); } }