Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EMR client to spark connector #1790

Merged
merged 41 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
70588d6
Create Spark Connector
vamsimanohar Jun 8, 2023
2b8ceb7
Add spark client and engine
rupal-bq Jun 21, 2023
f595ff4
Remove vars
rupal-bq Jun 21, 2023
95a86d5
Merge branch 'main' into create-spark-connector
rupal-bq Jun 26, 2023
51700bd
Merge branch 'main' into create-spark-connector
rupal-bq Jun 26, 2023
d0d5042
Spark connector draft
rupal-bq Jun 26, 2023
5ac01b2
nit
rupal-bq Jun 26, 2023
4fcf4ba
Fix checkstyle errors
rupal-bq Jun 26, 2023
52a044d
nit
rupal-bq Jun 26, 2023
69ca116
Fix license header
rupal-bq Jun 26, 2023
901e762
Add spark storage test
rupal-bq Jun 26, 2023
1b81453
Update comments
rupal-bq Jun 26, 2023
106218e
Fix checkstyle in comments
rupal-bq Jun 26, 2023
6da93f3
Merge branch 'main' into create-spark-connector
rupal-bq Jun 26, 2023
b953aa6
Update tests
rupal-bq Jun 26, 2023
2f899cd
Add emr client
rupal-bq Jun 27, 2023
8ebab1a
Set default values for flint args
rupal-bq Jun 27, 2023
7dcad73
Validate emr auth type
rupal-bq Jun 27, 2023
23f195f
Add default constants for flint
rupal-bq Jun 27, 2023
63e3f6d
Update unit tests
rupal-bq Jun 28, 2023
0df37a8
Address PR comments
rupal-bq Jun 28, 2023
99d561a
tests draft
rupal-bq Jul 4, 2023
776bc36
Refactor class name
rupal-bq Jul 4, 2023
0eb5e53
Merge branch 'main' into create-spark-connector
rupal-bq Jul 4, 2023
680268a
Merge branch 'create-spark-connector' into emr-client
rupal-bq Jul 4, 2023
890c8c7
Rename classes and update tests
rupal-bq Jul 4, 2023
9fc496a
Update scan operator test
rupal-bq Jul 5, 2023
be15dae
Address PR comment
rupal-bq Jul 5, 2023
e74e036
Fix Connection pool shut down issue
rupal-bq Jul 5, 2023
dbf0367
Update emr unit tests
rupal-bq Jul 6, 2023
b8209bd
Merge branch 'create-spark-connector' into emr-client
rupal-bq Jul 6, 2023
2f73d40
Merge branch 'main' into emr-client
rupal-bq Jul 6, 2023
7f8c2ab
Update doc and tests
rupal-bq Jul 6, 2023
cd9a4fe
nit
rupal-bq Jul 6, 2023
9e34ce3
Update EMR clinet impl tests
rupal-bq Jul 6, 2023
c029ae5
Address PR comments
rupal-bq Jul 6, 2023
9d249ee
Make spark & flint jars configurable
rupal-bq Jul 6, 2023
f9d7382
Address comments
rupal-bq Jul 7, 2023
c6e0076
Add spark application id in logs
rupal-bq Jul 7, 2023
cc6d8cb
nit
rupal-bq Jul 7, 2023
5bbda31
Delete result when not required
rupal-bq Jul 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions DEVELOPER_GUIDE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ The plugin codebase is in standard layout of Gradle project::
├── plugin
├── protocol
├── ppl
├── spark
├── sql
├── sql-cli
├── sql-jdbc
Expand All @@ -161,6 +162,7 @@ Here are sub-folders (Gradle modules) for plugin source code:
- ``core``: core query engine.
- ``opensearch``: OpenSearch storage engine.
- ``prometheus``: Prometheus storage engine.
- ``spark`` : Spark storage engine
- ``protocol``: request/response protocol formatter.
- ``common``: common util code.
- ``integ-test``: integration and comparison test.
Expand Down
92 changes: 92 additions & 0 deletions docs/user/ppl/admin/spark_connector.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
.. highlight:: sh

====================
Spark Connector
====================

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1


Introduction
============

This page covers spark connector properties for dataSource configuration
and the nuances associated with spark connector.


Spark Connector Properties in DataSource Configuration
========================================================
Spark Connector Properties.

* ``spark.connector`` [Required].
* This parameters provides the spark client information for connection.
* ``spark.sql.application`` [Optional].
* This parameters provides the spark sql application jar. Default value is ``s3://spark-datasource/sql-job.jar``.
* ``emr.cluster`` [Required].
* This parameters provides the emr cluster id information.
* ``emr.auth.type`` [Required]
* This parameters provides the authentication type information.
* Spark emr connector currently supports ``awssigv4`` authentication mechanism and following parameters are required.
* ``emr.auth.region``, ``emr.auth.access_key`` and ``emr.auth.secret_key``
* ``spark.datasource.flint.*`` [Optional]
* This parameters provides the Opensearch domain host information for flint integration.
* ``spark.datasource.flint.integration`` [Optional]
* Default value for integration jar is ``s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar``.
* ``spark.datasource.flint.host`` [Optional]
* Default value for host is ``localhost``.
* ``spark.datasource.flint.port`` [Optional]
* Default value for port is ``9200``.
* ``spark.datasource.flint.scheme`` [Optional]
* Default value for scheme is ``http``.
* ``spark.datasource.flint.auth`` [Optional]
* Default value for auth is ``false``.
* ``spark.datasource.flint.region`` [Optional]
* Default value for auth is ``us-west-2``.

Example spark dataSource configuration
========================================

AWSSigV4 Auth::

[{
"name" : "my_spark",
"connector": "spark",
"properties" : {
"spark.connector": "emr",
"emr.cluster" : "{{clusterId}}",
"emr.auth.type" : "awssigv4",
"emr.auth.region" : "us-east-1",
"emr.auth.access_key" : "{{accessKey}}"
"emr.auth.secret_key" : "{{secretKey}}"
"spark.datasource.flint.host" : "{{opensearchHost}}",
"spark.datasource.flint.port" : "{{opensearchPort}}",
"spark.datasource.flint.scheme" : "{{opensearchScheme}}",
"spark.datasource.flint.auth" : "{{opensearchAuth}}",
"spark.datasource.flint.region" : "{{opensearchRegion}}",
}
}]


Spark SQL Support
==================

`sql` Function
----------------------------
Spark connector offers `sql` function. This function can be used to run spark sql query.
The function takes spark sql query as input. Argument should be either passed by name or positionArguments should be either passed by name or position.
`source=my_spark.sql('select 1')`
or
`source=my_spark.sql(query='select 1')`
Example::

> source=my_spark.sql('select 1')
+---+
| 1 |
|---+
| 1 |
+---+

9 changes: 6 additions & 3 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ dependencies {

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20230227'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.2.0'
testImplementation 'junit:junit:4.13.1'
}

test {
Expand Down Expand Up @@ -51,7 +53,8 @@ jacocoTestCoverageVerification {
rule {
element = 'CLASS'
excludes = [
'org.opensearch.sql.spark.data.constants.*'
'org.opensearch.sql.spark.data.constants.*',
'org.opensearch.sql.spark.helper.*'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INTEGRATION_JAR;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_INDEX_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;

import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepStatus;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import lombok.SneakyThrows;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.spark.helper.EMRHelper;
import org.opensearch.sql.spark.helper.FlintHelper;
import org.opensearch.sql.spark.response.SparkResponse;

public class EmrClientImpl implements SparkClient {
private final EMRHelper emr;
private final FlintHelper flint;
private final String sparkApplicationJar;
private static final Logger logger = LogManager.getLogger(EmrClientImpl.class);
private SparkResponse sparkResponse;

/**
* Constructor for EMR Client Implementation.
*
* @param emr EMR helper
* @param flint Opensearch args for flint integration jar
* @param sparkResponse Response object to help with retrieving results from Opensearch index
*/
public EmrClientImpl(EMRHelper emr, FlintHelper flint,
SparkResponse sparkResponse, String sparkApplicationJar) {
this.emr = emr;
this.flint = flint;
this.sparkResponse = sparkResponse;
this.sparkApplicationJar =
sparkApplicationJar == null ? SPARK_SQL_APPLICATION_JAR : sparkApplicationJar;
}

@Override
public JSONObject sql(String query) throws IOException {
runEmrApplication(query);
return sparkResponse.getResultFromOpensearchIndex();
}

@VisibleForTesting
String runEmrApplication(String query) {

HadoopJarStepConfig stepConfig = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("spark-submit",
"--class","org.opensearch.sql.SQLJob",
"--jars",
flint.getFlintIntegrationJar(),
sparkApplicationJar,
query,
SPARK_INDEX_NAME,
flint.getFlintHost(),
flint.getFlintPort(),
flint.getFlintScheme(),
flint.getFlintAuth(),
flint.getFlintRegion()
);

StepConfig emrstep = new StepConfig()
.withName("Spark Application")
.withActionOnFailure(ActionOnFailure.CONTINUE)
.withHadoopJarStep(stepConfig);

AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
.withJobFlowId(emr.getEmrCluster())
.withSteps(emrstep);

AddJobFlowStepsResult result = emr.addStep(request);
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Spark application step IDs: " + result.getStepIds());

String stepId = result.getStepIds().get(0);
DescribeStepRequest stepRequest = new DescribeStepRequest()
.withClusterId(emr.getEmrCluster())
.withStepId(stepId);

waitForStepExecution(stepRequest);
sparkResponse.setValue(stepId);

return stepId;
rupal-bq marked this conversation as resolved.
Show resolved Hide resolved
}

@SneakyThrows
private void waitForStepExecution(DescribeStepRequest stepRequest) {
// Wait for the step to complete
boolean completed = false;
while (!completed) {
// Get the step status
StepStatus statusDetail = emr.getStepStatus(stepRequest);
// Check if the step has completed
if (statusDetail.getState().equals("COMPLETED")) {
completed = true;
logger.info("EMR step completed successfully.");
rupal-bq marked this conversation as resolved.
Show resolved Hide resolved
} else if (statusDetail.getState().equals("FAILED")
|| statusDetail.getState().equals("CANCELLED")) {
logger.error("EMR step failed or cancelled.");
throw new RuntimeException("Spark SQL application failed.");
} else {
// Sleep for some time before checking the status again
Thread.sleep(2500);
rupal-bq marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.data.constants;

public class SparkConstants {
public static final String EMR = "emr";
public static final String STEP_ID_FIELD = "stepId.keyword";
public static final String SPARK_SQL_APPLICATION_JAR = "s3://spark-datasource/sql-job.jar";
public static final String SPARK_INDEX_NAME = ".query_execution_result";
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar";
public static final String FLINT_DEFAULT_HOST = "localhost";
public static final String FLINT_DEFAULT_PORT = "9200";
public static final String FLINT_DEFAULT_SCHEME = "http";
public static final String FLINT_DEFAULT_AUTH = "-1";
public static final String FLINT_DEFAULT_REGION = "us-west-2";
}
Loading