Skip to content

Commit

Permalink
Runner implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 1, 2024
1 parent 503b659 commit 0d8b0c0
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.Plugin;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;
import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -30,7 +34,6 @@
* plugin.
*/
public class AsyncQueryScheduledQueryJob implements ScheduledJobRunner {

private static final Logger log = LogManager.getLogger(AsyncQueryScheduledQueryJob.class);

public static AsyncQueryScheduledQueryJob INSTANCE = new AsyncQueryScheduledQueryJob();
Expand All @@ -42,6 +45,7 @@ public static AsyncQueryScheduledQueryJob getJobRunnerInstance() {
private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;
private AsyncQueryExecutorService asyncQueryExecutorService;

private AsyncQueryScheduledQueryJob() {
// Singleton class, use getJobRunnerInstance method instead of constructor
Expand All @@ -59,8 +63,13 @@ public void setClient(Client client) {
this.client = client;
}

public void setAsyncQueryExecutorService(AsyncQueryExecutorService asyncQueryExecutorService) {
this.asyncQueryExecutorService = asyncQueryExecutorService;
}

@Override
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
// Parser will convert jobParameter to OpenSearchScheduleQueryJobRequest
if (!(jobParameter instanceof OpenSearchScheduleQueryJobRequest)) {
throw new IllegalStateException(
"Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: "
Expand All @@ -79,15 +88,26 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte
throw new IllegalStateException("Client is not initialized.");
}

if (this.asyncQueryExecutorService == null) {
throw new IllegalStateException("AsyncQueryExecutorService is not initialized.");
}

Runnable runnable =
() -> {
doRefresh(jobParameter.getName());
doRefresh((OpenSearchScheduleQueryJobRequest) jobParameter);
};
threadPool.generic().submit(runnable);
}

void doRefresh(String refreshIndex) {
// TODO: add logic to refresh index
log.info("Scheduled refresh index job on : " + refreshIndex);
void doRefresh(OpenSearchScheduleQueryJobRequest request) {
// TODO: use internal logic to create refresh index query?
log.info("Scheduled refresh index job on : " + request.getName());
CreateAsyncQueryRequest createAsyncQueryRequest =
new CreateAsyncQueryRequest(
request.getScheduledQuery(), request.getDataSource(), request.getQueryLang());
CreateAsyncQueryResponse createAsyncQueryResponse =
asyncQueryExecutorService.createAsyncQuery(
createAsyncQueryRequest, new NullAsyncQueryRequestContext());
log.info("Created async query with queryId: " + createAsyncQueryResponse.getQueryId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -24,6 +25,10 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -38,6 +43,9 @@ public class AsyncQueryScheduledQueryJobTest {
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private Client client;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private AsyncQueryExecutorService asyncQueryExecutorService;

@Mock private JobExecutionContext context;

private AsyncQueryScheduledQueryJob jobRunner;
Expand All @@ -51,6 +59,7 @@ public void setup() {
jobRunner.setClient(null);
jobRunner.setClusterService(null);
jobRunner.setThreadPool(null);
jobRunner.setAsyncQueryExecutorService(null);
}

@Test
Expand All @@ -59,23 +68,32 @@ public void testRunJobWithCorrectParameter() {
spyJobRunner.setClusterService(clusterService);
spyJobRunner.setThreadPool(threadPool);
spyJobRunner.setClient(client);
spyJobRunner.setAsyncQueryExecutorService(asyncQueryExecutorService);

OpenSearchScheduleQueryJobRequest jobParameter =
OpenSearchScheduleQueryJobRequest request =
OpenSearchScheduleQueryJobRequest.builder()
.jobId("testJob")
.lastUpdateTime(Instant.now())
.lockDurationSeconds(10L)
.scheduledQuery("REFRESH INDEX testIndex")
.dataSource("testDataSource")
.queryLang(LangType.SQL)
.build();

spyJobRunner.runJob(jobParameter, context);
CreateAsyncQueryRequest createAsyncQueryRequest =
new CreateAsyncQueryRequest(
request.getScheduledQuery(), request.getDataSource(), request.getQueryLang());
spyJobRunner.runJob(request, context);

ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(threadPool.generic()).submit(captor.capture());

Runnable runnable = captor.getValue();
runnable.run();

verify(spyJobRunner).doRefresh(eq(jobParameter.getName()));
verify(spyJobRunner).doRefresh(eq(request));
verify(asyncQueryExecutorService)
.createAsyncQuery(eq(createAsyncQueryRequest), any(NullAsyncQueryRequestContext.class));
}

@Test
Expand All @@ -84,6 +102,7 @@ public void testRunJobWithIncorrectParameter() {
jobRunner.setClusterService(clusterService);
jobRunner.setThreadPool(threadPool);
jobRunner.setClient(client);
jobRunner.setAsyncQueryExecutorService(asyncQueryExecutorService);

ScheduledJobParameter wrongParameter = mock(ScheduledJobParameter.class);

Expand Down Expand Up @@ -131,6 +150,15 @@ public void testRunJobWithUninitializedServices() {
() -> jobRunner.runJob(jobParameter, context),
"Expected IllegalStateException but no exception was thrown");
assertEquals("Client is not initialized.", exception.getMessage());

jobRunner.setClient(client);

exception =
assertThrows(
IllegalStateException.class,
() -> jobRunner.runJob(jobParameter, context),
"Expected IllegalStateException but no exception was thrown");
assertEquals("AsyncQueryExecutorService is not initialized.", exception.getMessage());
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler;
import org.opensearch.sql.spark.scheduler.OpenSearchScheduleQueryJobRequestParser;
import org.opensearch.sql.spark.scheduler.job.ScheduledQueryJob;
import org.opensearch.sql.spark.scheduler.job.AsyncQueryScheduledQueryJob;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction;
Expand Down Expand Up @@ -266,7 +266,7 @@ public String getJobIndex() {

@Override
public ScheduledJobRunner getJobRunner() {
return ScheduledQueryJob.getJobRunnerInstance();
return AsyncQueryScheduledQueryJob.getJobRunnerInstance();
}

@Override
Expand Down

0 comments on commit 0d8b0c0

Please sign in to comment.