Skip to content

Commit

Permalink
Runner implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 1, 2024
1 parent 503b659 commit 532c1bc
Showing 1 changed file with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.Plugin;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest;
import org.opensearch.sql.spark.transport.model.AsyncQueryResult;
import org.opensearch.threadpool.ThreadPool;

/**
Expand All @@ -30,6 +33,7 @@
* plugin.
*/
public class AsyncQueryScheduledQueryJob implements ScheduledJobRunner {
private final AsyncQueryExecutorService asyncQueryExecutorService;

private static final Logger log = LogManager.getLogger(AsyncQueryScheduledQueryJob.class);

Expand Down Expand Up @@ -59,6 +63,10 @@ public void setClient(Client client) {
this.client = client;
}

public void setAsyncQueryExecutorService(AsyncQueryExecutorService asyncQueryExecutorService) {
this.asyncQueryExecutorService = asyncQueryExecutorService;
}

@Override
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
if (!(jobParameter instanceof OpenSearchScheduleQueryJobRequest)) {
Expand All @@ -79,15 +87,25 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte
throw new IllegalStateException("Client is not initialized.");
}

if (this.asyncQueryExecutorService == null) {
throw new IllegalStateException("AsyncQueryExecutorService is not initialized.");
}

Runnable runnable =
() -> {
doRefresh(jobParameter.getName());
};
threadPool.generic().submit(runnable);
}

void doRefresh(String refreshIndex) {
// TODO: add logic to refresh index
log.info("Scheduled refresh index job on : " + refreshIndex);
void doRefresh(ScheduledJobParameter jobParameter) {
// TODO: use internal logic to create refresh index query?
log.info("Scheduled refresh index job on : " + jobParameter.getName());
CreateAsyncQueryRequest createAsyncQueryRequest = new CreateAsyncQueryRequest(
jobParameter.getScheduledQuery(),
jobParameter.getDataSource(),
jobParameter.getQueryLang());
CreateAsyncQueryResponse createAsyncQueryResponse = asyncQueryExecutorService.createAsyncQuery(createAsyncQueryRequest, new NullAsyncQueryRequestContext());
log.info("Created async query with queryId: " + createAsyncQueryResponse.getQueryId());
}
}

0 comments on commit 532c1bc

Please sign in to comment.