From c5dfbf2e498df00b0059e95d1c31565c5cfc14fe Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Mon, 22 Apr 2024 13:15:15 -0700 Subject: [PATCH] Add iceberg support to EMR serverless jobs. (#2602) (#2608) (cherry picked from commit 39c022271275aa1723f4f3cebf96e78e515c1722) Signed-off-by: Adi Suresh Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../asyncquery/model/SparkSubmitParameters.java | 14 ++++++++++++-- .../sql/spark/data/constants/SparkConstants.java | 11 +++++++++++ .../spark/dispatcher/SparkQueryDispatcherTest.java | 9 ++++++--- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index 11e418f42f..3942c9a772 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -56,7 +56,13 @@ private Builder() { DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); config.put( SPARK_JAR_PACKAGES_KEY, - SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE); + SPARK_STANDALONE_PACKAGE + + "," + + SPARK_LAUNCHER_PACKAGE + + "," + + PPL_STANDALONE_PACKAGE + + "," + + ICEBERG_SPARK_RUNTIME_PACKAGE); config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); @@ -67,8 +73,12 @@ private Builder() { config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); - config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); + config.put( + SPARK_SQL_EXTENSIONS_KEY, + ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); + config.put(SPARK_CATALOG, ICEBERG_SESSION_CATALOG); + config.put(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG); } public static Builder builder() { diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index ceb1b4da54..0a574ef730 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -94,4 +94,15 @@ public class SparkConstants { public static final String FLINT_JOB_SESSION_ID = "spark.flint.job.sessionId"; public static final String FLINT_SESSION_CLASS_NAME = "org.apache.spark.sql.FlintREPL"; + + public static final String SPARK_CATALOG = "spark.sql.catalog.spark_catalog"; + public static final String ICEBERG_SESSION_CATALOG = + "org.apache.iceberg.spark.SparkSessionCatalog"; + public static final String ICEBERG_SPARK_EXTENSION = + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"; + public static final String ICEBERG_SPARK_RUNTIME_PACKAGE = + "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0"; + public static final String SPARK_CATALOG_CATALOG_IMPL = + "spark.sql.catalog.spark_catalog.catalog-impl"; + public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog"; } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index da66400769..1f250a0aea 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -950,7 +950,7 @@ private String constructExpectedSparkSubmitParameterString( + " --conf" + " spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory" + " --conf" - + " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT" + + " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0" + " --conf" + " spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots" + " --conf" @@ -965,10 +965,13 @@ private String constructExpectedSparkSubmitParameterString( + " --conf" + " spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider" + " --conf" - + " spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions" + + " spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions" + " --conf" + " spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" - + " --conf" + + " --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog " + + " --conf" + + " spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog " + + " --conf" + " spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" + " --conf" + " spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"