Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit of new job APIs #2050

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,16 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.rest.RestJobManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCreateJobRequestAction;
import org.opensearch.sql.spark.transport.TransportDeleteJobRequestAction;
import org.opensearch.sql.spark.transport.TransportGetJobRequestAction;
import org.opensearch.sql.spark.transport.TransportGetQueryResultRequestAction;
import org.opensearch.sql.spark.transport.model.CreateJobActionResponse;
import org.opensearch.sql.spark.transport.model.DeleteJobActionResponse;
import org.opensearch.sql.spark.transport.model.GetJobActionResponse;
import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse;
import org.opensearch.sql.storage.DataSourceFactory;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
Expand Down Expand Up @@ -131,7 +140,8 @@ public List<RestHandler> getRestHandlers(
new RestSqlStatsAction(settings, restController),
new RestPPLStatsAction(settings, restController),
new RestQuerySettingsAction(settings, restController),
new RestDataSourceQueryAction());
new RestDataSourceQueryAction(),
new RestJobManagementAction());
}

/** Register action and handler so that transportClient can find proxy for action. */
Expand All @@ -155,7 +165,20 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(
new ActionType<>(
TransportDeleteDataSourceAction.NAME, DeleteDataSourceActionResponse::new),
TransportDeleteDataSourceAction.class));
TransportDeleteDataSourceAction.class),
new ActionHandler<>(
new ActionType<>(TransportCreateJobRequestAction.NAME, CreateJobActionResponse::new),
TransportCreateJobRequestAction.class),
new ActionHandler<>(
new ActionType<>(TransportGetJobRequestAction.NAME, GetJobActionResponse::new),
TransportGetJobRequestAction.class),
new ActionHandler<>(
new ActionType<>(
TransportGetQueryResultRequestAction.NAME, GetJobQueryResultActionResponse::new),
TransportGetQueryResultRequestAction.class),
new ActionHandler<>(
new ActionType<>(TransportDeleteJobRequestAction.NAME, DeleteJobActionResponse::new),
TransportDeleteJobRequestAction.class));
}

@Override
Expand Down
5 changes: 4 additions & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.2.0'
testImplementation 'junit:junit:4.13.1'
testImplementation "org.opensearch.test:framework:${opensearch_version}"
}

test {
Expand Down Expand Up @@ -53,7 +54,9 @@ jacocoTestCoverageVerification {
rule {
element = 'CLASS'
excludes = [
'org.opensearch.sql.spark.data.constants.*'
'org.opensearch.sql.spark.data.constants.*',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.transport.model.*'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.rest;

import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.rest.RestRequest.Method.DELETE;
import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.RestRequest.Method.POST;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.sql.datasources.exceptions.ErrorMessage;
import org.opensearch.sql.datasources.utils.Scheduler;
import org.opensearch.sql.spark.rest.model.CreateJobRequest;
import org.opensearch.sql.spark.transport.TransportCreateJobRequestAction;
import org.opensearch.sql.spark.transport.TransportDeleteJobRequestAction;
import org.opensearch.sql.spark.transport.TransportGetJobRequestAction;
import org.opensearch.sql.spark.transport.TransportGetQueryResultRequestAction;
import org.opensearch.sql.spark.transport.model.CreateJobActionRequest;
import org.opensearch.sql.spark.transport.model.CreateJobActionResponse;
import org.opensearch.sql.spark.transport.model.DeleteJobActionRequest;
import org.opensearch.sql.spark.transport.model.DeleteJobActionResponse;
import org.opensearch.sql.spark.transport.model.GetJobActionRequest;
import org.opensearch.sql.spark.transport.model.GetJobActionResponse;
import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionRequest;
import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse;

public class RestJobManagementAction extends BaseRestHandler {

public static final String JOB_ACTIONS = "job_actions";
public static final String BASE_JOB_ACTION_URL = "/_plugins/_query/_jobs";

private static final Logger LOG = LogManager.getLogger(RestJobManagementAction.class);

@Override
public String getName() {
return JOB_ACTIONS;
}

@Override
public List<Route> routes() {
return ImmutableList.of(

/*
*
* Create a new job with spark execution engine.
* Request URL: POST
* Request body:
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionRequest]
* Response body:
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionResponse]
*/
new Route(POST, BASE_JOB_ACTION_URL),

/*
*
* GET jobs with in spark execution engine.
* Request URL: GET
* Request body:
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionRequest]
* Response body:
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionResponse]
*/
new Route(GET, String.format(Locale.ROOT, "%s/{%s}", BASE_JOB_ACTION_URL, "jobId")),
new Route(GET, BASE_JOB_ACTION_URL),

/*
*
* Cancel a job within spark execution engine.
* Request URL: DELETE
* Request body:
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionRequest]
* Response body:
* Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionResponse]
*/
new Route(DELETE, String.format(Locale.ROOT, "%s/{%s}", BASE_JOB_ACTION_URL, "jobId")),

/*
* GET query result from job {{jobId}} execution.
* Request URL: GET
* Request body:
* Ref [org.opensearch.sql.spark.transport.model.GetJobQueryResultActionRequest]
* Response body:
* Ref [org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse]
*/
new Route(GET, String.format(Locale.ROOT, "%s/{%s}/result", BASE_JOB_ACTION_URL, "jobId")));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
switch (restRequest.method()) {
case POST:
return executePostRequest(restRequest, nodeClient);
case GET:
return executeGetRequest(restRequest, nodeClient);
case DELETE:
return executeDeleteRequest(restRequest, nodeClient);
default:
return restChannel ->
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.METHOD_NOT_ALLOWED, String.valueOf(restRequest.method())));
}
}

private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
CreateJobRequest submitJobRequest =
CreateJobRequest.fromXContentParser(restRequest.contentParser());
return restChannel ->
Scheduler.schedule(
nodeClient,
() ->
nodeClient.execute(
TransportCreateJobRequestAction.ACTION_TYPE,
new CreateJobActionRequest(submitJobRequest),
new ActionListener<>() {
@Override
public void onResponse(CreateJobActionResponse createJobActionResponse) {
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.CREATED,
"application/json; charset=UTF-8",
submitJobRequest.getQuery()));
}

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
}
}));
}

private RestChannelConsumer executeGetRequest(RestRequest restRequest, NodeClient nodeClient) {
Boolean isResultRequest = restRequest.rawPath().contains("result");
if (isResultRequest) {
return executeGetJobQueryResultRequest(nodeClient, restRequest);
} else {
return executeGetJobRequest(nodeClient, restRequest);
}
}

private RestChannelConsumer executeGetJobQueryResultRequest(
NodeClient nodeClient, RestRequest restRequest) {
String jobId = restRequest.param("jobId");
return restChannel ->
Scheduler.schedule(
nodeClient,
() ->
nodeClient.execute(
TransportGetQueryResultRequestAction.ACTION_TYPE,
new GetJobQueryResultActionRequest(jobId),
new ActionListener<>() {
@Override
public void onResponse(
GetJobQueryResultActionResponse getJobQueryResultActionResponse) {
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.OK,
"application/json; charset=UTF-8",
getJobQueryResultActionResponse.getResult()));
}

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
}
}));
}

private RestChannelConsumer executeGetJobRequest(NodeClient nodeClient, RestRequest restRequest) {
String jobId = restRequest.param("jobId");
return restChannel ->
Scheduler.schedule(
nodeClient,
() ->
nodeClient.execute(
TransportGetJobRequestAction.ACTION_TYPE,
new GetJobActionRequest(jobId),
new ActionListener<>() {
@Override
public void onResponse(GetJobActionResponse getJobActionResponse) {
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.OK,
"application/json; charset=UTF-8",
getJobActionResponse.getResult()));
}

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
}
}));
}

private void handleException(Exception e, RestChannel restChannel) {
if (e instanceof OpenSearchException) {
OpenSearchException exception = (OpenSearchException) e;
reportError(restChannel, exception, exception.status());
} else {
LOG.error("Error happened during request handling", e);
if (isClientError(e)) {
reportError(restChannel, e, BAD_REQUEST);
} else {
reportError(restChannel, e, SERVICE_UNAVAILABLE);
}
}
}

private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) {
String jobId = restRequest.param("jobId");
return restChannel ->
Scheduler.schedule(
nodeClient,
() ->
nodeClient.execute(
TransportDeleteJobRequestAction.ACTION_TYPE,
new DeleteJobActionRequest(jobId),
new ActionListener<>() {
@Override
public void onResponse(DeleteJobActionResponse deleteJobActionResponse) {
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.OK,
"application/json; charset=UTF-8",
deleteJobActionResponse.getResult()));
}

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
}
}));
}

private void reportError(final RestChannel channel, final Exception e, final RestStatus status) {
channel.sendResponse(
new BytesRestResponse(status, new ErrorMessage(e, status.getStatus()).toString()));
}

private static boolean isClientError(Exception e) {
return e instanceof IllegalArgumentException || e instanceof IllegalStateException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.rest.model;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.opensearch.core.xcontent.XContentParser;

@Data
@AllArgsConstructor
public class CreateJobRequest {

private String query;

public static CreateJobRequest fromXContentParser(XContentParser parser) throws IOException {
String query = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
if (fieldName.equals("query")) {
query = parser.textOrNull();
} else {
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
return new CreateJobRequest(query);
}
}
Loading