Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#5867] feat(flink-connector) Improve flink connector #5868

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ public interface PartitionConverter {
* @param partitions The partition keys in Gravitino.
* @return The partition keys in Flink.
*/
public abstract List<String> toFlinkPartitionKeys(Transform[] partitions);
List<String> toFlinkPartitionKeys(Transform[] partitions);

/**
* Convert the partition keys to Gravitino partition keys.
*
* @param partitionsKey The partition keys in Flink.
* @return The partition keys in Gravitino.
*/
public abstract Transform[] toGravitinoPartitions(List<String> partitionsKey);
Transform[] toGravitinoPartitions(List<String> partitionsKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ public abstract class BaseCatalog extends AbstractCatalog {
private final PropertiesConverter propertiesConverter;
private final PartitionConverter partitionConverter;

protected BaseCatalog(String catalogName, String defaultDatabase) {
protected BaseCatalog(
String catalogName,
String defaultDatabase,
PropertiesConverter propertiesConverter,
PartitionConverter partitionConverter) {
super(catalogName, defaultDatabase);
this.propertiesConverter = getPropertiesConverter();
this.partitionConverter = getPartitionConverter();
this.propertiesConverter = propertiesConverter;
this.partitionConverter = partitionConverter;
}

protected abstract AbstractCatalog realCatalog();
Expand Down Expand Up @@ -508,10 +512,6 @@ public void alterPartitionColumnStatistics(
throw new UnsupportedOperationException();
}

protected abstract PropertiesConverter getPropertiesConverter();

protected abstract PartitionConverter getPartitionConverter();

protected CatalogBaseTable toFlinkTable(Table table) {
org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.catalog;

import org.apache.flink.table.factories.CatalogFactory;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;

public interface BaseCatalogFactory extends CatalogFactory {

/**
* Define gravitino catalog provider {@link org.apache.gravitino.CatalogProvider}.
*
* @return The requested gravitino catalog provider.
*/
String gravitinoCatalogProvider();

/**
* Define gravitino catalog type {@link Catalog.Type}.
*
* @return The requested gravitino catalog type.
*/
Catalog.Type gravitinoCatalogType();

/**
* Define properties converter {@link PropertiesConverter}.
*
* @return The requested property converter.
*/
PropertiesConverter propertiesConverter();

/**
* Define partition converter.
*
* @return The requested partition converter.
*/
PartitionConverter partitionConverter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
Expand All @@ -41,9 +40,11 @@ public class GravitinoHiveCatalog extends BaseCatalog {
GravitinoHiveCatalog(
String catalogName,
String defaultDatabase,
PropertiesConverter propertiesConverter,
PartitionConverter partitionConverter,
@Nullable HiveConf hiveConf,
@Nullable String hiveVersion) {
super(catalogName, defaultDatabase);
super(catalogName, defaultDatabase, propertiesConverter, partitionConverter);
this.hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConf, hiveVersion);
}

Expand All @@ -68,16 +69,6 @@ public Optional<Factory> getFactory() {
return hiveCatalog.getFactory();
}

@Override
protected PropertiesConverter getPropertiesConverter() {
return HivePropertiesConverter.INSTANCE;
}

@Override
protected PartitionConverter getPartitionConverter() {
return DefaultPartitionConverter.INSTANCE;
}

@Override
protected AbstractCatalog realCatalog() {
return hiveCatalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;
import org.apache.gravitino.flink.connector.utils.PropertyUtils;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -38,7 +41,7 @@
* Factory for creating instances of {@link GravitinoHiveCatalog}. It will be created by SPI
* discovery in Flink.
*/
public class GravitinoHiveCatalogFactory implements CatalogFactory {
public class GravitinoHiveCatalogFactory implements BaseCatalogFactory {
private HiveCatalogFactory hiveCatalogFactory;

@Override
Expand All @@ -60,6 +63,8 @@ public Catalog createCatalog(Context context) {
return new GravitinoHiveCatalog(
context.getName(),
helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE),
propertiesConverter(),
partitionConverter(),
hiveConf,
helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION));
}
Expand All @@ -81,4 +86,44 @@ public Set<ConfigOption<?>> requiredOptions() {
public Set<ConfigOption<?>> optionalOptions() {
return hiveCatalogFactory.optionalOptions();
}

/**
* Define gravitino catalog provider {@link org.apache.gravitino.CatalogProvider}.
*
* @return The requested gravitino catalog provider.
*/
@Override
public String gravitinoCatalogProvider() {
return "hive";
}

/**
* Define gravitino catalog type {@link org.apache.gravitino.Catalog.Type}.
*
* @return The requested gravitino catalog type.
*/
@Override
public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
return org.apache.gravitino.Catalog.Type.RELATIONAL;
}

/**
* Define properties converter {@link PropertiesConverter}.
*
* @return The requested property converter.
*/
@Override
public PropertiesConverter propertiesConverter() {
return HivePropertiesConverter.INSTANCE;
}

/**
* Define partition converter.
*
* @return The requested partition converter.
*/
@Override
public PartitionConverter partitionConverter() {
return DefaultPartitionConverter.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,25 @@

package org.apache.gravitino.flink.connector.store;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.AbstractCatalogStore;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.Preconditions;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.hive.HivePropertiesConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,11 +54,15 @@ public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) {
public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
throws CatalogException {
Configuration configuration = descriptor.getConfiguration();
String provider = getGravitinoCatalogProvider(configuration);
Catalog.Type type = getGravitinoCatalogType(configuration);
BaseCatalogFactory catalogFactory = getCatalogFactory(configuration.toMap());
Map<String, String> gravitinoProperties =
getPropertiesConverter(provider).toGravitinoCatalogProperties(configuration);
gravitinoCatalogManager.createCatalog(catalogName, type, null, provider, gravitinoProperties);
catalogFactory.propertiesConverter().toGravitinoCatalogProperties(configuration);
gravitinoCatalogManager.createCatalog(
catalogName,
catalogFactory.gravitinoCatalogType(),
null,
catalogFactory.gravitinoCatalogProvider(),
gravitinoProperties);
}

@Override
Expand All @@ -69,8 +78,8 @@ public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws
public Optional<CatalogDescriptor> getCatalog(String catalogName) throws CatalogException {
try {
Catalog catalog = gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName);
String provider = catalog.provider();
PropertiesConverter propertiesConverter = getPropertiesConverter(provider);
BaseCatalogFactory catalogFactory = getCatalogFactory(catalog.provider());
PropertiesConverter propertiesConverter = catalogFactory.propertiesConverter();
Map<String, String> flinkCatalogProperties =
propertiesConverter.toFlinkCatalogProperties(catalog.properties());
CatalogDescriptor descriptor =
Expand All @@ -96,43 +105,60 @@ public boolean contains(String catalogName) throws CatalogException {
return gravitinoCatalogManager.contains(catalogName);
}

private String getGravitinoCatalogProvider(Configuration configuration) {
private BaseCatalogFactory getCatalogFactory(Map<String, String> configuration) {
String catalogType =
Preconditions.checkNotNull(
configuration.get(CommonCatalogOptions.CATALOG_TYPE),
configuration.get(CommonCatalogOptions.CATALOG_TYPE.key()),
"%s should not be null.",
CommonCatalogOptions.CATALOG_TYPE);

switch (catalogType) {
case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
return "hive";
default:
throw new IllegalArgumentException(
String.format("The catalog type is not supported:%s", catalogType));
}
return discoverFactories(
catalogFactory -> (catalogFactory.factoryIdentifier().equalsIgnoreCase(catalogType)),
String.format(
"Flink catalog type [%s] matched multiple flink catalog factories, it should only match one.",
catalogType));
}

private Catalog.Type getGravitinoCatalogType(Configuration configuration) {
String catalogType =
Preconditions.checkNotNull(
configuration.get(CommonCatalogOptions.CATALOG_TYPE),
"%s should not be null.",
CommonCatalogOptions.CATALOG_TYPE);
private BaseCatalogFactory getCatalogFactory(String provider) {
return discoverFactories(
catalogFactory ->
((BaseCatalogFactory) catalogFactory)
.gravitinoCatalogProvider()
.equalsIgnoreCase(provider),
String.format(
"Gravitino catalog provider [%s] matched multiple flink catalog factories, it should only match one.",
provider));
}

switch (catalogType) {
case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
return Catalog.Type.RELATIONAL;
default:
throw new IllegalArgumentException(
String.format("The catalog type is not supported:%s", catalogType));
private BaseCatalogFactory discoverFactories(Predicate<Factory> predicate, String errorMessage) {
Iterator<Factory> serviceLoaderIterator = ServiceLoader.load(Factory.class).iterator();
final List<Factory> factories = new ArrayList<>();
while (true) {
try {
if (!serviceLoaderIterator.hasNext()) {
break;
}
Factory catalogFactory = serviceLoaderIterator.next();
if (catalogFactory instanceof BaseCatalogFactory && predicate.test(catalogFactory)) {
factories.add(catalogFactory);
}
} catch (Throwable t) {
if (t instanceof NoClassDefFoundError) {
LOG.debug(
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
"NoClassDefFoundError when loading a " + Factory.class.getCanonicalName() + ".", t);
} else {
throw new RuntimeException("Unexpected error when trying to load service provider.", t);
}
}
}
}

private PropertiesConverter getPropertiesConverter(String provider) {
switch (provider) {
case "hive":
return HivePropertiesConverter.INSTANCE;
if (factories.isEmpty()) {
throw new RuntimeException("Failed to correctly match the Flink catalog factory.");
}
// It should only match one.
if (factories.size() > 1) {
throw new RuntimeException(errorMessage);
}
throw new IllegalArgumentException("The provider is not supported:" + provider);
return (BaseCatalogFactory) factories.get(0);
FANNG1 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading