Skip to content

Commit

Permalink
Add iceberg support to EMR serverless jobs. (#2602)
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suresh <[email protected]>
  • Loading branch information
asuresh8 authored Apr 8, 2024
1 parent 36b423c commit 39c0222
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ private Builder() {
DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY);
config.put(
SPARK_JAR_PACKAGES_KEY,
SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE);
SPARK_STANDALONE_PACKAGE
+ ","
+ SPARK_LAUNCHER_PACKAGE
+ ","
+ PPL_STANDALONE_PACKAGE
+ ","
+ ICEBERG_SPARK_RUNTIME_PACKAGE);
config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY);
config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
Expand All @@ -67,8 +73,12 @@ private Builder() {
config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME);
config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH);
config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER);
config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
config.put(
SPARK_SQL_EXTENSIONS_KEY,
ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS);
config.put(SPARK_CATALOG, ICEBERG_SESSION_CATALOG);
config.put(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG);
}

public static Builder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,15 @@ public class SparkConstants {
public static final String FLINT_JOB_SESSION_ID = "spark.flint.job.sessionId";

public static final String FLINT_SESSION_CLASS_NAME = "org.apache.spark.sql.FlintREPL";

public static final String SPARK_CATALOG = "spark.sql.catalog.spark_catalog";
public static final String ICEBERG_SESSION_CATALOG =
"org.apache.iceberg.spark.SparkSessionCatalog";
public static final String ICEBERG_SPARK_EXTENSION =
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
public static final String ICEBERG_SPARK_RUNTIME_PACKAGE =
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0";
public static final String SPARK_CATALOG_CATALOG_IMPL =
"spark.sql.catalog.spark_catalog.catalog-impl";
public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog";
}
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ private String constructExpectedSparkSubmitParameterString(
+ " --conf"
+ " spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory"
+ " --conf"
+ " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT"
+ " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0"
+ " --conf"
+ " spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots"
+ " --conf"
Expand All @@ -968,10 +968,13 @@ private String constructExpectedSparkSubmitParameterString(
+ " --conf"
+ " spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider"
+ " --conf"
+ " spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions"
+ " spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions"
+ " --conf"
+ " spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
+ " --conf"
+ " --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog "
+ " --conf"
+ " spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog "
+ " --conf"
+ " spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"
+ " --conf"
+ " spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"
Expand Down

0 comments on commit 39c0222

Please sign in to comment.