Skip to content

Commit

Permalink
[#4264] feat(iceberg): support access S3 with static access key for I…
Browse files Browse the repository at this point in the history
…ceberg REST server and Gravitino Iceberg catalog (#4250)

### What changes were proposed in this pull request?
1. support using static access-key-id and secret-access-key to access S3
data for iceberg rest server and gravitino iceberg catalog
2. refactor the code to reuse the iceberg catalog transform logic for
connector and Iceberg rest server and Iceberg catalog
3. add configuration to manage the s3 access key

### Why are the changes needed?

Fix: #4264 

### Does this PR introduce _any_ user-facing change?
yes, add some document

### How was this patch tested?

I tested Iceberg REST server and Iceberg catalog with `jdbc` catalog
backend and `hive` catalog backend to access S3 data.
  • Loading branch information
FANNG1 authored Aug 1, 2024
1 parent 9a3dfa1 commit fce0017
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 188 deletions.
1 change: 1 addition & 0 deletions catalogs/catalog-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ plugins {
// try to avoid adding extra dependencies because it is used by catalogs and connectors.
dependencies {
implementation(libs.slf4j.api)
implementation(libs.guava)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ public class IcebergConstants {
public static final String URI = "uri";
public static final String CATALOG_BACKEND_NAME = "catalog-backend-name";

// IO properties
public static final String IO_IMPL = "io-impl";
public static final String GRAVITINO_S3_ENDPOINT = "s3-endpoint";
public static final String ICEBERG_S3_ENDPOINT = "s3.endpoint";
public static final String GRAVITINO_S3_ACCESS_KEY_ID = "s3-access-key-id";
public static final String ICEBERG_S3_ACCESS_KEY_ID = "s3.access-key-id";
public static final String GRAVITINO_S3_SECRET_ACCESS_KEY = "s3-secret-access-key";
public static final String ICEBERG_S3_SECRET_ACCESS_KEY = "s3.secret-access-key";
public static final String GRAVITINO_S3_REGION = "s3-region";
public static final String AWS_S3_REGION = "client.region";

// Iceberg Table properties constants

public static final String COMMENT = "comment";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.lakehouse.iceberg;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class IcebergPropertiesUtils {

// Map that maintains the mapping of keys in Gravitino to that in Iceberg, for example, users
// will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will
// change it to `catalogType` automatically and pass it to Iceberg.
public static final Map<String, String> GRAVITINO_CONFIG_TO_ICEBERG;

static {
Map<String, String> map = new HashMap();
map.put(IcebergConstants.CATALOG_BACKEND, IcebergConstants.CATALOG_BACKEND);
map.put(IcebergConstants.GRAVITINO_JDBC_DRIVER, IcebergConstants.GRAVITINO_JDBC_DRIVER);
map.put(IcebergConstants.GRAVITINO_JDBC_USER, IcebergConstants.ICEBERG_JDBC_USER);
map.put(IcebergConstants.GRAVITINO_JDBC_PASSWORD, IcebergConstants.ICEBERG_JDBC_PASSWORD);
map.put(IcebergConstants.URI, IcebergConstants.URI);
map.put(IcebergConstants.WAREHOUSE, IcebergConstants.WAREHOUSE);
map.put(IcebergConstants.CATALOG_BACKEND_NAME, IcebergConstants.CATALOG_BACKEND_NAME);
map.put(IcebergConstants.IO_IMPL, IcebergConstants.IO_IMPL);
map.put(IcebergConstants.GRAVITINO_S3_ENDPOINT, IcebergConstants.ICEBERG_S3_ENDPOINT);
map.put(IcebergConstants.GRAVITINO_S3_REGION, IcebergConstants.AWS_S3_REGION);
map.put(IcebergConstants.GRAVITINO_S3_ACCESS_KEY_ID, IcebergConstants.ICEBERG_S3_ACCESS_KEY_ID);
map.put(
IcebergConstants.GRAVITINO_S3_SECRET_ACCESS_KEY,
IcebergConstants.ICEBERG_S3_SECRET_ACCESS_KEY);
GRAVITINO_CONFIG_TO_ICEBERG = Collections.unmodifiableMap(map);
}

/**
* Converts Gravitino properties to Iceberg catalog properties, the common transform logic shared
* by Spark connector, Iceberg REST server, Gravitino Iceberg catalog.
*
* @param gravitinoProperties a map of Gravitino configuration properties.
* @return a map containing Iceberg catalog properties.
*/
public static Map<String, String> toIcebergCatalogProperties(
Map<String, String> gravitinoProperties) {
Map<String, String> icebergProperties = new HashMap<>();
gravitinoProperties.forEach(
(key, value) -> {
if (GRAVITINO_CONFIG_TO_ICEBERG.containsKey(key)) {
icebergProperties.put(GRAVITINO_CONFIG_TO_ICEBERG.get(key), value);
}
});
return icebergProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.gravitino.catalog.lakehouse.iceberg;

import static org.apache.gravitino.connector.PropertyEntry.enumImmutablePropertyEntry;
import static org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
import static org.apache.gravitino.connector.PropertyEntry.stringRequiredPropertyEntry;

import com.google.common.collect.ImmutableList;
Expand All @@ -35,41 +36,14 @@

public class IcebergCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata {
public static final String CATALOG_BACKEND = IcebergConstants.CATALOG_BACKEND;

public static final String GRAVITINO_JDBC_USER = IcebergConstants.GRAVITINO_JDBC_USER;
public static final String ICEBERG_JDBC_USER = IcebergConstants.ICEBERG_JDBC_USER;

public static final String GRAVITINO_JDBC_PASSWORD = IcebergConstants.GRAVITINO_JDBC_PASSWORD;
public static final String ICEBERG_JDBC_PASSWORD = IcebergConstants.ICEBERG_JDBC_PASSWORD;
public static final String ICEBERG_JDBC_INITIALIZE = IcebergConstants.ICEBERG_JDBC_INITIALIZE;

public static final String GRAVITINO_JDBC_DRIVER = IcebergConstants.GRAVITINO_JDBC_DRIVER;
public static final String WAREHOUSE = IcebergConstants.WAREHOUSE;
public static final String URI = IcebergConstants.URI;
public static final String CATALOG_BACKEND_NAME = IcebergConstants.CATALOG_BACKEND_NAME;

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

// Map that maintains the mapping of keys in Gravitino to that in Iceberg, for example, users
// will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will
// change it to `catalogType` automatically and pass it to Iceberg.
public static final Map<String, String> GRAVITINO_CONFIG_TO_ICEBERG =
ImmutableMap.of(
CATALOG_BACKEND,
CATALOG_BACKEND,
GRAVITINO_JDBC_DRIVER,
GRAVITINO_JDBC_DRIVER,
GRAVITINO_JDBC_USER,
ICEBERG_JDBC_USER,
GRAVITINO_JDBC_PASSWORD,
ICEBERG_JDBC_PASSWORD,
URI,
URI,
WAREHOUSE,
WAREHOUSE,
CATALOG_BACKEND_NAME,
CATALOG_BACKEND_NAME);

public static final Map<String, String> KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND =
ImmutableMap.of(
KerberosConfig.PRINCIPAL_KEY,
Expand Down Expand Up @@ -98,7 +72,17 @@ public class IcebergCatalogPropertiesMetadata extends BaseCatalogPropertiesMetad
false),
stringRequiredPropertyEntry(URI, "Iceberg catalog uri config", false, false),
stringRequiredPropertyEntry(
WAREHOUSE, "Iceberg catalog warehouse config", false, false));
WAREHOUSE, "Iceberg catalog warehouse config", false, false),
stringOptionalPropertyEntry(
IcebergConstants.IO_IMPL, "FileIO implement for Iceberg", true, null, false),
stringOptionalPropertyEntry(
IcebergConstants.GRAVITINO_S3_ACCESS_KEY_ID, "s3 access-key-id", true, null, true),
stringOptionalPropertyEntry(
IcebergConstants.GRAVITINO_S3_SECRET_ACCESS_KEY,
"s3 secret-access-key",
true,
null,
true));
HashMap<String, PropertyEntry<?>> result = Maps.newHashMap(BASIC_CATALOG_PROPERTY_ENTRIES);
result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName));
result.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
Expand All @@ -111,18 +95,15 @@ protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return PROPERTIES_METADATA;
}

public Map<String, String> transformProperties(Map<String, String> properties) {
Map<String, String> gravitinoConfig = Maps.newHashMap();
properties.forEach(
(key, value) -> {
if (GRAVITINO_CONFIG_TO_ICEBERG.containsKey(key)) {
gravitinoConfig.put(GRAVITINO_CONFIG_TO_ICEBERG.get(key), value);
}

if (KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND.containsKey(key)) {
gravitinoConfig.put(KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND.get(key), value);
public Map<String, String> transformProperties(Map<String, String> gravitinoProperties) {
Map<String, String> icebergProperties =
IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties);
gravitinoProperties.forEach(
(k, v) -> {
if (KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND.containsKey(k)) {
icebergProperties.put(KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND.get(k), v);
}
});
return gravitinoConfig;
return icebergProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -54,6 +55,7 @@
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.utils.IcebergCatalogUtil;
import org.apache.gravitino.integration.test.container.ContainerSuite;
Expand Down Expand Up @@ -219,7 +221,10 @@ private void createCatalog() {
icebergCatalogProperties.put(
IcebergConfig.CATALOG_BACKEND_NAME.getKey(), icebergCatalogBackendName);

icebergCatalog = IcebergCatalogUtil.loadCatalogBackend(TYPE, icebergCatalogProperties);
icebergCatalog =
IcebergCatalogUtil.loadCatalogBackend(
IcebergCatalogBackend.valueOf(TYPE.toUpperCase(Locale.ROOT)),
new IcebergConfig(icebergCatalogProperties));
if (icebergCatalog instanceof SupportsNamespaces) {
icebergSupportsNamespaces = (org.apache.iceberg.catalog.SupportsNamespaces) icebergCatalog;
}
Expand Down
20 changes: 20 additions & 0 deletions common/src/main/java/org/apache/gravitino/utils/MapUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.Map;
import java.util.function.Predicate;

/** Utility class for working with maps. */
public class MapUtils {
Expand All @@ -47,6 +48,25 @@ public static Map<String, String> getPrefixMap(Map<String, String> m, String pre
return Collections.unmodifiableMap(configs);
}

/**
* Returns a map with all keys that match the predicate.
*
* @param m The map to filter.
* @param predicate The predicate expression to filter the keys.
* @return A map with all keys that match the predicate.
*/
public static Map<String, String> getFilteredMap(Map<String, String> m, Predicate predicate) {
Map<String, String> configs = Maps.newHashMap();
m.forEach(
(k, v) -> {
if (predicate.test(k)) {
configs.put(k, v);
}
});

return Collections.unmodifiableMap(configs);
}

/**
* Returns an unmodifiable map.
*
Expand Down
18 changes: 17 additions & 1 deletion conf/gravitino-iceberg-rest-server.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,20 @@ gravitino.iceberg-rest.responseHeaderSize = 131072
# The Iceberg catalog backend, it's recommended to change to hive or jdbc
gravitino.iceberg-rest.catalog-backend = memory
# The warehouse directory of Iceberg catalog
gravitino.iceberg-rest.warehouse = /tmp/
gravitino.iceberg-rest.warehouse = /tmp

# THE CONFIGURATION EXAMPLE FOR JDBC CATALOG BACKEND WITH S3 SUPPORT

# gravitino.iceberg-rest.catalog-backend = jdbc
# gravitino.iceberg-rest.jdbc-driver = org.postgresql.Driver
# gravitino.iceberg-rest.uri = jdbc:postgresql://127.0.0.1:5432/postgres
# gravitino.iceberg-rest.jdbc-user = postgres
# gravitino.iceberg-rest.jdbc-password = abc123
# gravitino.iceberg-rest.jdbc-initialize = true
# change to s3a://test/my/key/prefix for Hive catalog backend
# gravitino.iceberg-rest.warehouse = s3://test/my/key/prefix
# gravitino.iceberg-rest.io-impl= org.apache.iceberg.aws.s3.S3FileIO
# gravitino.iceberg-rest.s3-access-key-id = xxx
# gravitino.iceberg-rest.s3-secret-access-key = xxx
# gravitino.iceberg-rest.s3-endpoint = http://192.168.215.4:9010
# gravitino.iceberg-rest.s3-region = xxx
20 changes: 19 additions & 1 deletion docs/iceberg-rest-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ The `clients` property for example:
`catalog-impl` has no effect.
:::

### S3 configuration

Gravitino Iceberg REST service supports using static access-key-id and secret-access-key to access S3 data.

| Configuration item | Description | Default value | Required | Since Version |
|-----------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|---------------|
| `gravitino.iceberg-rest.io-impl` | The IO implementation for `FileIO` in Iceberg, use `org.apache.iceberg.aws.s3.S3FileIO` for S3. | (none) | No | 0.6.0 |
| `gravitino.iceberg-rest.s3-access-key-id` | The static access key ID used to access S3 data. | (none) | No | 0.6.0 |
| `gravitino.iceberg-rest.s3-secret-access-key` | The static secret access key used to access S3 data. | (none) | No | 0.6.0 |
| `gravitino.iceberg-rest.s3-endpoint` | An alternative endpoint of the S3 service, This could be used for S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. | (none) | No | 0.6.0 |
| `gravitino.iceberg-rest.s3-region` | The region of the S3 service, like `us-west-2`. | (none) | No | 0.6.0 |

For other Iceberg s3 properties not managed by Gravitino like `s3.sse.type`, you could config it directly by `gravitino.iceberg-rest.s3.sse.type`.

:::info
Please set `gravitino.iceberg-rest.warehouse` to `s3://{bucket_name}/${prefix_name}` for Jdbc catalog backend, `s3a://{bucket_name}/${prefix_name}` for Hive catalog backend.
:::

### HDFS configuration

The Gravitino Iceberg REST catalog service adds the HDFS configuration files `core-site.xml` and `hdfs-site.xml` from the directory defined by `gravitino.auxService.iceberg-rest.classpath`, for example, `catalogs/lakehouse-iceberg/conf`, to the classpath.
Expand Down Expand Up @@ -163,7 +181,7 @@ For example, we can configure Spark catalog options to use Gravitino Iceberg RES
--conf spark.sql.catalog.rest.uri=http://127.0.0.1:9001/iceberg/
```

You may need to adjust the Iceberg Spark runtime jar file name according to the real version number in your environment.
You may need to adjust the Iceberg Spark runtime jar file name according to the real version number in your environment. If you want to access the data stored in S3, you need to download [iceberg-aws-bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle) jar and place it in the classpath of Spark, no extra config is needed because S3 related properties is transferred from Iceberg REST server to Iceberg REST client automaticly.

### Exploring Apache Iceberg with Apache Spark SQL

Expand Down
18 changes: 18 additions & 0 deletions docs/lakehouse-iceberg-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,24 @@ If you have a JDBC Iceberg catalog prior, you must set `catalog-backend-name` to
You must download the corresponding JDBC driver to the `catalogs/lakehouse-iceberg/libs` directory.
:::

#### S3

Supports using static access-key-id and secret-access-key to access S3 data.

| Configuration item | Description | Default value | Required | Since Version |
|------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|---------------|
| `io-impl` | The io implementation for `FileIO` in Iceberg, use `org.apache.iceberg.aws.s3.S3FileIO` for s3. | (none) | No | 0.6.0 |
| `s3-access-key-id` | The static access key ID used to access S3 data. | (none) | No | 0.6.0 |
| `s3-secret-access-key` | The static secret access key used to access S3 data. | (none) | No | 0.6.0 |
| `s3-endpoint` | An alternative endpoint of the S3 service, This could be used for S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. | (none) | No | 0.6.0 |
| `s3-region` | The region of the S3 service, like `us-west-2`. | (none) | No | 0.6.0 |

For other Iceberg s3 properties not managed by Gravitino like `s3.sse.type`, you could config it directly by `gravitino.bypass.s3.sse.type`.

:::info
Please set `gravitino.iceberg-rest.warehouse` to `s3://{bucket_name}/${prefix_name}` for JDBC catalog backend, `s3a://{bucket_name}/${prefix_name}` for Hive catalog backend.
:::

### Catalog operations

Please refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations) for more details.
Expand Down
Loading

0 comments on commit fce0017

Please sign in to comment.