Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flags for Iceberg and Lake Formation and Security Lake as a data source type. #2858

Merged
merged 2 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,35 @@ public class SparkConstants {
public static final String FLINT_SESSION_CLASS_NAME = "org.apache.spark.sql.FlintREPL";

public static final String SPARK_CATALOG = "spark.sql.catalog.spark_catalog";
public static final String SPARK_CATALOG_CATALOG_IMPL = SPARK_CATALOG + ".catalog-impl";
public static final String SPARK_CATALOG_CLIENT_REGION = SPARK_CATALOG + ".client.region";
public static final String SPARK_CATALOG_CLIENT_FACTORY = SPARK_CATALOG + ".client.factory";
public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN =
SPARK_CATALOG + ".client.assume-role.arn";
public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION =
SPARK_CATALOG + ".client.assume-role.region";
public static final String SPARK_CATALOG_LF_SESSION_TAG_KEY =
SPARK_CATALOG + ".client.assume-role.tags.LakeFormationAuthorizedCaller";
public static final String SPARK_CATALOG_GLUE_ACCOUNT_ID = SPARK_CATALOG + ".glue.account-id";
public static final String SPARK_CATALOG_GLUE_LF_ENABLED =
SPARK_CATALOG + ".glue.lakeformation-enabled";

public static final String ICEBERG_SESSION_CATALOG =
"org.apache.iceberg.spark.SparkSessionCatalog";
public static final String ICEBERG_SPARK_EXTENSION =
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
public static final String ICEBERG_SPARK_RUNTIME_PACKAGE =
"/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar";
public static final String SPARK_CATALOG_CATALOG_IMPL =
"spark.sql.catalog.spark_catalog.catalog-impl";
public static final String ICEBERG_SPARK_JARS =
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0,software.amazon.awssdk:bundle:2.26.30";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to specify the specific version? When do we notice the issue if we have version inconsistency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hard to say. Some things I observed here is that using the Iceberg version in EMR was causing issues in EMR versions prior to 7.2 (Spark 3.5.1). So specifying iceberg from Maven central is more stable than that. On the AWS version, I'm not sure. The AWS sdk v2 is only used with Iceberg in the EMR 6.x versions, and this version doesn't conflict. That's not to say it couldn't be an issue in EMR 7.x.

public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog";
public static final String ICEBERG_ASSUME_ROLE_CLIENT_FACTORY =
"org.apache.iceberg.aws.AssumeRoleAwsClientFactory";
public static final String ICEBERG_LF_CLIENT_FACTORY =
"org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory";
// The following option is needed in Iceberg 1.5 when reading timestamp types that do not
// contain timezone in parquet files. The timezone is assumed to be GMT.
public static final String ICEBERG_TS_WO_TZ =
asuresh8 marked this conversation as resolved.
Show resolved Hide resolved
"spark.sql.iceberg.handle-timestamp-without-timezone";

public static final String EMR_LAKEFORMATION_OPTION =
"spark.emr-serverless.lakeformation.enabled";
public static final String FLINT_ACCELERATE_USING_COVERING_INDEX =
"spark.flint.optimizer.covering.enabled";
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public void deleteConfigItem(String key) {
config.remove(key);
}

public String getConfigItem(String key) {
return config.get(key);
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,13 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_RUNTIME_PACKAGE;
import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.PPL_STANDALONE_PACKAGE;
import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_LAUNCHER_PACKAGE;
Expand Down Expand Up @@ -71,7 +64,6 @@ private void setDefaultConfigs() {
setConfigItem(
HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY,
DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY);
setConfigItem(SPARK_JARS_KEY, ICEBERG_SPARK_RUNTIME_PACKAGE);
setConfigItem(
SPARK_JAR_PACKAGES_KEY,
SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE);
Expand All @@ -85,12 +77,8 @@ private void setDefaultConfigs() {
setConfigItem(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME);
setConfigItem(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH);
setConfigItem(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER);
setConfigItem(
SPARK_SQL_EXTENSIONS_KEY,
ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
setConfigItem(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
setConfigItem(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS);
setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG);
setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG);
}

private void setConfigItem(String key, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,39 +203,6 @@ void testDispatchSelectQuery() {
verifyNoInteractions(flintIndexMetadataService);
}

@Test
void testDispatchSelectQueryWithLakeFormation() {
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
HashMap<String, String> tags = new HashMap<>();
tags.put(DATASOURCE_TAG_KEY, MY_GLUE);
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "select * from my_glue.default.http_logs";
String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(query);
StartJobRequest expected =
new StartJobRequest(
"TEST_CLUSTER:batch",
null,
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
sparkSubmitParameters,
tags,
false,
"query_execution_result_my_glue");
when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithLakeFormation();
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata(
MY_GLUE, asyncQueryRequestContext))
.thenReturn(dataSourceMetadata);

DispatchQueryResponse dispatchQueryResponse =
sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext);
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
verifyNoInteractions(flintIndexMetadataService);
}

@Test
void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
Expand Down Expand Up @@ -1085,7 +1052,6 @@ private String constructExpectedSparkSubmitParameterString(String query, String
+ getConfParam(
"spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider",
"spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory",
"spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar",
"spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT",
"spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots",
"spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/",
Expand All @@ -1097,10 +1063,8 @@ private String constructExpectedSparkSubmitParameterString(String query, String
"spark.datasource.flint.scheme=SCHEMA",
"spark.datasource.flint.auth=basic",
"spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider",
"spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions",
"spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog",
"spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog")
"spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions",
"spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
+ getConfParam("spark.flint.job.query=" + query)
+ (jobType != null ? getConfParam("spark.flint.job.type=" + jobType) : "")
+ getConfParam(
Expand Down Expand Up @@ -1149,25 +1113,6 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBasicAuth() {
.build();
}

private DataSourceMetadata constructMyGlueDataSourceMetadataWithLakeFormation() {

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put(
"glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole");
properties.put(
"glue.indexstore.opensearch.uri",
"https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com");
properties.put("glue.indexstore.opensearch.auth", "awssigv4");
properties.put("glue.indexstore.opensearch.region", "eu-west-1");
properties.put("glue.lakeformation.enabled", "true");
return new DataSourceMetadata.Builder()
.setName(MY_GLUE)
.setConnector(DataSourceType.S3GLUE)
.setProperties(properties)
.build();
}

private DataSourceMetadata constructPrometheusDataSourceType() {
return new DataSourceMetadata.Builder()
.setName("my_prometheus")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.spark.parameter;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -93,6 +94,7 @@ public void testOverrideConfigItem() {
params.setConfigItem(SPARK_JARS_KEY, "Overridden");
String result = params.toString();

assertEquals("Overridden", params.getConfigItem(SPARK_JARS_KEY));
assertTrue(result.contains(String.format("%s=Overridden", SPARK_JARS_KEY)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@

package org.opensearch.sql.spark.parameter;

import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ICEBERG_ENABLED;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN;
import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_LAKEFORMATION_OPTION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_ACCELERATE_USING_COVERING_INDEX;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DATA_SOURCE_KEY;
Expand All @@ -25,39 +26,108 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_PPL_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_ASSUME_ROLE_CLIENT_FACTORY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_LF_CLIENT_FACTORY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_JARS;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_TS_WO_TZ;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_FACTORY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_REGION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_ACCOUNT_ID;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_LF_ENABLED;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_LF_SESSION_TAG_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY;

import com.amazonaws.arn.Arn;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSetting;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;

@RequiredArgsConstructor
public class S3GlueDataSourceSparkParameterComposer implements DataSourceSparkParameterComposer {
public static final String FLINT_BASIC_AUTH = "basic";
public static final String FALSE = "false";
public static final String TRUE = "true";

private final SparkExecutionEngineConfigClusterSettingLoader settingLoader;

@Override
public void compose(
DataSourceMetadata metadata,
SparkSubmitParameters params,
DispatchQueryRequest dispatchQueryRequest,
AsyncQueryRequestContext context) {
String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);
final Optional<SparkExecutionEngineConfigClusterSetting> maybeClusterSettings =
settingLoader.load();
if (!maybeClusterSettings.isPresent()) {
throw new RuntimeException("No cluster settings present");
}
final SparkExecutionEngineConfigClusterSetting clusterSetting = maybeClusterSettings.get();
final String region = clusterSetting.getRegion();

final String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);
final String accountId = Arn.fromString(roleArn).getAccountId();

params.setConfigItem(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
params.setConfigItem(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
params.setConfigItem(HIVE_METASTORE_GLUE_ARN_KEY, roleArn);
params.setConfigItem("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG);
params.setConfigItem(FLINT_DATA_SOURCE_KEY, metadata.getName());

final boolean lakeFormationEnabled =
BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED));
params.setConfigItem(EMR_LAKEFORMATION_OPTION, Boolean.toString(lakeFormationEnabled));
params.setConfigItem(
FLINT_ACCELERATE_USING_COVERING_INDEX, Boolean.toString(!lakeFormationEnabled));
final boolean icebergEnabled =
BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_ICEBERG_ENABLED));
if (icebergEnabled) {
params.setConfigItem(
SPARK_JAR_PACKAGES_KEY,
params.getConfigItem(SPARK_JAR_PACKAGES_KEY) + "," + ICEBERG_SPARK_JARS);
params.setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG);
params.setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG);
params.setConfigItem(
SPARK_SQL_EXTENSIONS_KEY,
ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);

params.setConfigItem(SPARK_CATALOG_CLIENT_REGION, region);
params.setConfigItem(SPARK_CATALOG_GLUE_ACCOUNT_ID, accountId);
params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN, roleArn);
params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION, region);
params.setConfigItem(ICEBERG_TS_WO_TZ, TRUE);

final boolean lakeFormationEnabled =
BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED));
if (lakeFormationEnabled) {
final String sessionTag = metadata.getProperties().get(GLUE_LAKEFORMATION_SESSION_TAG);
if (StringUtils.isBlank(sessionTag)) {
throw new IllegalArgumentException(GLUE_LAKEFORMATION_SESSION_TAG + " is required");
}

params.setConfigItem(FLINT_ACCELERATE_USING_COVERING_INDEX, FALSE);
params.setConfigItem(SPARK_CATALOG_GLUE_LF_ENABLED, TRUE);
params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_LF_CLIENT_FACTORY);
params.setConfigItem(SPARK_CATALOG_LF_SESSION_TAG_KEY, sessionTag);
} else {
params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_ASSUME_ROLE_CLIENT_FACTORY);
}
}

setFlintIndexStoreHost(
params,
Expand Down
Loading
Loading