Skip to content

Commit

Permalink
Add Security Lake data source type.
Browse files Browse the repository at this point in the history
This changes adds Security Lake as a data source type. Security Lake as
a data source is simply specific options set on top of the base S3Glue
data source.

Signed-off-by: Adi Suresh <[email protected]>
  • Loading branch information
asuresh8 committed Aug 13, 2024
1 parent 532a14d commit c1027d7
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ public class SparkConstants {
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
public static final String ICEBERG_SPARK_JARS =
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0,software.amazon.awssdk:bundle:2.26.30";
public static final String ICEBERG_SPARK_RUNTIME_PACKAGE =
"/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar";
public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog";
public static final String ICEBERG_ASSUME_ROLE_CLIENT_FACTORY =
"org.apache.iceberg.aws.AssumeRoleAwsClientFactory";
public static final String ICEBERG_LF_CLIENT_FACTORY =
"org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory";
// The following option is needed in Iceberg 1.5 when reading timestamp types that do not
// contain timezone in parquet files. The timezone is assumed to be GMT.
public static final String ICEBERG_TS_WO_TZ =
"spark.sql.iceberg.handle-timestamp-without-timezone";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.spark.parameter;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -93,6 +94,7 @@ public void testOverrideConfigItem() {
params.setConfigItem(SPARK_JARS_KEY, "Overridden");
String result = params.toString();

assertEquals("Overridden", params.getConfigItem(SPARK_JARS_KEY));
assertTrue(result.contains(String.format("%s=Overridden", SPARK_JARS_KEY)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider
SparkParameterComposerCollection collection = new SparkParameterComposerCollection();
collection.register(
DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader));
collection.register(
DataSourceType.SECURITY_LAKE,
new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader));
collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader));
return new SparkSubmitParametersBuilderProvider(collection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService(
DataSourceType.S3GLUE,
new S3GlueDataSourceSparkParameterComposer(
getSparkExecutionEngineConfigClusterSettingLoader()));
sparkParameterComposerCollection.register(
DataSourceType.SECURITY_LAKE,
new S3GlueDataSourceSparkParameterComposer(
getSparkExecutionEngineConfigClusterSettingLoader()));
SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider =
new SparkSubmitParametersBuilderProvider(sparkParameterComposerCollection);
QueryHandlerFactory queryHandlerFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ public class DataSourceType {
public static DataSourceType OPENSEARCH = new DataSourceType("OPENSEARCH");
public static DataSourceType SPARK = new DataSourceType("SPARK");
public static DataSourceType S3GLUE = new DataSourceType("S3GLUE");
public static DataSourceType SECURITY_LAKE = new DataSourceType("SECURITY_LAKE");

// Map from uppercase DataSourceType name to DataSourceType object
private static Map<String, DataSourceType> knownValues = new HashMap<>();

static {
register(PROMETHEUS, OPENSEARCH, SPARK, S3GLUE);
register(PROMETHEUS, OPENSEARCH, SPARK, S3GLUE, SECURITY_LAKE);
}

private final String name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.opensearch.sql.datasources.glue;

import java.util.Map;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;

public class SecurityLakeDataSourceFactory extends GlueDataSourceFactory {

private final Settings pluginSettings;

public static final String TRUE = "true";

public SecurityLakeDataSourceFactory(final Settings pluginSettings) {
super(pluginSettings);
this.pluginSettings = pluginSettings;
}

@Override
public DataSourceType getDataSourceType() {
return DataSourceType.SECURITY_LAKE;
}

@Override
public DataSource createDataSource(DataSourceMetadata metadata) {
validateProperties(metadata.getProperties());
metadata.getProperties().put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE);
metadata.getProperties().put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE);
return super.createDataSource(metadata);
}

private void validateProperties(Map<String, String> properties) {
// validate Lake Formation config
if (properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED) != null
&& !BooleanUtils.toBoolean(properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED))) {
throw new IllegalArgumentException(
GlueDataSourceFactory.GLUE_ICEBERG_ENABLED
+ " cannot be false when using Security Lake data source.");
}

if (properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED) != null
&& !BooleanUtils.toBoolean(
properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED))) {
throw new IllegalArgumentException(
GLUE_LAKEFORMATION_ENABLED + " cannot be false when using Security Lake data source.");
}

if (StringUtils.isBlank(properties.get(GLUE_LAKEFORMATION_SESSION_TAG))) {
throw new IllegalArgumentException(
GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG
+ " must be specified when using Security Lake data source");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package org.opensearch.sql.datasources.glue;

import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;

@ExtendWith(MockitoExtension.class)
public class SecurityLakeSourceFactoryTest {

@Mock private Settings settings;

@Test
void testGetConnectorType() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory =
new SecurityLakeDataSourceFactory(settings);
Assertions.assertEquals(
DataSourceType.SECURITY_LAKE, securityLakeDataSourceFactory.getDataSourceType());
}

@Test
@SneakyThrows
void testCreateSecurityLakeDataSource() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(Collections.emptyList());
SecurityLakeDataSourceFactory securityLakeDataSourceFactory =
new SecurityLakeDataSourceFactory(settings);

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");
properties.put("glue.lakeformation.session_tag", "session_tag");
DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();
DataSource dataSource = securityLakeDataSourceFactory.createDataSource(metadata);
Assertions.assertEquals(DataSourceType.SECURITY_LAKE, dataSource.getConnectorType());

Assertions.assertEquals(
properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED),
SecurityLakeDataSourceFactory.TRUE);
Assertions.assertEquals(
properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED),
SecurityLakeDataSourceFactory.TRUE);
}

@Test
@SneakyThrows
void testCreateSecurityLakeDataSourceIcebergCannotBeDisabled() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory =
new SecurityLakeDataSourceFactory(settings);

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");
properties.put("glue.iceberg.enabled", "false");
DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();

Assertions.assertThrows(
IllegalArgumentException.class,
() -> securityLakeDataSourceFactory.createDataSource(metadata));
}

@Test
@SneakyThrows
void testCreateSecurityLakeDataSourceLakeFormationCannotBeDisabled() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory =
new SecurityLakeDataSourceFactory(settings);

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");
properties.put("glue.iceberg.enabled", "true");
properties.put("glue.lakeformation.enabled", "false");
DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();

Assertions.assertThrows(
IllegalArgumentException.class,
() -> securityLakeDataSourceFactory.createDataSource(metadata));
}

@Test
@SneakyThrows
void testCreateGlueDataSourceWithLakeFormationNoSessionTags() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory =
new SecurityLakeDataSourceFactory(settings);

HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");
properties.put("glue.iceberg.enabled", "true");
properties.put("glue.lakeformation.enabled", "true");

DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();

Assertions.assertThrows(
IllegalArgumentException.class,
() -> securityLakeDataSourceFactory.createDataSource(metadata));
}
}
2 changes: 1 addition & 1 deletion docs/user/ppl/admin/connectors/s3glue_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Glue datasource configuration::
"glue.auth.role_arn": "role_arn",
"glue.indexstore.opensearch.uri": "http://adsasdf.amazonopensearch.com:9200",
"glue.indexstore.opensearch.auth" :"awssigv4",
"glue.indexstore.opensearch.auth.region" :"awssigv4"
"glue.indexstore.opensearch.auth.region" :"us-east-1"
},
"resultIndex": "query_execution_result"
}]
Expand Down
78 changes: 78 additions & 0 deletions docs/user/ppl/admin/connectors/security_lake_connector.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
.. highlight:: sh

====================
Security Lake Connector
====================

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1


Introduction
============

Security Lake connector provides a way to query Security Lake tables.

Required resources for Security Lake Connector
========================================
* ``EMRServerless Spark Execution Engine Config Setting``: Since we execute s3Glue queries on top of spark execution engine, we require this configuration.
More details: `ExecutionEngine Config <../../../interfaces/asyncqueryinterface.rst#id2>`_
* ``S3``: This is where the data lies.
* ``Glue``: Metadata store: Glue takes care of table metadata.
* ``Lake Formation``: AWS service that performs authorization on Security Lake tables
* ``Security Lake``: AWS service that orchestrates creation of S3 files, Glue tables, and Lake Formation permissions.
* ``Opensearch IndexStore``: Index for s3 data lies in opensearch and also acts as temporary buffer for query results.

We currently only support emr-serverless as spark execution engine and Glue as metadata store. we will add more support in future.

Glue Connector Properties.

* ``resultIndex`` is a new parameter specific to glue connector. Stores the results of queries executed on the data source. If unavailable, it defaults to .query_execution_result.
* ``glue.auth.type`` [Required]
* This parameters provides the authentication type information required for execution engine to connect to glue.
* S3 Glue connector currently only supports ``iam_role`` authentication and the below parameters is required.
* ``glue.auth.role_arn``
* ``glue.indexstore.opensearch.*`` [Required]
* This parameters provides the Opensearch domain host information for glue connector. This opensearch instance is used for writing index data back and also
* ``glue.indexstore.opensearch.uri`` [Required]
* ``glue.indexstore.opensearch.auth`` [Required]
* Accepted values include ["noauth", "basicauth", "awssigv4"]
* Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password``
* AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn``
* ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth]
* ``glue.lakeformation.session_tag`` [Required]
* What session tag to use when assuming the data source role.

Sample Glue dataSource configuration
========================================

Glue datasource configuration::

[{
"name" : "my_sl",
"connector": "security_lake",
"properties" : {
"glue.auth.type": "iam_role",
"glue.auth.role_arn": "role_arn",
"glue.indexstore.opensearch.uri": "http://adsasdf.amazonopensearch.com:9200",
"glue.indexstore.opensearch.auth" :"awssigv4",
"glue.indexstore.opensearch.auth.region" :"us-east-1",
"glue.lakeformation.session_tag": "sesson_tag"
},
"resultIndex": "query_execution_result"
}]

Sample Security Lake datasource queries APIS
=====================================

Sample Queries

* Select Query : ``select * from mysl.amazon_security_lake_glue_db_eu_west_1.amazon_security_lake_table_eu_west_1_vpc_flow_2_0 limit 1``
* Create Covering Index Query: ``create index srcip_time on mysl.amazon_security_lake_glue_db_eu_west_1.amazon_security_lake_table_eu_west_1_vpc_flow_2_0 (src_endpoint.ip, time) WITH (auto_refresh=true)``

These queries would work only top of async queries. Documentation: `Async Query APIs <../../../interfaces/asyncqueryinterface.rst>`_

Documentation for Index Queries: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md
2 changes: 2 additions & 0 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.datasources.encryptor.EncryptorImpl;
import org.opensearch.sql.datasources.glue.GlueDataSourceFactory;
import org.opensearch.sql.datasources.glue.SecurityLakeDataSourceFactory;
import org.opensearch.sql.datasources.model.transport.*;
import org.opensearch.sql.datasources.rest.RestDataSourceQueryAction;
import org.opensearch.sql.datasources.service.DataSourceMetadataStorage;
Expand Down Expand Up @@ -326,6 +327,7 @@ private DataSourceServiceImpl createDataSourceService() {
.add(new PrometheusStorageFactory(pluginSettings))
.add(new SparkStorageFactory(this.client, pluginSettings))
.add(new GlueDataSourceFactory(pluginSettings))
.add(new SecurityLakeDataSourceFactory(pluginSettings))
.build(),
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
Expand Down

0 comments on commit c1027d7

Please sign in to comment.