-
Notifications
You must be signed in to change notification settings - Fork 141
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Feature] Flint query scheduler part1 - integrate job scheduler plugin
Signed-off-by: Louis Chu <[email protected]>
- Loading branch information
Showing
20 changed files
with
855 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
248 changes: 248 additions & 0 deletions
248
...query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,248 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.scheduler; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.nio.charset.StandardCharsets; | ||
import java.time.Instant; | ||
import org.apache.commons.io.IOUtils; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.action.DocWriteRequest; | ||
import org.opensearch.action.DocWriteResponse; | ||
import org.opensearch.action.admin.indices.create.CreateIndexRequest; | ||
import org.opensearch.action.admin.indices.create.CreateIndexResponse; | ||
import org.opensearch.action.delete.DeleteRequest; | ||
import org.opensearch.action.delete.DeleteResponse; | ||
import org.opensearch.action.index.IndexRequest; | ||
import org.opensearch.action.index.IndexResponse; | ||
import org.opensearch.action.support.WriteRequest; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.action.ActionFuture; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.common.xcontent.json.JsonXContent; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.core.xcontent.XContentParser; | ||
import org.opensearch.core.xcontent.XContentParserUtils; | ||
import org.opensearch.jobscheduler.spi.ScheduledJobParser; | ||
import org.opensearch.jobscheduler.spi.ScheduledJobRunner; | ||
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; | ||
import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob; | ||
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
public class OpenSearchAsyncQueryScheduler { | ||
public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler"; | ||
public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler"; | ||
private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME = | ||
"async-query-scheduler-index-mapping.yml"; | ||
private static final String SCHEDULER_INDEX_SETTINGS_FILE_NAME = | ||
"async-query-scheduler-index-settings.yml"; | ||
private static final Logger LOG = LogManager.getLogger(); | ||
private Client client; | ||
private ClusterService clusterService; | ||
|
||
public OpenSearchAsyncQueryScheduler() { | ||
LOG.info("OpenSearchAsyncQueryScheduler initialized"); | ||
} | ||
|
||
public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool) { | ||
this.client = client; | ||
this.clusterService = clusterService; | ||
OpenSearchRefreshIndexJob openSearchRefreshIndexJob = | ||
OpenSearchRefreshIndexJob.getJobRunnerInstance(); | ||
openSearchRefreshIndexJob.setClusterService(clusterService); | ||
openSearchRefreshIndexJob.setThreadPool(threadPool); | ||
openSearchRefreshIndexJob.setClient(client); | ||
} | ||
|
||
public static ScheduledJobRunner getJobRunner() { | ||
return OpenSearchRefreshIndexJob.getJobRunnerInstance(); | ||
} | ||
|
||
public void scheduleJob(OpenSearchRefreshIndexJobRequest request) throws IOException { | ||
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { | ||
createAsyncQuerySchedulerIndex(); | ||
} | ||
IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME); | ||
indexRequest.id(request.getName()); | ||
indexRequest.opType(DocWriteRequest.OpType.CREATE); | ||
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); | ||
ActionFuture<IndexResponse> indexResponseActionFuture; | ||
IndexResponse indexResponse; | ||
|
||
indexRequest.source(request.toXContent(JsonXContent.contentBuilder(), null)); | ||
indexResponseActionFuture = client.index(indexRequest); | ||
indexResponse = indexResponseActionFuture.actionGet(); | ||
|
||
if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) { | ||
LOG.info("Job : {} successfully created", request.getName()); | ||
} else { | ||
throw new RuntimeException( | ||
"Schedule job failed with result : " + indexResponse.getResult().getLowercase()); | ||
} | ||
} | ||
|
||
public void unscheduleJob(OpenSearchRefreshIndexJobRequest request) throws IOException { | ||
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { | ||
throw new RuntimeException("Index does not exist."); | ||
} | ||
IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME).id(request.getName()); | ||
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); | ||
try (XContentBuilder builder = JsonXContent.contentBuilder()) { | ||
builder.startObject(); | ||
builder.field("enabled", false); | ||
builder.endObject(); | ||
indexRequest.source(builder); | ||
} | ||
|
||
ActionFuture<IndexResponse> indexResponseActionFuture = client.index(indexRequest); | ||
IndexResponse indexResponse = indexResponseActionFuture.actionGet(); | ||
|
||
if (indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) { | ||
LOG.info("Job : {} successfully unscheduled", request.getName()); | ||
} else { | ||
throw new RuntimeException( | ||
"Unschedule job failed with result : " + indexResponse.getResult().getLowercase()); | ||
} | ||
} | ||
|
||
public void removeJob(OpenSearchRefreshIndexJobRequest request) { | ||
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { | ||
throw new RuntimeException("Index does not exist."); | ||
} | ||
DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, request.getName()); | ||
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); | ||
ActionFuture<DeleteResponse> deleteResponseActionFuture = client.delete(deleteRequest); | ||
DeleteResponse deleteResponse = deleteResponseActionFuture.actionGet(); | ||
|
||
if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)) { | ||
LOG.info("Job : {} successfully deleted", request.getName()); | ||
} else if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) { | ||
throw new RuntimeException("Job : " + request.getName() + " doesn't exist"); | ||
} else { | ||
throw new RuntimeException( | ||
"Remove job failed with result : " + deleteResponse.getResult().getLowercase()); | ||
} | ||
} | ||
|
||
public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOException { | ||
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { | ||
throw new RuntimeException("Index does not exist."); | ||
} | ||
IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME).id(request.getName()); | ||
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); | ||
try (XContentBuilder builder = JsonXContent.contentBuilder()) { | ||
builder.startObject(); | ||
if (request.getSchedule() != null) { | ||
builder.field("schedule", request.getSchedule()); | ||
} | ||
builder.endObject(); | ||
indexRequest.source(builder); | ||
} | ||
|
||
ActionFuture<IndexResponse> indexResponseActionFuture = client.index(indexRequest); | ||
IndexResponse indexResponse = indexResponseActionFuture.actionGet(); | ||
|
||
if (indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) { | ||
LOG.info("Job : {} successfully updated", request.getName()); | ||
} else { | ||
throw new RuntimeException( | ||
"Update job failed with result : " + indexResponse.getResult().getLowercase()); | ||
} | ||
} | ||
|
||
private void createAsyncQuerySchedulerIndex() { | ||
try { | ||
InputStream mappingFileStream = | ||
OpenSearchAsyncQueryScheduler.class | ||
.getClassLoader() | ||
.getResourceAsStream(SCHEDULER_INDEX_MAPPING_FILE_NAME); | ||
InputStream settingsFileStream = | ||
OpenSearchAsyncQueryScheduler.class | ||
.getClassLoader() | ||
.getResourceAsStream(SCHEDULER_INDEX_SETTINGS_FILE_NAME); | ||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(SCHEDULER_INDEX_NAME); | ||
createIndexRequest.mapping( | ||
IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML); | ||
createIndexRequest.settings( | ||
IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML); | ||
ActionFuture<CreateIndexResponse> createIndexResponseActionFuture = | ||
client.admin().indices().create(createIndexRequest); | ||
CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet(); | ||
|
||
if (createIndexResponse.isAcknowledged()) { | ||
LOG.info("Index: {} creation Acknowledged", SCHEDULER_INDEX_NAME); | ||
} else { | ||
throw new RuntimeException("Index creation is not acknowledged."); | ||
} | ||
} catch (Throwable e) { | ||
LOG.error("Error creating index: {}", SCHEDULER_INDEX_NAME, e); | ||
throw new RuntimeException( | ||
"Internal server error while creating " | ||
+ SCHEDULER_INDEX_NAME | ||
+ " index: " | ||
+ e.getMessage(), | ||
e); | ||
} | ||
} | ||
|
||
public static ScheduledJobParser getJobParser() { | ||
return (parser, id, jobDocVersion) -> { | ||
OpenSearchRefreshIndexJobRequest.Builder builder = | ||
new OpenSearchRefreshIndexJobRequest.Builder(); | ||
XContentParserUtils.ensureExpectedToken( | ||
XContentParser.Token.START_OBJECT, parser.nextToken(), parser); | ||
|
||
while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { | ||
String fieldName = parser.currentName(); | ||
parser.nextToken(); | ||
switch (fieldName) { | ||
case OpenSearchRefreshIndexJobRequest.JOB_NAME_FIELD: | ||
builder.withJobName(parser.text()); | ||
break; | ||
case OpenSearchRefreshIndexJobRequest.JOB_TYPE_FIELD: | ||
builder.withJobType(parser.text()); | ||
break; | ||
case OpenSearchRefreshIndexJobRequest.ENABLED_FIELD: | ||
builder.withEnabled(parser.booleanValue()); | ||
break; | ||
case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FIELD: | ||
builder.withEnabledTime(parseInstantValue(parser)); | ||
break; | ||
case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD: | ||
builder.withLastUpdateTime(parseInstantValue(parser)); | ||
break; | ||
case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD: | ||
builder.withSchedule(ScheduleParser.parse(parser)); | ||
break; | ||
case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS: | ||
builder.withLockDurationSeconds(parser.longValue()); | ||
break; | ||
case OpenSearchRefreshIndexJobRequest.JITTER: | ||
builder.withJitter(parser.doubleValue()); | ||
break; | ||
default: | ||
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); | ||
} | ||
} | ||
return builder.build(); | ||
}; | ||
} | ||
|
||
private static Instant parseInstantValue(XContentParser parser) throws IOException { | ||
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { | ||
return null; | ||
} | ||
if (parser.currentToken().isValue()) { | ||
return Instant.ofEpochMilli(parser.longValue()); | ||
} | ||
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); | ||
return null; | ||
} | ||
} |
Oops, something went wrong.