Skip to content

Commit

Permalink
Introduce SparkParameterComposerCollection (#2774)
Browse files Browse the repository at this point in the history
* Introduce SparkParameterComposerCollection

Signed-off-by: Tomoyuki Morita <[email protected]>

* Fix comments

Signed-off-by: Tomoyuki Morita <[email protected]>

* Fix integ test

Signed-off-by: Tomoyuki Morita <[email protected]>

---------

Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 authored Jul 12, 2024
1 parent d3ecb2d commit a151a7d
Show file tree
Hide file tree
Showing 37 changed files with 1,387 additions and 706 deletions.
6 changes: 4 additions & 2 deletions async-query-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Following is the list of extension points where the consumer of the library need
- [QueryIdProvider](src/main/java/org/opensearch/sql/spark/dispatcher/QueryIdProvider.java)
- [SessionIdProvider](src/main/java/org/opensearch/sql/spark/execution/session/SessionIdProvider.java)
- [SessionConfigSupplier](src/main/java/org/opensearch/sql/spark/execution/session/SessionConfigSupplier.java)
- [SparkExecutionEngineConfigSupplier](src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplier.java)
- [SparkSubmitParameterModifier](src/main/java/org/opensearch/sql/spark/config/SparkSubmitParameterModifier.java)
- [EMRServerlessClientFactory](src/main/java/org/opensearch/sql/spark/client/EMRServerlessClientFactory.java)
- [SparkExecutionEngineConfigSupplier](src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplier.java)
- [DataSourceSparkParameterComposer](src/main/java/org/opensearch/sql/spark/parameter/DataSourceSparkParameterComposer.java)
- [GeneralSparkParameterComposer](src/main/java/org/opensearch/sql/spark/parameter/GeneralSparkParameterComposer.java)
- [SparkSubmitParameterModifier](src/main/java/org/opensearch/sql/spark/config/SparkSubmitParameterModifier.java) To be deprecated in favor of GeneralSparkParameterComposer

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package org.opensearch.sql.spark.config;

import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilder;

/**
* Interface for extension point to allow modification of spark submit parameter. modifyParameter
* method is called after the default spark submit parameter is build.
* method is called after the default spark submit parameter is build. To be deprecated in favor of
* {@link org.opensearch.sql.spark.parameter.GeneralSparkParameterComposer}
*/
public interface SparkSubmitParameterModifier {
void modifyParameters(SparkSubmitParameters parameters);
void modifyParameters(SparkSubmitParametersBuilder parametersBuilder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
Expand All @@ -25,6 +24,7 @@
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -37,6 +37,7 @@ public class BatchQueryHandler extends AsyncQueryHandler {
protected final JobExecutionResponseReader jobExecutionResponseReader;
protected final LeaseManager leaseManager;
protected final MetricsService metricsService;
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -80,12 +81,16 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getAccountId(),
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.builder()
sparkSubmitParametersBuilderProvider
.getSparkSubmitParametersBuilder()
.clusterName(clusterName)
.dataSource(context.getDataSourceMetadata())
.query(dispatchQueryRequest.getQuery())
.build()
.dataSource(
context.getDataSourceMetadata(),
dispatchQueryRequest,
context.getAsyncQueryRequestContext())
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier())
.acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext())
.toString(),
tags,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
Expand All @@ -32,6 +31,7 @@
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.metrics.EmrMetrics;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -46,6 +46,7 @@ public class InteractiveQueryHandler extends AsyncQueryHandler {
private final JobExecutionResponseReader jobExecutionResponseReader;
private final LeaseManager leaseManager;
private final MetricsService metricsService;
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -112,12 +113,16 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getAccountId(),
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.builder()
sparkSubmitParametersBuilderProvider
.getSparkSubmitParametersBuilder()
.className(FLINT_SESSION_CLASS_NAME)
.clusterName(clusterName)
.dataSource(dataSourceMetadata)
.build()
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier()),
.dataSource(
dataSourceMetadata,
dispatchQueryRequest,
context.getAsyncQueryRequestContext())
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier())
.acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext()),
tags,
dataSourceMetadata.getResultIndex(),
dataSourceMetadata.getName()),
Expand Down
Loading

0 comments on commit a151a7d

Please sign in to comment.