Skip to content

Commit

Permalink
Add spark connector (opensearch-project#1780)
Browse files Browse the repository at this point in the history
* Create Spark Connector

Signed-off-by: Vamsi Manohar <[email protected]>

* Add spark client and engine

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove vars

Signed-off-by: Rupal Mahajan <[email protected]>

* Spark connector draft

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix checkstyle errors

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix license header

Signed-off-by: Rupal Mahajan <[email protected]>

* Add spark storage test

Signed-off-by: Rupal Mahajan <[email protected]>

* Update comments

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix checkstyle in comments

Signed-off-by: Rupal Mahajan <[email protected]>

* Update tests

Signed-off-by: Rupal Mahajan <[email protected]>

* Address PR comments

Signed-off-by: Rupal Mahajan <[email protected]>

* Refactor class name

Signed-off-by: Rupal Mahajan <[email protected]>

* Address PR comment

Signed-off-by: Rupal Mahajan <[email protected]>

---------

Signed-off-by: Vamsi Manohar <[email protected]>
Signed-off-by: Rupal Mahajan <[email protected]>
Co-authored-by: Vamsi Manohar <[email protected]>
  • Loading branch information
rupal-bq and vamsimanohar authored Jul 5, 2023
1 parent 4d6ac79 commit a816a58
Show file tree
Hide file tree
Showing 19 changed files with 942 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
public enum DataSourceType {
PROMETHEUS("prometheus"),
OPENSEARCH("opensearch"),
JDBC("jdbc");

SPARK("spark");
private String text;

DataSourceType(String text) {
Expand Down
1 change: 1 addition & 0 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ dependencies {
api project(':opensearch')
api project(':prometheus')
api project(':datasources')
api project(':spark')

testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.12.13'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
Expand Down
2 changes: 2 additions & 0 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.storage.DataSourceFactory;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
Expand Down Expand Up @@ -221,6 +222,7 @@ private DataSourceServiceImpl createDataSourceService() {
.add(new OpenSearchDataSourceFactory(
new OpenSearchNodeClient(this.client), pluginSettings))
.add(new PrometheusStorageFactory(pluginSettings))
.add(new SparkStorageFactory(this.client, pluginSettings))
.build(),
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
Expand Down
4 changes: 3 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ include 'legacy'
include 'sql'
include 'prometheus'
include 'benchmarks'
include 'datasources'
include 'datasources'
include 'spark'

73 changes: 73 additions & 0 deletions spark/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
}

repositories {
mavenCentral()
}

dependencies {
api project(':core')
implementation project(':datasources')

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20230227'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}

test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
}
}

jacocoTestReport {
reports {
html.enabled true
xml.enabled true
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
test.finalizedBy(project.tasks.jacocoTestReport)

jacocoTestCoverageVerification {
violationRules {
rule {
element = 'CLASS'
excludes = [
'org.opensearch.sql.spark.data.constants.*'
]
limit {
counter = 'LINE'
minimum = 1.0
}
limit {
counter = 'BRANCH'
minimum = 1.0
}
}
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
}))
}
}
check.dependsOn jacocoTestCoverageVerification
jacocoTestCoverageVerification.dependsOn jacocoTestReport
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.client;

import java.io.IOException;
import org.json.JSONObject;

/**
* Interface class for Spark Client.
*/
public interface SparkClient {
/**
* This method executes spark sql query.
*
* @param query spark sql query
* @return spark query response
*/
JSONObject sql(String query) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.functions.implementation;

import static org.opensearch.sql.spark.functions.resolver.SparkSqlTableFunctionResolver.QUERY;

import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.TableFunctionImplementation;
import org.opensearch.sql.spark.client.SparkClient;
import org.opensearch.sql.spark.request.SparkQueryRequest;
import org.opensearch.sql.spark.storage.SparkTable;
import org.opensearch.sql.storage.Table;

/**
* Spark SQL function implementation.
*/
public class SparkSqlFunctionImplementation extends FunctionExpression
implements TableFunctionImplementation {

private final FunctionName functionName;
private final List<Expression> arguments;
private final SparkClient sparkClient;

/**
* Constructor for spark sql function.
*
* @param functionName name of the function
* @param arguments a list of expressions
* @param sparkClient spark client
*/
public SparkSqlFunctionImplementation(
FunctionName functionName, List<Expression> arguments, SparkClient sparkClient) {
super(functionName, arguments);
this.functionName = functionName;
this.arguments = arguments;
this.sparkClient = sparkClient;
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
throw new UnsupportedOperationException(String.format(
"Spark defined function [%s] is only "
+ "supported in SOURCE clause with spark connector catalog", functionName));
}

@Override
public ExprType type() {
return ExprCoreType.STRUCT;
}

@Override
public String toString() {
List<String> args = arguments.stream()
.map(arg -> String.format("%s=%s",
((NamedArgumentExpression) arg).getArgName(),
((NamedArgumentExpression) arg).getValue().toString()))
.collect(Collectors.toList());
return String.format("%s(%s)", functionName, String.join(", ", args));
}

@Override
public Table applyArguments() {
return new SparkTable(sparkClient, buildQueryFromSqlFunction(arguments));
}

/**
* This method builds a spark query request.
*
* @param arguments spark sql function arguments
* @return spark query request
*/
private SparkQueryRequest buildQueryFromSqlFunction(List<Expression> arguments) {

SparkQueryRequest sparkQueryRequest = new SparkQueryRequest();
arguments.forEach(arg -> {
String argName = ((NamedArgumentExpression) arg).getArgName();
Expression argValue = ((NamedArgumentExpression) arg).getValue();
ExprValue literalValue = argValue.valueOf();
if (argName.equals(QUERY)) {
sparkQueryRequest.setSql((String) literalValue.value());
} else {
throw new ExpressionEvaluationException(
String.format("Invalid Function Argument:%s", argName));
}
});
return sparkQueryRequest;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.functions.resolver;

import static org.opensearch.sql.data.type.ExprCoreType.STRING;

import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.function.FunctionBuilder;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.FunctionResolver;
import org.opensearch.sql.expression.function.FunctionSignature;
import org.opensearch.sql.spark.client.SparkClient;
import org.opensearch.sql.spark.functions.implementation.SparkSqlFunctionImplementation;

/**
* Function resolver for sql function of spark connector.
*/
@RequiredArgsConstructor
public class SparkSqlTableFunctionResolver implements FunctionResolver {
private final SparkClient sparkClient;

public static final String SQL = "sql";
public static final String QUERY = "query";

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(FunctionSignature unresolvedSignature) {
FunctionName functionName = FunctionName.of(SQL);
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING));
final List<String> argumentNames = List.of(QUERY);

FunctionBuilder functionBuilder = (functionProperties, arguments) -> {
Boolean argumentsPassedByName = arguments.stream()
.noneMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName()));
Boolean argumentsPassedByPosition = arguments.stream()
.allMatch(arg -> StringUtils.isEmpty(((NamedArgumentExpression) arg).getArgName()));
if (!(argumentsPassedByName || argumentsPassedByPosition)) {
throw new SemanticCheckException("Arguments should be either passed by name or position");
}

if (arguments.size() != argumentNames.size()) {
throw new SemanticCheckException(
String.format("Missing arguments:[%s]",
String.join(",", argumentNames.subList(arguments.size(), argumentNames.size()))));
}

if (argumentsPassedByPosition) {
List<Expression> namedArguments = new ArrayList<>();
for (int i = 0; i < arguments.size(); i++) {
namedArguments.add(new NamedArgumentExpression(argumentNames.get(i),
((NamedArgumentExpression) arguments.get(i)).getValue()));
}
return new SparkSqlFunctionImplementation(functionName, namedArguments, sparkClient);
}
return new SparkSqlFunctionImplementation(functionName, arguments, sparkClient);
};
return Pair.of(functionSignature, functionBuilder);
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of(SQL);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.functions.scan;

import lombok.AllArgsConstructor;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.spark.client.SparkClient;
import org.opensearch.sql.spark.request.SparkQueryRequest;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* TableScanBuilder for sql function of spark connector.
*/
@AllArgsConstructor
public class SparkSqlFunctionTableScanBuilder extends TableScanBuilder {

private final SparkClient sparkClient;

private final SparkQueryRequest sparkQueryRequest;

@Override
public TableScanOperator build() {
//TODO: return SqlFunctionTableScanOperator
return null;
}

@Override
public boolean pushDownProject(LogicalProject project) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.request;

import lombok.Data;

/**
* Spark query request.
*/
@Data
public class SparkQueryRequest {

/**
* SQL.
*/
private String sql;

}
Loading

0 comments on commit a816a58

Please sign in to comment.