Skip to content

Commit

Permalink
Improve flink connector
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Dec 17, 2024
1 parent ac575f5 commit 2a164b2
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ public interface PartitionConverter {
* @param partitions The partition keys in Gravitino.
* @return The partition keys in Flink.
*/
public abstract List<String> toFlinkPartitionKeys(Transform[] partitions);
List<String> toFlinkPartitionKeys(Transform[] partitions);

/**
* Convert the partition keys to Gravitino partition keys.
*
* @param partitionsKey The partition keys in Flink.
* @return The partition keys in Gravitino.
*/
public abstract Transform[] toGravitinoPartitions(List<String> partitionsKey);
Transform[] toGravitinoPartitions(List<String> partitionsKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ public abstract class BaseCatalog extends AbstractCatalog {
private final PropertiesConverter propertiesConverter;
private final PartitionConverter partitionConverter;

protected BaseCatalog(String catalogName, String defaultDatabase) {
protected BaseCatalog(
String catalogName,
String defaultDatabase,
PropertiesConverter propertiesConverter,
PartitionConverter partitionConverter) {
super(catalogName, defaultDatabase);
this.propertiesConverter = getPropertiesConverter();
this.partitionConverter = getPartitionConverter();
this.propertiesConverter = propertiesConverter;
this.partitionConverter = partitionConverter;
}

protected abstract AbstractCatalog realCatalog();
Expand Down Expand Up @@ -508,10 +512,6 @@ public void alterPartitionColumnStatistics(
throw new UnsupportedOperationException();
}

protected abstract PropertiesConverter getPropertiesConverter();

protected abstract PartitionConverter getPartitionConverter();

protected CatalogBaseTable toFlinkTable(Table table) {
org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.catalog;

import org.apache.flink.table.factories.CatalogFactory;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;

public interface BaseCatalogFactory extends CatalogFactory {

/**
* Define gravitino catalog provider {@link org.apache.gravitino.CatalogProvider}.
*
* @return
*/
String gravitinoCatalogProvider();

/**
* Define gravitino catalog type.
*
* @return
*/
Catalog.Type gravitinoCatalogType();

/**
* Define properties converter.
*
* @return
*/
PropertiesConverter propertiesConverter();

/**
* Define partition converter.
*
* @return
*/
PartitionConverter partitionConverter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
Expand All @@ -41,9 +40,11 @@ public class GravitinoHiveCatalog extends BaseCatalog {
GravitinoHiveCatalog(
String catalogName,
String defaultDatabase,
PropertiesConverter propertiesConverter,
PartitionConverter partitionConverter,
@Nullable HiveConf hiveConf,
@Nullable String hiveVersion) {
super(catalogName, defaultDatabase);
super(catalogName, defaultDatabase, propertiesConverter, partitionConverter);
this.hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConf, hiveVersion);
}

Expand All @@ -68,16 +69,6 @@ public Optional<Factory> getFactory() {
return hiveCatalog.getFactory();
}

@Override
protected PropertiesConverter getPropertiesConverter() {
return HivePropertiesConverter.INSTANCE;
}

@Override
protected PartitionConverter getPartitionConverter() {
return DefaultPartitionConverter.INSTANCE;
}

@Override
protected AbstractCatalog realCatalog() {
return hiveCatalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;
import org.apache.gravitino.flink.connector.utils.PropertyUtils;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -38,7 +41,7 @@
* Factory for creating instances of {@link GravitinoHiveCatalog}. It will be created by SPI
* discovery in Flink.
*/
public class GravitinoHiveCatalogFactory implements CatalogFactory {
public class GravitinoHiveCatalogFactory implements BaseCatalogFactory {
private HiveCatalogFactory hiveCatalogFactory;

@Override
Expand All @@ -60,6 +63,8 @@ public Catalog createCatalog(Context context) {
return new GravitinoHiveCatalog(
context.getName(),
helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE),
propertiesConverter(),
partitionConverter(),
hiveConf,
helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION));
}
Expand All @@ -81,4 +86,39 @@ public Set<ConfigOption<?>> requiredOptions() {
public Set<ConfigOption<?>> optionalOptions() {
return hiveCatalogFactory.optionalOptions();
}

/**
* Define gravitino catalog provider {@link org.apache.gravitino.CatalogProvider}.
*
* @return
*/
@Override
public String gravitinoCatalogProvider() {
return "hive";
}

/**
* Define gravitino catalog type.
*
* @return
*/
@Override
public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
return org.apache.gravitino.Catalog.Type.RELATIONAL;
}

/**
* Define properties converter.
*
* @return
*/
@Override
public PropertiesConverter propertiesConverter() {
return HivePropertiesConverter.INSTANCE;
}

@Override
public PartitionConverter partitionConverter() {
return DefaultPartitionConverter.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,25 @@

package org.apache.gravitino.flink.connector.store;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.AbstractCatalogStore;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.Preconditions;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.hive.HivePropertiesConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,11 +54,15 @@ public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) {
public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
throws CatalogException {
Configuration configuration = descriptor.getConfiguration();
String provider = getGravitinoCatalogProvider(configuration);
Catalog.Type type = getGravitinoCatalogType(configuration);
BaseCatalogFactory catalogFactory = catalogFactory(configuration.toMap());
Map<String, String> gravitinoProperties =
getPropertiesConverter(provider).toGravitinoCatalogProperties(configuration);
gravitinoCatalogManager.createCatalog(catalogName, type, null, provider, gravitinoProperties);
catalogFactory.propertiesConverter().toGravitinoCatalogProperties(configuration);
gravitinoCatalogManager.createCatalog(
catalogName,
catalogFactory.gravitinoCatalogType(),
null,
catalogFactory.gravitinoCatalogProvider(),
gravitinoProperties);
}

@Override
Expand All @@ -69,8 +78,8 @@ public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws
public Optional<CatalogDescriptor> getCatalog(String catalogName) throws CatalogException {
try {
Catalog catalog = gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName);
String provider = catalog.provider();
PropertiesConverter propertiesConverter = getPropertiesConverter(provider);
BaseCatalogFactory catalogFactory = catalogFactory(catalog.provider());
PropertiesConverter propertiesConverter = catalogFactory.propertiesConverter();
Map<String, String> flinkCatalogProperties =
propertiesConverter.toFlinkCatalogProperties(catalog.properties());
CatalogDescriptor descriptor =
Expand All @@ -96,43 +105,56 @@ public boolean contains(String catalogName) throws CatalogException {
return gravitinoCatalogManager.contains(catalogName);
}

private String getGravitinoCatalogProvider(Configuration configuration) {
private BaseCatalogFactory catalogFactory(Map<String, String> configuration) {
String catalogType =
Preconditions.checkNotNull(
configuration.get(CommonCatalogOptions.CATALOG_TYPE),
configuration.get(CommonCatalogOptions.CATALOG_TYPE.key()),
"%s should not be null.",
CommonCatalogOptions.CATALOG_TYPE);

switch (catalogType) {
case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
return "hive";
default:
throw new IllegalArgumentException(
String.format("The catalog type is not supported:%s", catalogType));
}
return discoverFactories(
catalogFactory -> (catalogFactory.factoryIdentifier().equalsIgnoreCase(catalogType)),
String.format(
"Flink catalog type [%s] matched multiple flink catalog factories, it should only match one.",
catalogType));
}

private Catalog.Type getGravitinoCatalogType(Configuration configuration) {
String catalogType =
Preconditions.checkNotNull(
configuration.get(CommonCatalogOptions.CATALOG_TYPE),
"%s should not be null.",
CommonCatalogOptions.CATALOG_TYPE);

switch (catalogType) {
case GravitinoHiveCatalogFactoryOptions.IDENTIFIER:
return Catalog.Type.RELATIONAL;
default:
throw new IllegalArgumentException(
String.format("The catalog type is not supported:%s", catalogType));
}
private BaseCatalogFactory catalogFactory(String provider) {
return discoverFactories(
catalogFactory ->
((BaseCatalogFactory) catalogFactory)
.gravitinoCatalogProvider()
.equalsIgnoreCase(provider),
String.format(
"Gravitino catalog provider [%s] matched multiple flink catalog factories, it should only match one.",
provider));
}

private PropertiesConverter getPropertiesConverter(String provider) {
switch (provider) {
case "hive":
return HivePropertiesConverter.INSTANCE;
private BaseCatalogFactory discoverFactories(Predicate<Factory> predicate, String errorMessage) {
Iterator<Factory> serviceLoaderIterator = ServiceLoader.load(Factory.class).iterator();
final List<Factory> factories = new ArrayList<>();
while (true) {
try {
if (!serviceLoaderIterator.hasNext()) {
break;
}
Factory catalogFactory = serviceLoaderIterator.next();
if (catalogFactory instanceof BaseCatalogFactory && predicate.test(catalogFactory)) {
factories.add(catalogFactory);
}
} catch (Throwable t) {
if (t instanceof NoClassDefFoundError) {
LOG.debug(
"NoClassDefFoundError when loading a " + Factory.class.getCanonicalName() + ".", t);
} else {
throw new RuntimeException("Unexpected error when trying to load service provider.", t);
}
}
}
// It should only match one.
if (factories.size() > 1) {
throw new RuntimeException(errorMessage);
}
throw new IllegalArgumentException("The provider is not supported:" + provider);
return (BaseCatalogFactory) factories.get(0);
}
}

0 comments on commit 2a164b2

Please sign in to comment.