From 4c61a644846e18db6711b4d4dc7a05d0aff57827 Mon Sep 17 00:00:00 2001 From: FANNG Date: Tue, 28 May 2024 20:12:33 +0800 Subject: [PATCH] [#3554] feat(spark-connector): support spark multi Version (#3415) ### What changes were proposed in this pull request? 1. split spark connector to spark common which contains common logic and v3.x which contains adaptor logic 2. add separate GitHub action to do spark IT 3. ./gradlew :spark-connector:spark35-runtime:build to build corresponding spark connector jars ### Why are the changes needed? Fix: #3554 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests with corresponding spark version --- .../workflows/backend-integration-test.yml | 3 +- .github/workflows/spark-integration-test.yml | 109 +++++++++++++ build.gradle.kts | 4 +- catalogs/catalog-hive/build.gradle.kts | 2 +- .../build.gradle.kts | 2 +- gradle/libs.versions.toml | 11 +- integration-test/build.gradle.kts | 4 - settings.gradle.kts | 8 +- spark-connector/spark-common/build.gradle.kts | 154 ++++++++++++++++++ .../spark/connector/ConnectorConstants.java | 0 .../spark/connector/GravitinoSparkConfig.java | 0 .../spark/connector/PropertiesConverter.java | 0 .../connector/SparkTableChangeConverter.java | 97 +++++++++++ .../connector/SparkTransformConverter.java | 0 .../spark/connector/SparkTypeConverter.java | 10 +- .../spark/connector/catalog/BaseCatalog.java | 151 ++++------------- .../catalog/GravitinoCatalogManager.java | 0 .../connector/hive/GravitinoHiveCatalog.java | 7 +- .../hive/HivePropertiesConstants.java | 0 .../hive/HivePropertiesConverter.java | 0 .../spark/connector/hive/SparkHiveTable.java | 11 +- .../iceberg/GravitinoIcebergCatalog.java | 13 +- .../iceberg/IcebergPropertiesConstants.java | 0 .../iceberg/IcebergPropertiesConverter.java | 0 .../connector/iceberg/SparkIcebergTable.java | 11 +- .../plugin/GravitinoDriverPlugin.java | 18 +- .../plugin/GravitinoSparkPlugin.java | 0 .../spark/connector/utils/ConnectorUtil.java | 0 .../utils/GravitinoTableInfoHelper.java | 7 +- .../connector/version/CatalogNameAdaptor.java | 41 +++++ .../TestSparkTransformConverter.java | 0 .../connector/TestSparkTypeConverter.java | 12 +- .../catalog/TestTransformTableChange.java | 72 +++----- .../hive/TestHivePropertiesConverter.java | 0 .../TestIcebergPropertiesConverter.java | 0 .../integration/test/SparkCommonIT.java | 13 +- .../integration/test/SparkEnvIT.java | 0 .../test/hive/SparkHiveCatalogIT.java | 2 +- .../SparkIcebergCatalogHiveBackendIT.java | 2 +- .../test/iceberg/SparkIcebergCatalogIT.java | 2 +- .../SparkIcebergCatalogRestBackendIT.java | 2 +- .../test/util/SparkMetadataColumnInfo.java | 0 .../integration/test/util/SparkTableInfo.java | 0 .../test/util/SparkTableInfoChecker.java | 0 .../integration/test/util/SparkUtilIT.java | 0 .../plugin/TestGravitinoDriverPlugin.java | 0 .../connector/utils/TestConnectorUtil.java | 0 .../src/test/resources/log4j2.properties | 0 spark-connector/v3.3/build.gradle.kts | 8 + .../spark-runtime}/build.gradle.kts | 11 +- spark-connector/v3.3/spark/build.gradle.kts | 139 ++++++++++++++++ .../hive/GravitinoHiveCatalogSpark33.java | 7 + .../GravitinoIcebergCatalogSpark33.java | 7 + .../test/hive/SparkHiveCatalogIT33.java | 21 +++ .../SparkIcebergCatalogHiveBackendIT33.java | 21 +++ .../SparkIcebergCatalogRestBackendIT33.java | 8 + .../version/TestCatalogNameAdaptor.java | 21 +++ .../src/test/resources/log4j2.properties | 33 ++++ spark-connector/v3.4/build.gradle.kts | 8 + .../v3.4/spark-runtime/build.gradle.kts | 45 +++++ .../spark}/build.gradle.kts | 62 ++++--- .../SparkTableChangeConverter34.java | 26 +++ .../spark/connector/SparkTypeConverter34.java | 31 ++++ .../hive/GravitinoHiveCatalogSpark34.java | 23 +++ .../GravitinoIcebergCatalogSpark34.java | 23 +++ .../TestSparkTableChangeConverter34.java | 41 +++++ .../connector/TestSparkTypeConverter34.java | 27 +++ .../test/hive/SparkHiveCatalogIT34.java | 21 +++ .../SparkIcebergCatalogHiveBackendIT34.java | 22 +++ .../SparkIcebergCatalogRestBackendIT34.java | 8 + .../version/TestCatalogNameAdaptor.java | 21 +++ .../src/test/resources/log4j2.properties | 33 ++++ spark-connector/v3.5/build.gradle.kts | 8 + .../v3.5/spark-runtime/build.gradle.kts | 45 +++++ spark-connector/v3.5/spark/build.gradle.kts | 141 ++++++++++++++++ .../hive/GravitinoHiveCatalogSpark35.java | 7 + .../GravitinoIcebergCatalogSpark35.java | 7 + .../test/hive/SparkHiveCatalogIT35.java | 21 +++ .../SparkIcebergCatalogHiveBackendIT35.java | 22 +++ .../SparkIcebergCatalogRestBackendIT35.java | 8 + .../version/TestCatalogNameAdaptor.java | 21 +++ .../src/test/resources/log4j2.properties | 33 ++++ 82 files changed, 1495 insertions(+), 253 deletions(-) create mode 100644 .github/workflows/spark-integration-test.yml create mode 100644 spark-connector/spark-common/build.gradle.kts rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/ConnectorConstants.java (100%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoSparkConfig.java (100%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java (100%) create mode 100644 spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTableChangeConverter.java rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java (100%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/SparkTypeConverter.java (94%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java (72%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java (100%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java (89%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java (100%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java (100%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java (83%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java (95%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java (100%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java (100%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java (88%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java (90%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoSparkPlugin.java (100%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java (100%) rename spark-connector/{spark-connector => spark-common}/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java (94%) create mode 100644 spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/version/CatalogNameAdaptor.java rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java (100%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTypeConverter.java (94%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java (79%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java (100%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java (100%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java (98%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkEnvIT.java (100%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java (99%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT.java (92%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java (99%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT.java (92%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkMetadataColumnInfo.java (100%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkTableInfo.java (100%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkTableInfoChecker.java (100%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkUtilIT.java (100%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/plugin/TestGravitinoDriverPlugin.java (100%) rename spark-connector/{spark-connector => spark-common}/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java (100%) rename spark-connector/{spark-connector => spark-common}/src/test/resources/log4j2.properties (100%) create mode 100644 spark-connector/v3.3/build.gradle.kts rename spark-connector/{spark-connector-runtime => v3.3/spark-runtime}/build.gradle.kts (73%) create mode 100644 spark-connector/v3.3/spark/build.gradle.kts create mode 100644 spark-connector/v3.3/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark33.java create mode 100644 spark-connector/v3.3/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark33.java create mode 100644 spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java create mode 100644 spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java create mode 100644 spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java create mode 100644 spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java create mode 100644 spark-connector/v3.3/spark/src/test/resources/log4j2.properties create mode 100644 spark-connector/v3.4/build.gradle.kts create mode 100644 spark-connector/v3.4/spark-runtime/build.gradle.kts rename spark-connector/{spark-connector => v3.4/spark}/build.gradle.kts (71%) create mode 100644 spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/SparkTableChangeConverter34.java create mode 100644 spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/SparkTypeConverter34.java create mode 100644 spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark34.java create mode 100644 spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark34.java create mode 100644 spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTableChangeConverter34.java create mode 100644 spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTypeConverter34.java create mode 100644 spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT34.java create mode 100644 spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT34.java create mode 100644 spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT34.java create mode 100644 spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java create mode 100644 spark-connector/v3.4/spark/src/test/resources/log4j2.properties create mode 100644 spark-connector/v3.5/build.gradle.kts create mode 100644 spark-connector/v3.5/spark-runtime/build.gradle.kts create mode 100644 spark-connector/v3.5/spark/build.gradle.kts create mode 100644 spark-connector/v3.5/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark35.java create mode 100644 spark-connector/v3.5/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark35.java create mode 100644 spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT35.java create mode 100644 spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT35.java create mode 100644 spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT35.java create mode 100644 spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java create mode 100644 spark-connector/v3.5/spark/src/test/resources/log4j2.properties diff --git a/.github/workflows/backend-integration-test.yml b/.github/workflows/backend-integration-test.yml index 33d565209b8..15b0a47b2ec 100644 --- a/.github/workflows/backend-integration-test.yml +++ b/.github/workflows/backend-integration-test.yml @@ -40,7 +40,6 @@ jobs: - meta/** - server/** - server-common/** - - spark-connector/** - trino-connector/** - web/** - docs/open-api/** @@ -96,7 +95,7 @@ jobs: - name: Backend Integration Test id: integrationTest run: | - ./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs -P${{ matrix.backend }} -PskipPyClientITs -PskipFlinkITs + ./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs -P${{ matrix.backend }} -PskipPyClientITs -PskipFlinkITs -PskipSparkITs - name: Upload integrate tests reports uses: actions/upload-artifact@v3 diff --git a/.github/workflows/spark-integration-test.yml b/.github/workflows/spark-integration-test.yml new file mode 100644 index 00000000000..f73a366ad78 --- /dev/null +++ b/.github/workflows/spark-integration-test.yml @@ -0,0 +1,109 @@ +name: Spark Integration Test + +# Controls when the workflow will run +on: + # Triggers the workflow on push or pull request events but only for the "main" branch + push: + branches: [ "main", "branch-*" ] + pull_request: + branches: [ "main", "branch-*" ] + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + changes: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: dorny/paths-filter@v2 + id: filter + with: + filters: | + source_changes: + - .github/** + - api/** + - bin/** + - catalogs/** + - clients/client-java/** + - clients/client-java-runtime/** + - clients/filesystem-hadoop3/** + - clients/filesystem-hadoop3-runtime/** + - common/** + - conf/** + - core/** + - dev/** + - gradle/** + - meta/** + - server/** + - server-common/** + - spark-connector/** + - docs/open-api/** + - build.gradle.kts + - gradle.properties + - gradlew + - setting.gradle.kts + outputs: + source_changes: ${{ steps.filter.outputs.source_changes }} + + # Integration test for AMD64 architecture + test-amd64-arch: + needs: changes + if: needs.changes.outputs.source_changes == 'true' + runs-on: ubuntu-latest + timeout-minutes: 90 + strategy: + matrix: + architecture: [linux/amd64] + java-version: [ 8, 11, 17 ] + test-mode: [ embedded, deploy ] + env: + PLATFORM: ${{ matrix.architecture }} + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-java@v3 + with: + java-version: ${{ matrix.java-version }} + distribution: 'temurin' + + - name: Set up QEMU + uses: docker/setup-qemu-action@v2 + + - name: Check required command + run: | + dev/ci/check_commands.sh + + - name: Package Gravitino + if : ${{ matrix.test-mode == 'deploy' }} + run: | + ./gradlew compileDistribution -x test -PjdkVersion=${{ matrix.java-version }} + + - name: Setup debug Github Action + if: ${{ contains(github.event.pull_request.labels.*.name, 'debug action') }} + uses: csexton/debugger-action@master + + - name: Free up disk space + run: | + dev/ci/util_free_space.sh + + - name: Spark Integration Test + id: integrationTest + run: | + ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.3:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" + ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.4:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" + ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.5:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" + + - name: Upload integrate tests reports + uses: actions/upload-artifact@v3 + if: ${{ (failure() && steps.integrationTest.outcome == 'failure') || contains(github.event.pull_request.labels.*.name, 'upload log') }} + with: + name: spark-connector-integrate-test-reports-${{ matrix.java-version }}-${{ matrix.test-mode }} + path: | + build/reports + spark-connector/v3.3/spark/build/spark-3.3-integration-test.log + spark-connector/v3.4/spark/build/spark-3.4-integration-test.log + spark-connector/v3.5/spark/build/spark-3.5-integration-test.log + distribution/package/logs/gravitino-server.out + distribution/package/logs/gravitino-server.log diff --git a/build.gradle.kts b/build.gradle.kts index 2aa23b51604..7aaee41fc0e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -617,7 +617,7 @@ tasks { register("copySubprojectDependencies", Copy::class) { subprojects.forEach() { if (!it.name.startsWith("catalog") && - !it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark-connector") && it.name != "trino-connector" && + !it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark") && it.name != "trino-connector" && it.name != "integration-test" && it.name != "bundled-catalog" && it.name != "flink-connector" ) { from(it.configurations.runtimeClasspath) @@ -631,7 +631,7 @@ tasks { if (!it.name.startsWith("catalog") && !it.name.startsWith("client") && !it.name.startsWith("filesystem") && - !it.name.startsWith("spark-connector") && + !it.name.startsWith("spark") && it.name != "trino-connector" && it.name != "integration-test" && it.name != "bundled-catalog" && diff --git a/catalogs/catalog-hive/build.gradle.kts b/catalogs/catalog-hive/build.gradle.kts index 983989da982..9baa2abafac 100644 --- a/catalogs/catalog-hive/build.gradle.kts +++ b/catalogs/catalog-hive/build.gradle.kts @@ -11,7 +11,7 @@ plugins { } val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() -val sparkVersion: String = libs.versions.spark.get() +val sparkVersion: String = libs.versions.spark34.get() val icebergVersion: String = libs.versions.iceberg.get() val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() diff --git a/catalogs/catalog-lakehouse-iceberg/build.gradle.kts b/catalogs/catalog-lakehouse-iceberg/build.gradle.kts index 2a846d36514..366f6a6b1de 100644 --- a/catalogs/catalog-lakehouse-iceberg/build.gradle.kts +++ b/catalogs/catalog-lakehouse-iceberg/build.gradle.kts @@ -11,7 +11,7 @@ plugins { } val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() -val sparkVersion: String = libs.versions.spark.get() +val sparkVersion: String = libs.versions.spark34.get() val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") val icebergVersion: String = libs.versions.iceberg.get() val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index b45c3d1adb4..f4054616463 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -27,9 +27,15 @@ commons-collections4 = "4.4" commons-dbcp2 = "2.11.0" caffeine = "2.9.3" rocksdbjni = "7.10.2" -iceberg = '1.3.1' # 1.4.0 causes test to fail +iceberg = '1.3.1' # used for Gravitino Iceberg catalog and Iceberg REST service +iceberg4spark = "1.4.1" # used for compile spark connector trino = '426' -spark = "3.4.1" # 3.5.0 causes tests to fail +spark33 = "3.3.4" +spark34 = "3.4.3" +spark35 = "3.5.1" +kyuubi4spark33 = "1.7.4" +kyuubi4spark34 = "1.8.2" +kyuubi4spark35 = "1.9.0" scala-collection-compat = "2.7.0" scala-java-compat = "1.0.2" sqlite-jdbc = "3.42.0.0" @@ -49,7 +55,6 @@ selenium = "3.141.59" rauschig = "1.2.0" mybatis = "3.5.6" h2db = "1.4.200" -kyuubi = "1.8.2" kafka = "3.4.0" curator = "2.12.0" awaitility = "4.2.1" diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index d3ef5eb750d..591c5c67d97 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -23,10 +23,6 @@ dependencies { testImplementation(project(":integration-test-common", "testArtifacts")) testImplementation(project(":server")) testImplementation(project(":server-common")) - testImplementation(project(":spark-connector:spark-connector")) { - exclude("org.apache.hadoop", "hadoop-client-api") - exclude("org.apache.hadoop", "hadoop-client-runtime") - } testImplementation(libs.commons.cli) testImplementation(libs.commons.lang3) diff --git a/settings.gradle.kts b/settings.gradle.kts index 3c0baa6ac01..d645efd0b02 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -28,7 +28,13 @@ include( "clients:client-python" ) include("trino-connector") -include("spark-connector:spark-connector", "spark-connector:spark-connector-runtime") +include("spark-connector:spark-common", "spark-connector:spark-3.3", "spark-connector:spark-runtime-3.3", "spark-connector:spark-3.4", "spark-connector:spark-runtime-3.4", "spark-connector:spark-3.5", "spark-connector:spark-runtime-3.5") +project(":spark-connector:spark-3.3").projectDir = file("spark-connector/v3.3/spark") +project(":spark-connector:spark-runtime-3.3").projectDir = file("spark-connector/v3.3/spark-runtime") +project(":spark-connector:spark-3.4").projectDir = file("spark-connector/v3.4/spark") +project(":spark-connector:spark-runtime-3.4").projectDir = file("spark-connector/v3.4/spark-runtime") +project(":spark-connector:spark-3.5").projectDir = file("spark-connector/v3.5/spark") +project(":spark-connector:spark-runtime-3.5").projectDir = file("spark-connector/v3.5/spark-runtime") include("flink-connector") include("web") include("docs") diff --git a/spark-connector/spark-common/build.gradle.kts b/spark-connector/spark-common/build.gradle.kts new file mode 100644 index 00000000000..3b485942f28 --- /dev/null +++ b/spark-connector/spark-common/build.gradle.kts @@ -0,0 +1,154 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +plugins { + `maven-publish` + id("java") + id("idea") + alias(libs.plugins.shadow) +} + +repositories { + mavenCentral() +} + +val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() +val sparkVersion: String = libs.versions.spark33.get() +val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") +val icebergVersion: String = libs.versions.iceberg4spark.get() +val kyuubiVersion: String = libs.versions.kyuubi4spark33.get() +val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() +val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() + +dependencies { + implementation(project(":catalogs:bundled-catalog", configuration = "shadow")) + implementation(libs.guava) + + compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) + compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") + compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + + compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") + compileOnly("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") + compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") + compileOnly("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion") + + annotationProcessor(libs.lombok) + compileOnly(libs.lombok) + + testAnnotationProcessor(libs.lombok) + testCompileOnly(libs.lombok) + + // use log from spark, spark3.3 use low version of log4j, to avoid java.lang.NoSuchMethodError: org.apache.logging.slf4j.Log4jLoggerFactory: method ()V not found + testImplementation(project(":api")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":clients:client-java")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":core")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":common")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":server")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":server-common")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":integration-test-common", "testArtifacts")) + + testImplementation(libs.hive2.common) { + exclude("org.apache.curator") + // use hadoop from Spark + exclude("org.apache.hadoop") + exclude("org.apache.logging.log4j") + exclude("org.eclipse.jetty.aggregate", "jetty-all") + exclude("org.eclipse.jetty.orbit", "javax.servlet") + } + testImplementation(libs.hive2.metastore) { + exclude("co.cask.tephra") + exclude("com.github.joshelser") + exclude("com.google.code.findbugs", "jsr305") + exclude("com.google.code.findbugs", "sr305") + exclude("com.tdunning", "json") + exclude("com.zaxxer", "HikariCP") + exclude("io.dropwizard.metricss") + exclude("javax.transaction", "transaction-api") + exclude("org.apache.avro") + exclude("org.apache.curator") + exclude("org.apache.hbase") + exclude("org.apache.hadoop") + exclude("org.apache.hive", "hive-common") + exclude("org.apache.hive", "hive-shims") + exclude("org.apache.logging.log4j") + exclude("org.apache.parquet", "parquet-hadoop-bundle") + exclude("org.apache.zookeeper") + exclude("org.eclipse.jetty.aggregate", "jetty-all") + exclude("org.eclipse.jetty.orbit", "javax.servlet") + exclude("org.slf4j") + } + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.junit.jupiter.params) + testImplementation(libs.mysql.driver) + testImplementation(libs.testcontainers) + + testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion") + testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion") + testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") + testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + // include spark-sql,spark-catalyst,hive-common,hdfs-client + testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") { + // conflict with Gravitino server jersey + exclude("org.glassfish.jersey.core") + exclude("org.glassfish.jersey.containers") + exclude("org.glassfish.jersey.inject") + } + testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion") + + testRuntimeOnly(libs.junit.jupiter.engine) +} + +tasks.test { + val skipUTs = project.hasProperty("skipTests") + if (skipUTs) { + // Only run integration tests + include("**/integration/**") + } + + val skipITs = project.hasProperty("skipITs") + val skipSparkITs = project.hasProperty("skipSparkITs") + if (skipITs || skipSparkITs) { + // Exclude integration tests + exclude("**/integration/**") + } else { + dependsOn(tasks.jar) + + doFirst { + environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12") + } + + val init = project.extra.get("initIntegrationTest") as (Test) -> Unit + init(this) + } +} + +tasks.clean { + delete("spark-warehouse") +} + +val testJar by tasks.registering(Jar::class) { + archiveClassifier.set("tests") + from(sourceSets["test"].output) +} + +configurations { + create("testArtifacts") +} + +artifacts { + add("testArtifacts", testJar) +} diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/ConnectorConstants.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/ConnectorConstants.java similarity index 100% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/ConnectorConstants.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/ConnectorConstants.java diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoSparkConfig.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoSparkConfig.java similarity index 100% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoSparkConfig.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoSparkConfig.java diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java similarity index 100% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/PropertiesConverter.java diff --git a/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTableChangeConverter.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTableChangeConverter.java new file mode 100644 index 00000000000..c386e7fb721 --- /dev/null +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTableChangeConverter.java @@ -0,0 +1,97 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector; + +import com.google.common.base.Preconditions; +import org.apache.spark.sql.connector.catalog.TableChange; + +public class SparkTableChangeConverter { + private SparkTypeConverter sparkTypeConverter; + + public SparkTableChangeConverter(SparkTypeConverter sparkTypeConverter) { + this.sparkTypeConverter = sparkTypeConverter; + } + + public com.datastrato.gravitino.rel.TableChange toGravitinoTableChange(TableChange change) { + if (change instanceof TableChange.SetProperty) { + TableChange.SetProperty setProperty = (TableChange.SetProperty) change; + if (ConnectorConstants.COMMENT.equals(setProperty.property())) { + return com.datastrato.gravitino.rel.TableChange.updateComment(setProperty.value()); + } else { + return com.datastrato.gravitino.rel.TableChange.setProperty( + setProperty.property(), setProperty.value()); + } + } else if (change instanceof TableChange.RemoveProperty) { + TableChange.RemoveProperty removeProperty = (TableChange.RemoveProperty) change; + Preconditions.checkArgument( + ConnectorConstants.COMMENT.equals(removeProperty.property()) == false, + "Gravitino doesn't support remove table comment yet"); + return com.datastrato.gravitino.rel.TableChange.removeProperty(removeProperty.property()); + } else if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + return com.datastrato.gravitino.rel.TableChange.addColumn( + addColumn.fieldNames(), + sparkTypeConverter.toGravitinoType(addColumn.dataType()), + addColumn.comment(), + transformColumnPosition(addColumn.position()), + addColumn.isNullable()); + } else if (change instanceof TableChange.DeleteColumn) { + TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change; + return com.datastrato.gravitino.rel.TableChange.deleteColumn( + deleteColumn.fieldNames(), deleteColumn.ifExists()); + } else if (change instanceof TableChange.UpdateColumnType) { + TableChange.UpdateColumnType updateColumnType = (TableChange.UpdateColumnType) change; + return com.datastrato.gravitino.rel.TableChange.updateColumnType( + updateColumnType.fieldNames(), + sparkTypeConverter.toGravitinoType(updateColumnType.newDataType())); + } else if (change instanceof TableChange.RenameColumn) { + TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) change; + return com.datastrato.gravitino.rel.TableChange.renameColumn( + renameColumn.fieldNames(), renameColumn.newName()); + } else if (change instanceof TableChange.UpdateColumnPosition) { + TableChange.UpdateColumnPosition sparkUpdateColumnPosition = + (TableChange.UpdateColumnPosition) change; + com.datastrato.gravitino.rel.TableChange.UpdateColumnPosition gravitinoUpdateColumnPosition = + (com.datastrato.gravitino.rel.TableChange.UpdateColumnPosition) + com.datastrato.gravitino.rel.TableChange.updateColumnPosition( + sparkUpdateColumnPosition.fieldNames(), + transformColumnPosition(sparkUpdateColumnPosition.position())); + Preconditions.checkArgument( + !(gravitinoUpdateColumnPosition.getPosition() + instanceof com.datastrato.gravitino.rel.TableChange.Default), + "Doesn't support alter column position without specifying position"); + return gravitinoUpdateColumnPosition; + } else if (change instanceof TableChange.UpdateColumnComment) { + TableChange.UpdateColumnComment updateColumnComment = + (TableChange.UpdateColumnComment) change; + return com.datastrato.gravitino.rel.TableChange.updateColumnComment( + updateColumnComment.fieldNames(), updateColumnComment.newComment()); + } else if (change instanceof TableChange.UpdateColumnNullability) { + TableChange.UpdateColumnNullability updateColumnNullability = + (TableChange.UpdateColumnNullability) change; + return com.datastrato.gravitino.rel.TableChange.updateColumnNullability( + updateColumnNullability.fieldNames(), updateColumnNullability.nullable()); + } else { + throw new UnsupportedOperationException( + String.format("Unsupported table change %s", change.getClass().getName())); + } + } + + private com.datastrato.gravitino.rel.TableChange.ColumnPosition transformColumnPosition( + TableChange.ColumnPosition columnPosition) { + if (null == columnPosition) { + return com.datastrato.gravitino.rel.TableChange.ColumnPosition.defaultPos(); + } else if (columnPosition instanceof TableChange.First) { + return com.datastrato.gravitino.rel.TableChange.ColumnPosition.first(); + } else if (columnPosition instanceof TableChange.After) { + TableChange.After after = (TableChange.After) columnPosition; + return com.datastrato.gravitino.rel.TableChange.ColumnPosition.after(after.column()); + } else { + throw new UnsupportedOperationException( + String.format( + "Unsupported table column position %s", columnPosition.getClass().getName())); + } + } +} diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java similarity index 100% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/SparkTypeConverter.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTypeConverter.java similarity index 94% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/SparkTypeConverter.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTypeConverter.java index d699a0058b5..40c0e1fe249 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/SparkTypeConverter.java +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTypeConverter.java @@ -30,13 +30,12 @@ import org.apache.spark.sql.types.StringType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.types.TimestampNTZType; import org.apache.spark.sql.types.TimestampType; import org.apache.spark.sql.types.VarcharType; /** Transform DataTypes between Gravitino and Spark. */ public class SparkTypeConverter { - public static Type toGravitinoType(DataType sparkType) { + public Type toGravitinoType(DataType sparkType) { if (sparkType instanceof ByteType) { return Types.ByteType.get(); } else if (sparkType instanceof ShortType) { @@ -68,8 +67,6 @@ public static Type toGravitinoType(DataType sparkType) { return Types.DateType.get(); } else if (sparkType instanceof TimestampType) { return Types.TimestampType.withTimeZone(); - } else if (sparkType instanceof TimestampNTZType) { - return Types.TimestampType.withoutTimeZone(); } else if (sparkType instanceof ArrayType) { ArrayType arrayType = (ArrayType) sparkType; return Types.ListType.of(toGravitinoType(arrayType.elementType()), arrayType.containsNull()); @@ -98,7 +95,7 @@ public static Type toGravitinoType(DataType sparkType) { throw new UnsupportedOperationException("Not support " + sparkType.toString()); } - public static DataType toSparkType(Type gravitinoType) { + public DataType toSparkType(Type gravitinoType) { if (gravitinoType instanceof Types.ByteType) { return DataTypes.ByteType; } else if (gravitinoType instanceof Types.ShortType) { @@ -131,9 +128,6 @@ public static DataType toSparkType(Type gravitinoType) { } else if (gravitinoType instanceof Types.TimestampType && ((Types.TimestampType) gravitinoType).hasTimeZone()) { return DataTypes.TimestampType; - } else if (gravitinoType instanceof Types.TimestampType - && !((Types.TimestampType) gravitinoType).hasTimeZone()) { - return DataTypes.TimestampNTZType; } else if (gravitinoType instanceof Types.ListType) { Types.ListType listType = (Types.ListType) gravitinoType; return DataTypes.createArrayType( diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java similarity index 72% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java index 208952987b0..be8864e478b 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/catalog/BaseCatalog.java @@ -13,26 +13,23 @@ import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.NonEmptySchemaException; import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; -import com.datastrato.gravitino.rel.expressions.literals.Literals; import com.datastrato.gravitino.spark.connector.ConnectorConstants; import com.datastrato.gravitino.spark.connector.PropertiesConverter; +import com.datastrato.gravitino.spark.connector.SparkTableChangeConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter.DistributionAndSortOrdersInfo; import com.datastrato.gravitino.spark.connector.SparkTypeConverter; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import javax.ws.rs.NotSupportedException; import org.apache.commons.lang3.StringUtils; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; -import org.apache.spark.sql.connector.catalog.Column; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; import org.apache.spark.sql.connector.catalog.NamespaceChange.SetProperty; @@ -41,6 +38,7 @@ import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.catalog.TableChange; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -57,15 +55,18 @@ * initialization. */ public abstract class BaseCatalog implements TableCatalog, SupportsNamespaces { + // The specific Spark catalog to do IO operations, different catalogs have different spark catalog // implementations, like HiveTableCatalog for Hive, JDBCTableCatalog for JDBC, SparkCatalog for // Iceberg. protected TableCatalog sparkCatalog; - // The Gravitino catalog client to do schema operations. - protected Catalog gravitinoCatalogClient; protected PropertiesConverter propertiesConverter; protected SparkTransformConverter sparkTransformConverter; + private SparkTypeConverter sparkTypeConverter; + private SparkTableChangeConverter sparkTableChangeConverter; + // The Gravitino catalog client to do schema operations. + private Catalog gravitinoCatalogClient; private final String metalakeName; private String catalogName; private final GravitinoCatalogManager gravitinoCatalogManager; @@ -97,6 +98,7 @@ protected abstract TableCatalog createAndInitSparkCatalog( * @param propertiesConverter transform properties between Gravitino and Spark * @param sparkTransformConverter sparkTransformConverter convert transforms between Gravitino and * Spark + * @param sparkTypeConverter sparkTypeConverter convert types between Gravitino and Spark * @return a specific Spark table */ protected abstract Table createSparkTable( @@ -105,7 +107,8 @@ protected abstract Table createSparkTable( Table sparkTable, TableCatalog sparkCatalog, PropertiesConverter propertiesConverter, - SparkTransformConverter sparkTransformConverter); + SparkTransformConverter sparkTransformConverter, + SparkTypeConverter sparkTypeConverter); /** * Get a PropertiesConverter to transform properties between Gravitino and Spark. @@ -121,6 +124,15 @@ protected abstract Table createSparkTable( */ protected abstract SparkTransformConverter getSparkTransformConverter(); + protected SparkTypeConverter getSparkTypeConverter() { + return new SparkTypeConverter(); + } + + protected SparkTableChangeConverter getSparkTableChangeConverter( + SparkTypeConverter sparkTypeConverter) { + return new SparkTableChangeConverter(sparkTypeConverter); + } + @Override public void initialize(String name, CaseInsensitiveStringMap options) { this.catalogName = name; @@ -132,6 +144,8 @@ public void initialize(String name, CaseInsensitiveStringMap options) { createAndInitSparkCatalog(name, options, gravitinoCatalogClient.properties()); this.propertiesConverter = getPropertiesConverter(); this.sparkTransformConverter = getSparkTransformConverter(); + this.sparkTypeConverter = getSparkTypeConverter(); + this.sparkTableChangeConverter = getSparkTableChangeConverter(sparkTypeConverter); } @Override @@ -165,13 +179,13 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti @Override public Table createTable( - Identifier ident, Column[] columns, Transform[] transforms, Map properties) + Identifier ident, StructType schema, Transform[] transforms, Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { NameIdentifier gravitinoIdentifier = NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name()); com.datastrato.gravitino.rel.Column[] gravitinoColumns = - Arrays.stream(columns) - .map(column -> createGravitinoColumn(column)) + Arrays.stream(schema.fields()) + .map(structField -> createGravitinoColumn(structField)) .toArray(com.datastrato.gravitino.rel.Column[]::new); Map gravitinoProperties = @@ -203,7 +217,8 @@ public Table createTable( sparkTable, sparkCatalog, propertiesConverter, - sparkTransformConverter); + sparkTransformConverter, + sparkTypeConverter); } catch (NoSuchSchemaException e) { throw new NoSuchNamespaceException(ident.namespace()); } catch (com.datastrato.gravitino.exceptions.TableAlreadyExistsException e) { @@ -223,25 +238,18 @@ public Table loadTable(Identifier ident) throws NoSuchTableException { sparkTable, sparkCatalog, propertiesConverter, - sparkTransformConverter); + sparkTransformConverter, + sparkTypeConverter); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } } - @SuppressWarnings("deprecation") - @Override - public Table createTable( - Identifier ident, StructType schema, Transform[] partitions, Map properties) - throws TableAlreadyExistsException, NoSuchNamespaceException { - throw new NotSupportedException("Deprecated create table method"); - } - @Override public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException { com.datastrato.gravitino.rel.TableChange[] gravitinoTableChanges = Arrays.stream(changes) - .map(BaseCatalog::transformTableChange) + .map(sparkTableChangeConverter::toGravitinoTableChange) .toArray(com.datastrato.gravitino.rel.TableChange[]::new); try { com.datastrato.gravitino.rel.Table gravitinoTable = @@ -257,7 +265,8 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT sparkTable, sparkCatalog, propertiesConverter, - sparkTransformConverter); + sparkTransformConverter, + sparkTypeConverter); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -415,12 +424,12 @@ private String getCatalogDefaultNamespace() { return catalogDefaultNamespace[0]; } - private com.datastrato.gravitino.rel.Column createGravitinoColumn(Column sparkColumn) { + private com.datastrato.gravitino.rel.Column createGravitinoColumn(StructField structField) { return com.datastrato.gravitino.rel.Column.of( - sparkColumn.name(), - SparkTypeConverter.toGravitinoType(sparkColumn.dataType()), - sparkColumn.comment(), - sparkColumn.nullable(), + structField.name(), + sparkTypeConverter.toGravitinoType(structField.dataType()), + structField.getComment().isEmpty() ? null : structField.getComment().get(), + structField.nullable(), // Spark doesn't support autoIncrement false, // todo: support default value @@ -434,94 +443,6 @@ private String getDatabase(NameIdentifier gravitinoIdentifier) { return gravitinoIdentifier.namespace().level(2); } - @VisibleForTesting - static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange change) { - if (change instanceof TableChange.SetProperty) { - TableChange.SetProperty setProperty = (TableChange.SetProperty) change; - if (ConnectorConstants.COMMENT.equals(setProperty.property())) { - return com.datastrato.gravitino.rel.TableChange.updateComment(setProperty.value()); - } else { - return com.datastrato.gravitino.rel.TableChange.setProperty( - setProperty.property(), setProperty.value()); - } - } else if (change instanceof TableChange.RemoveProperty) { - TableChange.RemoveProperty removeProperty = (TableChange.RemoveProperty) change; - Preconditions.checkArgument( - ConnectorConstants.COMMENT.equals(removeProperty.property()) == false, - "Gravitino doesn't support remove table comment yet"); - return com.datastrato.gravitino.rel.TableChange.removeProperty(removeProperty.property()); - } else if (change instanceof TableChange.AddColumn) { - TableChange.AddColumn addColumn = (TableChange.AddColumn) change; - return com.datastrato.gravitino.rel.TableChange.addColumn( - addColumn.fieldNames(), - SparkTypeConverter.toGravitinoType(addColumn.dataType()), - addColumn.comment(), - transformColumnPosition(addColumn.position()), - addColumn.isNullable()); - } else if (change instanceof TableChange.DeleteColumn) { - TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change; - return com.datastrato.gravitino.rel.TableChange.deleteColumn( - deleteColumn.fieldNames(), deleteColumn.ifExists()); - } else if (change instanceof TableChange.UpdateColumnType) { - TableChange.UpdateColumnType updateColumnType = (TableChange.UpdateColumnType) change; - return com.datastrato.gravitino.rel.TableChange.updateColumnType( - updateColumnType.fieldNames(), - SparkTypeConverter.toGravitinoType(updateColumnType.newDataType())); - } else if (change instanceof TableChange.RenameColumn) { - TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) change; - return com.datastrato.gravitino.rel.TableChange.renameColumn( - renameColumn.fieldNames(), renameColumn.newName()); - } else if (change instanceof TableChange.UpdateColumnPosition) { - TableChange.UpdateColumnPosition sparkUpdateColumnPosition = - (TableChange.UpdateColumnPosition) change; - com.datastrato.gravitino.rel.TableChange.UpdateColumnPosition gravitinoUpdateColumnPosition = - (com.datastrato.gravitino.rel.TableChange.UpdateColumnPosition) - com.datastrato.gravitino.rel.TableChange.updateColumnPosition( - sparkUpdateColumnPosition.fieldNames(), - transformColumnPosition(sparkUpdateColumnPosition.position())); - Preconditions.checkArgument( - !(gravitinoUpdateColumnPosition.getPosition() - instanceof com.datastrato.gravitino.rel.TableChange.Default), - "Doesn't support alter column position without specifying position"); - return gravitinoUpdateColumnPosition; - } else if (change instanceof TableChange.UpdateColumnComment) { - TableChange.UpdateColumnComment updateColumnComment = - (TableChange.UpdateColumnComment) change; - return com.datastrato.gravitino.rel.TableChange.updateColumnComment( - updateColumnComment.fieldNames(), updateColumnComment.newComment()); - } else if (change instanceof TableChange.UpdateColumnNullability) { - TableChange.UpdateColumnNullability updateColumnNullability = - (TableChange.UpdateColumnNullability) change; - return com.datastrato.gravitino.rel.TableChange.updateColumnNullability( - updateColumnNullability.fieldNames(), updateColumnNullability.nullable()); - } else if (change instanceof TableChange.UpdateColumnDefaultValue) { - TableChange.UpdateColumnDefaultValue updateColumnDefaultValue = - (TableChange.UpdateColumnDefaultValue) change; - return com.datastrato.gravitino.rel.TableChange.updateColumnDefaultValue( - updateColumnDefaultValue.fieldNames(), - Literals.stringLiteral(updateColumnDefaultValue.newDefaultValue())); - } else { - throw new UnsupportedOperationException( - String.format("Unsupported table change %s", change.getClass().getName())); - } - } - - private static com.datastrato.gravitino.rel.TableChange.ColumnPosition transformColumnPosition( - TableChange.ColumnPosition columnPosition) { - if (null == columnPosition) { - return com.datastrato.gravitino.rel.TableChange.ColumnPosition.defaultPos(); - } else if (columnPosition instanceof TableChange.First) { - return com.datastrato.gravitino.rel.TableChange.ColumnPosition.first(); - } else if (columnPosition instanceof TableChange.After) { - TableChange.After after = (TableChange.After) columnPosition; - return com.datastrato.gravitino.rel.TableChange.ColumnPosition.after(after.column()); - } else { - throw new UnsupportedOperationException( - String.format( - "Unsupported table column position %s", columnPosition.getClass().getName())); - } - } - private Table loadSparkTable(Identifier ident) { try { return sparkCatalog.loadTable(ident); diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java similarity index 100% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/catalog/GravitinoCatalogManager.java diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java similarity index 89% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java index feff8ad760d..d24c540117b 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalog.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.rel.Table; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; +import com.datastrato.gravitino.spark.connector.SparkTypeConverter; import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog; import java.util.Map; import org.apache.kyuubi.spark.connector.hive.HiveTable; @@ -36,14 +37,16 @@ protected org.apache.spark.sql.connector.catalog.Table createSparkTable( org.apache.spark.sql.connector.catalog.Table sparkTable, TableCatalog sparkHiveCatalog, PropertiesConverter propertiesConverter, - SparkTransformConverter sparkTransformConverter) { + SparkTransformConverter sparkTransformConverter, + SparkTypeConverter sparkTypeConverter) { return new SparkHiveTable( identifier, gravitinoTable, (HiveTable) sparkTable, (HiveTableCatalog) sparkHiveCatalog, propertiesConverter, - sparkTransformConverter); + sparkTransformConverter, + sparkTypeConverter); } @Override diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java similarity index 100% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConstants.java diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java similarity index 100% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/hive/HivePropertiesConverter.java diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java similarity index 83% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java index e27916af283..1b0381e8b9b 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/hive/SparkHiveTable.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.rel.Table; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; +import com.datastrato.gravitino.spark.connector.SparkTypeConverter; import com.datastrato.gravitino.spark.connector.utils.GravitinoTableInfoHelper; import java.util.Map; import org.apache.kyuubi.spark.connector.hive.HiveTable; @@ -28,11 +29,17 @@ public SparkHiveTable( HiveTable hiveTable, HiveTableCatalog hiveTableCatalog, PropertiesConverter propertiesConverter, - SparkTransformConverter sparkTransformConverter) { + SparkTransformConverter sparkTransformConverter, + SparkTypeConverter sparkTypeConverter) { super(SparkSession.active(), hiveTable.catalogTable(), hiveTableCatalog); this.gravitinoTableInfoHelper = new GravitinoTableInfoHelper( - false, identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); + false, + identifier, + gravitinoTable, + propertiesConverter, + sparkTransformConverter, + sparkTypeConverter); } @Override diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java similarity index 95% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java index d2490c7cfd9..6da6b28e536 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalog.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.rel.Table; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; +import com.datastrato.gravitino.spark.connector.SparkTypeConverter; import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -56,14 +57,16 @@ protected org.apache.spark.sql.connector.catalog.Table createSparkTable( org.apache.spark.sql.connector.catalog.Table sparkTable, TableCatalog sparkIcebergCatalog, PropertiesConverter propertiesConverter, - SparkTransformConverter sparkTransformConverter) { + SparkTransformConverter sparkTransformConverter, + SparkTypeConverter sparkTypeConverter) { return new SparkIcebergTable( identifier, gravitinoTable, (SparkTable) sparkTable, (SparkCatalog) sparkIcebergCatalog, propertiesConverter, - sparkTransformConverter); + sparkTransformConverter, + sparkTypeConverter); } @Override @@ -132,7 +135,8 @@ public org.apache.spark.sql.connector.catalog.Table loadTable(Identifier ident, sparkTable, sparkCatalog, propertiesConverter, - sparkTransformConverter); + sparkTransformConverter, + getSparkTypeConverter()); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -151,7 +155,8 @@ public org.apache.spark.sql.connector.catalog.Table loadTable(Identifier ident, sparkTable, sparkCatalog, propertiesConverter, - sparkTransformConverter); + sparkTransformConverter, + getSparkTypeConverter()); } catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java similarity index 100% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java similarity index 100% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java similarity index 88% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java index f4df339f30c..e8056ce0c14 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/SparkIcebergTable.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.rel.Table; import com.datastrato.gravitino.spark.connector.PropertiesConverter; import com.datastrato.gravitino.spark.connector.SparkTransformConverter; +import com.datastrato.gravitino.spark.connector.SparkTypeConverter; import com.datastrato.gravitino.spark.connector.utils.GravitinoTableInfoHelper; import java.lang.reflect.Field; import java.util.Map; @@ -34,11 +35,17 @@ public SparkIcebergTable( SparkTable sparkTable, SparkCatalog sparkCatalog, PropertiesConverter propertiesConverter, - SparkTransformConverter sparkTransformConverter) { + SparkTransformConverter sparkTransformConverter, + SparkTypeConverter sparkTypeConverter) { super(sparkTable.table(), !isCacheEnabled(sparkCatalog)); this.gravitinoTableInfoHelper = new GravitinoTableInfoHelper( - true, identifier, gravitinoTable, propertiesConverter, sparkTransformConverter); + true, + identifier, + gravitinoTable, + propertiesConverter, + sparkTransformConverter, + sparkTypeConverter); this.sparkTable = sparkTable; } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java similarity index 90% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java index 5a5e56daab2..92962d1f1e1 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java @@ -11,8 +11,7 @@ import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig; import com.datastrato.gravitino.spark.connector.catalog.GravitinoCatalogManager; -import com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalog; -import com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalog; +import com.datastrato.gravitino.spark.connector.version.CatalogNameAdaptor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; @@ -106,17 +105,10 @@ private void registerCatalog(SparkConf sparkConf, String catalogName, String pro return; } - String catalogClassName; - switch (provider.toLowerCase(Locale.ROOT)) { - case "hive": - catalogClassName = GravitinoHiveCatalog.class.getName(); - break; - case "lakehouse-iceberg": - catalogClassName = GravitinoIcebergCatalog.class.getName(); - break; - default: - LOG.warn("Skip registering {} because {} is not supported yet.", catalogName, provider); - return; + String catalogClassName = CatalogNameAdaptor.getCatalogName(provider); + if (StringUtils.isBlank(catalogClassName)) { + LOG.warn("Skip registering {} because {} is not supported yet.", catalogName, provider); + return; } String sparkCatalogConfigName = "spark.sql.catalog." + catalogName; diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoSparkPlugin.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoSparkPlugin.java similarity index 100% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoSparkPlugin.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoSparkPlugin.java diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java similarity index 100% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/utils/ConnectorUtil.java diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java similarity index 94% rename from spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java rename to spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java index a1ab61021c4..95de795f0f2 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/utils/GravitinoTableInfoHelper.java @@ -36,18 +36,21 @@ public class GravitinoTableInfoHelper { private com.datastrato.gravitino.rel.Table gravitinoTable; private PropertiesConverter propertiesConverter; private SparkTransformConverter sparkTransformConverter; + private SparkTypeConverter sparkTypeConverter; public GravitinoTableInfoHelper( boolean isCaseSensitive, Identifier identifier, com.datastrato.gravitino.rel.Table gravitinoTable, PropertiesConverter propertiesConverter, - SparkTransformConverter sparkTransformConverter) { + SparkTransformConverter sparkTransformConverter, + SparkTypeConverter sparkTypeConverter) { this.isCaseSensitive = isCaseSensitive; this.identifier = identifier; this.gravitinoTable = gravitinoTable; this.propertiesConverter = propertiesConverter; this.sparkTransformConverter = sparkTransformConverter; + this.sparkTypeConverter = sparkTypeConverter; } public String name() { @@ -69,7 +72,7 @@ public StructType schema() { } return StructField.apply( column.name(), - SparkTypeConverter.toSparkType(column.dataType()), + sparkTypeConverter.toSparkType(column.dataType()), column.nullable(), metadata); }) diff --git a/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/version/CatalogNameAdaptor.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/version/CatalogNameAdaptor.java new file mode 100644 index 00000000000..4e1ffbb8a69 --- /dev/null +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/version/CatalogNameAdaptor.java @@ -0,0 +1,41 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.version; + +import com.google.common.collect.ImmutableMap; +import java.util.Locale; +import java.util.Map; +import org.apache.spark.package$; +import org.apache.spark.util.VersionUtils$; + +public class CatalogNameAdaptor { + private static final Map catalogNames = + ImmutableMap.of( + "hive-3.3", "com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33", + "hive-3.4", "com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34", + "hive-3.5", "com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35", + "lakehouse-iceberg-3.3", + "com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33", + "lakehouse-iceberg-3.4", + "com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34", + "lakehouse-iceberg-3.5", + "com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35"); + + private static String sparkVersion() { + return package$.MODULE$.SPARK_VERSION(); + } + + private static String getCatalogName(String provider, int majorVersion, int minorVersion) { + String key = + String.format("%s-%d.%d", provider.toLowerCase(Locale.ROOT), majorVersion, minorVersion); + return catalogNames.get(key); + } + + public static String getCatalogName(String provider) { + int majorVersion = VersionUtils$.MODULE$.majorVersion(sparkVersion()); + int minorVersion = VersionUtils$.MODULE$.minorVersion(sparkVersion()); + return getCatalogName(provider, majorVersion, minorVersion); + } +} diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java similarity index 100% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTypeConverter.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTypeConverter.java similarity index 94% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTypeConverter.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTypeConverter.java index 0aca0140335..3bceffceee7 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTypeConverter.java +++ b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTypeConverter.java @@ -45,6 +45,7 @@ public class TestSparkTypeConverter { private Set notSupportGravitinoTypes = ImmutableSet.of(); private Set notSupportSparkTypes = ImmutableSet.of(); + private SparkTypeConverter sparkTypeConverter; @BeforeAll void init() { @@ -62,7 +63,6 @@ void init() { gravitinoToSparkTypeMapper.put(BooleanType.get(), DataTypes.BooleanType); gravitinoToSparkTypeMapper.put(DateType.get(), DataTypes.DateType); gravitinoToSparkTypeMapper.put(TimestampType.withTimeZone(), DataTypes.TimestampType); - gravitinoToSparkTypeMapper.put(TimestampType.withoutTimeZone(), DataTypes.TimestampNTZType); gravitinoToSparkTypeMapper.put( ListType.of(IntegerType.get(), true), DataTypes.createArrayType(DataTypes.IntegerType)); gravitinoToSparkTypeMapper.put( @@ -70,32 +70,34 @@ void init() { DataTypes.createMapType(DataTypes.IntegerType, DataTypes.StringType)); gravitinoToSparkTypeMapper.put(createGravitinoStructType(), createSparkStructType()); gravitinoToSparkTypeMapper.put(NullType.get(), DataTypes.NullType); + + this.sparkTypeConverter = new SparkTypeConverter(); } @Test void testConvertGravitinoTypeToSpark() { gravitinoToSparkTypeMapper.forEach( (gravitinoType, sparkType) -> - Assertions.assertEquals(sparkType, SparkTypeConverter.toSparkType(gravitinoType))); + Assertions.assertEquals(sparkType, sparkTypeConverter.toSparkType(gravitinoType))); notSupportGravitinoTypes.forEach( gravitinoType -> Assertions.assertThrowsExactly( UnsupportedOperationException.class, - () -> SparkTypeConverter.toSparkType(gravitinoType))); + () -> sparkTypeConverter.toSparkType(gravitinoType))); } @Test void testConvertSparkTypeToGravitino() { gravitinoToSparkTypeMapper.forEach( (gravitinoType, sparkType) -> - Assertions.assertEquals(gravitinoType, SparkTypeConverter.toGravitinoType(sparkType))); + Assertions.assertEquals(gravitinoType, sparkTypeConverter.toGravitinoType(sparkType))); notSupportSparkTypes.forEach( sparkType -> Assertions.assertThrowsExactly( UnsupportedOperationException.class, - () -> SparkTypeConverter.toGravitinoType(sparkType))); + () -> sparkTypeConverter.toGravitinoType(sparkType))); } /** Create a Gravitino StructType for testing. */ diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java similarity index 79% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java index e7a4a5887b2..118dab42710 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java +++ b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/catalog/TestTransformTableChange.java @@ -6,22 +6,23 @@ package com.datastrato.gravitino.spark.connector.catalog; import com.datastrato.gravitino.rel.TableChange.UpdateComment; -import com.datastrato.gravitino.rel.expressions.literals.Literals; import com.datastrato.gravitino.spark.connector.ConnectorConstants; -import org.apache.spark.sql.connector.catalog.ColumnDefaultValue; +import com.datastrato.gravitino.spark.connector.SparkTableChangeConverter; +import com.datastrato.gravitino.spark.connector.SparkTypeConverter; import org.apache.spark.sql.connector.catalog.TableChange; -import org.apache.spark.sql.connector.expressions.LiteralValue; import org.apache.spark.sql.types.DataTypes; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class TestTransformTableChange { + private SparkTableChangeConverter sparkTableChangeConverter = + new SparkTableChangeConverter(new SparkTypeConverter()); @Test void testTransformSetProperty() { TableChange sparkSetProperty = TableChange.setProperty("key", "value"); com.datastrato.gravitino.rel.TableChange tableChange = - BaseCatalog.transformTableChange(sparkSetProperty); + sparkTableChangeConverter.toGravitinoTableChange(sparkSetProperty); Assertions.assertTrue( tableChange instanceof com.datastrato.gravitino.rel.TableChange.SetProperty); com.datastrato.gravitino.rel.TableChange.SetProperty gravitinoSetProperty = @@ -34,7 +35,7 @@ void testTransformSetProperty() { void testTransformRemoveProperty() { TableChange sparkRemoveProperty = TableChange.removeProperty("key"); com.datastrato.gravitino.rel.TableChange tableChange = - BaseCatalog.transformTableChange(sparkRemoveProperty); + sparkTableChangeConverter.toGravitinoTableChange(sparkRemoveProperty); Assertions.assertTrue( tableChange instanceof com.datastrato.gravitino.rel.TableChange.RemoveProperty); com.datastrato.gravitino.rel.TableChange.RemoveProperty gravitinoRemoveProperty = @@ -46,7 +47,7 @@ void testTransformRemoveProperty() { void testTransformUpdateComment() { TableChange sparkSetProperty = TableChange.setProperty(ConnectorConstants.COMMENT, "a"); com.datastrato.gravitino.rel.TableChange tableChange = - BaseCatalog.transformTableChange(sparkSetProperty); + sparkTableChangeConverter.toGravitinoTableChange(sparkSetProperty); Assertions.assertTrue( tableChange instanceof com.datastrato.gravitino.rel.TableChange.UpdateComment); Assertions.assertEquals("a", ((UpdateComment) tableChange).getNewComment()); @@ -54,7 +55,7 @@ void testTransformUpdateComment() { TableChange sparkRemoveProperty = TableChange.removeProperty(ConnectorConstants.COMMENT); Assertions.assertThrowsExactly( IllegalArgumentException.class, - () -> BaseCatalog.transformTableChange(sparkRemoveProperty)); + () -> sparkTableChangeConverter.toGravitinoTableChange(sparkRemoveProperty)); } @Test @@ -65,7 +66,7 @@ void testTransformRenameColumn() { TableChange.RenameColumn sparkRenameColumn = (TableChange.RenameColumn) TableChange.renameColumn(oldFieldsName, newFiledName); com.datastrato.gravitino.rel.TableChange gravitinoChange = - BaseCatalog.transformTableChange(sparkRenameColumn); + sparkTableChangeConverter.toGravitinoTableChange(sparkRenameColumn); Assertions.assertTrue( gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.RenameColumn); @@ -84,7 +85,7 @@ void testTransformUpdateColumnComment() { TableChange.UpdateColumnComment updateColumnComment = (TableChange.UpdateColumnComment) TableChange.updateColumnComment(fieldNames, newComment); com.datastrato.gravitino.rel.TableChange gravitinoChange = - BaseCatalog.transformTableChange(updateColumnComment); + sparkTableChangeConverter.toGravitinoTableChange(updateColumnComment); Assertions.assertTrue( gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.UpdateColumnComment); @@ -100,16 +101,12 @@ void testTransformAddColumn() { TableChange.ColumnPosition first = TableChange.ColumnPosition.first(); TableChange.ColumnPosition after = TableChange.ColumnPosition.after("col0"); - ColumnDefaultValue defaultValue = - new ColumnDefaultValue( - "CURRENT_DEFAULT", new LiteralValue("default_value", DataTypes.StringType)); TableChange.AddColumn sparkAddColumnFirst = (TableChange.AddColumn) - TableChange.addColumn( - new String[] {"col1"}, DataTypes.StringType, true, "", first, defaultValue); + TableChange.addColumn(new String[] {"col1"}, DataTypes.StringType, true, "", first); com.datastrato.gravitino.rel.TableChange gravitinoChangeFirst = - BaseCatalog.transformTableChange(sparkAddColumnFirst); + sparkTableChangeConverter.toGravitinoTableChange(sparkAddColumnFirst); Assertions.assertTrue( gravitinoChangeFirst instanceof com.datastrato.gravitino.rel.TableChange.AddColumn); @@ -126,10 +123,9 @@ void testTransformAddColumn() { TableChange.AddColumn sparkAddColumnAfter = (TableChange.AddColumn) - TableChange.addColumn( - new String[] {"col1"}, DataTypes.StringType, true, "", after, defaultValue); + TableChange.addColumn(new String[] {"col1"}, DataTypes.StringType, true, "", after); com.datastrato.gravitino.rel.TableChange gravitinoChangeAfter = - BaseCatalog.transformTableChange(sparkAddColumnAfter); + sparkTableChangeConverter.toGravitinoTableChange(sparkAddColumnAfter); Assertions.assertTrue( gravitinoChangeAfter instanceof com.datastrato.gravitino.rel.TableChange.AddColumn); @@ -146,10 +142,9 @@ void testTransformAddColumn() { TableChange.AddColumn sparkAddColumnDefault = (TableChange.AddColumn) - TableChange.addColumn( - new String[] {"col1"}, DataTypes.StringType, true, "", null, defaultValue); + TableChange.addColumn(new String[] {"col1"}, DataTypes.StringType, true, "", null); com.datastrato.gravitino.rel.TableChange gravitinoChangeDefault = - BaseCatalog.transformTableChange(sparkAddColumnDefault); + sparkTableChangeConverter.toGravitinoTableChange(sparkAddColumnDefault); Assertions.assertTrue( gravitinoChangeDefault instanceof com.datastrato.gravitino.rel.TableChange.AddColumn); @@ -170,7 +165,7 @@ void testTransformDeleteColumn() { TableChange.DeleteColumn sparkDeleteColumn = (TableChange.DeleteColumn) TableChange.deleteColumn(new String[] {"col1"}, true); com.datastrato.gravitino.rel.TableChange gravitinoChange = - BaseCatalog.transformTableChange(sparkDeleteColumn); + sparkTableChangeConverter.toGravitinoTableChange(sparkDeleteColumn); Assertions.assertTrue( gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.DeleteColumn); @@ -187,7 +182,7 @@ void testTransformUpdateColumnType() { (TableChange.UpdateColumnType) TableChange.updateColumnType(new String[] {"col1"}, DataTypes.StringType); com.datastrato.gravitino.rel.TableChange gravitinoChange = - BaseCatalog.transformTableChange(sparkUpdateColumnType); + sparkTableChangeConverter.toGravitinoTableChange(sparkUpdateColumnType); Assertions.assertTrue( gravitinoChange instanceof com.datastrato.gravitino.rel.TableChange.UpdateColumnType); @@ -209,7 +204,7 @@ void testTransformUpdateColumnPosition() { (TableChange.UpdateColumnPosition) TableChange.updateColumnPosition(new String[] {"col1"}, first); com.datastrato.gravitino.rel.TableChange gravitinoChangeFirst = - BaseCatalog.transformTableChange(sparkUpdateColumnFirst); + sparkTableChangeConverter.toGravitinoTableChange(sparkUpdateColumnFirst); Assertions.assertTrue( gravitinoChangeFirst @@ -227,7 +222,7 @@ void testTransformUpdateColumnPosition() { (TableChange.UpdateColumnPosition) TableChange.updateColumnPosition(new String[] {"col1"}, after); com.datastrato.gravitino.rel.TableChange gravitinoChangeAfter = - BaseCatalog.transformTableChange(sparkUpdateColumnAfter); + sparkTableChangeConverter.toGravitinoTableChange(sparkUpdateColumnAfter); Assertions.assertTrue( gravitinoChangeAfter @@ -248,7 +243,7 @@ void testTransformUpdateColumnNullability() { (TableChange.UpdateColumnNullability) TableChange.updateColumnNullability(new String[] {"col1"}, true); com.datastrato.gravitino.rel.TableChange gravitinoChange = - BaseCatalog.transformTableChange(sparkUpdateColumnNullability); + sparkTableChangeConverter.toGravitinoTableChange(sparkUpdateColumnNullability); Assertions.assertTrue( gravitinoChange @@ -262,29 +257,4 @@ void testTransformUpdateColumnNullability() { Assertions.assertEquals( sparkUpdateColumnNullability.nullable(), gravitinoUpdateColumnNullability.nullable()); } - - @Test - void testUpdateColumnDefaultValue() { - String[] fieldNames = new String[] {"col"}; - String newDedauleValue = "col_default_value"; - TableChange.UpdateColumnDefaultValue sparkUpdateColumnDefaultValue = - (TableChange.UpdateColumnDefaultValue) - TableChange.updateColumnDefaultValue(fieldNames, newDedauleValue); - - com.datastrato.gravitino.rel.TableChange gravitinoChange = - BaseCatalog.transformTableChange(sparkUpdateColumnDefaultValue); - - Assertions.assertTrue( - gravitinoChange - instanceof com.datastrato.gravitino.rel.TableChange.UpdateColumnDefaultValue); - com.datastrato.gravitino.rel.TableChange.UpdateColumnDefaultValue - gravitinoUpdateColumnDefaultValue = - (com.datastrato.gravitino.rel.TableChange.UpdateColumnDefaultValue) gravitinoChange; - - Assertions.assertArrayEquals( - sparkUpdateColumnDefaultValue.fieldNames(), gravitinoUpdateColumnDefaultValue.fieldName()); - Assertions.assertEquals( - Literals.stringLiteral(newDedauleValue), - gravitinoUpdateColumnDefaultValue.getNewDefaultValue()); - } } diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java similarity index 100% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/hive/TestHivePropertiesConverter.java diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java similarity index 100% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java similarity index 98% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java index 2fcedf44a65..f4251eb200c 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java +++ b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java @@ -20,9 +20,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; @@ -312,7 +310,8 @@ void testDropTable() { dropTableIfExists(tableName); Assertions.assertEquals(false, tableExists(tableName)); - Assertions.assertThrowsExactly(NoSuchTableException.class, () -> sql("DROP TABLE not_exists")); + // may throw NoSuchTableException or AnalysisException for different spark version + Assertions.assertThrows(Exception.class, () -> sql("DROP TABLE not_exists")); } @Test @@ -337,8 +336,9 @@ void testRenameTable() { () -> sql(String.format("ALTER TABLE %s RENAME TO %s", tableName, newTableName))); // rename a not existing tables - Assertions.assertThrowsExactly( - AnalysisException.class, () -> sql("ALTER TABLE not_exists1 RENAME TO not_exist2")); + // Spark will throw AnalysisException before 3.5, ExtendedAnalysisException in 3.5 + Assertions.assertThrows( + Exception.class, () -> sql("ALTER TABLE not_exists1 RENAME TO not_exist2")); } @Test @@ -917,7 +917,8 @@ private void writeToEmptyTableAndCheckData(String tableName) { String.format( "INSERT INTO %s VALUES (1, '1', 1),(2, '2', 2),(3, '3', 3),(4, '4', 4),(5, '5', 5)", tableName)); - List queryResult = getTableData(tableName); + // Spark3.5 may get the data without orders + List queryResult = getQueryData(getSelectAllSqlWithOrder(tableName, "id")); Assertions.assertEquals(5, queryResult.size()); Assertions.assertEquals("1,1,1;2,2,2;3,3,3;4,4,4;5,5,5", String.join(";", queryResult)); } diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkEnvIT.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkEnvIT.java similarity index 100% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkEnvIT.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkEnvIT.java diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java similarity index 99% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java index 2c4a4e03d30..497ad3d4a41 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java @@ -28,7 +28,7 @@ @Tag("gravitino-docker-it") @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class SparkHiveCatalogIT extends SparkCommonIT { +public abstract class SparkHiveCatalogIT extends SparkCommonIT { @Override protected String getCatalogName() { diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT.java similarity index 92% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT.java index 573dafc42bc..d8af85fbf04 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT.java +++ b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT.java @@ -13,7 +13,7 @@ /** This class use Iceberg HiveCatalog for backend catalog. */ @Tag("gravitino-docker-it") @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class SparkIcebergCatalogHiveBackendIT extends SparkIcebergCatalogIT { +public abstract class SparkIcebergCatalogHiveBackendIT extends SparkIcebergCatalogIT { @Override protected Map getCatalogConfigs() { diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java similarity index 99% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java index 3628dd3dfd9..07683cbeecd 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java @@ -330,7 +330,7 @@ void testIcebergReservedProperties() throws NoSuchTableException { dropTableIfExists(tableName); sql( String.format( - "CREATE TABLE %s (id INT COMMENT 'id comment' NOT NULL, name STRING COMMENT '', age INT)", + "CREATE TABLE %s (id INT NOT NULL COMMENT 'id comment', name STRING COMMENT '', age INT)", tableName)); SparkIcebergTable sparkIcebergTable = getSparkIcebergTableInstance(tableName); diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT.java similarity index 92% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT.java index b3690ea92d4..8f7c5126df9 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT.java +++ b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT.java @@ -13,7 +13,7 @@ /** This class use Iceberg RESTCatalog for test, and the real backend catalog is HiveCatalog. */ @Tag("gravitino-docker-it") @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class SparkIcebergCatalogRestBackendIT extends SparkIcebergCatalogIT { +public abstract class SparkIcebergCatalogRestBackendIT extends SparkIcebergCatalogIT { @Override protected Map getCatalogConfigs() { diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkMetadataColumnInfo.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkMetadataColumnInfo.java similarity index 100% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkMetadataColumnInfo.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkMetadataColumnInfo.java diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkTableInfo.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkTableInfo.java similarity index 100% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkTableInfo.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkTableInfo.java diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkTableInfoChecker.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkTableInfoChecker.java similarity index 100% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkTableInfoChecker.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkTableInfoChecker.java diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkUtilIT.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkUtilIT.java similarity index 100% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkUtilIT.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/util/SparkUtilIT.java diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/plugin/TestGravitinoDriverPlugin.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/plugin/TestGravitinoDriverPlugin.java similarity index 100% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/plugin/TestGravitinoDriverPlugin.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/plugin/TestGravitinoDriverPlugin.java diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java similarity index 100% rename from spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java rename to spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/utils/TestConnectorUtil.java diff --git a/spark-connector/spark-connector/src/test/resources/log4j2.properties b/spark-connector/spark-common/src/test/resources/log4j2.properties similarity index 100% rename from spark-connector/spark-connector/src/test/resources/log4j2.properties rename to spark-connector/spark-common/src/test/resources/log4j2.properties diff --git a/spark-connector/v3.3/build.gradle.kts b/spark-connector/v3.3/build.gradle.kts new file mode 100644 index 00000000000..2d4a7344247 --- /dev/null +++ b/spark-connector/v3.3/build.gradle.kts @@ -0,0 +1,8 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +tasks.all { + enabled = false +} \ No newline at end of file diff --git a/spark-connector/spark-connector-runtime/build.gradle.kts b/spark-connector/v3.3/spark-runtime/build.gradle.kts similarity index 73% rename from spark-connector/spark-connector-runtime/build.gradle.kts rename to spark-connector/v3.3/spark-runtime/build.gradle.kts index d60f4de402b..c2a597d531e 100644 --- a/spark-connector/spark-connector-runtime/build.gradle.kts +++ b/spark-connector/v3.3/spark-runtime/build.gradle.kts @@ -11,13 +11,20 @@ plugins { } val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() -val sparkVersion: String = libs.versions.spark.get() +val sparkVersion: String = libs.versions.spark33.get() val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") val baseName = "${rootProject.name}-spark-connector-runtime-${sparkMajorVersion}_$scalaVersion" dependencies { implementation(project(":clients:client-java-runtime", configuration = "shadow")) - implementation(project(":spark-connector:spark-connector")) + when (sparkMajorVersion) { + "3.3" -> { + val kyuubiVersion: String = libs.versions.kyuubi4spark33.get() + implementation(project(":spark-connector:spark-3.3")) + implementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + } + else -> throw IllegalArgumentException("Unsupported Spark version: $sparkMajorVersion") + } } tasks.withType(ShadowJar::class.java) { diff --git a/spark-connector/v3.3/spark/build.gradle.kts b/spark-connector/v3.3/spark/build.gradle.kts new file mode 100644 index 00000000000..bc078d86e25 --- /dev/null +++ b/spark-connector/v3.3/spark/build.gradle.kts @@ -0,0 +1,139 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +plugins { + `maven-publish` + id("java") + id("idea") + alias(libs.plugins.shadow) +} + +repositories { + mavenCentral() +} + +val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() +val sparkVersion: String = libs.versions.spark33.get() +val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") +val icebergVersion: String = libs.versions.iceberg4spark.get() +val kyuubiVersion: String = libs.versions.kyuubi4spark33.get() +val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() +val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() + +dependencies { + implementation(project(":spark-connector:spark-common")) + compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") { + exclude("com.fasterxml.jackson") + } + compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") + + testImplementation(project(":api")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":clients:client-java")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":core")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":common")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":integration-test-common", "testArtifacts")) + testImplementation(project(":server")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":server-common")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":spark-connector:spark-common", "testArtifacts")) { + exclude("com.fasterxml.jackson") + } + + testImplementation(libs.hive2.common) { + exclude("com.sun.jersey") + exclude("org.apache.curator") + // use hadoop from Spark + exclude("org.apache.hadoop") + exclude("org.apache.logging.log4j") + exclude("org.eclipse.jetty.aggregate", "jetty-all") + exclude("org.eclipse.jetty.orbit", "javax.servlet") + } + + testImplementation(libs.hive2.metastore) { + exclude("co.cask.tephra") + exclude("com.github.joshelser") + exclude("com.google.code.findbugs", "jsr305") + exclude("com.google.code.findbugs", "sr305") + exclude("com.tdunning", "json") + exclude("com.zaxxer", "HikariCP") + exclude("com.sun.jersey") + exclude("io.dropwizard.metricss") + exclude("javax.transaction", "transaction-api") + exclude("org.apache.avro") + exclude("org.apache.curator") + exclude("org.apache.hbase") + exclude("org.apache.hadoop") + exclude("org.apache.hive", "hive-common") + exclude("org.apache.hive", "hive-shims") + exclude("org.apache.logging.log4j") + exclude("org.apache.parquet", "parquet-hadoop-bundle") + exclude("org.apache.zookeeper") + exclude("org.eclipse.jetty.aggregate", "jetty-all") + exclude("org.eclipse.jetty.orbit", "javax.servlet") + exclude("org.slf4j") + } + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.junit.jupiter.params) + testImplementation(libs.mysql.driver) + testImplementation(libs.testcontainers) + + // org.apache.iceberg.rest.RESTSerializers#registerAll(ObjectMapper) has different method signature for iceberg-core and iceberg-spark-runtime package, we must make sure iceberg-core is in front to start up MiniGravitino server. + testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion") + testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") + testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion") + testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + // include spark-sql,spark-catalyst,hive-common,hdfs-client + testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") { + // conflict with Gravitino server jersey + exclude("org.glassfish.jersey.core") + exclude("org.glassfish.jersey.containers") + exclude("org.glassfish.jersey.inject") + exclude("com.sun.jersey") + exclude("com.fasterxml.jackson") + exclude("com.fasterxml.jackson.core") + } + testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion") + + testRuntimeOnly(libs.junit.jupiter.engine) +} + +tasks.test { + val skipUTs = project.hasProperty("skipTests") + if (skipUTs) { + // Only run integration tests + include("**/integration/**") + } + + val skipITs = project.hasProperty("skipITs") + val skipSparkITs = project.hasProperty("skipSparkITs") + if (skipITs || skipSparkITs) { + // Exclude integration tests + exclude("**/integration/**") + } else { + dependsOn(tasks.jar) + + doFirst { + environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12") + } + + val init = project.extra.get("initIntegrationTest") as (Test) -> Unit + init(this) + } +} + +tasks.clean { + delete("spark-warehouse") +} diff --git a/spark-connector/v3.3/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark33.java b/spark-connector/v3.3/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark33.java new file mode 100644 index 00000000000..6170b0a55f9 --- /dev/null +++ b/spark-connector/v3.3/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark33.java @@ -0,0 +1,7 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.hive; + +public class GravitinoHiveCatalogSpark33 extends GravitinoHiveCatalog {} diff --git a/spark-connector/v3.3/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark33.java b/spark-connector/v3.3/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark33.java new file mode 100644 index 00000000000..a560e1c4527 --- /dev/null +++ b/spark-connector/v3.3/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark33.java @@ -0,0 +1,7 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.iceberg; + +public class GravitinoIcebergCatalogSpark33 extends GravitinoIcebergCatalog {} diff --git a/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java b/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java new file mode 100644 index 00000000000..b8cdf5225ba --- /dev/null +++ b/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT33.java @@ -0,0 +1,21 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.integration.test.hive; + +import com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkHiveCatalogIT33 extends SparkHiveCatalogIT { + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoHiveCatalogSpark33.class.getName(), catalogClass); + } +} diff --git a/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java b/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java new file mode 100644 index 00000000000..07b80d7c141 --- /dev/null +++ b/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT33.java @@ -0,0 +1,21 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.integration.test.iceberg; + +import com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkIcebergCatalogHiveBackendIT33 extends SparkIcebergCatalogHiveBackendIT { + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(), catalogClass); + } +} diff --git a/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java b/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java new file mode 100644 index 00000000000..4153145aa2e --- /dev/null +++ b/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT33.java @@ -0,0 +1,8 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.integration.test.iceberg; + +public class SparkIcebergCatalogRestBackendIT33 extends SparkIcebergCatalogRestBackendIT {} diff --git a/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java b/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java new file mode 100644 index 00000000000..bdada706b0e --- /dev/null +++ b/spark-connector/v3.3/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java @@ -0,0 +1,21 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.version; + +import com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33; +import com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestCatalogNameAdaptor { + @Test + void testSpark33() { + String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive"); + Assertions.assertEquals(GravitinoHiveCatalogSpark33.class.getName(), hiveCatalogName); + + String icebergCatalogName = CatalogNameAdaptor.getCatalogName("lakehouse-iceberg"); + Assertions.assertEquals(GravitinoIcebergCatalogSpark33.class.getName(), icebergCatalogName); + } +} diff --git a/spark-connector/v3.3/spark/src/test/resources/log4j2.properties b/spark-connector/v3.3/spark/src/test/resources/log4j2.properties new file mode 100644 index 00000000000..ce232cc5cf2 --- /dev/null +++ b/spark-connector/v3.3/spark/src/test/resources/log4j2.properties @@ -0,0 +1,33 @@ +# +# Copyright 2024 Datastrato Pvt Ltd. +# This software is licensed under the Apache License version 2. +# + +# Set to debug or trace if log4j initialization is failing +status = info + +# Name of the configuration +name = ConsoleLogConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = consoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n + +# Log files location +property.logPath = ${sys:gravitino.log.path:-integration-test/build/integration-test.log} + +# File appender configuration +appender.file.type = File +appender.file.name = fileLogger +appender.file.fileName = ${logPath} +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n + +# Root logger level +rootLogger.level = info + +# Root logger referring to console and file appenders +rootLogger.appenderRef.stdout.ref = consoleLogger +rootLogger.appenderRef.file.ref = fileLogger diff --git a/spark-connector/v3.4/build.gradle.kts b/spark-connector/v3.4/build.gradle.kts new file mode 100644 index 00000000000..2d4a7344247 --- /dev/null +++ b/spark-connector/v3.4/build.gradle.kts @@ -0,0 +1,8 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +tasks.all { + enabled = false +} \ No newline at end of file diff --git a/spark-connector/v3.4/spark-runtime/build.gradle.kts b/spark-connector/v3.4/spark-runtime/build.gradle.kts new file mode 100644 index 00000000000..c4af53d91b2 --- /dev/null +++ b/spark-connector/v3.4/spark-runtime/build.gradle.kts @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar + +plugins { + `maven-publish` + id("java") + alias(libs.plugins.shadow) +} + +val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() +val sparkVersion: String = libs.versions.spark34.get() +val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") +val baseName = "${rootProject.name}-spark-connector-runtime-${sparkMajorVersion}_$scalaVersion" + +dependencies { + implementation(project(":clients:client-java-runtime", configuration = "shadow")) + when (sparkMajorVersion) { + "3.4" -> { + val kyuubiVersion: String = libs.versions.kyuubi4spark34.get() + implementation(project(":spark-connector:spark-3.4")) + implementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + } + else -> throw IllegalArgumentException("Unsupported Spark version: $sparkMajorVersion") + } +} + +tasks.withType(ShadowJar::class.java) { + isZip64 = true + configurations = listOf(project.configurations.runtimeClasspath.get()) + archiveFileName.set("$baseName-$version.jar") + archiveClassifier.set("") + + // Relocate dependencies to avoid conflicts + relocate("com.google", "com.datastrato.gravitino.shaded.com.google") + relocate("google", "com.datastrato.gravitino.shaded.google") + relocate("org.apache.hc", "com.datastrato.gravitino.shaded.org.apache.hc") +} + +tasks.jar { + dependsOn(tasks.named("shadowJar")) + archiveClassifier.set("empty") +} diff --git a/spark-connector/spark-connector/build.gradle.kts b/spark-connector/v3.4/spark/build.gradle.kts similarity index 71% rename from spark-connector/spark-connector/build.gradle.kts rename to spark-connector/v3.4/spark/build.gradle.kts index 1e9b2783359..811844c56c4 100644 --- a/spark-connector/spark-connector/build.gradle.kts +++ b/spark-connector/v3.4/spark/build.gradle.kts @@ -14,46 +14,55 @@ repositories { } val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() -val sparkVersion: String = libs.versions.spark.get() +val sparkVersion: String = libs.versions.spark34.get() val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") -val icebergVersion: String = libs.versions.iceberg.get() -val kyuubiVersion: String = libs.versions.kyuubi.get() +val icebergVersion: String = libs.versions.iceberg4spark.get() +val kyuubiVersion: String = libs.versions.kyuubi4spark34.get() val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() dependencies { - implementation(project(":catalogs:bundled-catalog", configuration = "shadow")) - implementation(libs.guava) - implementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") - - // unable to run IT in embedded mode if including client-java-runtime and common module + implementation(project(":spark-connector:spark-common")) + compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") { + exclude("com.fasterxml.jackson") + } compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") - compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") - compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") - compileOnly("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion") - annotationProcessor(libs.lombok) - compileOnly(libs.lombok) - - testAnnotationProcessor(libs.lombok) - testCompileOnly(libs.lombok) - testImplementation(project(":api")) - testImplementation(project(":clients:client-java")) - testImplementation(project(":core")) - testImplementation(project(":common")) + testImplementation(project(":api")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":clients:client-java")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":core")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":common")) { + exclude("org.apache.logging.log4j") + } testImplementation(project(":integration-test-common", "testArtifacts")) - testImplementation(project(":server")) - testImplementation(project(":server-common")) + testImplementation(project(":server")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":server-common")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":spark-connector:spark-common", "testArtifacts")) { + exclude("com.fasterxml.jackson") + } testImplementation(libs.hive2.common) { exclude("com.sun.jersey") exclude("org.apache.curator") + exclude("org.apache.logging.log4j") // use hadoop from Spark exclude("org.apache.hadoop") exclude("org.eclipse.jetty.aggregate", "jetty-all") exclude("org.eclipse.jetty.orbit", "javax.servlet") } + testImplementation(libs.hive2.metastore) { exclude("co.cask.tephra") exclude("com.github.joshelser") @@ -68,6 +77,9 @@ dependencies { exclude("org.apache.curator") exclude("org.apache.hbase") exclude("org.apache.hadoop") + exclude("org.apache.hive", "hive-common") + exclude("org.apache.hive", "hive-shims") + exclude("org.apache.logging.log4j") exclude("org.apache.parquet", "parquet-hadoop-bundle") exclude("org.apache.zookeeper") exclude("org.eclipse.jetty.aggregate", "jetty-all") @@ -83,6 +95,7 @@ dependencies { testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion") testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion") + testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") // include spark-sql,spark-catalyst,hive-common,hdfs-client testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") { // conflict with Gravitino server jersey @@ -90,6 +103,8 @@ dependencies { exclude("org.glassfish.jersey.containers") exclude("org.glassfish.jersey.inject") exclude("com.sun.jersey") + exclude("com.fasterxml.jackson") + exclude("com.fasterxml.jackson.core") } testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion") @@ -104,7 +119,8 @@ tasks.test { } val skipITs = project.hasProperty("skipITs") - if (skipITs) { + val skipSparkITs = project.hasProperty("skipSparkITs") + if (skipITs || skipSparkITs) { // Exclude integration tests exclude("**/integration/**") } else { diff --git a/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/SparkTableChangeConverter34.java b/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/SparkTableChangeConverter34.java new file mode 100644 index 00000000000..9f8a3804d5c --- /dev/null +++ b/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/SparkTableChangeConverter34.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector; + +import com.datastrato.gravitino.rel.expressions.literals.Literals; +import org.apache.spark.sql.connector.catalog.TableChange; + +public class SparkTableChangeConverter34 extends SparkTableChangeConverter { + public SparkTableChangeConverter34(SparkTypeConverter sparkTypeConverter) { + super(sparkTypeConverter); + } + + public com.datastrato.gravitino.rel.TableChange toGravitinoTableChange(TableChange change) { + if (change instanceof TableChange.UpdateColumnDefaultValue) { + TableChange.UpdateColumnDefaultValue updateColumnDefaultValue = + (TableChange.UpdateColumnDefaultValue) change; + return com.datastrato.gravitino.rel.TableChange.updateColumnDefaultValue( + updateColumnDefaultValue.fieldNames(), + Literals.stringLiteral(updateColumnDefaultValue.newDefaultValue())); + } else { + return super.toGravitinoTableChange(change); + } + } +} diff --git a/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/SparkTypeConverter34.java b/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/SparkTypeConverter34.java new file mode 100644 index 00000000000..fda77e3d876 --- /dev/null +++ b/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/SparkTypeConverter34.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector; + +import com.datastrato.gravitino.rel.types.Type; +import com.datastrato.gravitino.rel.types.Types; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.TimestampNTZType; + +public class SparkTypeConverter34 extends SparkTypeConverter { + + public Type toGravitinoType(DataType sparkType) { + if (sparkType instanceof TimestampNTZType) { + return Types.TimestampType.withoutTimeZone(); + } else { + return super.toGravitinoType(sparkType); + } + } + + public DataType toSparkType(Type gravitinoType) { + if (gravitinoType instanceof Types.TimestampType + && ((Types.TimestampType) gravitinoType).hasTimeZone() == false) { + return DataTypes.TimestampNTZType; + } else { + return super.toSparkType(gravitinoType); + } + } +} diff --git a/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark34.java b/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark34.java new file mode 100644 index 00000000000..fee8569cc9b --- /dev/null +++ b/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark34.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.hive; + +import com.datastrato.gravitino.spark.connector.SparkTableChangeConverter; +import com.datastrato.gravitino.spark.connector.SparkTableChangeConverter34; +import com.datastrato.gravitino.spark.connector.SparkTypeConverter; +import com.datastrato.gravitino.spark.connector.SparkTypeConverter34; + +public class GravitinoHiveCatalogSpark34 extends GravitinoHiveCatalog { + @Override + protected SparkTypeConverter getSparkTypeConverter() { + return new SparkTypeConverter34(); + } + + @Override + protected SparkTableChangeConverter getSparkTableChangeConverter( + SparkTypeConverter sparkTypeConverter) { + return new SparkTableChangeConverter34(sparkTypeConverter); + } +} diff --git a/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark34.java b/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark34.java new file mode 100644 index 00000000000..c537b0c3be5 --- /dev/null +++ b/spark-connector/v3.4/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark34.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.iceberg; + +import com.datastrato.gravitino.spark.connector.SparkTableChangeConverter; +import com.datastrato.gravitino.spark.connector.SparkTableChangeConverter34; +import com.datastrato.gravitino.spark.connector.SparkTypeConverter; +import com.datastrato.gravitino.spark.connector.SparkTypeConverter34; + +public class GravitinoIcebergCatalogSpark34 extends GravitinoIcebergCatalog { + @Override + protected SparkTypeConverter getSparkTypeConverter() { + return new SparkTypeConverter34(); + } + + @Override + protected SparkTableChangeConverter getSparkTableChangeConverter( + SparkTypeConverter sparkTypeConverter) { + return new SparkTableChangeConverter34(sparkTypeConverter); + } +} diff --git a/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTableChangeConverter34.java b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTableChangeConverter34.java new file mode 100644 index 00000000000..b455eceddb5 --- /dev/null +++ b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTableChangeConverter34.java @@ -0,0 +1,41 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector; + +import com.datastrato.gravitino.rel.expressions.literals.Literals; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestSparkTableChangeConverter34 { + + private SparkTableChangeConverter sparkTableChangeConverter = + new SparkTableChangeConverter34(new SparkTypeConverter34()); + + @Test + void testUpdateColumnDefaultValue() { + String[] fieldNames = new String[] {"col"}; + String defaultValue = "col_default_value"; + TableChange.UpdateColumnDefaultValue sparkUpdateColumnDefaultValue = + (TableChange.UpdateColumnDefaultValue) + TableChange.updateColumnDefaultValue(fieldNames, defaultValue); + + com.datastrato.gravitino.rel.TableChange gravitinoChange = + sparkTableChangeConverter.toGravitinoTableChange(sparkUpdateColumnDefaultValue); + + Assertions.assertTrue( + gravitinoChange + instanceof com.datastrato.gravitino.rel.TableChange.UpdateColumnDefaultValue); + com.datastrato.gravitino.rel.TableChange.UpdateColumnDefaultValue + gravitinoUpdateColumnDefaultValue = + (com.datastrato.gravitino.rel.TableChange.UpdateColumnDefaultValue) gravitinoChange; + + Assertions.assertArrayEquals( + sparkUpdateColumnDefaultValue.fieldNames(), gravitinoUpdateColumnDefaultValue.fieldName()); + Assertions.assertEquals( + Literals.stringLiteral(defaultValue), + gravitinoUpdateColumnDefaultValue.getNewDefaultValue()); + } +} diff --git a/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTypeConverter34.java b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTypeConverter34.java new file mode 100644 index 00000000000..4b57722fa81 --- /dev/null +++ b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTypeConverter34.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector; + +import com.datastrato.gravitino.rel.types.Types.TimestampType; +import org.apache.spark.sql.types.DataTypes; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; + +@TestInstance(Lifecycle.PER_CLASS) +public class TestSparkTypeConverter34 { + private SparkTypeConverter sparkTypeConverter = new SparkTypeConverter34(); + + @Test + void testTimestampNTZ() { + Assertions.assertEquals( + TimestampType.withoutTimeZone(), + sparkTypeConverter.toGravitinoType(DataTypes.TimestampNTZType)); + Assertions.assertEquals( + DataTypes.TimestampNTZType, + sparkTypeConverter.toSparkType(TimestampType.withoutTimeZone())); + } +} diff --git a/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT34.java b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT34.java new file mode 100644 index 00000000000..883f19836ce --- /dev/null +++ b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT34.java @@ -0,0 +1,21 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.integration.test.hive; + +import com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkHiveCatalogIT34 extends SparkHiveCatalogIT { + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoHiveCatalogSpark34.class.getName(), catalogClass); + } +} diff --git a/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT34.java b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT34.java new file mode 100644 index 00000000000..3e1c7228c03 --- /dev/null +++ b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT34.java @@ -0,0 +1,22 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.integration.test.iceberg; + +import com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkIcebergCatalogHiveBackendIT34 extends SparkIcebergCatalogHiveBackendIT { + + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoIcebergCatalogSpark34.class.getName(), catalogClass); + } +} diff --git a/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT34.java b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT34.java new file mode 100644 index 00000000000..a7bac70a576 --- /dev/null +++ b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT34.java @@ -0,0 +1,8 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.integration.test.iceberg; + +public class SparkIcebergCatalogRestBackendIT34 extends SparkIcebergCatalogRestBackendIT {} diff --git a/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java new file mode 100644 index 00000000000..c4c6e646c23 --- /dev/null +++ b/spark-connector/v3.4/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java @@ -0,0 +1,21 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.version; + +import com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark34; +import com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark34; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestCatalogNameAdaptor { + @Test + void testSpark34() { + String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive"); + Assertions.assertEquals(GravitinoHiveCatalogSpark34.class.getName(), hiveCatalogName); + + String icebergCatalogName = CatalogNameAdaptor.getCatalogName("lakehouse-iceberg"); + Assertions.assertEquals(GravitinoIcebergCatalogSpark34.class.getName(), icebergCatalogName); + } +} diff --git a/spark-connector/v3.4/spark/src/test/resources/log4j2.properties b/spark-connector/v3.4/spark/src/test/resources/log4j2.properties new file mode 100644 index 00000000000..ce232cc5cf2 --- /dev/null +++ b/spark-connector/v3.4/spark/src/test/resources/log4j2.properties @@ -0,0 +1,33 @@ +# +# Copyright 2024 Datastrato Pvt Ltd. +# This software is licensed under the Apache License version 2. +# + +# Set to debug or trace if log4j initialization is failing +status = info + +# Name of the configuration +name = ConsoleLogConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = consoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n + +# Log files location +property.logPath = ${sys:gravitino.log.path:-integration-test/build/integration-test.log} + +# File appender configuration +appender.file.type = File +appender.file.name = fileLogger +appender.file.fileName = ${logPath} +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n + +# Root logger level +rootLogger.level = info + +# Root logger referring to console and file appenders +rootLogger.appenderRef.stdout.ref = consoleLogger +rootLogger.appenderRef.file.ref = fileLogger diff --git a/spark-connector/v3.5/build.gradle.kts b/spark-connector/v3.5/build.gradle.kts new file mode 100644 index 00000000000..2d4a7344247 --- /dev/null +++ b/spark-connector/v3.5/build.gradle.kts @@ -0,0 +1,8 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +tasks.all { + enabled = false +} \ No newline at end of file diff --git a/spark-connector/v3.5/spark-runtime/build.gradle.kts b/spark-connector/v3.5/spark-runtime/build.gradle.kts new file mode 100644 index 00000000000..5fccd0368fd --- /dev/null +++ b/spark-connector/v3.5/spark-runtime/build.gradle.kts @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar + +plugins { + `maven-publish` + id("java") + alias(libs.plugins.shadow) +} + +val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() +val sparkVersion: String = libs.versions.spark35.get() +val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") +val baseName = "${rootProject.name}-spark-connector-runtime-${sparkMajorVersion}_$scalaVersion" + +dependencies { + implementation(project(":clients:client-java-runtime", configuration = "shadow")) + when (sparkMajorVersion) { + "3.5" -> { + val kyuubiVersion: String = libs.versions.kyuubi4spark35.get() + implementation(project(":spark-connector:spark-3.5")) + implementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + } + else -> throw IllegalArgumentException("Unsupported Spark version: $sparkMajorVersion") + } +} + +tasks.withType(ShadowJar::class.java) { + isZip64 = true + configurations = listOf(project.configurations.runtimeClasspath.get()) + archiveFileName.set("$baseName-$version.jar") + archiveClassifier.set("") + + // Relocate dependencies to avoid conflicts + relocate("com.google", "com.datastrato.gravitino.shaded.com.google") + relocate("google", "com.datastrato.gravitino.shaded.google") + relocate("org.apache.hc", "com.datastrato.gravitino.shaded.org.apache.hc") +} + +tasks.jar { + dependsOn(tasks.named("shadowJar")) + archiveClassifier.set("empty") +} diff --git a/spark-connector/v3.5/spark/build.gradle.kts b/spark-connector/v3.5/spark/build.gradle.kts new file mode 100644 index 00000000000..1f83b55b568 --- /dev/null +++ b/spark-connector/v3.5/spark/build.gradle.kts @@ -0,0 +1,141 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +plugins { + `maven-publish` + id("java") + id("idea") + alias(libs.plugins.shadow) +} + +repositories { + mavenCentral() +} + +val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() +val sparkVersion: String = libs.versions.spark35.get() +val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") +val icebergVersion: String = libs.versions.iceberg4spark.get() +val kyuubiVersion: String = libs.versions.kyuubi4spark35.get() +val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() +val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() + +dependencies { + implementation(project(":spark-connector:spark-3.4")) + implementation(project(":spark-connector:spark-common")) + compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion") { + exclude("com.fasterxml.jackson") + } + compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) + compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") + + testImplementation(project(":api")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":clients:client-java")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":core")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":common")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":integration-test-common", "testArtifacts")) + testImplementation(project(":server")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":server-common")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":spark-connector:spark-common", "testArtifacts")) { + exclude("com.fasterxml.jackson") + } + + testImplementation(libs.hive2.common) { + exclude("com.sun.jersey") + exclude("org.apache.curator") + exclude("org.apache.logging.log4j") + // use hadoop from Spark + exclude("org.apache.hadoop") + exclude("org.eclipse.jetty.aggregate", "jetty-all") + exclude("org.eclipse.jetty.orbit", "javax.servlet") + } + + testImplementation(libs.hive2.metastore) { + exclude("co.cask.tephra") + exclude("com.github.joshelser") + exclude("com.google.code.findbugs", "jsr305") + exclude("com.google.code.findbugs", "sr305") + exclude("com.tdunning", "json") + exclude("com.zaxxer", "HikariCP") + exclude("com.sun.jersey") + exclude("io.dropwizard.metricss") + exclude("javax.transaction", "transaction-api") + exclude("org.apache.avro") + exclude("org.apache.curator") + exclude("org.apache.hbase") + exclude("org.apache.hadoop") + exclude("org.apache.hive", "hive-common") + exclude("org.apache.hive", "hive-shims") + exclude("org.apache.logging.log4j") + exclude("org.apache.parquet", "parquet-hadoop-bundle") + exclude("org.apache.zookeeper") + exclude("org.eclipse.jetty.aggregate", "jetty-all") + exclude("org.eclipse.jetty.orbit", "javax.servlet") + exclude("org.slf4j") + } + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.junit.jupiter.params) + testImplementation(libs.mysql.driver) + testImplementation(libs.testcontainers) + + // org.apache.iceberg.rest.RESTSerializers#registerAll(ObjectMapper) has different method signature for iceberg-core and iceberg-spark-runtime package, we must make sure iceberg-core is in front to start up MiniGravitino server. + testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion") + testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") + testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion") + testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + // include spark-sql,spark-catalyst,hive-common,hdfs-client + testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") { + // conflict with Gravitino server jersey + exclude("org.glassfish.jersey.core") + exclude("org.glassfish.jersey.containers") + exclude("org.glassfish.jersey.inject") + exclude("com.sun.jersey") + exclude("com.fasterxml.jackson") + exclude("com.fasterxml.jackson.core") + } + testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion") + + testRuntimeOnly(libs.junit.jupiter.engine) +} + +tasks.test { + val skipUTs = project.hasProperty("skipTests") + if (skipUTs) { + // Only run integration tests + include("**/integration/**") + } + + val skipITs = project.hasProperty("skipITs") + val skipSparkITs = project.hasProperty("skipSparkITs") + if (skipITs || skipSparkITs) { + // Exclude integration tests + exclude("**/integration/**") + } else { + dependsOn(tasks.jar) + + doFirst { + environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12") + } + + val init = project.extra.get("initIntegrationTest") as (Test) -> Unit + init(this) + } +} + +tasks.clean { + delete("spark-warehouse") +} diff --git a/spark-connector/v3.5/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark35.java b/spark-connector/v3.5/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark35.java new file mode 100644 index 00000000000..10735436582 --- /dev/null +++ b/spark-connector/v3.5/spark/src/main/java/com/datastrato/gravitino/spark/connector/hive/GravitinoHiveCatalogSpark35.java @@ -0,0 +1,7 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.hive; + +public class GravitinoHiveCatalogSpark35 extends GravitinoHiveCatalogSpark34 {} diff --git a/spark-connector/v3.5/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark35.java b/spark-connector/v3.5/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark35.java new file mode 100644 index 00000000000..92f70accde6 --- /dev/null +++ b/spark-connector/v3.5/spark/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/GravitinoIcebergCatalogSpark35.java @@ -0,0 +1,7 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.iceberg; + +public class GravitinoIcebergCatalogSpark35 extends GravitinoIcebergCatalogSpark34 {} diff --git a/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT35.java b/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT35.java new file mode 100644 index 00000000000..0915dc274d6 --- /dev/null +++ b/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT35.java @@ -0,0 +1,21 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.integration.test.hive; + +import com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkHiveCatalogIT35 extends SparkHiveCatalogIT { + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoHiveCatalogSpark35.class.getName(), catalogClass); + } +} diff --git a/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT35.java b/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT35.java new file mode 100644 index 00000000000..e7f94490582 --- /dev/null +++ b/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogHiveBackendIT35.java @@ -0,0 +1,22 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.integration.test.iceberg; + +import com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SparkIcebergCatalogHiveBackendIT35 extends SparkIcebergCatalogHiveBackendIT { + + @Test + void testCatalogClassName() { + String catalogClass = + getSparkSession() + .sessionState() + .conf() + .getConfString("spark.sql.catalog." + getCatalogName()); + Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(), catalogClass); + } +} diff --git a/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT35.java b/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT35.java new file mode 100644 index 00000000000..0132c06964d --- /dev/null +++ b/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT35.java @@ -0,0 +1,8 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.integration.test.iceberg; + +public class SparkIcebergCatalogRestBackendIT35 extends SparkIcebergCatalogRestBackendIT {} diff --git a/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java b/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java new file mode 100644 index 00000000000..dba22816c21 --- /dev/null +++ b/spark-connector/v3.5/spark/src/test/java/com/datastrato/gravitino/spark/connector/version/TestCatalogNameAdaptor.java @@ -0,0 +1,21 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.spark.connector.version; + +import com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark35; +import com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark35; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestCatalogNameAdaptor { + @Test + void testSpark35() { + String hiveCatalogName = CatalogNameAdaptor.getCatalogName("hive"); + Assertions.assertEquals(GravitinoHiveCatalogSpark35.class.getName(), hiveCatalogName); + + String icebergCatalogName = CatalogNameAdaptor.getCatalogName("lakehouse-iceberg"); + Assertions.assertEquals(GravitinoIcebergCatalogSpark35.class.getName(), icebergCatalogName); + } +} diff --git a/spark-connector/v3.5/spark/src/test/resources/log4j2.properties b/spark-connector/v3.5/spark/src/test/resources/log4j2.properties new file mode 100644 index 00000000000..ce232cc5cf2 --- /dev/null +++ b/spark-connector/v3.5/spark/src/test/resources/log4j2.properties @@ -0,0 +1,33 @@ +# +# Copyright 2024 Datastrato Pvt Ltd. +# This software is licensed under the Apache License version 2. +# + +# Set to debug or trace if log4j initialization is failing +status = info + +# Name of the configuration +name = ConsoleLogConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = consoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n + +# Log files location +property.logPath = ${sys:gravitino.log.path:-integration-test/build/integration-test.log} + +# File appender configuration +appender.file.type = File +appender.file.name = fileLogger +appender.file.fileName = ${logPath} +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n + +# Root logger level +rootLogger.level = info + +# Root logger referring to console and file appenders +rootLogger.appenderRef.stdout.ref = consoleLogger +rootLogger.appenderRef.file.ref = fileLogger