Skip to content

Commit

Permalink
Add Security Lake data source type.
Browse files Browse the repository at this point in the history
This changes adds Security Lake as a data source type. Security Lake as
a data source is simply specific options set on top of the base S3Glue
data source.

Signed-off-by: Adi Suresh <[email protected]>
  • Loading branch information
asuresh8 committed Aug 7, 2024
1 parent 74ed049 commit af13394
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.common.inject.Singleton;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.legacy.metrics.GaugeMetric;
import org.opensearch.sql.legacy.metrics.Metrics;
Expand Down Expand Up @@ -161,6 +162,8 @@ public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider
SparkParameterComposerCollection collection = new SparkParameterComposerCollection();
collection.register(
DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader));
collection.register(
DataSourceType.SECURITY_LAKE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader));
collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader));
return new SparkSubmitParametersBuilderProvider(collection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService(
DataSourceType.S3GLUE,
new S3GlueDataSourceSparkParameterComposer(
getSparkExecutionEngineConfigClusterSettingLoader()));
sparkParameterComposerCollection.register(
DataSourceType.SECURITY_LAKE,
new S3GlueDataSourceSparkParameterComposer(
getSparkExecutionEngineConfigClusterSettingLoader()));
SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider =
new SparkSubmitParametersBuilderProvider(sparkParameterComposerCollection);
QueryHandlerFactory queryHandlerFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ public class DataSourceType {
public static DataSourceType OPENSEARCH = new DataSourceType("OPENSEARCH");
public static DataSourceType SPARK = new DataSourceType("SPARK");
public static DataSourceType S3GLUE = new DataSourceType("S3GLUE");
public static DataSourceType SECURITY_LAKE = new DataSourceType("SECURITY_LAKE");

// Map from uppercase DataSourceType name to DataSourceType object
private static Map<String, DataSourceType> knownValues = new HashMap<>();

static {
register(PROMETHEUS, OPENSEARCH, SPARK, S3GLUE);
register(PROMETHEUS, OPENSEARCH, SPARK, S3GLUE, SECURITY_LAKE);
}

private final String name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.opensearch.sql.datasources.glue;

import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.datasources.utils.DatasourceValidationUtils;
import org.opensearch.sql.storage.DataSourceFactory;

public class SecurityLakeDataSourceFactory extends GlueDataSourceFactory {

private final Settings pluginSettings;

public static final String TRUE = "true";

public SecurityLakeDataSourceFactory(final Settings pluginSettings) {
super(pluginSettings);
this.pluginSettings = pluginSettings;
}

@Override
public DataSourceType getDataSourceType() {
return DataSourceType.SECURITY_LAKE;
}

@Override
public DataSource createDataSource(DataSourceMetadata metadata) {
validateProperties(metadata.getProperties());
metadata.getProperties().put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE);
metadata.getProperties().put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE);
return super.createDataSource(metadata);
}

private void validateProperties(Map<String, String> properties) {
// validate Lake Formation config
if (properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED) != null &&
!BooleanUtils.toBoolean(properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED))) {
throw new IllegalArgumentException(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED +
" cannot be false when using Security Lake data source.");
}

if (properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED) != null &&
!BooleanUtils.toBoolean(properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED))) {
throw new IllegalArgumentException(GLUE_LAKEFORMATION_ENABLED +
" cannot be false when using Security Lake data source.");
}

if (StringUtils.isBlank(properties.get(GLUE_LAKEFORMATION_SESSION_TAG))) {
throw new IllegalArgumentException(GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG +
" must be specified when using Security Lake data source");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.opensearch.sql.datasources.glue;

import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;

@ExtendWith(MockitoExtension.class)
public class SecurityLakeSourceFactoryTest {

@Mock private Settings settings;

@Test
void testGetConnectorType() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory = new SecurityLakeDataSourceFactory(settings);
Assertions.assertEquals(DataSourceType.SECURITY_LAKE, securityLakeDataSourceFactory.getDataSourceType());
}

@Test
@SneakyThrows
void testCreateSecurityLakeDataSource() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(Collections.emptyList());
SecurityLakeDataSourceFactory securityLakeDataSourceFactory = new SecurityLakeDataSourceFactory(settings);

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");
properties.put("glue.lakeformation.session_tag", "session_tag");
DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();
DataSource dataSource = securityLakeDataSourceFactory.createDataSource(metadata);
Assertions.assertEquals(DataSourceType.SECURITY_LAKE, dataSource.getConnectorType());

Assertions.assertEquals(properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED), SecurityLakeDataSourceFactory.TRUE);
Assertions.assertEquals(properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED), SecurityLakeDataSourceFactory.TRUE);
}

@Test
@SneakyThrows
void testCreateSecurityLakeDataSourceIcebergCannotBeDisabled() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory = new SecurityLakeDataSourceFactory(settings);

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");
properties.put("glue.iceberg.enabled", "false");
DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();

Assertions.assertThrows(IllegalArgumentException.class, () -> securityLakeDataSourceFactory.createDataSource(metadata));
}

@Test
@SneakyThrows
void testCreateSecurityLakeDataSourceLakeFormationCannotBeDisabled() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory = new SecurityLakeDataSourceFactory(settings);

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");
properties.put("glue.lakeformation.enabled", "false");
DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();

Assertions.assertThrows(IllegalArgumentException.class, () -> securityLakeDataSourceFactory.createDataSource(metadata));
}

@Test
@SneakyThrows
void testCreateGlueDataSourceWithLakeFormationNoSessionTags() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory = new SecurityLakeDataSourceFactory(settings);

HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();

Assertions.assertThrows(IllegalArgumentException.class, () -> securityLakeDataSourceFactory.createDataSource(metadata));
}
}
78 changes: 78 additions & 0 deletions docs/user/ppl/admin/connectors/security_lake_connector.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
.. highlight:: sh

====================
Security Lake Connector
====================

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1


Introduction
============

Security Lake connector provides a way to query Security Lake tables.

Required resources for Security Lake Connector
========================================
* ``EMRServerless Spark Execution Engine Config Setting``: Since we execute s3Glue queries on top of spark execution engine, we require this configuration.
More details: `ExecutionEngine Config <../../../interfaces/asyncqueryinterface.rst#id2>`_
* ``S3``: This is where the data lies.
* ``Glue``: Metadata store: Glue takes care of table metadata.
* ``Lake Formation``: AWS service that performs authorization on Security Lake tables
* ``Security Lake``: AWS service that orchestrates creation of S3 files, Glue tables, and Lake Formation permissions.
* ``Opensearch IndexStore``: Index for s3 data lies in opensearch and also acts as temporary buffer for query results.

We currently only support emr-serverless as spark execution engine and Glue as metadata store. we will add more support in future.

Glue Connector Properties.

* ``resultIndex`` is a new parameter specific to glue connector. Stores the results of queries executed on the data source. If unavailable, it defaults to .query_execution_result.
* ``glue.auth.type`` [Required]
* This parameters provides the authentication type information required for execution engine to connect to glue.
* S3 Glue connector currently only supports ``iam_role`` authentication and the below parameters is required.
* ``glue.auth.role_arn``
* ``glue.indexstore.opensearch.*`` [Required]
* This parameters provides the Opensearch domain host information for glue connector. This opensearch instance is used for writing index data back and also
* ``glue.indexstore.opensearch.uri`` [Required]
* ``glue.indexstore.opensearch.auth`` [Required]
* Accepted values include ["noauth", "basicauth", "awssigv4"]
* Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password``
* AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn``
* ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth]
* ``glue.lakeformation.session_tag`` [Required]
* What session tag to use when assuming the data source role.

Sample Glue dataSource configuration
========================================

Glue datasource configuration::

[{
"name" : "my_sl",
"connector": "security_lake",
"properties" : {
"glue.auth.type": "iam_role",
"glue.auth.role_arn": "role_arn",
"glue.indexstore.opensearch.uri": "http://adsasdf.amazonopensearch.com:9200",
"glue.indexstore.opensearch.auth" :"awssigv4",
"glue.indexstore.opensearch.auth.region" :"awssigv4",
"glue.lakeformation.session_tag": "sesson_tag"
},
"resultIndex": "query_execution_result"
}]

Sample Security Lake datasource queries APIS
=====================================

Sample Queries

* Select Query : ``select * from mysl.amazon_security_lake_glue_db_eu_west_1.amazon_security_lake_table_eu_west_1_vpc_flow_2_0 limit 1"``
* Create Covering Index Query: ``create index srcip_time on mysl.amazon_security_lake_glue_db_eu_west_1.amazon_security_lake_table_eu_west_1_vpc_flow_2_0 (src_endpoint.ip, time) WITH (auto_refresh=true)``

These queries would work only top of async queries. Documentation: `Async Query APIs <../../../interfaces/asyncqueryinterface.rst>`_

Documentation for Index Queries: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md
2 changes: 2 additions & 0 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.datasources.encryptor.EncryptorImpl;
import org.opensearch.sql.datasources.glue.GlueDataSourceFactory;
import org.opensearch.sql.datasources.glue.SecurityLakeDataSourceFactory;
import org.opensearch.sql.datasources.model.transport.*;
import org.opensearch.sql.datasources.rest.RestDataSourceQueryAction;
import org.opensearch.sql.datasources.service.DataSourceMetadataStorage;
Expand Down Expand Up @@ -326,6 +327,7 @@ private DataSourceServiceImpl createDataSourceService() {
.add(new PrometheusStorageFactory(pluginSettings))
.add(new SparkStorageFactory(this.client, pluginSettings))
.add(new GlueDataSourceFactory(pluginSettings))
.add(new SecurityLakeDataSourceFactory(pluginSettings))
.build(),
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
Expand Down

0 comments on commit af13394

Please sign in to comment.