diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index f20de87d61..d2c1e61c3a 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -82,7 +82,16 @@ import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; +import org.opensearch.sql.spark.rest.RestJobManagementAction; import org.opensearch.sql.spark.storage.SparkStorageFactory; +import org.opensearch.sql.spark.transport.TransportCreateJobRequestAction; +import org.opensearch.sql.spark.transport.TransportDeleteJobRequestAction; +import org.opensearch.sql.spark.transport.TransportGetJobRequestAction; +import org.opensearch.sql.spark.transport.TransportGetQueryResultRequestAction; +import org.opensearch.sql.spark.transport.model.CreateJobActionResponse; +import org.opensearch.sql.spark.transport.model.DeleteJobActionResponse; +import org.opensearch.sql.spark.transport.model.GetJobActionResponse; +import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; @@ -131,7 +140,8 @@ public List getRestHandlers( new RestSqlStatsAction(settings, restController), new RestPPLStatsAction(settings, restController), new RestQuerySettingsAction(settings, restController), - new RestDataSourceQueryAction()); + new RestDataSourceQueryAction(), + new RestJobManagementAction()); } /** Register action and handler so that transportClient can find proxy for action. */ @@ -155,7 +165,20 @@ public List getRestHandlers( new ActionHandler<>( new ActionType<>( TransportDeleteDataSourceAction.NAME, DeleteDataSourceActionResponse::new), - TransportDeleteDataSourceAction.class)); + TransportDeleteDataSourceAction.class), + new ActionHandler<>( + new ActionType<>(TransportCreateJobRequestAction.NAME, CreateJobActionResponse::new), + TransportCreateJobRequestAction.class), + new ActionHandler<>( + new ActionType<>(TransportGetJobRequestAction.NAME, GetJobActionResponse::new), + TransportGetJobRequestAction.class), + new ActionHandler<>( + new ActionType<>( + TransportGetQueryResultRequestAction.NAME, GetJobQueryResultActionResponse::new), + TransportGetQueryResultRequestAction.class), + new ActionHandler<>( + new ActionType<>(TransportDeleteJobRequestAction.NAME, DeleteJobActionResponse::new), + TransportDeleteJobRequestAction.class)); } @Override diff --git a/spark/build.gradle b/spark/build.gradle index 89842e5ea8..b93e3327ce 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -25,6 +25,7 @@ dependencies { testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0' testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.2.0' testImplementation 'junit:junit:4.13.1' + testImplementation "org.opensearch.test:framework:${opensearch_version}" } test { @@ -53,7 +54,9 @@ jacocoTestCoverageVerification { rule { element = 'CLASS' excludes = [ - 'org.opensearch.sql.spark.data.constants.*' + 'org.opensearch.sql.spark.data.constants.*', + 'org.opensearch.sql.spark.rest.*', + 'org.opensearch.sql.spark.transport.model.*' ] limit { counter = 'LINE' diff --git a/spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java b/spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java new file mode 100644 index 0000000000..669cbb6aca --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/rest/RestJobManagementAction.java @@ -0,0 +1,262 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.rest; + +import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; +import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE; +import static org.opensearch.rest.RestRequest.Method.DELETE; +import static org.opensearch.rest.RestRequest.Method.GET; +import static org.opensearch.rest.RestRequest.Method.POST; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.sql.datasources.exceptions.ErrorMessage; +import org.opensearch.sql.datasources.utils.Scheduler; +import org.opensearch.sql.spark.rest.model.CreateJobRequest; +import org.opensearch.sql.spark.transport.TransportCreateJobRequestAction; +import org.opensearch.sql.spark.transport.TransportDeleteJobRequestAction; +import org.opensearch.sql.spark.transport.TransportGetJobRequestAction; +import org.opensearch.sql.spark.transport.TransportGetQueryResultRequestAction; +import org.opensearch.sql.spark.transport.model.CreateJobActionRequest; +import org.opensearch.sql.spark.transport.model.CreateJobActionResponse; +import org.opensearch.sql.spark.transport.model.DeleteJobActionRequest; +import org.opensearch.sql.spark.transport.model.DeleteJobActionResponse; +import org.opensearch.sql.spark.transport.model.GetJobActionRequest; +import org.opensearch.sql.spark.transport.model.GetJobActionResponse; +import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionRequest; +import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse; + +public class RestJobManagementAction extends BaseRestHandler { + + public static final String JOB_ACTIONS = "job_actions"; + public static final String BASE_JOB_ACTION_URL = "/_plugins/_query/_jobs"; + + private static final Logger LOG = LogManager.getLogger(RestJobManagementAction.class); + + @Override + public String getName() { + return JOB_ACTIONS; + } + + @Override + public List routes() { + return ImmutableList.of( + + /* + * + * Create a new job with spark execution engine. + * Request URL: POST + * Request body: + * Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionRequest] + * Response body: + * Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionResponse] + */ + new Route(POST, BASE_JOB_ACTION_URL), + + /* + * + * GET jobs with in spark execution engine. + * Request URL: GET + * Request body: + * Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionRequest] + * Response body: + * Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionResponse] + */ + new Route(GET, String.format(Locale.ROOT, "%s/{%s}", BASE_JOB_ACTION_URL, "jobId")), + new Route(GET, BASE_JOB_ACTION_URL), + + /* + * + * Cancel a job within spark execution engine. + * Request URL: DELETE + * Request body: + * Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionRequest] + * Response body: + * Ref [org.opensearch.sql.spark.transport.model.SubmitJobActionResponse] + */ + new Route(DELETE, String.format(Locale.ROOT, "%s/{%s}", BASE_JOB_ACTION_URL, "jobId")), + + /* + * GET query result from job {{jobId}} execution. + * Request URL: GET + * Request body: + * Ref [org.opensearch.sql.spark.transport.model.GetJobQueryResultActionRequest] + * Response body: + * Ref [org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse] + */ + new Route(GET, String.format(Locale.ROOT, "%s/{%s}/result", BASE_JOB_ACTION_URL, "jobId"))); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) + throws IOException { + switch (restRequest.method()) { + case POST: + return executePostRequest(restRequest, nodeClient); + case GET: + return executeGetRequest(restRequest, nodeClient); + case DELETE: + return executeDeleteRequest(restRequest, nodeClient); + default: + return restChannel -> + restChannel.sendResponse( + new BytesRestResponse( + RestStatus.METHOD_NOT_ALLOWED, String.valueOf(restRequest.method()))); + } + } + + private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClient nodeClient) + throws IOException { + CreateJobRequest submitJobRequest = + CreateJobRequest.fromXContentParser(restRequest.contentParser()); + return restChannel -> + Scheduler.schedule( + nodeClient, + () -> + nodeClient.execute( + TransportCreateJobRequestAction.ACTION_TYPE, + new CreateJobActionRequest(submitJobRequest), + new ActionListener<>() { + @Override + public void onResponse(CreateJobActionResponse createJobActionResponse) { + restChannel.sendResponse( + new BytesRestResponse( + RestStatus.CREATED, + "application/json; charset=UTF-8", + submitJobRequest.getQuery())); + } + + @Override + public void onFailure(Exception e) { + handleException(e, restChannel); + } + })); + } + + private RestChannelConsumer executeGetRequest(RestRequest restRequest, NodeClient nodeClient) { + Boolean isResultRequest = restRequest.rawPath().contains("result"); + if (isResultRequest) { + return executeGetJobQueryResultRequest(nodeClient, restRequest); + } else { + return executeGetJobRequest(nodeClient, restRequest); + } + } + + private RestChannelConsumer executeGetJobQueryResultRequest( + NodeClient nodeClient, RestRequest restRequest) { + String jobId = restRequest.param("jobId"); + return restChannel -> + Scheduler.schedule( + nodeClient, + () -> + nodeClient.execute( + TransportGetQueryResultRequestAction.ACTION_TYPE, + new GetJobQueryResultActionRequest(jobId), + new ActionListener<>() { + @Override + public void onResponse( + GetJobQueryResultActionResponse getJobQueryResultActionResponse) { + restChannel.sendResponse( + new BytesRestResponse( + RestStatus.OK, + "application/json; charset=UTF-8", + getJobQueryResultActionResponse.getResult())); + } + + @Override + public void onFailure(Exception e) { + handleException(e, restChannel); + } + })); + } + + private RestChannelConsumer executeGetJobRequest(NodeClient nodeClient, RestRequest restRequest) { + String jobId = restRequest.param("jobId"); + return restChannel -> + Scheduler.schedule( + nodeClient, + () -> + nodeClient.execute( + TransportGetJobRequestAction.ACTION_TYPE, + new GetJobActionRequest(jobId), + new ActionListener<>() { + @Override + public void onResponse(GetJobActionResponse getJobActionResponse) { + restChannel.sendResponse( + new BytesRestResponse( + RestStatus.OK, + "application/json; charset=UTF-8", + getJobActionResponse.getResult())); + } + + @Override + public void onFailure(Exception e) { + handleException(e, restChannel); + } + })); + } + + private void handleException(Exception e, RestChannel restChannel) { + if (e instanceof OpenSearchException) { + OpenSearchException exception = (OpenSearchException) e; + reportError(restChannel, exception, exception.status()); + } else { + LOG.error("Error happened during request handling", e); + if (isClientError(e)) { + reportError(restChannel, e, BAD_REQUEST); + } else { + reportError(restChannel, e, SERVICE_UNAVAILABLE); + } + } + } + + private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) { + String jobId = restRequest.param("jobId"); + return restChannel -> + Scheduler.schedule( + nodeClient, + () -> + nodeClient.execute( + TransportDeleteJobRequestAction.ACTION_TYPE, + new DeleteJobActionRequest(jobId), + new ActionListener<>() { + @Override + public void onResponse(DeleteJobActionResponse deleteJobActionResponse) { + restChannel.sendResponse( + new BytesRestResponse( + RestStatus.OK, + "application/json; charset=UTF-8", + deleteJobActionResponse.getResult())); + } + + @Override + public void onFailure(Exception e) { + handleException(e, restChannel); + } + })); + } + + private void reportError(final RestChannel channel, final Exception e, final RestStatus status) { + channel.sendResponse( + new BytesRestResponse(status, new ErrorMessage(e, status.getStatus()).toString())); + } + + private static boolean isClientError(Exception e) { + return e instanceof IllegalArgumentException || e instanceof IllegalStateException; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/rest/model/CreateJobRequest.java b/spark/src/main/java/org/opensearch/sql/spark/rest/model/CreateJobRequest.java new file mode 100644 index 0000000000..ef29e857c8 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/rest/model/CreateJobRequest.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.rest.model; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; + +import java.io.IOException; +import lombok.AllArgsConstructor; +import lombok.Data; +import org.opensearch.core.xcontent.XContentParser; + +@Data +@AllArgsConstructor +public class CreateJobRequest { + + private String query; + + public static CreateJobRequest fromXContentParser(XContentParser parser) throws IOException { + String query = null; + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + if (fieldName.equals("query")) { + query = parser.textOrNull(); + } else { + throw new IllegalArgumentException("Unknown field: " + fieldName); + } + } + return new CreateJobRequest(query); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequestAction.java b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequestAction.java new file mode 100644 index 0000000000..0c43566134 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportCreateJobRequestAction.java @@ -0,0 +1,39 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.spark.transport.model.CreateJobActionRequest; +import org.opensearch.sql.spark.transport.model.CreateJobActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +public class TransportCreateJobRequestAction + extends HandledTransportAction { + + public static final String NAME = "cluster:admin/opensearch/ql/jobs/write"; + public static final ActionType ACTION_TYPE = + new ActionType<>(NAME, CreateJobActionResponse::new); + + @Inject + public TransportCreateJobRequestAction( + TransportService transportService, ActionFilters actionFilters) { + super(NAME, transportService, actionFilters, CreateJobActionRequest::new); + } + + @Override + protected void doExecute( + Task task, CreateJobActionRequest request, ActionListener listener) { + String responseContent = "submitted_job"; + listener.onResponse(new CreateJobActionResponse(responseContent)); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequestAction.java b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequestAction.java new file mode 100644 index 0000000000..dcccb76272 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequestAction.java @@ -0,0 +1,39 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.spark.transport.model.DeleteJobActionRequest; +import org.opensearch.sql.spark.transport.model.DeleteJobActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +public class TransportDeleteJobRequestAction + extends HandledTransportAction { + + public static final String NAME = "cluster:admin/opensearch/ql/jobs/delete"; + public static final ActionType ACTION_TYPE = + new ActionType<>(NAME, DeleteJobActionResponse::new); + + @Inject + public TransportDeleteJobRequestAction( + TransportService transportService, ActionFilters actionFilters) { + super(NAME, transportService, actionFilters, DeleteJobActionRequest::new); + } + + @Override + protected void doExecute( + Task task, DeleteJobActionRequest request, ActionListener listener) { + String responseContent = "deleted_job"; + listener.onResponse(new DeleteJobActionResponse(responseContent)); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/TransportGetJobRequestAction.java b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportGetJobRequestAction.java new file mode 100644 index 0000000000..96e002bd81 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportGetJobRequestAction.java @@ -0,0 +1,52 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.spark.transport.model.GetJobActionRequest; +import org.opensearch.sql.spark.transport.model.GetJobActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +public class TransportGetJobRequestAction + extends HandledTransportAction { + + public static final String NAME = "cluster:admin/opensearch/ql/jobs/read"; + public static final ActionType ACTION_TYPE = + new ActionType<>(NAME, GetJobActionResponse::new); + + @Inject + public TransportGetJobRequestAction( + TransportService transportService, ActionFilters actionFilters) { + super(NAME, transportService, actionFilters, GetJobActionRequest::new); + } + + @Override + protected void doExecute( + Task task, GetJobActionRequest request, ActionListener listener) { + String responseContent; + if (request.getJobId() == null) { + responseContent = handleGetAllJobs(); + } else { + responseContent = handleGetJob(request.getJobId()); + } + listener.onResponse(new GetJobActionResponse(responseContent)); + } + + private String handleGetAllJobs() { + return "All Jobs Information."; + } + + private String handleGetJob(String jobId) { + return String.format("Job %s details.", jobId); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/TransportGetQueryResultRequestAction.java b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportGetQueryResultRequestAction.java new file mode 100644 index 0000000000..6aba1b48b6 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/TransportGetQueryResultRequestAction.java @@ -0,0 +1,42 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionRequest; +import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +public class TransportGetQueryResultRequestAction + extends HandledTransportAction< + GetJobQueryResultActionRequest, GetJobQueryResultActionResponse> { + + public static final String NAME = "cluster:admin/opensearch/ql/jobs/result"; + public static final ActionType ACTION_TYPE = + new ActionType<>(NAME, GetJobQueryResultActionResponse::new); + + @Inject + public TransportGetQueryResultRequestAction( + TransportService transportService, ActionFilters actionFilters) { + super(NAME, transportService, actionFilters, GetJobQueryResultActionRequest::new); + } + + @Override + protected void doExecute( + Task task, + GetJobQueryResultActionRequest request, + ActionListener listener) { + String responseContent = "job result"; + listener.onResponse(new GetJobQueryResultActionResponse(responseContent)); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/model/CreateJobActionRequest.java b/spark/src/main/java/org/opensearch/sql/spark/transport/model/CreateJobActionRequest.java new file mode 100644 index 0000000000..cbdcb617af --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/model/CreateJobActionRequest.java @@ -0,0 +1,34 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport.model; + +import java.io.IOException; +import lombok.Getter; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.sql.spark.rest.model.CreateJobRequest; + +public class CreateJobActionRequest extends ActionRequest { + + @Getter private CreateJobRequest createJobRequest; + + /** Constructor of CreateJobActionRequest from StreamInput. */ + public CreateJobActionRequest(StreamInput in) throws IOException { + super(in); + } + + public CreateJobActionRequest(CreateJobRequest createJobRequest) { + this.createJobRequest = createJobRequest; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/model/CreateJobActionResponse.java b/spark/src/main/java/org/opensearch/sql/spark/transport/model/CreateJobActionResponse.java new file mode 100644 index 0000000000..ce76d4a20d --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/model/CreateJobActionResponse.java @@ -0,0 +1,31 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport.model; + +import java.io.IOException; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +@RequiredArgsConstructor +public class CreateJobActionResponse extends ActionResponse { + + @Getter private final String result; + + public CreateJobActionResponse(StreamInput in) throws IOException { + super(in); + result = in.readString(); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + streamOutput.writeString(result); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/model/DeleteJobActionRequest.java b/spark/src/main/java/org/opensearch/sql/spark/transport/model/DeleteJobActionRequest.java new file mode 100644 index 0000000000..eaf379047a --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/model/DeleteJobActionRequest.java @@ -0,0 +1,30 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport.model; + +import java.io.IOException; +import lombok.AllArgsConstructor; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.StreamInput; + +@AllArgsConstructor +public class DeleteJobActionRequest extends ActionRequest { + + private String jobId; + + /** Constructor of SubmitJobActionRequest from StreamInput. */ + public DeleteJobActionRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/model/DeleteJobActionResponse.java b/spark/src/main/java/org/opensearch/sql/spark/transport/model/DeleteJobActionResponse.java new file mode 100644 index 0000000000..38be57c21d --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/model/DeleteJobActionResponse.java @@ -0,0 +1,31 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport.model; + +import java.io.IOException; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +@RequiredArgsConstructor +public class DeleteJobActionResponse extends ActionResponse { + + @Getter private final String result; + + public DeleteJobActionResponse(StreamInput in) throws IOException { + super(in); + result = in.readString(); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + streamOutput.writeString(result); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobActionRequest.java b/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobActionRequest.java new file mode 100644 index 0000000000..f8969cde15 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobActionRequest.java @@ -0,0 +1,33 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport.model; + +import java.io.IOException; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.StreamInput; + +@NoArgsConstructor +@AllArgsConstructor +public class GetJobActionRequest extends ActionRequest { + + @Getter private String jobId; + + /** Constructor of GetJobActionRequest from StreamInput. */ + public GetJobActionRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobActionResponse.java b/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobActionResponse.java new file mode 100644 index 0000000000..f904afdb4e --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobActionResponse.java @@ -0,0 +1,31 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport.model; + +import java.io.IOException; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +@RequiredArgsConstructor +public class GetJobActionResponse extends ActionResponse { + + @Getter private final String result; + + public GetJobActionResponse(StreamInput in) throws IOException { + super(in); + result = in.readString(); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + streamOutput.writeString(result); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobQueryResultActionRequest.java b/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobQueryResultActionRequest.java new file mode 100644 index 0000000000..1de7bae2c7 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobQueryResultActionRequest.java @@ -0,0 +1,31 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport.model; + +import java.io.IOException; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.StreamInput; + +@AllArgsConstructor +public class GetJobQueryResultActionRequest extends ActionRequest { + + @Getter private String jobId; + + /** Constructor of GetJobQueryResultActionRequest from StreamInput. */ + public GetJobQueryResultActionRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobQueryResultActionResponse.java b/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobQueryResultActionResponse.java new file mode 100644 index 0000000000..a7a8002c67 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/model/GetJobQueryResultActionResponse.java @@ -0,0 +1,31 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport.model; + +import java.io.IOException; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +@RequiredArgsConstructor +public class GetJobQueryResultActionResponse extends ActionResponse { + + @Getter private final String result; + + public GetJobQueryResultActionResponse(StreamInput in) throws IOException { + super(in); + result = in.readString(); + } + + @Override + public void writeTo(StreamOutput streamOutput) throws IOException { + streamOutput.writeString(result); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/transport/TransportCreateJobRequestActionTest.java b/spark/src/test/java/org/opensearch/sql/spark/transport/TransportCreateJobRequestActionTest.java new file mode 100644 index 0000000000..4357899368 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/transport/TransportCreateJobRequestActionTest.java @@ -0,0 +1,55 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport; + +import java.util.HashSet; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.spark.rest.model.CreateJobRequest; +import org.opensearch.sql.spark.transport.model.CreateJobActionRequest; +import org.opensearch.sql.spark.transport.model.CreateJobActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +@ExtendWith(MockitoExtension.class) +public class TransportCreateJobRequestActionTest { + + @Mock private TransportService transportService; + @Mock private TransportCreateJobRequestAction action; + @Mock private Task task; + @Mock private ActionListener actionListener; + + @Captor private ArgumentCaptor createJobActionResponseArgumentCaptor; + + @BeforeEach + public void setUp() { + action = + new TransportCreateJobRequestAction(transportService, new ActionFilters(new HashSet<>())); + } + + @Test + public void testDoExecute() { + CreateJobRequest createJobRequest = new CreateJobRequest("source = my_glue.default.alb_logs"); + CreateJobActionRequest request = new CreateJobActionRequest(createJobRequest); + + action.doExecute(task, request, actionListener); + Mockito.verify(actionListener).onResponse(createJobActionResponseArgumentCaptor.capture()); + CreateJobActionResponse createJobActionResponse = + createJobActionResponseArgumentCaptor.getValue(); + Assertions.assertEquals("submitted_job", createJobActionResponse.getResult()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequestActionTest.java b/spark/src/test/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequestActionTest.java new file mode 100644 index 0000000000..828b264343 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/transport/TransportDeleteJobRequestActionTest.java @@ -0,0 +1,53 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport; + +import java.util.HashSet; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.spark.transport.model.DeleteJobActionRequest; +import org.opensearch.sql.spark.transport.model.DeleteJobActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +@ExtendWith(MockitoExtension.class) +public class TransportDeleteJobRequestActionTest { + + @Mock private TransportService transportService; + @Mock private TransportDeleteJobRequestAction action; + @Mock private Task task; + @Mock private ActionListener actionListener; + + @Captor private ArgumentCaptor deleteJobActionResponseArgumentCaptor; + + @BeforeEach + public void setUp() { + action = + new TransportDeleteJobRequestAction(transportService, new ActionFilters(new HashSet<>())); + } + + @Test + public void testDoExecute() { + DeleteJobActionRequest request = new DeleteJobActionRequest("jobId"); + + action.doExecute(task, request, actionListener); + Mockito.verify(actionListener).onResponse(deleteJobActionResponseArgumentCaptor.capture()); + DeleteJobActionResponse deleteJobActionResponse = + deleteJobActionResponseArgumentCaptor.getValue(); + Assertions.assertEquals("deleted_job", deleteJobActionResponse.getResult()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/transport/TransportGetJobRequestActionTest.java b/spark/src/test/java/org/opensearch/sql/spark/transport/TransportGetJobRequestActionTest.java new file mode 100644 index 0000000000..06d1ee8baf --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/transport/TransportGetJobRequestActionTest.java @@ -0,0 +1,60 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport; + +import java.util.HashSet; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.spark.transport.model.GetJobActionRequest; +import org.opensearch.sql.spark.transport.model.GetJobActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +@ExtendWith(MockitoExtension.class) +public class TransportGetJobRequestActionTest { + + @Mock private TransportService transportService; + @Mock private TransportGetJobRequestAction action; + @Mock private Task task; + @Mock private ActionListener actionListener; + + @Captor private ArgumentCaptor getJobActionResponseArgumentCaptor; + + @BeforeEach + public void setUp() { + action = new TransportGetJobRequestAction(transportService, new ActionFilters(new HashSet<>())); + } + + @Test + public void testDoExecuteWithSingleJob() { + GetJobActionRequest request = new GetJobActionRequest("abcd"); + + action.doExecute(task, request, actionListener); + Mockito.verify(actionListener).onResponse(getJobActionResponseArgumentCaptor.capture()); + GetJobActionResponse getJobActionResponse = getJobActionResponseArgumentCaptor.getValue(); + Assertions.assertEquals("Job abcd details.", getJobActionResponse.getResult()); + } + + @Test + public void testDoExecuteWithAllJobs() { + GetJobActionRequest request = new GetJobActionRequest(); + action.doExecute(task, request, actionListener); + Mockito.verify(actionListener).onResponse(getJobActionResponseArgumentCaptor.capture()); + GetJobActionResponse getJobActionResponse = getJobActionResponseArgumentCaptor.getValue(); + Assertions.assertEquals("All Jobs Information.", getJobActionResponse.getResult()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/transport/TransportGetQueryResultRequestActionTest.java b/spark/src/test/java/org/opensearch/sql/spark/transport/TransportGetQueryResultRequestActionTest.java new file mode 100644 index 0000000000..f22adead49 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/transport/TransportGetQueryResultRequestActionTest.java @@ -0,0 +1,54 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.transport; + +import java.util.HashSet; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionRequest; +import org.opensearch.sql.spark.transport.model.GetJobQueryResultActionResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +@ExtendWith(MockitoExtension.class) +public class TransportGetQueryResultRequestActionTest { + + @Mock private TransportService transportService; + @Mock private TransportGetQueryResultRequestAction action; + @Mock private Task task; + @Mock private ActionListener actionListener; + + @Captor + private ArgumentCaptor createJobActionResponseArgumentCaptor; + + @BeforeEach + public void setUp() { + action = + new TransportGetQueryResultRequestAction( + transportService, new ActionFilters(new HashSet<>())); + } + + @Test + public void testDoExecuteForSingleJob() { + GetJobQueryResultActionRequest request = new GetJobQueryResultActionRequest("jobId"); + action.doExecute(task, request, actionListener); + Mockito.verify(actionListener).onResponse(createJobActionResponseArgumentCaptor.capture()); + GetJobQueryResultActionResponse getJobQueryResultActionResponse = + createJobActionResponseArgumentCaptor.getValue(); + Assertions.assertEquals("job result", getJobQueryResultActionResponse.getResult()); + } +} diff --git a/spark/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/spark/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..ca6ee9cea8 --- /dev/null +++ b/spark/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file