Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add option to use LakeFormation in S3Glue data source. #2639

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class GlueDataSourceFactory implements DataSourceFactory {
"glue.indexstore.opensearch.auth.password";
public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION =
"glue.indexstore.opensearch.region";
public static final String GLUE_LAKEFORMATION_ENABLED = "glue.lakeformation.enabled";

@Override
public DataSourceType getDataSourceType() {
Expand Down
8 changes: 5 additions & 3 deletions docs/user/ppl/admin/connectors/s3glue_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ s3Glue connector provides a way to query s3 files using glue as metadata store a
This page covers s3Glue datasource configuration and also how to query and s3Glue datasource.

Required resources for s3 Glue Connector
===================================
========================================
* ``EMRServerless Spark Execution Engine Config Setting``: Since we execute s3Glue queries on top of spark execution engine, we require this configuration.
More details: `ExecutionEngine Config <../../../interfaces/asyncqueryinterface.rst#id2>`_
* ``S3``: This is where the data lies.
Expand All @@ -42,6 +42,7 @@ Glue Connector Properties.
* Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password``
* AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn``
* ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth]
* ``glue.lakeformation.enabled`` determines whether to enable lakeformation for queries. Default value is ``"false"`` if not specified

Sample Glue dataSource configuration
========================================
Expand All @@ -56,7 +57,7 @@ Glue datasource configuration::
"glue.auth.role_arn": "role_arn",
"glue.indexstore.opensearch.uri": "http://localhost:9200",
"glue.indexstore.opensearch.auth" :"basicauth",
"glue.indexstore.opensearch.auth.username" :"username"
"glue.indexstore.opensearch.auth.username" :"username",
"glue.indexstore.opensearch.auth.password" :"password"
},
"resultIndex": "query_execution_result"
Expand All @@ -71,6 +72,7 @@ Glue datasource configuration::
"glue.indexstore.opensearch.uri": "http://adsasdf.amazonopensearch.com:9200",
"glue.indexstore.opensearch.auth" :"awssigv4",
"glue.indexstore.opensearch.auth.region" :"awssigv4",
"glue.lakeformation.enabled": "true"
},
"resultIndex": "query_execution_result"
}]
Expand All @@ -86,4 +88,4 @@ Sample Queries

These queries would work only top of async queries. Documentation: `Async Query APIs <../../../interfaces/asyncqueryinterface.rst>`_

Documentation for Index Queries: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md
Documentation for Index Queries: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN;
import static org.opensearch.sql.spark.data.constants.SparkConstants.*;
import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;
Expand All @@ -21,6 +22,7 @@
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
Expand Down Expand Up @@ -117,6 +119,11 @@ public Builder dataSource(DataSourceMetadata metadata) {
config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG);
config.put(FLINT_DATA_SOURCE_KEY, metadata.getName());

final boolean lakeFormationEnabled =
BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED));
config.put(EMR_LAKEFORMATION_OPTION, Boolean.toString(lakeFormationEnabled));
config.put(FLINT_ACCELERATE_USING_COVERING_INDEX, Boolean.toString(!lakeFormationEnabled));

setFlintIndexStoreHost(
parseUri(
metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_URI), metadata.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,9 @@ public class SparkConstants {
public static final String SPARK_CATALOG_CATALOG_IMPL =
"spark.sql.catalog.spark_catalog.catalog-impl";
public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog";

public static final String EMR_LAKEFORMATION_OPTION =
"spark.emr-serverless.lakeformation.enabled";
public static final String FLINT_ACCELERATE_USING_COVERING_INDEX =
"spark.flint.optimizer.covering.enabled";
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,52 @@ void testDispatchSelectQuery() {
verifyNoInteractions(flintIndexMetadataService);
}

@Test
void testDispatchSelectQueryWithLakeFormation() {
HashMap<String, String> tags = new HashMap<>();
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "select * from my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
"sigv4",
new HashMap<>() {
{
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1");
}
},
query,
true);
StartJobRequest expected =
new StartJobRequest(
"TEST_CLUSTER:batch",
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
sparkSubmitParameters,
tags,
false,
"query_execution_result_my_glue");
when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithLakeFormation();
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue"))
.thenReturn(dataSourceMetadata);

DispatchQueryResponse dispatchQueryResponse =
sparkQueryDispatcher.dispatch(
new DispatchQueryRequest(
EMRS_APPLICATION_ID,
query,
"my_glue",
LangType.SQL,
EMRS_EXECUTION_ROLE,
TEST_CLUSTER_NAME));
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
verifyNoInteractions(flintIndexMetadataService);
}

@Test
void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
HashMap<String, String> tags = new HashMap<>();
Expand Down Expand Up @@ -936,13 +982,17 @@ void testDispatchQueryWithExtraSparkSubmitParameters() {

private String constructExpectedSparkSubmitParameterString(
String auth, Map<String, String> authParams, String query) {
return constructExpectedSparkSubmitParameterString(auth, authParams, query, false);
}

private String constructExpectedSparkSubmitParameterString(
String auth, Map<String, String> authParams, String query, boolean lakeFormationEnabled) {
StringBuilder authParamConfigBuilder = new StringBuilder();
for (String key : authParams.keySet()) {
authParamConfigBuilder.append(" --conf ");
authParamConfigBuilder.append(" --conf ");
authParamConfigBuilder.append(key);
authParamConfigBuilder.append("=");
authParamConfigBuilder.append(authParams.get(key));
authParamConfigBuilder.append(" ");
}
query = "\"" + query + "\"";
return " --class org.apache.spark.sql.FlintJob --conf"
Expand Down Expand Up @@ -978,9 +1028,13 @@ private String constructExpectedSparkSubmitParameterString(
+ " --conf"
+ " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"
+ " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegatingSessionCatalog "
+ " --conf spark.flint.datasource.name=my_glue "
+ " --conf spark.flint.datasource.name=my_glue --conf"
+ " spark.emr-serverless.lakeformation.enabled="
+ Boolean.toString(lakeFormationEnabled)
+ " --conf spark.flint.optimizer.covering.enabled="
+ Boolean.toString(!lakeFormationEnabled)
+ authParamConfigBuilder
+ " --conf spark.flint.job.query="
+ " --conf spark.flint.job.query="
+ query
+ " ";
}
Expand Down Expand Up @@ -1056,6 +1110,25 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBadURISyntax() {
.build();
}

private DataSourceMetadata constructMyGlueDataSourceMetadataWithLakeFormation() {

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put(
"glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole");
properties.put(
"glue.indexstore.opensearch.uri",
"https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com");
properties.put("glue.indexstore.opensearch.auth", "awssigv4");
properties.put("glue.indexstore.opensearch.region", "eu-west-1");
properties.put("glue.lakeformation.enabled", "true");
return new DataSourceMetadata.Builder()
.setName("my_glue")
.setConnector(DataSourceType.S3GLUE)
.setProperties(properties)
.build();
}

private DataSourceMetadata constructPrometheusDataSourceType() {
return new DataSourceMetadata.Builder()
.setName("my_prometheus")
Expand Down
Loading