Skip to content

Commit

Permalink
[#3554] feat(spark-connector): support spark multi Version (#3415)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1. split spark connector to spark common which contains common logic and
v3.x which contains adaptor logic
2. add separate GitHub action to do spark IT
3. ./gradlew :spark-connector:spark35-runtime:build to build
corresponding spark connector jars

### Why are the changes needed?

Fix: #3554 

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
existing tests with corresponding spark version
  • Loading branch information
FANNG1 authored May 28, 2024
1 parent 15fdac5 commit 4c61a64
Show file tree
Hide file tree
Showing 82 changed files with 1,495 additions and 253 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/backend-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ jobs:
- meta/**
- server/**
- server-common/**
- spark-connector/**
- trino-connector/**
- web/**
- docs/open-api/**
Expand Down Expand Up @@ -96,7 +95,7 @@ jobs:
- name: Backend Integration Test
id: integrationTest
run: |
./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs -P${{ matrix.backend }} -PskipPyClientITs -PskipFlinkITs
./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs -P${{ matrix.backend }} -PskipPyClientITs -PskipFlinkITs -PskipSparkITs
- name: Upload integrate tests reports
uses: actions/upload-artifact@v3
Expand Down
109 changes: 109 additions & 0 deletions .github/workflows/spark-integration-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
name: Spark Integration Test

# Controls when the workflow will run
on:
# Triggers the workflow on push or pull request events but only for the "main" branch
push:
branches: [ "main", "branch-*" ]
pull_request:
branches: [ "main", "branch-*" ]

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
changes:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
id: filter
with:
filters: |
source_changes:
- .github/**
- api/**
- bin/**
- catalogs/**
- clients/client-java/**
- clients/client-java-runtime/**
- clients/filesystem-hadoop3/**
- clients/filesystem-hadoop3-runtime/**
- common/**
- conf/**
- core/**
- dev/**
- gradle/**
- meta/**
- server/**
- server-common/**
- spark-connector/**
- docs/open-api/**
- build.gradle.kts
- gradle.properties
- gradlew
- setting.gradle.kts
outputs:
source_changes: ${{ steps.filter.outputs.source_changes }}

# Integration test for AMD64 architecture
test-amd64-arch:
needs: changes
if: needs.changes.outputs.source_changes == 'true'
runs-on: ubuntu-latest
timeout-minutes: 90
strategy:
matrix:
architecture: [linux/amd64]
java-version: [ 8, 11, 17 ]
test-mode: [ embedded, deploy ]
env:
PLATFORM: ${{ matrix.architecture }}
steps:
- uses: actions/checkout@v3

- uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java-version }}
distribution: 'temurin'

- name: Set up QEMU
uses: docker/setup-qemu-action@v2

- name: Check required command
run: |
dev/ci/check_commands.sh
- name: Package Gravitino
if : ${{ matrix.test-mode == 'deploy' }}
run: |
./gradlew compileDistribution -x test -PjdkVersion=${{ matrix.java-version }}
- name: Setup debug Github Action
if: ${{ contains(github.event.pull_request.labels.*.name, 'debug action') }}
uses: csexton/debugger-action@master

- name: Free up disk space
run: |
dev/ci/util_free_space.sh
- name: Spark Integration Test
id: integrationTest
run: |
./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.3:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**"
./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.4:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**"
./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.5:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**"
- name: Upload integrate tests reports
uses: actions/upload-artifact@v3
if: ${{ (failure() && steps.integrationTest.outcome == 'failure') || contains(github.event.pull_request.labels.*.name, 'upload log') }}
with:
name: spark-connector-integrate-test-reports-${{ matrix.java-version }}-${{ matrix.test-mode }}
path: |
build/reports
spark-connector/v3.3/spark/build/spark-3.3-integration-test.log
spark-connector/v3.4/spark/build/spark-3.4-integration-test.log
spark-connector/v3.5/spark/build/spark-3.5-integration-test.log
distribution/package/logs/gravitino-server.out
distribution/package/logs/gravitino-server.log
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ tasks {
register("copySubprojectDependencies", Copy::class) {
subprojects.forEach() {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark-connector") && it.name != "trino-connector" &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark") && it.name != "trino-connector" &&
it.name != "integration-test" && it.name != "bundled-catalog" && it.name != "flink-connector"
) {
from(it.configurations.runtimeClasspath)
Expand All @@ -631,7 +631,7 @@ tasks {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("client") &&
!it.name.startsWith("filesystem") &&
!it.name.startsWith("spark-connector") &&
!it.name.startsWith("spark") &&
it.name != "trino-connector" &&
it.name != "integration-test" &&
it.name != "bundled-catalog" &&
Expand Down
2 changes: 1 addition & 1 deletion catalogs/catalog-hive/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ plugins {
}

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark.get()
val sparkVersion: String = libs.versions.spark34.get()
val icebergVersion: String = libs.versions.iceberg.get()
val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get()

Expand Down
2 changes: 1 addition & 1 deletion catalogs/catalog-lakehouse-iceberg/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ plugins {
}

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark.get()
val sparkVersion: String = libs.versions.spark34.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg.get()
val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get()
Expand Down
11 changes: 8 additions & 3 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@ commons-collections4 = "4.4"
commons-dbcp2 = "2.11.0"
caffeine = "2.9.3"
rocksdbjni = "7.10.2"
iceberg = '1.3.1' # 1.4.0 causes test to fail
iceberg = '1.3.1' # used for Gravitino Iceberg catalog and Iceberg REST service
iceberg4spark = "1.4.1" # used for compile spark connector
trino = '426'
spark = "3.4.1" # 3.5.0 causes tests to fail
spark33 = "3.3.4"
spark34 = "3.4.3"
spark35 = "3.5.1"
kyuubi4spark33 = "1.7.4"
kyuubi4spark34 = "1.8.2"
kyuubi4spark35 = "1.9.0"
scala-collection-compat = "2.7.0"
scala-java-compat = "1.0.2"
sqlite-jdbc = "3.42.0.0"
Expand All @@ -49,7 +55,6 @@ selenium = "3.141.59"
rauschig = "1.2.0"
mybatis = "3.5.6"
h2db = "1.4.200"
kyuubi = "1.8.2"
kafka = "3.4.0"
curator = "2.12.0"
awaitility = "4.2.1"
Expand Down
4 changes: 0 additions & 4 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ dependencies {
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation(project(":spark-connector:spark-connector")) {
exclude("org.apache.hadoop", "hadoop-client-api")
exclude("org.apache.hadoop", "hadoop-client-runtime")
}

testImplementation(libs.commons.cli)
testImplementation(libs.commons.lang3)
Expand Down
8 changes: 7 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ include(
"clients:client-python"
)
include("trino-connector")
include("spark-connector:spark-connector", "spark-connector:spark-connector-runtime")
include("spark-connector:spark-common", "spark-connector:spark-3.3", "spark-connector:spark-runtime-3.3", "spark-connector:spark-3.4", "spark-connector:spark-runtime-3.4", "spark-connector:spark-3.5", "spark-connector:spark-runtime-3.5")
project(":spark-connector:spark-3.3").projectDir = file("spark-connector/v3.3/spark")
project(":spark-connector:spark-runtime-3.3").projectDir = file("spark-connector/v3.3/spark-runtime")
project(":spark-connector:spark-3.4").projectDir = file("spark-connector/v3.4/spark")
project(":spark-connector:spark-runtime-3.4").projectDir = file("spark-connector/v3.4/spark-runtime")
project(":spark-connector:spark-3.5").projectDir = file("spark-connector/v3.5/spark")
project(":spark-connector:spark-runtime-3.5").projectDir = file("spark-connector/v3.5/spark-runtime")
include("flink-connector")
include("web")
include("docs")
Expand Down
154 changes: 154 additions & 0 deletions spark-connector/spark-common/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
plugins {
`maven-publish`
id("java")
id("idea")
alias(libs.plugins.shadow)
}

repositories {
mavenCentral()
}

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark33.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg4spark.get()
val kyuubiVersion: String = libs.versions.kyuubi4spark33.get()
val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get()
val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get()

dependencies {
implementation(project(":catalogs:bundled-catalog", configuration = "shadow"))
implementation(libs.guava)

compileOnly(project(":clients:client-java-runtime", configuration = "shadow"))
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
compileOnly("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")

compileOnly("org.apache.spark:spark-catalyst_$scalaVersion:$sparkVersion")
compileOnly("org.apache.spark:spark-core_$scalaVersion:$sparkVersion")
compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
compileOnly("org.scala-lang.modules:scala-java8-compat_$scalaVersion:$scalaJava8CompatVersion")

annotationProcessor(libs.lombok)
compileOnly(libs.lombok)

testAnnotationProcessor(libs.lombok)
testCompileOnly(libs.lombok)

// use log from spark, spark3.3 use low version of log4j, to avoid java.lang.NoSuchMethodError: org.apache.logging.slf4j.Log4jLoggerFactory: method <init>()V not found
testImplementation(project(":api")) {
exclude("org.apache.logging.log4j")
}
testImplementation(project(":clients:client-java")) {
exclude("org.apache.logging.log4j")
}
testImplementation(project(":core")) {
exclude("org.apache.logging.log4j")
}
testImplementation(project(":common")) {
exclude("org.apache.logging.log4j")
}
testImplementation(project(":server")) {
exclude("org.apache.logging.log4j")
}
testImplementation(project(":server-common")) {
exclude("org.apache.logging.log4j")
}
testImplementation(project(":integration-test-common", "testArtifacts"))

testImplementation(libs.hive2.common) {
exclude("org.apache.curator")
// use hadoop from Spark
exclude("org.apache.hadoop")
exclude("org.apache.logging.log4j")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
}
testImplementation(libs.hive2.metastore) {
exclude("co.cask.tephra")
exclude("com.github.joshelser")
exclude("com.google.code.findbugs", "jsr305")
exclude("com.google.code.findbugs", "sr305")
exclude("com.tdunning", "json")
exclude("com.zaxxer", "HikariCP")
exclude("io.dropwizard.metricss")
exclude("javax.transaction", "transaction-api")
exclude("org.apache.avro")
exclude("org.apache.curator")
exclude("org.apache.hbase")
exclude("org.apache.hadoop")
exclude("org.apache.hive", "hive-common")
exclude("org.apache.hive", "hive-shims")
exclude("org.apache.logging.log4j")
exclude("org.apache.parquet", "parquet-hadoop-bundle")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.slf4j")
}
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.mysql.driver)
testImplementation(libs.testcontainers)

testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
// include spark-sql,spark-catalyst,hive-common,hdfs-client
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
// conflict with Gravitino server jersey
exclude("org.glassfish.jersey.core")
exclude("org.glassfish.jersey.containers")
exclude("org.glassfish.jersey.inject")
}
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:$scalaCollectionCompatVersion")

testRuntimeOnly(libs.junit.jupiter.engine)
}

tasks.test {
val skipUTs = project.hasProperty("skipTests")
if (skipUTs) {
// Only run integration tests
include("**/integration/**")
}

val skipITs = project.hasProperty("skipITs")
val skipSparkITs = project.hasProperty("skipSparkITs")
if (skipITs || skipSparkITs) {
// Exclude integration tests
exclude("**/integration/**")
} else {
dependsOn(tasks.jar)

doFirst {
environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12")
}

val init = project.extra.get("initIntegrationTest") as (Test) -> Unit
init(this)
}
}

tasks.clean {
delete("spark-warehouse")
}

val testJar by tasks.registering(Jar::class) {
archiveClassifier.set("tests")
from(sourceSets["test"].output)
}

configurations {
create("testArtifacts")
}

artifacts {
add("testArtifacts", testJar)
}
Loading

0 comments on commit 4c61a64

Please sign in to comment.