Skip to content

Commit

Permalink
Alter support
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 3, 2024
1 parent dccc30e commit 6059eda
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public boolean autoRefresh() {
}

public boolean isExternalScheduler() {
// Default is true, which means using external scheduler to refresh the index.
return getOption(SCHEDULER_MODE).map(mode -> "external".equals(mode)).orElse(true);
// Default is false, which means using internal scheduler to refresh the index.
return getOption(SCHEDULER_MODE).map(mode -> "external".equals(mode)).orElse(false);
}

public Map<String, String> getProvidedOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/**
* Index Operation for Altering the flint index. Only handles alter operation when
Expand All @@ -25,16 +26,19 @@ public class FlintIndexOpAlter extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger(FlintIndexOpAlter.class);
private final FlintIndexMetadataService flintIndexMetadataService;
private final FlintIndexOptions flintIndexOptions;
private final AsyncQueryScheduler asyncQueryScheduler;

public FlintIndexOpAlter(
FlintIndexOptions flintIndexOptions,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory,
FlintIndexMetadataService flintIndexMetadataService) {
FlintIndexMetadataService flintIndexMetadataService,
AsyncQueryScheduler asyncQueryScheduler) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.flintIndexMetadataService = flintIndexMetadataService;
this.flintIndexOptions = flintIndexOptions;
this.asyncQueryScheduler = asyncQueryScheduler;
}

@Override
Expand All @@ -57,7 +61,11 @@ void runOp(
"Running alter index operation for index: {}", flintIndexMetadata.getOpensearchIndexName());
this.flintIndexMetadataService.updateIndexToManualRefresh(
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
cancelStreamingJob(flintIndexStateModel);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
} else {
cancelStreamingJob(flintIndexStateModel);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ void runOp(
"Performing drop index operation for index: {}",
flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
// Scheduler jobid is the opensearch index name until we support multiple jobs per index
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
} else {
cancelStreamingJob(flintIndexStateModel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

/** Flint index vacuum operation. */
public class FlintIndexOpVacuum extends FlintIndexOp {
private final AsyncQueryScheduler asyncQueryScheduler;

private static final Logger LOG = LogManager.getLogger();

private final AsyncQueryScheduler asyncQueryScheduler;

/** OpenSearch client. */
private final FlintIndexClient flintIndexClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
@AllArgsConstructor
public class AsyncQuerySchedulerRequest {
protected String accountId;
// Scheduler jobid is the opensearch index name until we support multiple jobs per index
protected String jobId;
protected String dataSource;
protected String scheduledQuery;
Expand Down

0 comments on commit 6059eda

Please sign in to comment.