Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3515] feat(flink-connector): Support flink iceberg catalog #5914

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

sunxiaojian
Copy link
Contributor

What changes were proposed in this pull request?

Support flink iceberg catalog

Why are the changes needed?

Fix: #3515

Does this PR introduce any user-facing change?

no

How was this patch tested?

FlinkIcebergCatalogIT
FlinkIcebergHiveCatalogIT

@sunxiaojian sunxiaojian changed the title [#3515]feat(flink-connector)Support flink iceberg catalog [#3515] feat(flink-connector): Support flink iceberg catalog Dec 19, 2024
@sunxiaojian sunxiaojian force-pushed the support-flink-iceberg-catalog branch 4 times, most recently from 24cd6b8 to ef294ba Compare December 19, 2024 07:37
@sunxiaojian
Copy link
Contributor Author

@FANNG1 @coolderli PTAL

@FANNG1
Copy link
Contributor

FANNG1 commented Dec 19, 2024

Cool!, I'll review this PR, but may need some time, :)

@sunxiaojian
Copy link
Contributor Author

Cool!, I'll review this PR, but may need some time, :)

ok, thanks

Comment on lines 8 to 10
The Apache Gravitino Flink connector offers the capability to read and write Iceberg tables, with the metadata managed by the Gravitino server. To enable the use of the Iceberg catalog within the Flink connector, you must download the Iceberg Flink runtime JAR to the Flink classpath.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The Apache Gravitino Flink connector offers the capability to read and write Iceberg tables, with the metadata managed by the Gravitino server. To enable the use of the Iceberg catalog within the Flink connector, you must download the Iceberg Flink runtime JAR to the Flink classpath.
The Apache Gravitino Flink connector can be used to read and write Iceberg tables, with the metadata managed by the Gravitino server.
To enable the Flink connector, you must download the Iceberg Flink runtime JAR and place it in the Flink classpath.

- `INSERT INTO & OVERWRITE`
- `SELECT`

#### Not supported operations:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#### Not supported operations:
#### Operations Not Supported:


## Catalog properties

Gravitino Flink connector will transform the following property names defined in catalog properties to Flink Iceberg connector configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Gravitino Flink connector will transform the following property names defined in catalog properties to Flink Iceberg connector configuration.
The Gravitino Flink connector transforms the following properties in a catalog to Flink connector configuration.


### S3

You need to add s3 secret to the Flink configuration using `s3.access-key-id` and `s3.secret-access-key`. Additionally, download the [Iceberg AWS bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle) and place it in the classpath of Flink.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You need to add s3 secret to the Flink configuration using `s3.access-key-id` and `s3.secret-access-key`. Additionally, download the [Iceberg AWS bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle) and place it in the classpath of Flink.
You need to add an S3 secret to the Flink configuration using `s3.access-key-id` and `s3.secret-access-key`.
Additionally, you need to download the [Iceberg AWS bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-aws-bundle)
and place it in the Flink classpath.


### OSS

You need to add OSS secret key to the Flink configuration using `client.access-key-id` and `client.access-key-secret`. Additionally, download the [Aliyun OSS SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip) and copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` in the classpath of Flink.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You need to add OSS secret key to the Flink configuration using `client.access-key-id` and `client.access-key-secret`. Additionally, download the [Aliyun OSS SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip) and copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` in the classpath of Flink.
You need to add an OSS secret key to the Flink configuration using `client.access-key-id` and `client.access-key-secret`.
Additionally, you need download the [Aliyun OSS SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip),
and copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` to the Flink classpath.


### GCS

No extra configuration is needed. Please make sure the credential file is accessible by Flink, like using `export GOOGLE_APPLICATION_CREDENTIALS=/xx/application_default_credentials.json`, and download [Iceberg GCP bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-gcp-bundle) and place it to the classpath of Flink.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
No extra configuration is needed. Please make sure the credential file is accessible by Flink, like using `export GOOGLE_APPLICATION_CREDENTIALS=/xx/application_default_credentials.json`, and download [Iceberg GCP bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-gcp-bundle) and place it to the classpath of Flink.
No extra configuration is needed. Please make sure the credential file is accessible by Flink.
For example, `export GOOGLE_APPLICATION_CREDENTIALS=/xx/application_default_credentials.json`.
You need to download [Iceberg GCP bundle](https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-gcp-bundle) and place it in the Flink classpath.

FactoryUtils.createCatalogFactoryHelper(this, context);
return new GravitinoIcebergCatalog(
context.getName(),
helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to use such a long name?
Maybe CatalogFactoryOptions or even FactoryOptions is enough in such a well-defined package hierarchy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not recommended. If multiple CatalogFactoryOptions are referenced within the same class, the package name may have to be included every time


String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive";

@VisibleForTesting String ICEBERG_CATALOG_BACKEND_REST = CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most, if not all fields in this interface are constants, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are all constants

@FANNG1
Copy link
Contributor

FANNG1 commented Dec 26, 2024

Hi, @sunxiaojian , Sorry for the delay, I'm working on the issues to release 0.8, may doesn't have enough time to review this PR these days.

@xunliu
Copy link
Member

xunliu commented Dec 28, 2024

hi @sunxiaojian Thank you for your contributions.
Can you send an email to me([email protected]), I have something need to discuss with you.

@sunxiaojian
Copy link
Contributor Author

Hi, @sunxiaojian , Sorry for the delay, I'm working on the issues to release 0.8, may doesn't have enough time to review this PR these days.

@FANNG1 ok , I will also handle the comments above as soon as possible

@sunxiaojian
Copy link
Contributor Author

sunxiaojian commented Dec 29, 2024

hi @sunxiaojian Thank you for your contributions. Can you send an email to me([email protected]), I have something need to discuss with you.

@xunliu The email has been sent

@sunxiaojian sunxiaojian force-pushed the support-flink-iceberg-catalog branch 3 times, most recently from 4314cc7 to fc59502 Compare January 3, 2025 17:53
@sunxiaojian sunxiaojian force-pushed the support-flink-iceberg-catalog branch from fc59502 to 5acadb8 Compare January 4, 2025 03:33
- Metadata tables, like:
- `{iceberg_catalog}.{iceberg_database}.{iceberg_table}&snapshots`
- Querying UDF
- `UPDATE` clause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPDATE & DELETE clause are not under Query UDF?

| `uri` | `uri` | Catalog backend URI | 0.8.0-incubating |
| `warehouse` | `warehouse` | Catalog backend warehouse | 0.8.0-incubating |
| `io-impl` | `io-impl` | The IO implementation for `FileIO` in Iceberg. | 0.8.0-incubating |
| `oss-endpoint` | `oss.endpoint` | The endpoint of Aliyun OSS service. | 0.8.0-incubating |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in 0.8, Gravitino relax the constrait for the secret key to pass the client, could you add OSS AKSK too?


private IcebergPropertiesConverter() {}

private static final Map<String, String> GRAVITINO_CONFIG_TO_FLINK_ICEBERG;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not initiate the map directly?

all.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
// Map "catalog-backend" to "catalog-type".
GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gravitino supports custom catalog backend, should we do special logic here?

import org.junit.jupiter.api.Tag;

@Tag("gravitino-docker-test")
public class FlinkIcebergHiveCatalogIT extends FlinkIcebergCatalogIT {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it support other catalog backend like JDBC or REST?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be supported. This time, I only added Hive tests. In the future, I will add unit tests for JDBC and REST. There may be some minor adjustments in the logic, so detailed unit tests are necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, could you add the supported catalog backend to the document? we could update the document when we add tests for other catalog backend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Subtask] [flink-connector] Support iceberg catalog
5 participants