Skip to content

Commit

Permalink
[#5867] feat(flink-connector): Improve flink connector (#5868)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

1. Remove useless code
2. Optimize the logic of the store, and when adding connector support,
there is no need to modify the logic of the store.

### Why are the changes needed?

Fix : [#5867](#5867)

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
FlinkHiveCatalogIT
  • Loading branch information
sunxiaojian authored Dec 19, 2024
1 parent a4190e1 commit 9695c28
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ public interface PartitionConverter {
* @param partitions The partition keys in Gravitino.
* @return The partition keys in Flink.
*/
public abstract List<String> toFlinkPartitionKeys(Transform[] partitions);
List<String> toFlinkPartitionKeys(Transform[] partitions);

/**
* Convert the partition keys to Gravitino partition keys.
*
* @param partitionsKey The partition keys in Flink.
* @return The partition keys in Gravitino.
*/
public abstract Transform[] toGravitinoPartitions(List<String> partitionsKey);
Transform[] toGravitinoPartitions(List<String> partitionsKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ public abstract class BaseCatalog extends AbstractCatalog {
private final PropertiesConverter propertiesConverter;
private final PartitionConverter partitionConverter;

protected BaseCatalog(String catalogName, String defaultDatabase) {
protected BaseCatalog(
String catalogName,
String defaultDatabase,
PropertiesConverter propertiesConverter,
PartitionConverter partitionConverter) {
super(catalogName, defaultDatabase);
this.propertiesConverter = getPropertiesConverter();
this.partitionConverter = getPartitionConverter();
this.propertiesConverter = propertiesConverter;
this.partitionConverter = partitionConverter;
}

protected abstract AbstractCatalog realCatalog();
Expand Down Expand Up @@ -508,10 +512,6 @@ public void alterPartitionColumnStatistics(
throw new UnsupportedOperationException();
}

protected abstract PropertiesConverter getPropertiesConverter();

protected abstract PartitionConverter getPartitionConverter();

protected CatalogBaseTable toFlinkTable(Table table) {
org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.catalog;

import org.apache.flink.table.factories.CatalogFactory;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;

public interface BaseCatalogFactory extends CatalogFactory {

/**
* Define gravitino catalog provider {@link org.apache.gravitino.CatalogProvider}.
*
* @return The requested gravitino catalog provider.
*/
String gravitinoCatalogProvider();

/**
* Define gravitino catalog type {@link Catalog.Type}.
*
* @return The requested gravitino catalog type.
*/
Catalog.Type gravitinoCatalogType();

/**
* Define properties converter {@link PropertiesConverter}.
*
* @return The requested property converter.
*/
PropertiesConverter propertiesConverter();

/**
* Define partition converter.
*
* @return The requested partition converter.
*/
PartitionConverter partitionConverter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
Expand All @@ -41,9 +40,11 @@ public class GravitinoHiveCatalog extends BaseCatalog {
GravitinoHiveCatalog(
String catalogName,
String defaultDatabase,
PropertiesConverter propertiesConverter,
PartitionConverter partitionConverter,
@Nullable HiveConf hiveConf,
@Nullable String hiveVersion) {
super(catalogName, defaultDatabase);
super(catalogName, defaultDatabase, propertiesConverter, partitionConverter);
this.hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConf, hiveVersion);
}

Expand All @@ -68,16 +69,6 @@ public Optional<Factory> getFactory() {
return hiveCatalog.getFactory();
}

@Override
protected PropertiesConverter getPropertiesConverter() {
return HivePropertiesConverter.INSTANCE;
}

@Override
protected PartitionConverter getPartitionConverter() {
return DefaultPartitionConverter.INSTANCE;
}

@Override
protected AbstractCatalog realCatalog() {
return hiveCatalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;
import org.apache.gravitino.flink.connector.utils.PropertyUtils;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -38,7 +41,7 @@
* Factory for creating instances of {@link GravitinoHiveCatalog}. It will be created by SPI
* discovery in Flink.
*/
public class GravitinoHiveCatalogFactory implements CatalogFactory {
public class GravitinoHiveCatalogFactory implements BaseCatalogFactory {
private HiveCatalogFactory hiveCatalogFactory;

@Override
Expand All @@ -60,6 +63,8 @@ public Catalog createCatalog(Context context) {
return new GravitinoHiveCatalog(
context.getName(),
helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE),
propertiesConverter(),
partitionConverter(),
hiveConf,
helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION));
}
Expand All @@ -81,4 +86,44 @@ public Set<ConfigOption<?>> requiredOptions() {
public Set<ConfigOption<?>> optionalOptions() {
return hiveCatalogFactory.optionalOptions();
}

/**
* Define gravitino catalog provider {@link org.apache.gravitino.CatalogProvider}.
*
* @return The requested gravitino catalog provider.
*/
@Override
public String gravitinoCatalogProvider() {
return "hive";
}

/**
* Define gravitino catalog type {@link org.apache.gravitino.Catalog.Type}.
*
* @return The requested gravitino catalog type.
*/
@Override
public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
return org.apache.gravitino.Catalog.Type.RELATIONAL;
}

/**
* Define properties converter {@link PropertiesConverter}.
*
* @return The requested property converter.
*/
@Override
public PropertiesConverter propertiesConverter() {
return HivePropertiesConverter.INSTANCE;
}

/**
* Define partition converter.
*
* @return The requested partition converter.
*/
@Override
public PartitionConverter partitionConverter() {
return DefaultPartitionConverter.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,25 @@

package org.apache.gravitino.flink.connector.store;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.AbstractCatalogStore;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.Preconditions;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.hive.HivePropertiesConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,11 +54,15 @@ public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) {
public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
throws CatalogException {
Configuration configuration = descriptor.getConfiguration();
String provider = getGravitinoCatalogProvider(configuration);
Catalog.Type type = getGravitinoCatalogType(configuration);
BaseCatalogFactory catalogFactory = getCatalogFactory(configuration.toMap());
Map<String, String> gravitinoProperties =
getPropertiesConverter(provider).toGravitinoCatalogProperties(configuration);
gravitinoCatalogManager.createCatalog(catalogName, type, null, provider, gravitinoProperties);
catalogFactory.propertiesConverter().toGravitinoCatalogProperties(configuration);
gravitinoCatalogManager.createCatalog(
catalogName,
catalogFactory.gravitinoCatalogType(),
null,
catalogFactory.gravitinoCatalogProvider(),
gravitinoProperties);
}

@Override
Expand All @@ -69,8 +78,8 @@ public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws
public Optional<CatalogDescriptor> getCatalog(String catalogName) throws CatalogException {
try {
Catalog catalog = gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName);
String provider = catalog.provider();
PropertiesConverter propertiesConverter = getPropertiesConverter(provider);
BaseCatalogFactory catalogFactory = getCatalogFactory(catalog.provider());
PropertiesConverter propertiesConverter = catalogFactory.propertiesConverter();
Map<String, String> flinkCatalogProperties =
propertiesConverter.toFlinkCatalogProperties(catalog.properties());
CatalogDescriptor descriptor =
Expand All @@ -96,43 +105,60 @@ public boolean contains(String catalogName) throws CatalogException {
return gravitinoCatalogManager.contains(catalogName);
}

private String getGravitinoCatalogProvider(Configuration configuration) {
private BaseCatalogFactory getCatalogFactory(Map<String, String> configuration) {
String catalogType =
Preconditions.checkNotNull(
configuration.get(CommonCatalogOptions.CATALOG_TYPE),
configuration.get(CommonCatalogOptions.CATALOG_TYPE.key()),
"%s should not be null.",
CommonCatalogOptions.CATALOG_TYPE);

switch (catalogType) {
case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
return "hive";
default:
throw new IllegalArgumentException(
String.format("The catalog type is not supported:%s", catalogType));
}
return discoverFactories(
catalogFactory -> (catalogFactory.factoryIdentifier().equalsIgnoreCase(catalogType)),
String.format(
"Flink catalog type [%s] matched multiple flink catalog factories, it should only match one.",
catalogType));
}

private Catalog.Type getGravitinoCatalogType(Configuration configuration) {
String catalogType =
Preconditions.checkNotNull(
configuration.get(CommonCatalogOptions.CATALOG_TYPE),
"%s should not be null.",
CommonCatalogOptions.CATALOG_TYPE);
private BaseCatalogFactory getCatalogFactory(String provider) {
return discoverFactories(
catalogFactory ->
((BaseCatalogFactory) catalogFactory)
.gravitinoCatalogProvider()
.equalsIgnoreCase(provider),
String.format(
"Gravitino catalog provider [%s] matched multiple flink catalog factories, it should only match one.",
provider));
}

switch (catalogType) {
case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
return Catalog.Type.RELATIONAL;
default:
throw new IllegalArgumentException(
String.format("The catalog type is not supported:%s", catalogType));
private BaseCatalogFactory discoverFactories(Predicate<Factory> predicate, String errorMessage) {
Iterator<Factory> serviceLoaderIterator = ServiceLoader.load(Factory.class).iterator();
final List<Factory> factories = new ArrayList<>();
while (true) {
try {
if (!serviceLoaderIterator.hasNext()) {
break;
}
Factory catalogFactory = serviceLoaderIterator.next();
if (catalogFactory instanceof BaseCatalogFactory && predicate.test(catalogFactory)) {
factories.add(catalogFactory);
}
} catch (Throwable t) {
if (t instanceof NoClassDefFoundError) {
LOG.debug(
"NoClassDefFoundError when loading a " + Factory.class.getCanonicalName() + ".", t);
} else {
throw new RuntimeException("Unexpected error when trying to load service provider.", t);
}
}
}
}

private PropertiesConverter getPropertiesConverter(String provider) {
switch (provider) {
case "hive":
return HivePropertiesConverter.INSTANCE;
if (factories.isEmpty()) {
throw new RuntimeException("Failed to correctly match the Flink catalog factory.");
}
// It should only match one.
if (factories.size() > 1) {
throw new RuntimeException(errorMessage);
}
throw new IllegalArgumentException("The provider is not supported:" + provider);
return (BaseCatalogFactory) factories.get(0);
}
}

0 comments on commit 9695c28

Please sign in to comment.