Skip to content

Commit

Permalink
[apache#4962] feat(trino-connector): Support the Trino cascading conn…
Browse files Browse the repository at this point in the history
…ector (apache#4935)

### What changes were proposed in this pull request?

Support the Trino cascading connector

### Why are the changes needed?

Fix: apache#4962 

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add some ut and Manually test
  • Loading branch information
diqiu50 authored Sep 25, 2024
1 parent ed7fefb commit eaffd52
Show file tree
Hide file tree
Showing 8 changed files with 366 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.gravitino.trino.connector.catalog.iceberg.IcebergConnectorAdapter;
import org.apache.gravitino.trino.connector.catalog.jdbc.mysql.MySQLConnectorAdapter;
import org.apache.gravitino.trino.connector.catalog.jdbc.postgresql.PostgreSQLConnectorAdapter;
import org.apache.gravitino.trino.connector.catalog.jdbc.trino.TrinoClusterConnectorAdapter;
import org.apache.gravitino.trino.connector.catalog.memory.MemoryConnectorAdapter;
import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
import org.slf4j.Logger;
Expand All @@ -35,25 +36,49 @@
public class DefaultCatalogConnectorFactory implements CatalogConnectorFactory {
private static final Logger LOG = LoggerFactory.getLogger(DefaultCatalogConnectorFactory.class);

private static final String HIVE_CONNECTOR_PROVIDER_NAME = "hive";
private static final String ICEBERG_CONNECTOR_PROVIDER_NAME = "lakehouse-iceberg";
private static final String MEMORY_CONNECTOR_PROVIDER_NAME = "memory";
private static final String MYSQL_CONNECTOR_PROVIDER_NAME = "jdbc-mysql";
private static final String POSTGRESQL_CONNECTOR_PROVIDER_NAME = "jdbc-postgresql";
private static final String TRINO_CLUSTER_CONNECTOR_PROVIDER_NAME = "trino-cluster";

protected final HashMap<String, CatalogConnectorContext.Builder> catalogBuilders =
new HashMap<>();
protected final String region;

public DefaultCatalogConnectorFactory(GravitinoConfig config) {
catalogBuilders.put("hive", new CatalogConnectorContext.Builder(new HiveConnectorAdapter()));
this.region = config.getRegion();

catalogBuilders.put(
HIVE_CONNECTOR_PROVIDER_NAME,
new CatalogConnectorContext.Builder(new HiveConnectorAdapter()));
catalogBuilders.put(
MEMORY_CONNECTOR_PROVIDER_NAME,
new CatalogConnectorContext.Builder(new MemoryConnectorAdapter()));
catalogBuilders.put(
"memory", new CatalogConnectorContext.Builder(new MemoryConnectorAdapter()));
ICEBERG_CONNECTOR_PROVIDER_NAME,
new CatalogConnectorContext.Builder(new IcebergConnectorAdapter()));
catalogBuilders.put(
"lakehouse-iceberg", new CatalogConnectorContext.Builder(new IcebergConnectorAdapter()));
MYSQL_CONNECTOR_PROVIDER_NAME,
new CatalogConnectorContext.Builder(new MySQLConnectorAdapter()));
catalogBuilders.put(
"jdbc-mysql", new CatalogConnectorContext.Builder(new MySQLConnectorAdapter()));
POSTGRESQL_CONNECTOR_PROVIDER_NAME,
new CatalogConnectorContext.Builder(new PostgreSQLConnectorAdapter()));
catalogBuilders.put(
"jdbc-postgresql", new CatalogConnectorContext.Builder(new PostgreSQLConnectorAdapter()));
TRINO_CLUSTER_CONNECTOR_PROVIDER_NAME,
new CatalogConnectorContext.Builder(new TrinoClusterConnectorAdapter()));
LOG.info("Start the DefaultCatalogConnectorFactory");
}

public CatalogConnectorContext.Builder createCatalogConnectorContextBuilder(
GravitinoCatalog catalog) {
String catalogProvider = catalog.getProvider();

if (!catalog.isSameRegion(region)) {
catalogProvider = TRINO_CLUSTER_CONNECTOR_PROVIDER_NAME;
}

CatalogConnectorContext.Builder builder = catalogBuilders.get(catalogProvider);
if (builder == null) {
String message = String.format("Unsupported catalog provider %s.", catalogProvider);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.trino.connector.catalog.jdbc.trino;

import static org.apache.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_MISSING_CONFIG;
import static org.apache.gravitino.trino.connector.catalog.jdbc.JDBCCatalogPropertyConverter.JDBC_CONNECTION_PASSWORD_KEY;
import static org.apache.gravitino.trino.connector.catalog.jdbc.JDBCCatalogPropertyConverter.JDBC_CONNECTION_URL_KEY;
import static org.apache.gravitino.trino.connector.catalog.jdbc.JDBCCatalogPropertyConverter.JDBC_CONNECTION_USER_KEY;

import io.trino.spi.TrinoException;
import io.trino.spi.session.PropertyMetadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorAdapter;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
import org.apache.gravitino.trino.connector.catalog.HasPropertyMeta;
import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;

/**
* Support trino cluster connector. Transforming cluster connector configuration and components into
* Gravitino connector.
*/
public class TrinoClusterConnectorAdapter implements CatalogConnectorAdapter {

private static final String CONNECTOR_CLUSTER = "trino";
public static final String TRINO_CLUSTER_URL_KEY = "cloud.trino.connection-url";
public static final String TRINO_CLUSTER_USER_KEY = "cloud.trino.connection-user";
public static final String TRINO_CLUSTER_PASSWORD_KEY = "cloud.trino.connection-password";
public static final String TRINO_CLUSTER_DEFAULT_USER = "admin";

private final HasPropertyMeta propertyMetadata;

public TrinoClusterConnectorAdapter() {
this.propertyMetadata = new TrinoClusterPropertyMeta();
}

@Override
public Map<String, String> buildInternalConnectorConfig(GravitinoCatalog catalog) {
Map<String, String> config = new HashMap<>();
String jdbcUrl = catalog.getProperty(TRINO_CLUSTER_URL_KEY, "");
if (StringUtils.isEmpty(jdbcUrl)) {
throw new TrinoException(
GRAVITINO_MISSING_CONFIG, "Missing jdbc url config for the cluster catalog");
}
jdbcUrl += "/" + catalog.getName();
config.put(JDBC_CONNECTION_URL_KEY, jdbcUrl);

String user = catalog.getProperty(TRINO_CLUSTER_USER_KEY, TRINO_CLUSTER_DEFAULT_USER);
config.put(JDBC_CONNECTION_USER_KEY, user);

String password = catalog.getProperty(TRINO_CLUSTER_PASSWORD_KEY, "");
config.put(JDBC_CONNECTION_PASSWORD_KEY, password);

return config;
}

@Override
public String internalConnectorName() {
return CONNECTOR_CLUSTER;
}

@Override
public CatalogConnectorMetadataAdapter getMetadataAdapter() {
return new TrinoClusterMetadataAdapter(
getTableProperties(), Collections.emptyList(), Collections.emptyList());
}

@Override
public List<PropertyMetadata<?>> getTableProperties() {
return propertyMetadata.getTablePropertyMetadata();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.trino.connector.catalog.jdbc.trino;

import io.trino.spi.session.PropertyMetadata;
import java.util.List;
import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter;
import org.apache.gravitino.trino.connector.util.GeneralDataTypeTransformer;

/** Support Trino cluster connector for testing. */
public class TrinoClusterMetadataAdapter extends CatalogConnectorMetadataAdapter {

public TrinoClusterMetadataAdapter(
List<PropertyMetadata<?>> schemaProperties,
List<PropertyMetadata<?>> tableProperties,
List<PropertyMetadata<?>> columnProperties) {

super(schemaProperties, tableProperties, columnProperties, new GeneralDataTypeTransformer());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.trino.connector.catalog.jdbc.trino;

import org.apache.gravitino.trino.connector.catalog.HasPropertyMeta;

public class TrinoClusterPropertyMeta implements HasPropertyMeta {}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.catalog.property.PropertyConverter;
import org.apache.gravitino.trino.connector.catalog.jdbc.mysql.MySQLConnectorAdapter;
import org.apache.gravitino.trino.connector.catalog.jdbc.postgresql.PostgreSQLConnectorAdapter;
import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
import org.apache.gravitino.trino.connector.metadata.TestGravitinoCatalog;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -63,72 +58,4 @@ public void testTrinoPropertyKeyToGravitino() {
propertyConverter.gravitinoToEngineProperties(gravitinoPropertiesWithoutPassword);
});
}

@Test
@SuppressWarnings("unchecked")
public void testBuildPostgreSqlConnectorProperties() throws Exception {
String name = "test_catalog";
Map<String, String> properties =
ImmutableMap.<String, String>builder()
.put("jdbc-url", "jdbc:postgresql://localhost:5432/test")
.put("jdbc-user", "test")
.put("jdbc-password", "test")
.put("trino.bypass.join-pushdown.strategy", "EAGER")
.put("unknown-key", "1")
.put("trino.bypass.unknown-key", "1")
.build();
Catalog mockCatalog =
TestGravitinoCatalog.mockCatalog(
name, "jdbc-postgresql", "test catalog", Catalog.Type.RELATIONAL, properties);
PostgreSQLConnectorAdapter adapter = new PostgreSQLConnectorAdapter();

Map<String, String> config =
adapter.buildInternalConnectorConfig(new GravitinoCatalog("test", mockCatalog));

// test converted properties
Assertions.assertEquals(config.get("connection-url"), "jdbc:postgresql://localhost:5432/test");
Assertions.assertEquals(config.get("connection-user"), "test");
Assertions.assertEquals(config.get("connection-password"), "test");

// test trino passing properties
Assertions.assertEquals(config.get("join-pushdown.strategy"), "EAGER");

// test unknown properties
Assertions.assertNull(config.get("hive.unknown-key"));
Assertions.assertNull(config.get("trino.bypass.unknown-key"));
}

@Test
@SuppressWarnings("unchecked")
public void testBuildMySqlConnectorProperties() throws Exception {
String name = "test_catalog";
Map<String, String> properties =
ImmutableMap.<String, String>builder()
.put("jdbc-url", "jdbc:mysql://localhost:5432/test")
.put("jdbc-user", "test")
.put("jdbc-password", "test")
.put("trino.bypass.join-pushdown.strategy", "EAGER")
.put("unknown-key", "1")
.put("trino.bypass.unknown-key", "1")
.build();
Catalog mockCatalog =
TestGravitinoCatalog.mockCatalog(
name, "jdbc-postgresql", "test catalog", Catalog.Type.RELATIONAL, properties);
MySQLConnectorAdapter adapter = new MySQLConnectorAdapter();

Map<String, String> config =
adapter.buildInternalConnectorConfig(new GravitinoCatalog("test", mockCatalog));

// test converted properties
Assertions.assertEquals(config.get("connection-url"), "jdbc:mysql://localhost:5432/test");
Assertions.assertEquals(config.get("connection-user"), "test");
Assertions.assertEquals(config.get("connection-password"), "test");

// test trino passing properties
Assertions.assertEquals(config.get("join-pushdown.strategy"), "EAGER");

// test unknown properties
Assertions.assertNull(config.get("hive.unknown-key"));
Assertions.assertNull(config.get("trino.bypass.unknown-key"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.trino.connector.catalog.jdbc.mysql;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog;
import org.apache.gravitino.trino.connector.metadata.TestGravitinoCatalog;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestMySQLCatalogPropertyConverter {

@Test
@SuppressWarnings("unchecked")
public void testBuildMySqlConnectorProperties() throws Exception {
String name = "test_catalog";
Map<String, String> properties =
ImmutableMap.<String, String>builder()
.put("jdbc-url", "jdbc:mysql://localhost:5432/test")
.put("jdbc-user", "test")
.put("jdbc-password", "test")
.put("trino.bypass.join-pushdown.strategy", "EAGER")
.put("unknown-key", "1")
.put("trino.bypass.unknown-key", "1")
.build();
Catalog mockCatalog =
TestGravitinoCatalog.mockCatalog(
name, "jdbc-postgresql", "test catalog", Catalog.Type.RELATIONAL, properties);
MySQLConnectorAdapter adapter = new MySQLConnectorAdapter();

Map<String, String> config =
adapter.buildInternalConnectorConfig(new GravitinoCatalog("test", mockCatalog));

// test converted properties
Assertions.assertEquals(config.get("connection-url"), "jdbc:mysql://localhost:5432/test");
Assertions.assertEquals(config.get("connection-user"), "test");
Assertions.assertEquals(config.get("connection-password"), "test");

// test trino passing properties
Assertions.assertEquals(config.get("join-pushdown.strategy"), "EAGER");

// test unknown properties
Assertions.assertNull(config.get("hive.unknown-key"));
Assertions.assertNull(config.get("trino.bypass.unknown-key"));
}
}
Loading

0 comments on commit eaffd52

Please sign in to comment.