Skip to content

Commit

Permalink
Add EMR client to spark connector (#1790)
Browse files Browse the repository at this point in the history
* Create Spark Connector

Signed-off-by: Vamsi Manohar <[email protected]>

* Add spark client and engine

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove vars

Signed-off-by: Rupal Mahajan <[email protected]>

* Spark connector draft

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix checkstyle errors

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix license header

Signed-off-by: Rupal Mahajan <[email protected]>

* Add spark storage test

Signed-off-by: Rupal Mahajan <[email protected]>

* Update comments

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix checkstyle in comments

Signed-off-by: Rupal Mahajan <[email protected]>

* Update tests

Signed-off-by: Rupal Mahajan <[email protected]>

* Add emr client

Signed-off-by: Rupal Mahajan <[email protected]>

* Set default values for flint args

Signed-off-by: Rupal Mahajan <[email protected]>

* Validate emr auth type

Signed-off-by: Rupal Mahajan <[email protected]>

* Add default constants for flint

Signed-off-by: Rupal Mahajan <[email protected]>

* Update unit tests

Signed-off-by: Rupal Mahajan <[email protected]>

* Address PR comments

Signed-off-by: Rupal Mahajan <[email protected]>

* tests draft

Signed-off-by: Rupal Mahajan <[email protected]>

* Refactor class name

Signed-off-by: Rupal Mahajan <[email protected]>

* Rename classes and update tests

Signed-off-by: Rupal Mahajan <[email protected]>

* Update scan operator test

Signed-off-by: Rupal Mahajan <[email protected]>

* Address PR comment

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix Connection pool shut down issue

Signed-off-by: Rupal Mahajan <[email protected]>

* Update emr unit tests

Signed-off-by: Rupal Mahajan <[email protected]>

* Update doc and tests

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Update EMR clinet impl tests

Signed-off-by: Rupal Mahajan <[email protected]>

* Address PR comments

Signed-off-by: Rupal Mahajan <[email protected]>

* Make spark & flint jars configurable

Signed-off-by: Rupal Mahajan <[email protected]>

* Address comments

Signed-off-by: Rupal Mahajan <[email protected]>

* Add spark application id in logs

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Delete result when not required

Signed-off-by: Rupal Mahajan <[email protected]>

---------

Signed-off-by: Vamsi Manohar <[email protected]>
Signed-off-by: Rupal Mahajan <[email protected]>
Co-authored-by: Vamsi Manohar <[email protected]>
  • Loading branch information
rupal-bq and vamsimanohar authored Jul 11, 2023
1 parent ab375e8 commit 392a720
Show file tree
Hide file tree
Showing 30 changed files with 1,559 additions and 44 deletions.
2 changes: 2 additions & 0 deletions DEVELOPER_GUIDE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ The plugin codebase is in standard layout of Gradle project::
├── plugin
├── protocol
├── ppl
├── spark
├── sql
├── sql-cli
├── sql-jdbc
Expand All @@ -161,6 +162,7 @@ Here are sub-folders (Gradle modules) for plugin source code:
- ``core``: core query engine.
- ``opensearch``: OpenSearch storage engine.
- ``prometheus``: Prometheus storage engine.
- ``spark`` : Spark storage engine
- ``protocol``: request/response protocol formatter.
- ``common``: common util code.
- ``integ-test``: integration and comparison test.
Expand Down
92 changes: 92 additions & 0 deletions docs/user/ppl/admin/spark_connector.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
.. highlight:: sh

====================
Spark Connector
====================

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1


Introduction
============

This page covers spark connector properties for dataSource configuration
and the nuances associated with spark connector.


Spark Connector Properties in DataSource Configuration
========================================================
Spark Connector Properties.

* ``spark.connector`` [Required].
* This parameters provides the spark client information for connection.
* ``spark.sql.application`` [Optional].
* This parameters provides the spark sql application jar. Default value is ``s3://spark-datasource/sql-job.jar``.
* ``emr.cluster`` [Required].
* This parameters provides the emr cluster id information.
* ``emr.auth.type`` [Required]
* This parameters provides the authentication type information.
* Spark emr connector currently supports ``awssigv4`` authentication mechanism and following parameters are required.
* ``emr.auth.region``, ``emr.auth.access_key`` and ``emr.auth.secret_key``
* ``spark.datasource.flint.*`` [Optional]
* This parameters provides the Opensearch domain host information for flint integration.
* ``spark.datasource.flint.integration`` [Optional]
* Default value for integration jar is ``s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar``.
* ``spark.datasource.flint.host`` [Optional]
* Default value for host is ``localhost``.
* ``spark.datasource.flint.port`` [Optional]
* Default value for port is ``9200``.
* ``spark.datasource.flint.scheme`` [Optional]
* Default value for scheme is ``http``.
* ``spark.datasource.flint.auth`` [Optional]
* Default value for auth is ``false``.
* ``spark.datasource.flint.region`` [Optional]
* Default value for auth is ``us-west-2``.

Example spark dataSource configuration
========================================

AWSSigV4 Auth::

[{
"name" : "my_spark",
"connector": "spark",
"properties" : {
"spark.connector": "emr",
"emr.cluster" : "{{clusterId}}",
"emr.auth.type" : "awssigv4",
"emr.auth.region" : "us-east-1",
"emr.auth.access_key" : "{{accessKey}}"
"emr.auth.secret_key" : "{{secretKey}}"
"spark.datasource.flint.host" : "{{opensearchHost}}",
"spark.datasource.flint.port" : "{{opensearchPort}}",
"spark.datasource.flint.scheme" : "{{opensearchScheme}}",
"spark.datasource.flint.auth" : "{{opensearchAuth}}",
"spark.datasource.flint.region" : "{{opensearchRegion}}",
}
}]


Spark SQL Support
==================

`sql` Function
----------------------------
Spark connector offers `sql` function. This function can be used to run spark sql query.
The function takes spark sql query as input. Argument should be either passed by name or positionArguments should be either passed by name or position.
`source=my_spark.sql('select 1')`
or
`source=my_spark.sql(query='select 1')`
Example::

> source=my_spark.sql('select 1')
+---+
| 1 |
|---+
| 1 |
+---+

6 changes: 4 additions & 2 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ dependencies {

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20230227'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.2.0'
testImplementation 'junit:junit:4.13.1'
}

test {
Expand Down
121 changes: 121 additions & 0 deletions spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_INDEX_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;

import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepStatus;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import lombok.SneakyThrows;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.spark.helper.FlintHelper;
import org.opensearch.sql.spark.response.SparkResponse;

public class EmrClientImpl implements SparkClient {
private final AmazonElasticMapReduce emr;
private final String emrCluster;
private final FlintHelper flint;
private final String sparkApplicationJar;
private static final Logger logger = LogManager.getLogger(EmrClientImpl.class);
private SparkResponse sparkResponse;

/**
* Constructor for EMR Client Implementation.
*
* @param emr EMR helper
* @param flint Opensearch args for flint integration jar
* @param sparkResponse Response object to help with retrieving results from Opensearch index
*/
public EmrClientImpl(AmazonElasticMapReduce emr, String emrCluster, FlintHelper flint,
SparkResponse sparkResponse, String sparkApplicationJar) {
this.emr = emr;
this.emrCluster = emrCluster;
this.flint = flint;
this.sparkResponse = sparkResponse;
this.sparkApplicationJar =
sparkApplicationJar == null ? SPARK_SQL_APPLICATION_JAR : sparkApplicationJar;
}

@Override
public JSONObject sql(String query) throws IOException {
runEmrApplication(query);
return sparkResponse.getResultFromOpensearchIndex();
}

@VisibleForTesting
void runEmrApplication(String query) {

HadoopJarStepConfig stepConfig = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("spark-submit",
"--class","org.opensearch.sql.SQLJob",
"--jars",
flint.getFlintIntegrationJar(),
sparkApplicationJar,
query,
SPARK_INDEX_NAME,
flint.getFlintHost(),
flint.getFlintPort(),
flint.getFlintScheme(),
flint.getFlintAuth(),
flint.getFlintRegion()
);

StepConfig emrstep = new StepConfig()
.withName("Spark Application")
.withActionOnFailure(ActionOnFailure.CONTINUE)
.withHadoopJarStep(stepConfig);

AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
.withJobFlowId(emrCluster)
.withSteps(emrstep);

AddJobFlowStepsResult result = emr.addJobFlowSteps(request);
logger.info("EMR step ID: " + result.getStepIds());

String stepId = result.getStepIds().get(0);
DescribeStepRequest stepRequest = new DescribeStepRequest()
.withClusterId(emrCluster)
.withStepId(stepId);

waitForStepExecution(stepRequest);
sparkResponse.setValue(stepId);
}

@SneakyThrows
private void waitForStepExecution(DescribeStepRequest stepRequest) {
// Wait for the step to complete
boolean completed = false;
while (!completed) {
// Get the step status
StepStatus statusDetail = emr.describeStep(stepRequest).getStep().getStatus();
// Check if the step has completed
if (statusDetail.getState().equals("COMPLETED")) {
completed = true;
logger.info("EMR step completed successfully.");
} else if (statusDetail.getState().equals("FAILED")
|| statusDetail.getState().equals("CANCELLED")) {
logger.error("EMR step failed or cancelled.");
throw new RuntimeException("Spark SQL application failed.");
} else {
// Sleep for some time before checking the status again
Thread.sleep(2500);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.data.constants;

public class SparkConstants {
public static final String EMR = "emr";
public static final String STEP_ID_FIELD = "stepId.keyword";
public static final String SPARK_SQL_APPLICATION_JAR = "s3://spark-datasource/sql-job.jar";
public static final String SPARK_INDEX_NAME = ".query_execution_result";
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar";
public static final String FLINT_DEFAULT_HOST = "localhost";
public static final String FLINT_DEFAULT_PORT = "9200";
public static final String FLINT_DEFAULT_SCHEME = "http";
public static final String FLINT_DEFAULT_AUTH = "-1";
public static final String FLINT_DEFAULT_REGION = "us-west-2";
}
Loading

0 comments on commit 392a720

Please sign in to comment.