Skip to content

Commit

Permalink
Runner implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 1, 2024
1 parent 503b659 commit 07a53d9
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.Plugin;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;
import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -30,7 +34,6 @@
* plugin.
*/
public class AsyncQueryScheduledQueryJob implements ScheduledJobRunner {

private static final Logger log = LogManager.getLogger(AsyncQueryScheduledQueryJob.class);

public static AsyncQueryScheduledQueryJob INSTANCE = new AsyncQueryScheduledQueryJob();
Expand All @@ -42,6 +45,7 @@ public static AsyncQueryScheduledQueryJob getJobRunnerInstance() {
private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;
private AsyncQueryExecutorService asyncQueryExecutorService;

private AsyncQueryScheduledQueryJob() {
// Singleton class, use getJobRunnerInstance method instead of constructor
Expand All @@ -59,8 +63,13 @@ public void setClient(Client client) {
this.client = client;
}

public void setAsyncQueryExecutorService(AsyncQueryExecutorService asyncQueryExecutorService) {
this.asyncQueryExecutorService = asyncQueryExecutorService;
}

@Override
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
// Parser will convert jobParameter to OpenSearchScheduleQueryJobRequest
if (!(jobParameter instanceof OpenSearchScheduleQueryJobRequest)) {
throw new IllegalStateException(
"Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: "
Expand All @@ -79,15 +88,26 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte
throw new IllegalStateException("Client is not initialized.");
}

if (this.asyncQueryExecutorService == null) {
throw new IllegalStateException("AsyncQueryExecutorService is not initialized.");
}

Runnable runnable =
() -> {
doRefresh(jobParameter.getName());
doRefresh((OpenSearchScheduleQueryJobRequest) jobParameter);
};
threadPool.generic().submit(runnable);
}

void doRefresh(String refreshIndex) {
// TODO: add logic to refresh index
log.info("Scheduled refresh index job on : " + refreshIndex);
void doRefresh(OpenSearchScheduleQueryJobRequest request) {
// TODO: use internal logic to create refresh index query?
log.info("Scheduled refresh index job on : " + request.getName());
CreateAsyncQueryRequest createAsyncQueryRequest =
new CreateAsyncQueryRequest(
request.getScheduledQuery(), request.getDataSource(), request.getQueryLang());
CreateAsyncQueryResponse createAsyncQueryResponse =
asyncQueryExecutorService.createAsyncQuery(
createAsyncQueryRequest, new NullAsyncQueryRequestContext());
log.info("Created async query with queryId: " + createAsyncQueryResponse.getQueryId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.ArgumentMatchers.any;

import java.time.Instant;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -26,6 +27,10 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.LangType;

public class AsyncQueryScheduledQueryJobTest {

Expand All @@ -40,6 +45,8 @@ public class AsyncQueryScheduledQueryJobTest {

@Mock private JobExecutionContext context;

@Mock private AsyncQueryExecutorService asyncQueryExecutorService;

private AsyncQueryScheduledQueryJob jobRunner;

private AsyncQueryScheduledQueryJob spyJobRunner;
Expand All @@ -51,6 +58,7 @@ public void setup() {
jobRunner.setClient(null);
jobRunner.setClusterService(null);
jobRunner.setThreadPool(null);
jobRunner.setAsyncQueryExecutorService(null);
}

@Test
Expand All @@ -59,23 +67,28 @@ public void testRunJobWithCorrectParameter() {
spyJobRunner.setClusterService(clusterService);
spyJobRunner.setThreadPool(threadPool);
spyJobRunner.setClient(client);
spyJobRunner.setAsyncQueryExecutorService(asyncQueryExecutorService);

OpenSearchScheduleQueryJobRequest jobParameter =
OpenSearchScheduleQueryJobRequest request =
OpenSearchScheduleQueryJobRequest.builder()
.jobId("testJob")
.lastUpdateTime(Instant.now())
.lockDurationSeconds(10L)
.scheduledQuery("REFRESH INDEX testIndex")
.dataSource("testDataSource")
.queryLang(LangType.SQL)
.build();

spyJobRunner.runJob(jobParameter, context);
spyJobRunner.runJob(request, context);

ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(threadPool.generic()).submit(captor.capture());

Runnable runnable = captor.getValue();
runnable.run();

verify(spyJobRunner).doRefresh(eq(jobParameter.getName()));
verify(spyJobRunner).doRefresh(eq(request));
verify(asyncQueryExecutorService).createAsyncQuery(any(CreateAsyncQueryRequest.class), any(NullAsyncQueryRequestContext.class));
}

@Test
Expand Down Expand Up @@ -131,6 +144,15 @@ public void testRunJobWithUninitializedServices() {
() -> jobRunner.runJob(jobParameter, context),
"Expected IllegalStateException but no exception was thrown");
assertEquals("Client is not initialized.", exception.getMessage());

jobRunner.setAsyncQueryExecutorService(asyncQueryExecutorService);

exception =
assertThrows(
IllegalStateException.class,
() -> jobRunner.runJob(jobParameter, context),
"Expected IllegalStateException but no exception was thrown");
assertEquals("AsyncQueryExecutorService is not initialized.", exception.getMessage());
}

@Test
Expand All @@ -142,4 +164,4 @@ public void testGetJobRunnerInstanceMultipleCalls() {
assertSame(instance1, instance2);
assertSame(instance2, instance3);
}
}
}
4 changes: 2 additions & 2 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler;
import org.opensearch.sql.spark.scheduler.OpenSearchScheduleQueryJobRequestParser;
import org.opensearch.sql.spark.scheduler.job.ScheduledQueryJob;
import org.opensearch.sql.spark.scheduler.job.AsyncQueryScheduledQueryJob;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction;
Expand Down Expand Up @@ -266,7 +266,7 @@ public String getJobIndex() {

@Override
public ScheduledJobRunner getJobRunner() {
return ScheduledQueryJob.getJobRunnerInstance();
return AsyncQueryScheduledQueryJob.getJobRunnerInstance();
}

@Override
Expand Down

0 comments on commit 07a53d9

Please sign in to comment.