Skip to content

Commit

Permalink
[#4163]feat(trino-connector): Support create user customized Trino co…
Browse files Browse the repository at this point in the history
…nnectors (#4167)

### What changes were proposed in this pull request?

Make the CatalogConnectorFactory pluggable to support customized Trino
connectors

### Why are the changes needed?

Fix: #4163 

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

Manually Test
  • Loading branch information
diqiu50 authored Jul 30, 2024
1 parent 6901f38 commit adac3db
Show file tree
Hide file tree
Showing 16 changed files with 257 additions and 104 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ tasks.rat {
"dev/docker/**/*.conf",
"dev/docker/kerberos-hive/kadm5.acl",
"**/*.log",
"**/testsets",
"**/licenses/*.txt",
"**/licenses/*.md",
"integration-test/**/*.sql",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ public class TrinoQueryITBase {

protected static boolean started = false;

protected static String gravitinoUri = "http://127.0.0.1:8090";
protected static String trinoUri = "http://127.0.0.1:8080";
protected static String hiveMetastoreUri = "thrift://127.0.0.1:9083";
protected static String hdfsUri = "hdfs://127.0.0.1:9000";
protected static String mysqlUri = "jdbc:mysql://127.0.0.1";
protected static String postgresqlUri = "jdbc:postgresql://127.0.0.1";
public static String testHost = "127.0.0.1";
public static String gravitinoUri = String.format("http://%s:8090", testHost);
public static String trinoUri = String.format("http://%s:8080", testHost);
public static String hiveMetastoreUri = String.format("thrift://%s:9083", testHost);
public static String hdfsUri = String.format("hdfs://%s:9000", testHost);
public static String mysqlUri = String.format("jdbc:mysql://%s", testHost);
public static String postgresqlUri = String.format("jdbc:postgresql://%s", testHost);

protected static GravitinoAdminClient gravitinoClient;
protected static TrinoITContainers trinoITContainers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public static void main(String[] args) throws Exception {
false,
"Generate the output file for the test set, the default value is 'false'");

options.addOption(
"test_host",
true,
"Host address for all test services (include gravitino server, trino, hive, postgresql, mysql..., "
+ "if the services are not running on the same host. set the arguments like --gravitino_uri=xxx and --trino_uri=xxx), "
+ "all test services use 127.0.0.1 as default, if --auto is set to 'all', this option is ignored");
options.addOption(
"gravitino_uri",
true,
Expand Down Expand Up @@ -129,6 +135,8 @@ public static void main(String[] args) throws Exception {

TrinoQueryIT.ciTestsets.clear();

String testHost = commandLine.getOptionValue("test_host");
TrinoQueryIT.testHost = Strings.isBlank(testHost) ? TrinoQueryIT.testHost : testHost;
String gravitinoUri = commandLine.getOptionValue("gravitino_uri");
TrinoQueryIT.gravitinoUri =
Strings.isBlank(gravitinoUri) ? TrinoQueryIT.gravitinoUri : gravitinoUri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ public class GravitinoConfig {
"true",
false);

private static final ConfigEntry GRAVITINO_CLOUD_REGION_CODE =
new ConfigEntry(
"gravitino.cloud.region-code",
"The property to specify the region code of the cloud that the catalog is running on.",
"",
false);

private static final ConfigEntry GRAVITINO_CATALOG_CONNECTOR_FACTORY_CLASS_NAME =
new ConfigEntry(
"gravitino.catalog.connector.factory.class.name",
"The class name for the custom CatalogConnectorFactory. The class must implement the CatalogConnectorFactory interface",
"",
false);

private static final ConfigEntry TRINO_JDBC_USER =
new ConfigEntry("trino.jdbc.user", "The jdbc user name of Trino", "admin", false);

Expand Down Expand Up @@ -138,6 +152,11 @@ public String getTrinoJdbcURI() {
}
}

public String getRegion() {
return config.getOrDefault(
GRAVITINO_CLOUD_REGION_CODE.key, GRAVITINO_CLOUD_REGION_CODE.defaultValue);
}

public String getCatalogConfigDirectory() {
if (config.containsKey(TRINO_CATALOG_CONFIG_DIR)) {
return config.get(TRINO_CATALOG_CONFIG_DIR);
Expand All @@ -155,6 +174,12 @@ public String getTrinoPassword() {
return config.getOrDefault(TRINO_JDBC_PASSWORD.key, TRINO_JDBC_PASSWORD.defaultValue);
}

public String getCatalogConnectorFactoryClassName() {
return config.getOrDefault(
GRAVITINO_CATALOG_CONNECTOR_FACTORY_CLASS_NAME.key,
GRAVITINO_CATALOG_CONNECTOR_FACTORY_CLASS_NAME.defaultValue);
}

public String toCatalogConfig() {
List<String> stringList = new ArrayList<>();
for (Map.Entry<String, ConfigEntry> entry : CONFIG_DEFINITIONS.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.gravitino.trino.connector;

import static org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand All @@ -27,10 +29,12 @@
import io.trino.spi.connector.ConnectorFactory;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.client.GravitinoAdminClient;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorFactory;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorManager;
import org.apache.gravitino.trino.connector.catalog.CatalogRegister;
import org.apache.gravitino.trino.connector.catalog.DefaultCatalogConnectorFactory;
import org.apache.gravitino.trino.connector.system.GravitinoSystemConnector;
import org.apache.gravitino.trino.connector.system.storedprocdure.GravitinoStoredProcedureFactory;
import org.apache.gravitino.trino.connector.system.table.GravitinoSystemTableFactory;
Expand Down Expand Up @@ -76,8 +80,8 @@ public Connector create(
if (catalogConnectorManager == null) {
try {
CatalogRegister catalogRegister = new CatalogRegister();
CatalogConnectorFactory catalogConnectorFactory = new CatalogConnectorFactory();

CatalogConnectorFactory catalogConnectorFactory = createCatalogConnectorFactory(config);
catalogConnectorManager =
new CatalogConnectorManager(catalogRegister, catalogConnectorFactory);
catalogConnectorManager.config(config, clientProvider().get());
Expand All @@ -87,7 +91,7 @@ public Connector create(
} catch (Exception e) {
String message = "Initialization of the GravitinoConnector failed" + e.getMessage();
LOG.error(message);
throw new TrinoException(GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR, message, e);
throw new TrinoException(GRAVITINO_RUNTIME_ERROR, message, e);
}
}
}
Expand Down Expand Up @@ -120,4 +124,24 @@ public Connector create(
Supplier<GravitinoAdminClient> clientProvider() {
return () -> null;
}

private CatalogConnectorFactory createCatalogConnectorFactory(GravitinoConfig config) {
// Create a CatalogConnectorFactory. If we specify a customized class name for the
// CatalogConnectorFactory,
// it creates a user-customized CatalogConnectorFactory; otherwise, it creates a
// DefaultCatalogConnectorFactory.
String className = config.getCatalogConnectorFactoryClassName();
if (StringUtils.isEmpty(className)) {
return new DefaultCatalogConnectorFactory(config);
}

try {
Class<?> clazz = Class.forName(className);
Object obj = clazz.getDeclaredConstructor(GravitinoConfig.class).newInstance(config);
return (CatalogConnectorFactory) obj;
} catch (Exception e) {
throw new TrinoException(
GRAVITINO_RUNTIME_ERROR, "Can not create CatalogConnectorFactory", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonatype.aether.artifact.Artifact;
Expand All @@ -51,31 +50,17 @@ public class GravitinoConnectorPluginManager {

public static final String APP_CLASS_LOADER_NAME = "app";

public static final String CONNECTOR_HIVE = "hive";
public static final String CONNECTOR_ICEBERG = "iceberg";
public static final String CONNECTOR_MYSQL = "mysql";
public static final String CONNECTOR_POSTGRESQL = "postgresql";
public static final String CONNECTOR_MEMORY = "memory";

private static final String PLUGIN_NAME_PREFIX = "gravitino-";
private static final String PLUGIN_CLASSLOADER_CLASS_NAME = "io.trino.server.PluginClassLoader";

private static volatile GravitinoConnectorPluginManager instance;

private Class<?> pluginLoaderClass;

private static final Set<String> usePlugins =
Set.of(
CONNECTOR_HIVE,
CONNECTOR_ICEBERG,
CONNECTOR_MYSQL,
CONNECTOR_POSTGRESQL,
CONNECTOR_MEMORY);

private final Map<String, Plugin> connectorPlugins = new HashMap<>();
private final ClassLoader appClassloader;

public GravitinoConnectorPluginManager(ClassLoader classLoader) {
private GravitinoConnectorPluginManager(ClassLoader classLoader) {
this.appClassloader = classLoader;

try {
Expand Down Expand Up @@ -132,11 +117,12 @@ private void loadPluginsFromFile() {
.getPath();
String pluginDir = Paths.get(jarPath).getParent().getParent().toString();

// Load all plugins
for (String pluginName : usePlugins) {
loadPlugin(pluginDir, pluginName);
LOG.info("Load plugin {}/{} successful", pluginDir, pluginName);
}
Arrays.stream(new File(pluginDir).listFiles())
.forEach(
file -> {
loadPlugin(pluginDir, file.getName());
LOG.info("Load plugin {}/{} successful", pluginDir, file.getName());
});
} catch (Exception e) {
throw new TrinoException(
GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR, "Error while loading plugins from file", e);
Expand All @@ -146,16 +132,10 @@ private void loadPluginsFromFile() {
private void loadPlugin(String pluginPath, String pluginName) {
String dirName = pluginPath + "/" + pluginName;
File directory = new File(dirName);
if (!directory.exists()) {
LOG.warn("Can not found plugin {} in directory {}", pluginName, dirName);
return;
}

File[] pluginFiles = directory.listFiles();
if (pluginFiles == null || pluginFiles.length == 0) {
throw new TrinoException(
GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR,
"Can not found any files plugin directory " + dirName);
LOG.warn("Can not load plugin {} from empty directory {}", pluginName, dirName);
return;
}
List<URL> files =
Arrays.stream(pluginFiles)
Expand Down Expand Up @@ -196,24 +176,20 @@ private void loadPluginWithUrls(List<URL> urls, String pluginName) {
ServiceLoader.load(Plugin.class, (ClassLoader) pluginClassLoader);
List<Plugin> pluginList = ImmutableList.copyOf(serviceLoader);
if (pluginList.isEmpty()) {
throw new TrinoException(
GravitinoErrorCode.GRAVITINO_CREATE_INTERNAL_CONNECTOR_ERROR,
String.format("The %s plugin does not found connector SIP interface", pluginName));
LOG.warn("The {} plugin directory does not found connector SIP interface", pluginName);
return;
}
Plugin plugin = pluginList.get(0);
if (plugin.getConnectorFactories() == null
|| !plugin.getConnectorFactories().iterator().hasNext()) {
throw new TrinoException(
GravitinoErrorCode.GRAVITINO_CREATE_INTERNAL_CONNECTOR_ERROR,
String.format("The %s plugin does not contains any ConnectorFactories", pluginName));
LOG.warn("The {} plugin does not contains any ConnectorFactories ", pluginName);
return;
}
connectorPlugins.put(pluginName, pluginList.get(0));

} catch (Exception e) {
throw new TrinoException(
GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR,
"Failed to create Plugin class loader " + pluginName,
e);
GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR, "Failed to load Plugin " + pluginName, e);
}
}

Expand All @@ -228,7 +204,8 @@ private void loadPluginsFromBundle() {

String value = GravitinoConfig.trinoConfig.getProperty(TRINO_PLUGIN_BUNDLES);
Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults();
splitter.splitToList(value).stream()
splitter
.splitToList(value)
.forEach(
v -> {
int start = v.indexOf("trino-");
Expand All @@ -240,10 +217,11 @@ private void loadPluginsFromBundle() {
return;
}
String key = v.substring(start, end).replace("trino-", "");
if (!usePlugins.contains(key)) {
return;
try {
loadPluginByPom(artifactResolver.resolvePom(new File(v)), key);
} catch (Throwable t) {
LOG.error("Fatal error in load plugin by {}", v, t);
}
loadPluginByPom(artifactResolver.resolvePom(new File(v)), key);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,17 @@
*/
package org.apache.gravitino.trino.connector.catalog;

import io.trino.spi.TrinoException;
import java.util.HashMap;
import org.apache.gravitino.trino.connector.GravitinoErrorCode;
import org.apache.gravitino.trino.connector.catalog.hive.HiveConnectorAdapter;
import org.apache.gravitino.trino.connector.catalog.iceberg.IcebergConnectorAdapter;
import org.apache.gravitino.trino.connector.catalog.jdbc.mysql.MySQLConnectorAdapter;
import org.apache.gravitino.trino.connector.catalog.jdbc.postgresql.PostgreSQLConnectorAdapter;
import org.apache.gravitino.trino.connector.catalog.memory.MemoryConnectorAdapter;
import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** This class use to create CatalogConnectorContext instance by given catalog. */
public class CatalogConnectorFactory {
private static final Logger LOG = LoggerFactory.getLogger(CatalogConnectorFactory.class);

private final HashMap<String, CatalogConnectorContext.Builder> catalogBuilders = new HashMap<>();

public CatalogConnectorFactory() {
catalogBuilders.put("hive", new CatalogConnectorContext.Builder(new HiveConnectorAdapter()));
catalogBuilders.put(
"memory", new CatalogConnectorContext.Builder(new MemoryConnectorAdapter()));
catalogBuilders.put(
"lakehouse-iceberg", new CatalogConnectorContext.Builder(new IcebergConnectorAdapter()));
catalogBuilders.put(
"jdbc-mysql", new CatalogConnectorContext.Builder(new MySQLConnectorAdapter()));
catalogBuilders.put(
"jdbc-postgresql", new CatalogConnectorContext.Builder(new PostgreSQLConnectorAdapter()));
}

public CatalogConnectorContext.Builder createCatalogConnectorContextBuilder(
GravitinoCatalog catalog) {
String catalogProvider = catalog.getProvider();
CatalogConnectorContext.Builder builder = catalogBuilders.get(catalogProvider);
if (builder == null) {
String message = String.format("Unsupported catalog provider %s.", catalogProvider);
LOG.error(message);
throw new TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_CATALOG_PROVIDER, message);
}

// Avoid using the same builder object to prevent catalog creation errors.
return builder.clone(catalog);
}
/**
* This interface is used to create a CatalogConnectorContext builder connector by Gravitino catalog
*/
public interface CatalogConnectorFactory {
/**
* Create a CatalogConnectorContext builder by Gravitino catalog
*
* @param catalog Gravitino catalog
* @return CatalogConnectorContext builder
*/
CatalogConnectorContext.Builder createCatalogConnectorContextBuilder(GravitinoCatalog catalog);
}
Loading

0 comments on commit adac3db

Please sign in to comment.