Skip to content

Commit

Permalink
Add flags for Iceberg and Lake Formation and Security Lake as a data …
Browse files Browse the repository at this point in the history
…source type. (#2858)

Previously, Iceberg catalog was set as the default catalog. This poses
problems as the behavior to fall back to default Spark catalog is only
correct in some versions of Iceberg. Rather than always opt into
Iceberg, Iceberg should be an option.

Additionally, the Lake Formation flag enabled Lake Formation for the EMR
job. This did not work as expected because EMR system space does not
work with Flint. Instead Lake Formation can be enabled using the Iceberg
catalog implementation.

This changes adds Security Lake as a data source type. Security Lake as
a data source is simply specific options set on top of the base S3Glue
data source.
---------

Signed-off-by: Adi Suresh <[email protected]>
(cherry picked from commit 05c961e)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Aug 13, 2024
1 parent 3a2de37 commit 47f5361
Show file tree
Hide file tree
Showing 17 changed files with 760 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,35 @@ public class SparkConstants {
public static final String FLINT_SESSION_CLASS_NAME = "org.apache.spark.sql.FlintREPL";

public static final String SPARK_CATALOG = "spark.sql.catalog.spark_catalog";
public static final String SPARK_CATALOG_CATALOG_IMPL = SPARK_CATALOG + ".catalog-impl";
public static final String SPARK_CATALOG_CLIENT_REGION = SPARK_CATALOG + ".client.region";
public static final String SPARK_CATALOG_CLIENT_FACTORY = SPARK_CATALOG + ".client.factory";
public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN =
SPARK_CATALOG + ".client.assume-role.arn";
public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION =
SPARK_CATALOG + ".client.assume-role.region";
public static final String SPARK_CATALOG_LF_SESSION_TAG_KEY =
SPARK_CATALOG + ".client.assume-role.tags.LakeFormationAuthorizedCaller";
public static final String SPARK_CATALOG_GLUE_ACCOUNT_ID = SPARK_CATALOG + ".glue.account-id";
public static final String SPARK_CATALOG_GLUE_LF_ENABLED =
SPARK_CATALOG + ".glue.lakeformation-enabled";

public static final String ICEBERG_SESSION_CATALOG =
"org.apache.iceberg.spark.SparkSessionCatalog";
public static final String ICEBERG_SPARK_EXTENSION =
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
public static final String ICEBERG_SPARK_RUNTIME_PACKAGE =
"/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar";
public static final String SPARK_CATALOG_CATALOG_IMPL =
"spark.sql.catalog.spark_catalog.catalog-impl";
public static final String ICEBERG_SPARK_JARS =
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0,software.amazon.awssdk:bundle:2.26.30";
public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog";
public static final String ICEBERG_ASSUME_ROLE_CLIENT_FACTORY =
"org.apache.iceberg.aws.AssumeRoleAwsClientFactory";
public static final String ICEBERG_LF_CLIENT_FACTORY =
"org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory";
// The following option is needed in Iceberg 1.5 when reading timestamp types that do not
// contain timezone in parquet files. The timezone is assumed to be GMT.
public static final String ICEBERG_TS_WO_TZ =
"spark.sql.iceberg.handle-timestamp-without-timezone";

public static final String EMR_LAKEFORMATION_OPTION =
"spark.emr-serverless.lakeformation.enabled";
public static final String FLINT_ACCELERATE_USING_COVERING_INDEX =
"spark.flint.optimizer.covering.enabled";
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public void deleteConfigItem(String key) {
config.remove(key);
}

public String getConfigItem(String key) {
return config.get(key);
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,13 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_RUNTIME_PACKAGE;
import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.PPL_STANDALONE_PACKAGE;
import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_LAUNCHER_PACKAGE;
Expand Down Expand Up @@ -71,7 +64,6 @@ private void setDefaultConfigs() {
setConfigItem(
HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY,
DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY);
setConfigItem(SPARK_JARS_KEY, ICEBERG_SPARK_RUNTIME_PACKAGE);
setConfigItem(
SPARK_JAR_PACKAGES_KEY,
SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE);
Expand All @@ -85,12 +77,8 @@ private void setDefaultConfigs() {
setConfigItem(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME);
setConfigItem(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH);
setConfigItem(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER);
setConfigItem(
SPARK_SQL_EXTENSIONS_KEY,
ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
setConfigItem(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
setConfigItem(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS);
setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG);
setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG);
}

private void setConfigItem(String key, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,39 +203,6 @@ void testDispatchSelectQuery() {
verifyNoInteractions(flintIndexMetadataService);
}

@Test
void testDispatchSelectQueryWithLakeFormation() {
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
HashMap<String, String> tags = new HashMap<>();
tags.put(DATASOURCE_TAG_KEY, MY_GLUE);
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "select * from my_glue.default.http_logs";
String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(query);
StartJobRequest expected =
new StartJobRequest(
"TEST_CLUSTER:batch",
null,
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
sparkSubmitParameters,
tags,
false,
"query_execution_result_my_glue");
when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithLakeFormation();
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata(
MY_GLUE, asyncQueryRequestContext))
.thenReturn(dataSourceMetadata);

DispatchQueryResponse dispatchQueryResponse =
sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext);
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
verifyNoInteractions(flintIndexMetadataService);
}

@Test
void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
Expand Down Expand Up @@ -1085,7 +1052,6 @@ private String constructExpectedSparkSubmitParameterString(String query, String
+ getConfParam(
"spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider",
"spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory",
"spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar",
"spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT",
"spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots",
"spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/",
Expand All @@ -1097,10 +1063,8 @@ private String constructExpectedSparkSubmitParameterString(String query, String
"spark.datasource.flint.scheme=SCHEMA",
"spark.datasource.flint.auth=basic",
"spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider",
"spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions",
"spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog",
"spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog")
"spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions",
"spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
+ getConfParam("spark.flint.job.query=" + query)
+ (jobType != null ? getConfParam("spark.flint.job.type=" + jobType) : "")
+ getConfParam(
Expand Down Expand Up @@ -1149,25 +1113,6 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBasicAuth() {
.build();
}

private DataSourceMetadata constructMyGlueDataSourceMetadataWithLakeFormation() {

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put(
"glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole");
properties.put(
"glue.indexstore.opensearch.uri",
"https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com");
properties.put("glue.indexstore.opensearch.auth", "awssigv4");
properties.put("glue.indexstore.opensearch.region", "eu-west-1");
properties.put("glue.lakeformation.enabled", "true");
return new DataSourceMetadata.Builder()
.setName(MY_GLUE)
.setConnector(DataSourceType.S3GLUE)
.setProperties(properties)
.build();
}

private DataSourceMetadata constructPrometheusDataSourceType() {
return new DataSourceMetadata.Builder()
.setName("my_prometheus")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.spark.parameter;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -93,6 +94,7 @@ public void testOverrideConfigItem() {
params.setConfigItem(SPARK_JARS_KEY, "Overridden");
String result = params.toString();

assertEquals("Overridden", params.getConfigItem(SPARK_JARS_KEY));
assertTrue(result.contains(String.format("%s=Overridden", SPARK_JARS_KEY)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@

package org.opensearch.sql.spark.parameter;

import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ICEBERG_ENABLED;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN;
import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_LAKEFORMATION_OPTION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_ACCELERATE_USING_COVERING_INDEX;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DATA_SOURCE_KEY;
Expand All @@ -25,39 +26,108 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_PPL_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_ASSUME_ROLE_CLIENT_FACTORY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_LF_CLIENT_FACTORY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_JARS;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_TS_WO_TZ;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_FACTORY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_REGION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_ACCOUNT_ID;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_LF_ENABLED;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_LF_SESSION_TAG_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY;

import com.amazonaws.arn.Arn;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSetting;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;

@RequiredArgsConstructor
public class S3GlueDataSourceSparkParameterComposer implements DataSourceSparkParameterComposer {
public static final String FLINT_BASIC_AUTH = "basic";
public static final String FALSE = "false";
public static final String TRUE = "true";

private final SparkExecutionEngineConfigClusterSettingLoader settingLoader;

@Override
public void compose(
DataSourceMetadata metadata,
SparkSubmitParameters params,
DispatchQueryRequest dispatchQueryRequest,
AsyncQueryRequestContext context) {
String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);
final Optional<SparkExecutionEngineConfigClusterSetting> maybeClusterSettings =
settingLoader.load();
if (!maybeClusterSettings.isPresent()) {
throw new RuntimeException("No cluster settings present");
}
final SparkExecutionEngineConfigClusterSetting clusterSetting = maybeClusterSettings.get();
final String region = clusterSetting.getRegion();

final String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);
final String accountId = Arn.fromString(roleArn).getAccountId();

params.setConfigItem(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
params.setConfigItem(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
params.setConfigItem(HIVE_METASTORE_GLUE_ARN_KEY, roleArn);
params.setConfigItem("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG);
params.setConfigItem(FLINT_DATA_SOURCE_KEY, metadata.getName());

final boolean lakeFormationEnabled =
BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED));
params.setConfigItem(EMR_LAKEFORMATION_OPTION, Boolean.toString(lakeFormationEnabled));
params.setConfigItem(
FLINT_ACCELERATE_USING_COVERING_INDEX, Boolean.toString(!lakeFormationEnabled));
final boolean icebergEnabled =
BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_ICEBERG_ENABLED));
if (icebergEnabled) {
params.setConfigItem(
SPARK_JAR_PACKAGES_KEY,
params.getConfigItem(SPARK_JAR_PACKAGES_KEY) + "," + ICEBERG_SPARK_JARS);
params.setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG);
params.setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG);
params.setConfigItem(
SPARK_SQL_EXTENSIONS_KEY,
ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);

params.setConfigItem(SPARK_CATALOG_CLIENT_REGION, region);
params.setConfigItem(SPARK_CATALOG_GLUE_ACCOUNT_ID, accountId);
params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN, roleArn);
params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION, region);
params.setConfigItem(ICEBERG_TS_WO_TZ, TRUE);

final boolean lakeFormationEnabled =
BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED));
if (lakeFormationEnabled) {
final String sessionTag = metadata.getProperties().get(GLUE_LAKEFORMATION_SESSION_TAG);
if (StringUtils.isBlank(sessionTag)) {
throw new IllegalArgumentException(GLUE_LAKEFORMATION_SESSION_TAG + " is required");
}

params.setConfigItem(FLINT_ACCELERATE_USING_COVERING_INDEX, FALSE);
params.setConfigItem(SPARK_CATALOG_GLUE_LF_ENABLED, TRUE);
params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_LF_CLIENT_FACTORY);
params.setConfigItem(SPARK_CATALOG_LF_SESSION_TAG_KEY, sessionTag);
} else {
params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_ASSUME_ROLE_CLIENT_FACTORY);
}
}

setFlintIndexStoreHost(
params,
Expand Down
Loading

0 comments on commit 47f5361

Please sign in to comment.