From 2a96e3032b2e517c424e03202b7f73390988240a Mon Sep 17 00:00:00 2001 From: Peidian li <38486782+coolderli@users.noreply.github.com> Date: Fri, 24 May 2024 12:00:22 +0800 Subject: [PATCH] [#3362] feat(flink-connector): Add the code skeleton for flink-connector (#2635) ### What changes were proposed in this pull request? - support GravitinoCatalogStore to register the catalog. In the MR, we will support to create the hive catalog. ### Why are the changes needed? - Fix #3362 ### Does this PR introduce _any_ user-facing change? - support flink in gravitino ### How was this patch tested? - add UTs --- .../workflows/backend-integration-test.yml | 2 +- .github/workflows/build.yml | 1 + .github/workflows/flink-integration-test.yml | 108 +++++++ build.gradle.kts | 5 +- flink-connector/build.gradle.kts | 131 ++++++++ .../flink/connector/PropertiesConverter.java | 39 +++ .../flink/connector/catalog/BaseCatalog.java | 281 ++++++++++++++++++ .../catalog/GravitinoCatalogManager.java | 148 +++++++++ .../connector/hive/GravitinoHiveCatalog.java | 39 +++ .../hive/GravitinoHiveCatalogFactory.java | 70 +++++ .../GravitinoHiveCatalogFactoryOptions.java | 22 ++ .../hive/HivePropertiesConverter.java | 64 ++++ .../store/GravitinoCatalogStore.java | 124 ++++++++ .../store/GravitinoCatalogStoreFactory.java | 71 +++++ .../GravitinoCatalogStoreFactoryOptions.java | 27 ++ .../flink/connector/utils/FactoryUtils.java | 91 ++++++ .../flink/connector/utils/PropertyUtils.java | 32 ++ .../org.apache.flink.table.factories.Factory | 7 + .../hive/TestHivePropertiesConverter.java | 57 ++++ .../integration/test/FlinkEnvIT.java | 62 ++++ .../test/hive/FlinkHiveCatalogIT.java | 238 +++++++++++++++ .../test/resources/flink-tests/hive-site.xml | 30 ++ .../src/test/resources/log4j2.properties | 59 ++++ gradle/libs.versions.toml | 1 + integration-test/build.gradle.kts | 1 - settings.gradle.kts | 1 + 26 files changed, 1707 insertions(+), 4 deletions(-) create mode 100644 .github/workflows/flink-integration-test.yml create mode 100644 flink-connector/build.gradle.kts create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/GravitinoCatalogManager.java create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactoryOptions.java create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStore.java create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/FactoryUtils.java create mode 100644 flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/PropertyUtils.java create mode 100644 flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java create mode 100644 flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java create mode 100644 flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java create mode 100644 flink-connector/src/test/resources/flink-tests/hive-site.xml create mode 100644 flink-connector/src/test/resources/log4j2.properties diff --git a/.github/workflows/backend-integration-test.yml b/.github/workflows/backend-integration-test.yml index 8c37dc9d28a..29db0ee9880 100644 --- a/.github/workflows/backend-integration-test.yml +++ b/.github/workflows/backend-integration-test.yml @@ -96,7 +96,7 @@ jobs: - name: Backend Integration Test id: integrationTest run: | - ./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs -P${{ matrix.backend }} -PskipPyClientITs + ./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs -P${{ matrix.backend }} -PskipPyClientITs -PskipFlinkITs - name: Upload integrate tests reports uses: actions/upload-artifact@v3 diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f339bb5bbac..32ca9a1adc7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -41,6 +41,7 @@ jobs: - server/** - server-common/** - spark-connector/** + - flink-connector/** - trino-connector/** - web/** - docs/open-api/** diff --git a/.github/workflows/flink-integration-test.yml b/.github/workflows/flink-integration-test.yml new file mode 100644 index 00000000000..818618eebbc --- /dev/null +++ b/.github/workflows/flink-integration-test.yml @@ -0,0 +1,108 @@ +name: Flink Integration Test + +# Controls when the workflow will run +on: + # Triggers the workflow on push or pull request events but only for the "main" branch + push: + branches: [ "main", "branch-*" ] + pull_request: + branches: [ "main", "branch-*" ] + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + changes: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: dorny/paths-filter@v2 + id: filter + with: + filters: | + source_changes: + - .github/** + - api/** + - bin/** + - catalogs/** + - clients/client-java/** + - clients/client-java-runtime/** + - clients/filesystem-hadoop3/** + - clients/filesystem-hadoop3-runtime/** + - common/** + - conf/** + - core/** + - dev/** + - gradle/** + - meta/** + - server/** + - server-common/** + - flink-connector/** + - docs/open-api/** + - build.gradle.kts + - gradle.properties + - gradlew + - setting.gradle.kts + outputs: + source_changes: ${{ steps.filter.outputs.source_changes }} + + # Integration test for AMD64 architecture + test-amd64-arch: + needs: changes + if: needs.changes.outputs.source_changes == 'true' + runs-on: ubuntu-latest + timeout-minutes: 30 + strategy: + matrix: + architecture: [linux/amd64] + java-version: [ 8, 11, 17 ] + env: + PLATFORM: ${{ matrix.architecture }} + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-java@v3 + with: + java-version: ${{ matrix.java-version }} + distribution: 'temurin' + + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + + - name: Check required command + run: | + dev/ci/check_commands.sh + + - name: Package Gravitino + run: | + ./gradlew build -x test -PjdkVersion=${{ matrix.java-version }} + ./gradlew compileDistribution -x test -PjdkVersion=${{ matrix.java-version }} + + - name: Setup debug Github Action + if: ${{ contains(github.event.pull_request.labels.*.name, 'debug action') }} + uses: csexton/debugger-action@master + + - name: Free up disk space + run: | + dev/ci/util_free_space.sh + + - name: Flink Integration Test + id: integrationTest + run: | + ./gradlew --rerun-tasks -PskipTests -PtestMode=embedded -PjdkVersion=${{ matrix.java-version }} :flink-connector:test --tests "com.datastrato.gravitino.flink.connector.integration.test.**" + ./gradlew --rerun-tasks -PskipTests -PtestMode=deploy -PjdkVersion=${{ matrix.java-version }} :flink-connector:test --tests "com.datastrato.gravitino.flink.connector.integration.test.**" + + - name: Upload integrate tests reports + uses: actions/upload-artifact@v3 + if: ${{ (failure() && steps.integrationTest.outcome == 'failure') || contains(github.event.pull_request.labels.*.name, 'upload log') }} + with: + name: flink-connector-integrate-test-reports-${{ matrix.java-version }} + path: | + build/reports + flink-connector/build/flink-connector-integration-test.log + flink-connector/build/*.tar + distribution/package/logs/gravitino-server.out + distribution/package/logs/gravitino-server.log + catalogs/**/*.log + catalogs/**/*.tar \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index de2aecbef87..2aa23b51604 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -618,7 +618,7 @@ tasks { subprojects.forEach() { if (!it.name.startsWith("catalog") && !it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark-connector") && it.name != "trino-connector" && - it.name != "integration-test" && it.name != "bundled-catalog" + it.name != "integration-test" && it.name != "bundled-catalog" && it.name != "flink-connector" ) { from(it.configurations.runtimeClasspath) into("distribution/package/libs") @@ -634,7 +634,8 @@ tasks { !it.name.startsWith("spark-connector") && it.name != "trino-connector" && it.name != "integration-test" && - it.name != "bundled-catalog" + it.name != "bundled-catalog" && + it.name != "flink-connector" ) { dependsOn("${it.name}:build") from("${it.name}/build/libs") diff --git a/flink-connector/build.gradle.kts b/flink-connector/build.gradle.kts new file mode 100644 index 00000000000..28cb0cb9e93 --- /dev/null +++ b/flink-connector/build.gradle.kts @@ -0,0 +1,131 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +plugins { + `maven-publish` + id("java") + id("idea") +} + +repositories { + mavenCentral() +} + +val flinkVersion: String = libs.versions.flink.get() +val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() + +dependencies { + implementation(project(":api")) + implementation(project(":common")) + implementation(project(":core")) + implementation(project(":clients:client-java")) + implementation(project(":catalogs:bundled-catalog", configuration = "shadow")) + + implementation(libs.bundles.log4j) + implementation(libs.commons.lang3) + implementation(libs.guava) + implementation(libs.httpclient5) + implementation(libs.jackson.databind) + implementation(libs.jackson.annotations) + implementation(libs.jackson.datatype.jdk8) + implementation(libs.jackson.datatype.jsr310) + + implementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") + implementation("org.apache.flink:flink-table-common:$flinkVersion") + implementation("org.apache.flink:flink-table-api-java:$flinkVersion") + + implementation(libs.hive2.exec) { + artifact { + classifier = "core" + } + exclude("com.fasterxml.jackson.core") + exclude("com.google.code.findbugs", "jsr305") + exclude("com.google.protobuf") + exclude("org.apache.avro") + exclude("org.apache.calcite") + exclude("org.apache.calcite.avatica") + exclude("org.apache.curator") + exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager") + exclude("org.apache.logging.log4j") + exclude("org.apache.zookeeper") + exclude("org.eclipse.jetty.aggregate", "jetty-all") + exclude("org.eclipse.jetty.orbit", "javax.servlet") + exclude("org.openjdk.jol") + exclude("org.pentaho") + exclude("org.slf4j") + } + + testAnnotationProcessor(libs.lombok) + + testCompileOnly(libs.lombok) + testImplementation(project(":integration-test-common", "testArtifacts")) + testImplementation(project(":server")) + testImplementation(project(":server-common")) + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.junit.jupiter.params) + testImplementation(libs.mockito.core) + testImplementation(libs.sqlite.jdbc) + testImplementation(libs.testcontainers) + testImplementation(libs.testcontainers.junit.jupiter) + testImplementation(libs.testcontainers.mysql) + + testImplementation(libs.hadoop2.common) { + exclude("*") + } + testImplementation(libs.hadoop2.mapreduce.client.core) { + exclude("*") + } + testImplementation(libs.hive2.common) { + exclude("org.eclipse.jetty.aggregate", "jetty-all") + exclude("org.eclipse.jetty.orbit", "javax.servlet") + } + testImplementation(libs.hive2.metastore) { + exclude("co.cask.tephra") + exclude("com.github.joshelser") + exclude("com.google.code.findbugs", "jsr305") + exclude("com.google.code.findbugs", "sr305") + exclude("com.tdunning", "json") + exclude("com.zaxxer", "HikariCP") + exclude("io.dropwizard.metricss") + exclude("javax.transaction", "transaction-api") + exclude("org.apache.avro") + exclude("org.apache.curator") + exclude("org.apache.hbase") + exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager") + exclude("org.apache.logging.log4j") + exclude("org.apache.parquet", "parquet-hadoop-bundle") + exclude("org.apache.zookeeper") + exclude("org.eclipse.jetty.aggregate", "jetty-all") + exclude("org.eclipse.jetty.orbit", "javax.servlet") + exclude("org.slf4j") + } + testImplementation("org.apache.flink:flink-table-api-bridge-base:$flinkVersion") { + exclude("commons-cli", "commons-cli") + exclude("commons-io", "commons-io") + exclude("com.google.code.findbugs", "jsr305") + } + testImplementation("org.apache.flink:flink-table-planner_$scalaVersion:$flinkVersion") + + testRuntimeOnly(libs.junit.jupiter.engine) +} + +tasks.test { + val skipUTs = project.hasProperty("skipTests") + if (skipUTs) { + // Only run integration tests + include("**/integration/**") + } + + val skipITs = project.hasProperty("skipITs") + val skipFlinkITs = project.hasProperty("skipFlinkITs") + if (skipITs || skipFlinkITs) { + // Exclude integration tests + exclude("**/integration/**") + } else { + dependsOn(tasks.jar) + + val init = project.extra.get("initIntegrationTest") as (Test) -> Unit + init(this) + } +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java new file mode 100644 index 00000000000..7d1822039bb --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.flink.connector; + +import java.util.Map; +import org.apache.flink.configuration.Configuration; + +/** + * PropertiesConverter is used to convert properties between Flink properties and Gravitino + * properties + */ +public interface PropertiesConverter { + + String FLINK_PROPERTY_PREFIX = "flink.bypass."; + + /** + * Converts properties from application provided properties and Flink connector properties to + * Gravitino properties. + * + * @param flinkConf The configuration provided by Flink. + * @return properties for the Gravitino connector. + */ + default Map toGravitinoCatalogProperties(Configuration flinkConf) { + return flinkConf.toMap(); + } + + /** + * Converts properties from Gravitino properties to Flink connector properties. + * + * @param gravitinoProperties The properties provided by Gravitino. + * @return properties for the Flink connector. + */ + default Map toFlinkCatalogProperties(Map gravitinoProperties) { + return gravitinoProperties; + } +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java new file mode 100644 index 00000000000..51e08c396d7 --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java @@ -0,0 +1,281 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.flink.connector.catalog; + +import java.util.List; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; + +/** + * The BaseCatalog that provides a default implementation for all methods in the {@link + * org.apache.flink.table.catalog.Catalog} interface. + */ +public abstract class BaseCatalog extends AbstractCatalog { + protected BaseCatalog(String catalogName, String defaultDatabase) { + super(catalogName, defaultDatabase); + } + + @Override + public void open() throws CatalogException {} + + @Override + public void close() throws CatalogException {} + + @Override + public List listDatabases() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogDatabase getDatabase(String s) throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean databaseExists(String s) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createDatabase(String s, CatalogDatabase catalogDatabase, boolean b) + throws DatabaseAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropDatabase(String s, boolean b, boolean b1) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterDatabase(String s, CatalogDatabase catalogDatabase, boolean b) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listTables(String s) throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listViews(String s) throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogBaseTable getTable(ObjectPath objectPath) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tableExists(ObjectPath objectPath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropTable(ObjectPath objectPath, boolean b) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void renameTable(ObjectPath objectPath, String s, boolean b) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listPartitions(ObjectPath objectPath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listPartitions( + ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, + CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listPartitionsByFilter( + ObjectPath objectPath, List list) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogPartition getPartition( + ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean partitionExists(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) + throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createPartition( + ObjectPath objectPath, + CatalogPartitionSpec catalogPartitionSpec, + CatalogPartition catalogPartition, + boolean b) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, + PartitionAlreadyExistsException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropPartition( + ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, boolean b) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition( + ObjectPath objectPath, + CatalogPartitionSpec catalogPartitionSpec, + CatalogPartition catalogPartition, + boolean b) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listFunctions(String s) throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogFunction getFunction(ObjectPath objectPath) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean functionExists(ObjectPath objectPath) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createFunction(ObjectPath objectPath, CatalogFunction catalogFunction, boolean b) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterFunction(ObjectPath objectPath, CatalogFunction catalogFunction, boolean b) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropFunction(ObjectPath objectPath, boolean b) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath objectPath) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath objectPath) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableStatistics( + ObjectPath objectPath, CatalogTableStatistics catalogTableStatistics, boolean b) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableColumnStatistics( + ObjectPath objectPath, CatalogColumnStatistics catalogColumnStatistics, boolean b) + throws TableNotExistException, CatalogException, TablePartitionedException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionStatistics( + ObjectPath objectPath, + CatalogPartitionSpec catalogPartitionSpec, + CatalogTableStatistics catalogTableStatistics, + boolean b) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionColumnStatistics( + ObjectPath objectPath, + CatalogPartitionSpec catalogPartitionSpec, + CatalogColumnStatistics catalogColumnStatistics, + boolean b) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/GravitinoCatalogManager.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/GravitinoCatalogManager.java new file mode 100644 index 00000000000..ed1cb527cc8 --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/GravitinoCatalogManager.java @@ -0,0 +1,148 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.flink.connector.catalog; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.client.GravitinoAdminClient; +import com.datastrato.gravitino.client.GravitinoMetalake; +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** GravitinoCatalogManager is used to retrieve catalogs from Gravitino server. */ +public class GravitinoCatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(GravitinoCatalogManager.class); + private static GravitinoCatalogManager gravitinoCatalogManager; + + private volatile boolean isClosed = false; + private final String metalakeName; + private final GravitinoMetalake metalake; + private final GravitinoAdminClient gravitinoClient; + + private GravitinoCatalogManager(String gravitinoUri, String metalakeName) { + this.metalakeName = metalakeName; + this.gravitinoClient = GravitinoAdminClient.builder(gravitinoUri).build(); + this.metalake = gravitinoClient.loadMetalake(metalakeName); + } + + /** + * Create GravitinoCatalogManager with Gravitino server uri and metalake name. + * + * @param gravitinoUri Gravitino server uri + * @param metalakeName Metalake name + * @return GravitinoCatalogManager + */ + public static GravitinoCatalogManager create(String gravitinoUri, String metalakeName) { + Preconditions.checkState( + gravitinoCatalogManager == null, "Should not create duplicate GravitinoCatalogManager"); + gravitinoCatalogManager = new GravitinoCatalogManager(gravitinoUri, metalakeName); + return gravitinoCatalogManager; + } + + /** + * Get GravitinoCatalogManager instance. + * + * @return GravitinoCatalogManager + */ + public static GravitinoCatalogManager get() { + Preconditions.checkState( + gravitinoCatalogManager != null, "GravitinoCatalogManager has not created yet"); + Preconditions.checkState( + !gravitinoCatalogManager.isClosed, "GravitinoCatalogManager is already closed"); + return gravitinoCatalogManager; + } + + /** + * Close GravitinoCatalogManager. + * + *

After close, GravitinoCatalogManager can not be used anymore. + */ + public void close() { + Preconditions.checkState(!isClosed, "Gravitino Catalog is already closed"); + isClosed = true; + gravitinoClient.close(); + } + + /** + * Get GravitinoCatalog by name. + * + * @param name Catalog name + * @return The Gravitino Catalog + */ + public Catalog getGravitinoCatalogInfo(String name) { + Catalog catalog = metalake.loadCatalog(name); + Preconditions.checkArgument( + Catalog.Type.RELATIONAL.equals(catalog.type()), "Only support relational catalog"); + LOG.info("Load catalog {} from Gravitino successfully.", name); + return catalog; + } + + /** + * Get the metalake. + * + * @return the metalake name. + */ + public String getMetalakeName() { + return metalakeName; + } + + /** + * Create catalog in Gravitino. + * + * @param catalogName Catalog name + * @param type Catalog type + * @param comment Catalog comment + * @param provider Catalog provider + * @param properties Catalog properties + * @return Catalog + */ + public Catalog createCatalog( + String catalogName, + Catalog.Type type, + String comment, + String provider, + Map properties) { + return metalake.createCatalog(catalogName, type, provider, comment, properties); + } + + /** + * Drop catalog in Gravitino. + * + * @param catalogName Catalog name + * @return boolean + */ + public boolean dropCatalog(String catalogName) { + return metalake.dropCatalog(catalogName); + } + + /** + * List catalogs in Gravitino. + * + * @return Set of catalog names + */ + public Set listCatalogs() { + NameIdentifier[] catalogNames = metalake.listCatalogs(); + LOG.info( + "Load metalake {}'s catalogs. catalogs: {}.", + metalake.name(), + Arrays.toString(catalogNames)); + return Arrays.stream(catalogNames).map(NameIdentifier::name).collect(Collectors.toSet()); + } + + /** + * Check if catalog exists in Gravitino. + * + * @param catalogName Catalog name + * @return boolean + */ + public boolean contains(String catalogName) { + return metalake.catalogExists(catalogName); + } +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java new file mode 100644 index 00000000000..26ae6ded871 --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java @@ -0,0 +1,39 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.flink.connector.hive; + +import com.datastrato.gravitino.flink.connector.catalog.BaseCatalog; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.factories.Factory; +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * The GravitinoHiveCatalog class is a implementation of the BaseCatalog class that is used to proxy + * the HiveCatalog class. + */ +public class GravitinoHiveCatalog extends BaseCatalog { + + private HiveCatalog hiveCatalog; + + GravitinoHiveCatalog( + String catalogName, + String defaultDatabase, + @Nullable HiveConf hiveConf, + @Nullable String hiveVersion) { + super(catalogName, defaultDatabase); + this.hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConf, hiveVersion); + } + + public HiveConf getHiveConf() { + return hiveCatalog.getHiveConf(); + } + + @Override + public Optional getFactory() { + return hiveCatalog.getFactory(); + } +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java new file mode 100644 index 00000000000..7d7adde40d7 --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java @@ -0,0 +1,70 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.flink.connector.hive; + +import static com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions.IDENTIFIER; + +import com.datastrato.gravitino.flink.connector.utils.FactoryUtils; +import com.datastrato.gravitino.flink.connector.utils.PropertyUtils; +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * Factory for creating instances of {@link GravitinoHiveCatalog}. It will be created by SPI + * discovery in Flink. + */ +public class GravitinoHiveCatalogFactory implements CatalogFactory { + private HiveCatalogFactory hiveCatalogFactory; + + @Override + public Catalog createCatalog(Context context) { + this.hiveCatalogFactory = new HiveCatalogFactory(); + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtils.createCatalogFactoryHelper(this, context); + helper.validateExcept( + PropertyUtils.HIVE_PREFIX, + PropertyUtils.HADOOP_PREFIX, + PropertyUtils.DFS_PREFIX, + PropertyUtils.FS_PREFIX); + + String hiveConfDir = helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_CONF_DIR); + String hadoopConfDir = helper.getOptions().get(HiveCatalogFactoryOptions.HADOOP_CONF_DIR); + HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir); + // Put the hadoop properties managed by Gravitino into the hiveConf + PropertyUtils.getHadoopAndHiveProperties(context.getOptions()).forEach(hiveConf::set); + return new GravitinoHiveCatalog( + context.getName(), + helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE), + hiveConf, + helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION)); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return ImmutableSet.>builder() + .addAll(hiveCatalogFactory.requiredOptions()) + .add(GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS) + .build(); + } + + @Override + public Set> optionalOptions() { + return hiveCatalogFactory.optionalOptions(); + } +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactoryOptions.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactoryOptions.java new file mode 100644 index 00000000000..9dc451b10f0 --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalogFactoryOptions.java @@ -0,0 +1,22 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.flink.connector.hive; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.hadoop.hive.conf.HiveConf; + +public class GravitinoHiveCatalogFactoryOptions { + + /** Identifier for the {@link GravitinoHiveCatalog}. */ + public static final String IDENTIFIER = "gravitino-hive"; + + public static final ConfigOption HIVE_METASTORE_URIS = + ConfigOptions.key(HiveConf.ConfVars.METASTOREURIS.varname) + .stringType() + .noDefaultValue() + .withDescription( + "The Hive metastore URIs, it is higher priority than hive.metastore.uris in hive-site.xml"); +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java new file mode 100644 index 00000000000..f0c6b12facf --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java @@ -0,0 +1,64 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.flink.connector.hive; + +import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS; + +import com.datastrato.gravitino.flink.connector.PropertiesConverter; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.hadoop.hive.conf.HiveConf; + +public class HivePropertiesConverter implements PropertiesConverter { + + private HivePropertiesConverter() {} + + public static final HivePropertiesConverter INSTANCE = new HivePropertiesConverter(); + + private static final Map HIVE_CONFIG_TO_GRAVITINO = + ImmutableMap.of(HiveConf.ConfVars.METASTOREURIS.varname, METASTORE_URIS); + private static final Map GRAVITINO_CONFIG_TO_HIVE = + ImmutableMap.of(METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname); + + @Override + public Map toGravitinoCatalogProperties(Configuration flinkConf) { + Map gravitinoProperties = Maps.newHashMap(); + + for (Map.Entry entry : flinkConf.toMap().entrySet()) { + String hiveKey = HIVE_CONFIG_TO_GRAVITINO.get(entry.getKey()); + if (hiveKey != null) { + gravitinoProperties.put(hiveKey, entry.getValue()); + } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) { + gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue()); + } else { + gravitinoProperties.put(entry.getKey(), entry.getValue()); + } + } + + return gravitinoProperties; + } + + @Override + public Map toFlinkCatalogProperties(Map gravitinoProperties) { + Map flinkCatalogProperties = Maps.newHashMap(); + flinkCatalogProperties.put( + CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoHiveCatalogFactoryOptions.IDENTIFIER); + + gravitinoProperties.forEach( + (key, value) -> { + String flinkConfigKey = key; + if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) { + flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length()); + } + flinkCatalogProperties.put( + GRAVITINO_CONFIG_TO_HIVE.getOrDefault(flinkConfigKey, flinkConfigKey), value); + }); + return flinkCatalogProperties; + } +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStore.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStore.java new file mode 100644 index 00000000000..e5c1795e145 --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStore.java @@ -0,0 +1,124 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.flink.connector.store; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.flink.connector.PropertiesConverter; +import com.datastrato.gravitino.flink.connector.catalog.GravitinoCatalogManager; +import com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions; +import com.datastrato.gravitino.flink.connector.hive.HivePropertiesConverter; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.AbstractCatalogStore; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** GravitinoCatalogStore is used to store catalog information to Gravitino server. */ +public class GravitinoCatalogStore extends AbstractCatalogStore { + private static final Logger LOG = LoggerFactory.getLogger(GravitinoCatalogStore.class); + private final GravitinoCatalogManager gravitinoCatalogManager; + + public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) { + this.gravitinoCatalogManager = catalogManager; + } + + @Override + public void storeCatalog(String catalogName, CatalogDescriptor descriptor) + throws CatalogException { + Configuration configuration = descriptor.getConfiguration(); + String provider = getGravitinoCatalogProvider(configuration); + Catalog.Type type = getGravitinoCatalogType(configuration); + Map gravitinoProperties = + getPropertiesConverter(provider).toGravitinoCatalogProperties(configuration); + gravitinoCatalogManager.createCatalog(catalogName, type, null, provider, gravitinoProperties); + } + + @Override + public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException { + try { + gravitinoCatalogManager.dropCatalog(catalogName); + } catch (Exception e) { + throw new CatalogException(String.format("Failed to remove the catalog: %s", catalogName), e); + } + } + + @Override + public Optional getCatalog(String catalogName) throws CatalogException { + try { + Catalog catalog = gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName); + String provider = catalog.provider(); + PropertiesConverter propertiesConverter = getPropertiesConverter(provider); + Map flinkCatalogProperties = + propertiesConverter.toFlinkCatalogProperties(catalog.properties()); + CatalogDescriptor descriptor = + CatalogDescriptor.of(catalogName, Configuration.fromMap(flinkCatalogProperties)); + return Optional.of(descriptor); + } catch (Exception e) { + LOG.warn("Failed to get the catalog:{}", catalogName, e); + return Optional.empty(); + } + } + + @Override + public Set listCatalogs() throws CatalogException { + try { + return gravitinoCatalogManager.listCatalogs(); + } catch (Exception e) { + throw new CatalogException("Failed to list catalog.", e); + } + } + + @Override + public boolean contains(String catalogName) throws CatalogException { + return gravitinoCatalogManager.contains(catalogName); + } + + private String getGravitinoCatalogProvider(Configuration configuration) { + String catalogType = + Preconditions.checkNotNull( + configuration.get(CommonCatalogOptions.CATALOG_TYPE), + "%s should not be null.", + CommonCatalogOptions.CATALOG_TYPE); + + switch (catalogType) { + case GravitinoHiveCatalogFactoryOptions.IDENTIFIER: + return "hive"; + default: + throw new IllegalArgumentException( + String.format("The catalog type is not supported:%s", catalogType)); + } + } + + private Catalog.Type getGravitinoCatalogType(Configuration configuration) { + String catalogType = + Preconditions.checkNotNull( + configuration.get(CommonCatalogOptions.CATALOG_TYPE), + "%s should not be null.", + CommonCatalogOptions.CATALOG_TYPE); + + switch (catalogType) { + case GravitinoHiveCatalogFactoryOptions.IDENTIFIER: + return Catalog.Type.RELATIONAL; + default: + throw new IllegalArgumentException( + String.format("The catalog type is not supported:%s", catalogType)); + } + } + + private PropertiesConverter getPropertiesConverter(String provider) { + switch (provider) { + case "hive": + return HivePropertiesConverter.INSTANCE; + } + throw new IllegalArgumentException("The provider is not supported:" + provider); + } +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java new file mode 100644 index 00000000000..c33c22f40f5 --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java @@ -0,0 +1,71 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.flink.connector.store; + +import static com.datastrato.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO; +import static com.datastrato.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_METALAKE; +import static com.datastrato.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_URI; +import static org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper; + +import com.datastrato.gravitino.flink.connector.catalog.GravitinoCatalogManager; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import java.util.Collections; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.CatalogStoreFactory; +import org.apache.flink.table.factories.FactoryUtil; + +/** The Factory for creating {@link GravitinoCatalogStore}. */ +public class GravitinoCatalogStoreFactory implements CatalogStoreFactory { + private GravitinoCatalogManager catalogManager; + + @Override + public CatalogStore createCatalogStore() { + return new GravitinoCatalogStore(catalogManager); + } + + @Override + public void open(Context context) throws CatalogException { + FactoryUtil.FactoryHelper factoryHelper = + createCatalogStoreFactoryHelper(this, context); + factoryHelper.validate(); + + ReadableConfig options = factoryHelper.getOptions(); + String gravitinoUri = + Preconditions.checkNotNull( + options.get(GRAVITINO_URI), "The %s must be set.", GRAVITINO_URI.key()); + String gravitinoName = + Preconditions.checkNotNull( + options.get(GRAVITINO_METALAKE), "The %s must be set.", GRAVITINO_METALAKE.key()); + this.catalogManager = GravitinoCatalogManager.create(gravitinoUri, gravitinoName); + } + + @Override + public void close() throws CatalogException { + if (catalogManager != null) { + catalogManager.close(); + } + } + + @Override + public String factoryIdentifier() { + return GRAVITINO; + } + + @Override + public Set> requiredOptions() { + return ImmutableSet.of(GRAVITINO_METALAKE, GRAVITINO_URI); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java new file mode 100644 index 00000000000..45af304169f --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.flink.connector.store; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class GravitinoCatalogStoreFactoryOptions { + + private GravitinoCatalogStoreFactoryOptions() {} + + public static final String GRAVITINO = "gravitino"; + + public static final ConfigOption GRAVITINO_URI = + ConfigOptions.key("gravitino.uri") + .stringType() + .noDefaultValue() + .withDescription("The uri of Gravitino server"); + public static final ConfigOption GRAVITINO_METALAKE = + ConfigOptions.key("gravitino.metalake") + .stringType() + .noDefaultValue() + .withDescription("The name of Gravitino metalake"); +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/FactoryUtils.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/FactoryUtils.java new file mode 100644 index 00000000000..610005a4d3e --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/FactoryUtils.java @@ -0,0 +1,91 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.flink.connector.utils; + +import static org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions; +import static org.apache.flink.table.factories.FactoryUtil.validateWatermarkOptions; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FactoryUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FactoryUtils { + + private FactoryUtils() {} + + private static final Logger LOG = LoggerFactory.getLogger(FactoryUtils.class); + + /** + * Utility for working with {@link Factory}s. The {@link GravitinoCatalogFactoryHelper} override + * the {@link FactoryUtil.CatalogFactoryHelper#validate()} method to validate the options. For the + * unconsumed option keys, it logs a warning instead of throwing an exception. + */ + public static class GravitinoCatalogFactoryHelper extends FactoryUtil.CatalogFactoryHelper { + + private GravitinoCatalogFactoryHelper( + CatalogFactory catalogFactory, CatalogFactory.Context context) { + super(catalogFactory, context); + } + + @Override + public void validate() { + validateFactoryOptions(factory, allOptions); + ignoreUnconsumedKeys( + factory.factoryIdentifier(), + allOptions.keySet(), + consumedOptionKeys, + deprecatedOptionKeys); + validateWatermarkOptions(factory.factoryIdentifier(), allOptions); + } + } + + /** + * Validates unconsumed option keys. Logs a warning for each unconsumed option key instead of + * throwing an exception. + */ + private static void ignoreUnconsumedKeys( + String factoryIdentifier, + Set allOptionKeys, + Set consumedOptionKeys, + Set deprecatedOptionKeys) { + final Set remainingOptionKeys = new HashSet<>(allOptionKeys); + remainingOptionKeys.removeAll(consumedOptionKeys); + if (!remainingOptionKeys.isEmpty()) { + LOG.warn( + "Unsupported options found for '{}'.\n\n" + + "Unsupported options that will be ignored:\n\n" + + "{}\n\n" + + "Supported options:\n\n" + + "{}", + factoryIdentifier, + remainingOptionKeys.stream().sorted().collect(Collectors.joining("\n")), + consumedOptionKeys.stream() + .map( + k -> { + if (deprecatedOptionKeys.contains(k)) { + return String.format("%s (deprecated)", k); + } + return k; + }) + .sorted() + .collect(Collectors.joining("\n"))); + } + } + + /** + * Creates a utility that helps validating options for a {@link CatalogFactory}. + * + *

Note: This utility checks for left-over options in the final step. + */ + public static FactoryUtil.CatalogFactoryHelper createCatalogFactoryHelper( + CatalogFactory factory, CatalogFactory.Context context) { + return new FactoryUtils.GravitinoCatalogFactoryHelper(factory, context); + } +} diff --git a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/PropertyUtils.java b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/PropertyUtils.java new file mode 100644 index 00000000000..f6269112046 --- /dev/null +++ b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/PropertyUtils.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.flink.connector.utils; + +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; + +public class PropertyUtils { + + public static final String HIVE_PREFIX = "hive."; + public static final String HADOOP_PREFIX = "hadoop."; + public static final String FS_PREFIX = "hive."; + public static final String DFS_PREFIX = "dfs."; + + public static Map getHadoopAndHiveProperties(Map properties) { + if (properties == null) { + return Collections.emptyMap(); + } + + return properties.entrySet().stream() + .filter( + entry -> + entry.getKey().startsWith(HADOOP_PREFIX) + || entry.getKey().startsWith(FS_PREFIX) + || entry.getKey().startsWith(DFS_PREFIX) + || entry.getKey().startsWith(HIVE_PREFIX)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } +} diff --git a/flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 00000000000..d5cf8980797 --- /dev/null +++ b/flink-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,7 @@ +# +# Copyright 2024 Datastrato Pvt Ltd. +# This software is licensed under the Apache License version 2. +# + +com.datastrato.gravitino.flink.connector.store.GravitinoCatalogStoreFactory +com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory \ No newline at end of file diff --git a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java new file mode 100644 index 00000000000..e9497c341e7 --- /dev/null +++ b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java @@ -0,0 +1,57 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.flink.connector.hive; + +import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestHivePropertiesConverter { + + private static final HivePropertiesConverter CONVERTER = HivePropertiesConverter.INSTANCE; + + @Test + public void testToGravitinoCatalogProperties() { + Configuration configuration = + Configuration.fromMap( + ImmutableMap.of( + "hive-conf-dir", + "src/test/resources/flink-tests", + "flink.bypass.key", + "value", + HiveConf.ConfVars.METASTOREURIS.varname, + "thrift://127.0.0.1:9084")); + Map properties = CONVERTER.toGravitinoCatalogProperties(configuration); + Assertions.assertEquals(3, properties.size()); + Assertions.assertEquals( + "src/test/resources/flink-tests", + properties.get("flink.bypass.hive-conf-dir"), + "This will add the prefix"); + Assertions.assertEquals( + "value", properties.get("flink.bypass.key"), "The prefix have already existed"); + Assertions.assertEquals( + "thrift://127.0.0.1:9084", + properties.get(METASTORE_URIS), + "The key is converted to Gravitino Config"); + } + + @Test + public void testToFlinkCatalogProperties() { + Map catalogProperties = + ImmutableMap.of("flink.bypass.key", "value", "metastore.uris", "thrift://xxx"); + Map flinkCatalogProperties = + CONVERTER.toFlinkCatalogProperties(catalogProperties); + Assertions.assertEquals(3, flinkCatalogProperties.size()); + Assertions.assertEquals("value", flinkCatalogProperties.get("key")); + Assertions.assertEquals( + GravitinoHiveCatalogFactoryOptions.IDENTIFIER, flinkCatalogProperties.get("type")); + Assertions.assertEquals("thrift://xxx", flinkCatalogProperties.get("hive.metastore.uris")); + } +} diff --git a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java new file mode 100644 index 00000000000..6524e2eb5fc --- /dev/null +++ b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -0,0 +1,62 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.flink.connector.integration.test; + +import com.datastrato.gravitino.client.GravitinoMetalake; +import com.datastrato.gravitino.flink.connector.PropertiesConverter; +import com.datastrato.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions; +import com.datastrato.gravitino.integration.test.util.AbstractIT; +import java.util.Collections; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableEnvironment; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class FlinkEnvIT extends AbstractIT { + private static final Logger LOG = LoggerFactory.getLogger(FlinkEnvIT.class); + protected static final String gravitinoMetalake = "flink"; + + protected static GravitinoMetalake metalake; + protected static TableEnvironment tableEnv; + + private static String gravitinoUri = "http://127.0.0.1:8090"; + + @BeforeAll + static void startUp() { + // Start Gravitino server + initGravitinoEnv(); + initMetalake(); + initFlinkEnv(); + LOG.info("Startup Flink env successfully, Gravitino uri: {}.", gravitinoUri); + } + + @AfterAll + static void stop() {} + + protected String flinkByPass(String key) { + return PropertiesConverter.FLINK_PROPERTY_PREFIX + key; + } + + private static void initGravitinoEnv() { + // Gravitino server is already started by AbstractIT, just construct gravitinoUrl + int gravitinoPort = getGravitinoServerPort(); + gravitinoUri = String.format("http://127.0.0.1:%d", gravitinoPort); + } + + private static void initMetalake() { + metalake = client.createMetalake(gravitinoMetalake, "", Collections.emptyMap()); + } + + private static void initFlinkEnv() { + final Configuration configuration = new Configuration(); + configuration.setString( + "table.catalog-store.kind", GravitinoCatalogStoreFactoryOptions.GRAVITINO); + configuration.setString("table.catalog-store.gravitino.gravitino.metalake", gravitinoMetalake); + configuration.setString("table.catalog-store.gravitino.gravitino.uri", gravitinoUri); + tableEnv = TableEnvironment.create(configuration); + } +} diff --git a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java new file mode 100644 index 00000000000..ffb7b9481b3 --- /dev/null +++ b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -0,0 +1,238 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.flink.connector.integration.test.hive; + +import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS; + +import com.datastrato.gravitino.flink.connector.PropertiesConverter; +import com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalog; +import com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions; +import com.datastrato.gravitino.flink.connector.integration.test.FlinkEnvIT; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class FlinkHiveCatalogIT extends FlinkEnvIT { + + private static final String DEFAULT_CATALOG = "default_catalog"; + + @Test + public void testCreateGravitinoHiveCatalog() { + tableEnv.useCatalog(DEFAULT_CATALOG); + + // Create a new catalog. + String catalogName = "gravitino_hive"; + Configuration configuration = new Configuration(); + configuration.set( + CommonCatalogOptions.CATALOG_TYPE, GravitinoHiveCatalogFactoryOptions.IDENTIFIER); + configuration.set(HiveCatalogFactoryOptions.HIVE_CONF_DIR, "src/test/resources/flink-tests"); + configuration.set( + GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS, "thrift://127.0.0.1:9084"); + CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName, configuration); + tableEnv.createCatalog(catalogName, catalogDescriptor); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + // Check the catalog properties. + com.datastrato.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Map properties = gravitinoCatalog.properties(); + Assertions.assertEquals("thrift://127.0.0.1:9084", properties.get(METASTORE_URIS)); + Map flinkProperties = + gravitinoCatalog.properties().entrySet().stream() + .filter(e -> e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Assertions.assertEquals(2, flinkProperties.size()); + Assertions.assertEquals( + "src/test/resources/flink-tests", + flinkProperties.get(flinkByPass(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key()))); + Assertions.assertEquals( + GravitinoHiveCatalogFactoryOptions.IDENTIFIER, + flinkProperties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key()))); + + // Get the created catalog. + Optional catalog = tableEnv.getCatalog(catalogName); + Assertions.assertTrue(catalog.isPresent()); + Assertions.assertInstanceOf(GravitinoHiveCatalog.class, catalog.get()); + + // List catalogs. + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(2, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should create the correct catalog."); + + Assertions.assertEquals( + DEFAULT_CATALOG, + tableEnv.getCurrentCatalog(), + "Current catalog should be default_catalog in flink"); + + // Change the current catalog to the new created catalog. + tableEnv.useCatalog(catalogName); + Assertions.assertEquals( + catalogName, + tableEnv.getCurrentCatalog(), + "Current catalog should be the one that is created just now."); + + // Drop the catalog. Only support drop catalog by SQL. + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("drop catalog " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + + Optional droppedCatalog = tableEnv.getCatalog(catalogName); + Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); + } + + @Test + public void testCreateGravitinoHiveCatalogUsingSQL() { + tableEnv.useCatalog(DEFAULT_CATALOG); + + // Create a new catalog. + String catalogName = "gravitino_hive_sql"; + tableEnv.executeSql( + String.format( + "create catalog %s with (" + + "'type'='gravitino-hive', " + + "'hive-conf-dir'='src/test/resources/flink-tests'," + + "'hive.metastore.uris'='thrift://127.0.0.1:9084'," + + "'unknown.key'='unknown.value'" + + ")", + catalogName)); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + // Check the properties of the created catalog. + com.datastrato.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Map properties = gravitinoCatalog.properties(); + Assertions.assertEquals("thrift://127.0.0.1:9084", properties.get(METASTORE_URIS)); + Map flinkProperties = + properties.entrySet().stream() + .filter(e -> e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Assertions.assertEquals(3, flinkProperties.size()); + Assertions.assertEquals( + "src/test/resources/flink-tests", + flinkProperties.get(flinkByPass(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key()))); + Assertions.assertEquals( + GravitinoHiveCatalogFactoryOptions.IDENTIFIER, + flinkProperties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key()))); + Assertions.assertEquals( + "unknown.value", + flinkProperties.get(flinkByPass("unknown.key")), + "The unknown.key will not cause failure and will be saved in Gravitino."); + + // Get the created catalog. + Optional catalog = tableEnv.getCatalog(catalogName); + Assertions.assertTrue(catalog.isPresent()); + Assertions.assertInstanceOf(GravitinoHiveCatalog.class, catalog.get()); + + // List catalogs. + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(2, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue( + Arrays.asList(catalogs).contains(catalogName), "Should create the correct catalog."); + + // Use SQL to list catalogs. + TableResult result = tableEnv.executeSql("show catalogs"); + Assertions.assertEquals( + 2, Lists.newArrayList(result.collect()).size(), "Should have 2 catalogs"); + + Assertions.assertEquals( + DEFAULT_CATALOG, + tableEnv.getCurrentCatalog(), + "Current catalog should be default_catalog in flink"); + + // Change the current catalog to the new created catalog. + tableEnv.useCatalog(catalogName); + Assertions.assertEquals( + catalogName, + tableEnv.getCurrentCatalog(), + "Current catalog should be the one that is created just now."); + + // Drop the catalog. Only support using SQL to drop catalog. + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("drop catalog " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + + Optional droppedCatalog = tableEnv.getCatalog(catalogName); + Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be dropped"); + } + + @Test + public void testCreateGravitinoHiveCatalogRequireOptions() { + tableEnv.useCatalog(DEFAULT_CATALOG); + + // Failed to create the catalog for missing the required options. + String catalogName = "gravitino_hive_sql2"; + Assertions.assertThrows( + ValidationException.class, + () -> { + tableEnv.executeSql( + String.format( + "create catalog %s with (" + + "'type'='gravitino-hive', " + + "'hive-conf-dir'='src/test/resources/flink-tests'" + + ")", + catalogName)); + }, + "The hive.metastore.uris is required."); + + Assertions.assertFalse(metalake.catalogExists(catalogName)); + } + + @Test + public void testGetCatalogFromGravitino() { + // list catalogs. + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(1, catalogs.length, "Only one default catalog"); + + // create a new catalog. + String catalogName = "hive_catalog_in_gravitino"; + com.datastrato.gravitino.Catalog gravitinoCatalog = + metalake.createCatalog( + catalogName, + com.datastrato.gravitino.Catalog.Type.RELATIONAL, + "hive", + null, + ImmutableMap.of( + "flink.bypass.hive-conf-dir", + "src/test/resources/flink-tests", + "flink.bypass.hive.test", + "hive.config", + "metastore.uris", + "thrift://127.0.0.1:9084")); + Assertions.assertNotNull(gravitinoCatalog); + Assertions.assertEquals(catalogName, gravitinoCatalog.name()); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + Assertions.assertEquals(2, tableEnv.listCatalogs().length, "Should create a new catalog"); + + // get the catalog from gravitino. + Optional flinkHiveCatalog = tableEnv.getCatalog(catalogName); + Assertions.assertTrue(flinkHiveCatalog.isPresent()); + Assertions.assertInstanceOf(GravitinoHiveCatalog.class, flinkHiveCatalog.get()); + GravitinoHiveCatalog gravitinoHiveCatalog = (GravitinoHiveCatalog) flinkHiveCatalog.get(); + HiveConf hiveConf = gravitinoHiveCatalog.getHiveConf(); + Assertions.assertTrue(hiveConf.size() > 0, "Should have hive conf"); + Assertions.assertEquals("hive.config", hiveConf.get("hive.test")); + Assertions.assertEquals( + "thrift://127.0.0.1:9084", hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); + + // drop the catalog. + tableEnv.useCatalog(DEFAULT_CATALOG); + tableEnv.executeSql("drop catalog " + catalogName); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + Assertions.assertEquals( + 1, tableEnv.listCatalogs().length, "The created catalog should be dropped."); + } +} diff --git a/flink-connector/src/test/resources/flink-tests/hive-site.xml b/flink-connector/src/test/resources/flink-tests/hive-site.xml new file mode 100644 index 00000000000..2504c845832 --- /dev/null +++ b/flink-connector/src/test/resources/flink-tests/hive-site.xml @@ -0,0 +1,30 @@ + + + + + + + + + hive.metastore.sasl.enabled + false + + + + hive.metastore.uris + thrift://127.0.0.1:9083 + + + + hadoop.security.authentication + simple + + + + hive.metastore.warehouse.dir + hdfs://tmp + + \ No newline at end of file diff --git a/flink-connector/src/test/resources/log4j2.properties b/flink-connector/src/test/resources/log4j2.properties new file mode 100644 index 00000000000..922ea245882 --- /dev/null +++ b/flink-connector/src/test/resources/log4j2.properties @@ -0,0 +1,59 @@ +# +# Copyright 2024 Datastrato Pvt Ltd. +# This software is licensed under the Apache License version 2. +# + +# Set to debug or trace if log4j initialization is failing +status = info + +# Name of the configuration +name = ConsoleLogConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = consoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# Log files location +property.logPath = ${sys:gravitino.log.path:-flink-connector/build/integration-test.log} + +# File appender configuration +appender.file.type = File +appender.file.name = fileLogger +appender.file.fileName = ${logPath} +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n + +# Root logger level +rootLogger.level = info + +# Root logger referring to console and file appenders +rootLogger.appenderRef.stdout.ref = consoleLogger +rootLogger.appenderRef.file.ref = fileLogger + +# File appender configuration for testcontainers +appender.testcontainersFile.type = File +appender.testcontainersFile.name = testcontainersLogger +appender.testcontainersFile.fileName = build/testcontainers.log +appender.testcontainersFile.layout.type = PatternLayout +appender.testcontainersFile.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n + +# Logger for testcontainers +logger.testcontainers.name = org.testcontainers +logger.testcontainers.level = debug +logger.testcontainers.additivity = false +logger.testcontainers.appenderRef.file.ref = testcontainersLogger + +logger.tc.name = tc +logger.tc.level = debug +logger.tc.additivity = false +logger.tc.appenderRef.file.ref = testcontainersLogger + +logger.docker.name = com.github.dockerjava +logger.docker.level = warn +logger.docker.additivity = false +logger.docker.appenderRef.file.ref = testcontainersLogger + +logger.http.name = com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire +logger.http.level = off \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 754ed0afbf1..b45c3d1adb4 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -55,6 +55,7 @@ curator = "2.12.0" awaitility = "4.2.1" servlet = "3.1.0" jodd = "3.5.2" +flink = "1.18.0" protobuf-plugin = "0.9.2" spotless-plugin = '6.11.0' diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index 74617f8a5ed..d3ef5eb750d 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -2,7 +2,6 @@ * Copyright 2023 Datastrato Pvt Ltd. * This software is licensed under the Apache License version 2. */ -import java.util.* plugins { `maven-publish` diff --git a/settings.gradle.kts b/settings.gradle.kts index be21feb8790..3c0baa6ac01 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -29,6 +29,7 @@ include( ) include("trino-connector") include("spark-connector:spark-connector", "spark-connector:spark-connector-runtime") +include("flink-connector") include("web") include("docs") include("integration-test-common")