Skip to content

Commit

Permalink
[#1785] improve (trino-connector): Gravitino Trino connector support …
Browse files Browse the repository at this point in the history
…origin Trino connector configuration in the catalog properties (#3054)

### What changes were proposed in this pull request?

Gravitino Trino connector Support passing Trino connector configuration
in the catalog properties.
the configuration key should start with `trino.bypass`.
For example using `trino.bypass.hive.config.resources` to passing the
`hive.config.resources` to the Gravitino Hive catalog in Trino runtime.

### Why are the changes needed?

Fix: #1785

### Does this PR introduce _any_ user-facing change?

Add documents about : Passing Trino connector properties in
`trino-connector/supported-catalog.md`

### How was this patch tested?

UT IT

---------

Co-authored-by: Qi Yu <[email protected]>
  • Loading branch information
diqiu50 and yuqi1129 authored Apr 22, 2024
1 parent 4cf2ed2 commit 59dc495
Show file tree
Hide file tree
Showing 25 changed files with 737 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
/** Transforming between gravitino schema/table/column property and engine property. */
public abstract class PropertyConverter {

protected static final String TRINO_PROPERTIES_PREFIX = "trino.bypass.";

private static final Logger LOG = LoggerFactory.getLogger(PropertyConverter.class);
/**
* Mapping that maps engine properties to Gravitino properties. It will return a map that holds
Expand Down
2 changes: 2 additions & 0 deletions docs/apache-hive-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ The Hive catalog supports creating, updating, and deleting databases and tables
| `kerberos.check-interval-sec` | The interval to check validness of the principal | 60 | No | 0.4.0 |
| `kerberos.keytab-fetch-timeout-sec` | The timeout to fetch key tab | 60 | No | 0.4.0 |

When you use the Gravitino with Trino. You can pass the Trino Hive connector configuration using prefix `trino.bypass.`. For example, using `trino.bypass.hive.config.resources` to pass the `hive.config.resources` to the Gravitino Hive catalog in Trino runtime.

### Catalog operations

Refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations) for more details.
Expand Down
2 changes: 2 additions & 0 deletions docs/jdbc-mysql-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ You can pass to a MySQL data source any property that isn't defined by Gravitino

Check the relevant data source configuration in [data source properties](https://commons.apache.org/proper/commons-dbcp/configuration.html)

When you use the Gravitino with Trino. You can pass the Trino MySQL connector configuration using prefix `trino.bypass.`. For example, using `trino.bypass.join-pushdown.strategy` to pass the `join-pushdown.strategy` to the Gravitino MySQL catalog in Trino runtime.

If you use a JDBC catalog, you must provide `jdbc-url`, `jdbc-driver`, `jdbc-user` and `jdbc-password` to catalog properties.

| Configuration item | Description | Default value | Required | Since Version |
Expand Down
2 changes: 2 additions & 0 deletions docs/jdbc-postgresql-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ Gravitino saves some system information in schema and table comment, like `(From
Any property that isn't defined by Gravitino can pass to MySQL data source by adding `gravitino.bypass.` prefix as a catalog property. For example, catalog property `gravitino.bypass.maxWaitMillis` will pass `maxWaitMillis` to the data source property.
You can check the relevant data source configuration in [data source properties](https://commons.apache.org/proper/commons-dbcp/configuration.html)

When you use the Gravitino with Trino. You can pass the Trino PostgreSQL connector configuration using prefix `trino.bypass.`. For example, using `trino.bypass.join-pushdown.strategy` to pass the `join-pushdown.strategy` to the Gravitino PostgreSQL catalog in Trino runtime.

If you use JDBC catalog, you must provide `jdbc-url`, `jdbc-driver`, `jdbc-database`, `jdbc-user` and `jdbc-password` to catalog properties.

| Configuration item | Description | Default value | Required | Since Version |
Expand Down
2 changes: 2 additions & 0 deletions docs/lakehouse-iceberg-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ Builds with Hadoop 2.10.x, there may be compatibility issues when accessing Hado

Any properties not defined by Gravitino with `gravitino.bypass.` prefix will pass to Iceberg catalog properties and HDFS configuration. For example, if specify `gravitino.bypass.list-all-tables`, `list-all-tables` will pass to Iceberg catalog properties.

When you use the Gravitino with Trino. You can pass the Trino Iceberg connector configuration using prefix `trino.bypass.`. For example, using `trino.bypass.iceberg.table-statistics-enabled` to pass the `iceberg.table-statistics-enabled` to the Gravitino Iceberg catalog in Trino runtime.

#### JDBC catalog

If you are using JDBC catalog, you must provide `jdbc-user`, `jdbc-password` and `jdbc-driver` to catalog properties.
Expand Down
24 changes: 24 additions & 0 deletions docs/trino-connector/supported-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,30 @@ call gravitino.system.alter_catalog(
if you need more information about catalog, please refer to:
[Create a Catalog](../manage-relational-metadata-using-gravitino.md#create-a-catalog).

## Passing Trino connector configuration
A Gravitino catalog is implemented by the Trino connector, so you can pass the Trino connector configuration to the Gravitino catalog.
For example, you want to set the `hive.config.resources` configuration for the Hive catalog, you can pass the configuration to the
Gravitino catalog like this:

```sql
call gravitino.system.create_catalog(
'gt_hive',
'hive',
map(
array['metastore.uris', 'trino.bypass.hive.config.resources'],
array['thrift://trino-ci-hive:9083', "/tmp/hive-site.xml,/tmp/core-site.xml"]
)
);
```

A prefix with `trino.bypass.` in the configuration key is used to indicate Gravitino connector to pass the Trino connector configuration to the Gravitino catalog in the Trino runtime.

More trino connector configurations can refer to:
- [Hive catalog](https://trino.io/docs/current/connector/hive.html#hive-general-configuration-properties)
- [Iceberg catalog](https://trino.io/docs/current/connector/iceberg.html#general-configuration)
- [MySQL catalog](https://trino.io/docs/current/connector/mysql.html#general-configuration-properties)
- [PostgreSQL catalog](https://trino.io/docs/current/connector/postgresql.html#general-configuration-properties)

## Data type mapping between Trino and Gravitino

Gravitino connector supports the following data type conversions between Trino and Gravitino currently. Depending on the detailed catalog, Gravitino may not support some data types conversion for this specific catalog, for example,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,16 +825,20 @@ void testHiveCatalogCreatedByGravitino() throws InterruptedException {
"thrift://%s:%s",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HIVE_METASTORE_PORT))
.put("hive.immutable-partitions", "true")
.put("hive.target-max-file-size", "1GB")
.put("hive.create-empty-bucket-files", "true")
.put("hive.validate-bucketing", "true")
.put("trino.bypass.hive.immutable-partitions", "true")
.put("trino.bypass.hive.target-max-file-size", "1GB")
.put("trino.bypass.hive.create-empty-bucket-files", "true")
.put("trino.bypass.hive.validate-bucketing", "true")
.build());
Catalog catalog = createdMetalake.loadCatalog(NameIdentifier.of(metalakeName, catalogName));
Assertions.assertEquals("true", catalog.properties().get("hive.immutable-partitions"));
Assertions.assertEquals("1GB", catalog.properties().get("hive.target-max-file-size"));
Assertions.assertEquals("true", catalog.properties().get("hive.create-empty-bucket-files"));
Assertions.assertEquals("true", catalog.properties().get("hive.validate-bucketing"));
Assertions.assertEquals(
"true", catalog.properties().get("trino.bypass.hive.immutable-partitions"));
Assertions.assertEquals(
"1GB", catalog.properties().get("trino.bypass.hive.target-max-file-size"));
Assertions.assertEquals(
"true", catalog.properties().get("trino.bypass.hive.create-empty-bucket-files"));
Assertions.assertEquals(
"true", catalog.properties().get("trino.bypass.hive.validate-bucketing"));

String sql = String.format("show catalogs like '%s'", catalogName);
boolean success = checkTrinoHasLoaded(sql, 30);
Expand Down Expand Up @@ -863,17 +867,21 @@ void testWrongHiveCatalogProperty() throws InterruptedException {
"thrift://%s:%s",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HIVE_METASTORE_PORT))
.put("hive.immutable-partitions", "true")
.put("trino.bypass.hive.immutable-partitions", "true")
// it should be like '1GB, 1MB', we make it wrong purposely.
.put("hive.target-max-file-size", "xxxx")
.put("hive.create-empty-bucket-files", "true")
.put("hive.validate-bucketing", "true")
.put("trino.bypass.hive.target-max-file-size", "xxxx")
.put("trino.bypass.hive.create-empty-bucket-files", "true")
.put("trino.bypass.hive.validate-bucketing", "true")
.build());
Catalog catalog = createdMetalake.loadCatalog(NameIdentifier.of(metalakeName, catalogName));
Assertions.assertEquals("true", catalog.properties().get("hive.immutable-partitions"));
Assertions.assertEquals("xxxx", catalog.properties().get("hive.target-max-file-size"));
Assertions.assertEquals("true", catalog.properties().get("hive.create-empty-bucket-files"));
Assertions.assertEquals("true", catalog.properties().get("hive.validate-bucketing"));
Assertions.assertEquals(
"true", catalog.properties().get("trino.bypass.hive.immutable-partitions"));
Assertions.assertEquals(
"xxxx", catalog.properties().get("trino.bypass.hive.target-max-file-size"));
Assertions.assertEquals(
"true", catalog.properties().get("trino.bypass.hive.create-empty-bucket-files"));
Assertions.assertEquals(
"true", catalog.properties().get("trino.bypass.hive.validate-bucketing"));

String sql = String.format("show catalogs like '%s'", catalogName);
checkTrinoHasLoaded(sql, 6);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ select * from gravitino.system.catalog where name = 'gt_mysql_xxx1';
call gravitino.system.alter_catalog(
'gt_mysql_xxx1',
map(
array['join-pushdown.strategy', 'test_key'],
array['trino.bypass.join-pushdown.strategy', 'test_key'],
array['EAGER', 'test_value']
)
);
Expand All @@ -22,15 +22,15 @@ select * from gravitino.system.catalog where name = 'gt_mysql_xxx1';
call gravitino.system.alter_catalog(
'gt_mysql_xxx1',
map(),
array['join-pushdown.strategy']
array['trino.bypass.join-pushdown.strategy']
);

select * from gravitino.system.catalog where name = 'gt_mysql_xxx1';

call gravitino.system.alter_catalog(
catalog => 'gt_mysql_xxx1',
set_properties => map(
array['join-pushdown.strategy'],
array['trino.bypass.join-pushdown.strategy'],
array['EAGER']
),
remove_properties => array['test_key']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ CALL

CALL

"gt_mysql_xxx1","jdbc-mysql","{""join-pushdown.strategy"":""EAGER"",""jdbc-url"":""jdbc:mysql://%/?useSSL=false"",""jdbc-user"":""trino"",""jdbc-password"":""ds123"",""jdbc-driver"":""com.mysql.cj.jdbc.Driver"",""test_key"":""test_value""}"
"gt_mysql_xxx1","jdbc-mysql","{""jdbc-url"":""jdbc:mysql://%/?useSSL=false"",""jdbc-user"":""trino"",""jdbc-password"":""ds123"",""jdbc-driver"":""com.mysql.cj.jdbc.Driver"",""test_key"":""test_value"",""trino.bypass.join-pushdown.strategy"":""EAGER""}"

CALL

"gt_mysql_xxx1","jdbc-mysql","{""jdbc-url"":""jdbc:mysql://%/?useSSL=false"",""jdbc-user"":""trino"",""jdbc-password"":""ds123"",""jdbc-driver"":""com.mysql.cj.jdbc.Driver"",""test_key"":""test_value""}"

CALL

"gt_mysql_xxx1","jdbc-mysql","{""join-pushdown.strategy"":""EAGER"",""jdbc-url"":""jdbc:mysql://%/?useSSL=false"",""jdbc-user"":""trino"",""jdbc-password"":""ds123"",""jdbc-driver"":""com.mysql.cj.jdbc.Driver""}"
"gt_mysql_xxx1","jdbc-mysql","{""jdbc-url"":""jdbc:mysql://%/?useSSL=false"",""jdbc-user"":""trino"",""jdbc-password"":""ds123"",""jdbc-driver"":""com.mysql.cj.jdbc.Driver"",""trino.bypass.join-pushdown.strategy"":""EAGER""}"

CALL
CALL
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ call gravitino.system.create_catalog(
'gt_mysql',
'jdbc-mysql',
map(
array['jdbc-url', 'jdbc-user', 'jdbc-password', 'jdbc-driver', 'join-pushdown.strategy'],
array['jdbc-url', 'jdbc-user', 'jdbc-password', 'jdbc-driver', 'trino.bypass.join-pushdown.strategy'],
array['${mysql_uri}/?useSSL=false', 'trino', 'ds123', 'com.mysql.cj.jdbc.Driver', 'EAGER']
)
);
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ call gravitino.system.create_catalog(
'gt_postgresql',
'jdbc-postgresql',
map(
array['jdbc-url', 'jdbc-user', 'jdbc-password', 'jdbc-database', 'jdbc-driver', 'join-pushdown.strategy'],
array['jdbc-url', 'jdbc-user', 'jdbc-password', 'jdbc-database', 'jdbc-driver', 'trino.bypass.join-pushdown.strategy'],
array['${postgresql_uri}/gt_db', 'trino', 'ds123', 'gt_db', 'org.postgresql.Driver', 'EAGER']
)
);
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ public void loadCatalogs(GravitinoMetalake metalake) {
(NameIdentifier nameIdentifier) -> {
try {
Catalog catalog = metalake.loadCatalog(nameIdentifier);
GravitinoCatalog gravitinoCatalog =
new GravitinoCatalog(metalake.name(), catalog, config.simplifyCatalogNames());
GravitinoCatalog gravitinoCatalog = new GravitinoCatalog(metalake.name(), catalog);
if (catalogConnectors.containsKey(getTrinoCatalogName(gravitinoCatalog))) {
// Reload catalogs that have been updated in Gravitino server.
reloadCatalog(metalake, gravitinoCatalog);
Expand Down
Loading

0 comments on commit 59dc495

Please sign in to comment.