Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Manual Backport 2.x] Extend scheduler interface for multitenancy #3034

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/integ-tests-with-security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:

- name: Upload test reports
if: ${{ always() }}
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: test-reports-${{ matrix.os }}-${{ matrix.java }}
Expand Down Expand Up @@ -79,7 +79,7 @@ jobs:

- name: Upload test reports
if: ${{ always() }}
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: test-reports-${{ matrix.os }}-${{ matrix.java }}
Expand Down
29 changes: 25 additions & 4 deletions .github/workflows/sql-test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,15 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}

- name: Upload Artifacts
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: opensearch-sql-ubuntu-latest
path: opensearch-sql-builds

- name: Upload test reports
if: always()
uses: actions/upload-artifact@v2
if: ${{ always() }}
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: test-reports
Expand Down Expand Up @@ -130,7 +131,27 @@ jobs:
cp -r ./plugin/build/distributions/*.zip opensearch-sql-builds/

- name: Upload Artifacts
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: opensearch-sql-${{ matrix.entry.os }}
path: opensearch-sql-builds

- name: Upload test reports
if: ${{ always() && matrix.entry.os == 'ubuntu-latest' }}
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: test-reports-${{ matrix.entry.os }}-${{ matrix.entry.java }}
path: |
sql/build/reports/**
ppl/build/reports/**
core/build/reports/**
common/build/reports/**
opensearch/build/reports/**
integ-test/build/reports/**
protocol/build/reports/**
legacy/build/reports/**
plugin/build/reports/**
doctest/build/testclusters/docTestCluster-0/logs/*
integ-test/build/testclusters/*/logs/*
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ void runOp(
this.flintIndexMetadataService.updateIndexToManualRefresh(
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
asyncQueryScheduler.unscheduleJob(
flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext);
} else {
cancelStreamingJob(flintIndexStateModel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ void runOp(
"Performing drop index operation for index: {}",
flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
asyncQueryScheduler.unscheduleJob(
flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext);
} else {
cancelStreamingJob(flintIndexStateModel);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/** Scheduler interface for scheduling asynchronous query jobs. */
Expand All @@ -13,10 +19,13 @@ public interface AsyncQueryScheduler {
* task
*
* @param asyncQuerySchedulerRequest The request containing job configuration details
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if a job with the same name already exists
* @throws RuntimeException if there's an error during job creation
*/
void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
void scheduleJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Updates an existing job with new parameters. This method modifies the configuration of an
Expand All @@ -26,10 +35,13 @@ public interface AsyncQueryScheduler {
* scheduled job - Updating resource allocations for a job
*
* @param asyncQuerySchedulerRequest The request containing updated job configuration
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be updated doesn't exist
* @throws RuntimeException if there's an error during the update process
*/
void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
void updateJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Unschedules a job by marking it as disabled and updating its last update time. This method is
Expand All @@ -41,8 +53,11 @@ public interface AsyncQueryScheduler {
* re-enabling of the job in the future
*
* @param jobId The unique identifier of the job to unschedule
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be unscheduled doesn't exist
* @throws RuntimeException if there's an error during the unschedule process
*/
void unscheduleJob(String jobId);
void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Removes a job completely from the scheduler. This method permanently deletes the job and all
Expand All @@ -52,6 +67,9 @@ public interface AsyncQueryScheduler {
* created jobs - Freeing up resources by deleting unused job configurations
*
* @param jobId The unique identifier of the job to remove
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be removed doesn't exist
* @throws RuntimeException if there's an error during the remove process
*/
void removeJob(String jobId);
void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@

import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.opensearch.sql.spark.rest.model.LangType;

/** Represents a job request for a scheduled task. */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AsyncQuerySchedulerRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void createDropIndexQueryWithScheduler() {
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);

verify(asyncQueryScheduler).unscheduleJob(indexName);
verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext);
}

@Test
Expand Down Expand Up @@ -318,8 +318,7 @@ public void createAlterIndexQueryWithScheduler() {
FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue();
assertFalse(flintIndexOptions.autoRefresh());

verify(asyncQueryScheduler).unscheduleJob(indexName);

verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand Down Expand Up @@ -35,6 +36,7 @@
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest;
Expand All @@ -55,7 +57,9 @@ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler {

@Override
/** Schedules a new job by indexing it into the job index. */
public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
public void scheduleJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
Expand Down Expand Up @@ -87,15 +91,18 @@ public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {

/** Unschedules a job by marking it as disabled and updating its last update time. */
@Override
public void unscheduleJob(String jobId) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
.jobId(jobId)
.enabled(false)
.lastUpdateTime(Instant.now())
.build();
public void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) {
if (Strings.isNullOrEmpty(jobId)) {
throw new IllegalArgumentException("JobId cannot be null or empty");
}
try {
updateJob(request);
AsyncQuerySchedulerRequest request =
ScheduledAsyncQueryJobRequest.builder()
.jobId(jobId)
.enabled(false)
.lastUpdateTime(Instant.now())
.build();
updateJob(request, asyncQueryRequestContext);
LOG.info("Unscheduled job for jobId: {}", jobId);
} catch (IllegalStateException | DocumentMissingException e) {
LOG.error("Failed to unschedule job: {}", jobId, e);
Expand All @@ -105,7 +112,9 @@ public void unscheduleJob(String jobId) {
/** Updates an existing job with new parameters. */
@Override
@SneakyThrows
public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
public void updateJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
assertIndexExists();
Expand Down Expand Up @@ -134,8 +143,11 @@ public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {

/** Removes a job by deleting its document from the index. */
@Override
public void removeJob(String jobId) {
public void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) {
assertIndexExists();
if (Strings.isNullOrEmpty(jobId)) {
throw new IllegalArgumentException("JobId cannot be null or empty");
}
DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture = client.delete(deleteRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class ScheduledAsyncQueryJobRequest extends AsyncQuerySchedulerRequest
public static final String ENABLED_FIELD = "enabled";
private final Schedule schedule;

@Builder
@Builder(builderMethodName = "scheduledAsyncQueryJobRequestBuilder")
public ScheduledAsyncQueryJobRequest(
String accountId,
String jobId,
Expand Down Expand Up @@ -139,7 +139,7 @@ public static ScheduledAsyncQueryJobRequest fromAsyncQuerySchedulerRequest(
AsyncQuerySchedulerRequest request) {
Instant updateTime =
request.getLastUpdateTime() != null ? request.getLastUpdateTime() : Instant.now();
return ScheduledAsyncQueryJobRequest.builder()
return ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.accountId(request.getAccountId())
.jobId(request.getJobId())
.dataSource(request.getDataSource())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private static Instant parseInstantValue(XContentParser parser) throws IOExcepti
public static ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
ScheduledAsyncQueryJobRequest.ScheduledAsyncQueryJobRequestBuilder builder =
ScheduledAsyncQueryJobRequest.builder();
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

Expand Down
Loading
Loading