Skip to content

Commit

Permalink
Vacuum support
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 2, 2024
1 parent 5c117b3 commit dccc30e
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da

public FlintIndexOpVacuum getVacuum(String datasource) {
return new FlintIndexOpVacuum(
flintIndexStateModelService, datasource, flintIndexClient, emrServerlessClientFactory);
flintIndexStateModelService,
datasource,
flintIndexClient,
emrServerlessClientFactory,
asyncQueryScheduler);
}

public FlintIndexOpCancel getCancel(String datasource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/** Flint index vacuum operation. */
public class FlintIndexOpVacuum extends FlintIndexOp {
private final AsyncQueryScheduler asyncQueryScheduler;

private static final Logger LOG = LogManager.getLogger();

Expand All @@ -27,9 +29,11 @@ public FlintIndexOpVacuum(
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
FlintIndexClient flintIndexClient,
EMRServerlessClientFactory emrServerlessClientFactory) {
EMRServerlessClientFactory emrServerlessClientFactory,
AsyncQueryScheduler asyncQueryScheduler) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.flintIndexClient = flintIndexClient;
this.asyncQueryScheduler = asyncQueryScheduler;
}

@Override
Expand All @@ -48,6 +52,9 @@ public void runOp(
FlintIndexStateModel flintIndex,
AsyncQueryRequestContext asyncQueryRequestContext) {
LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.removeJob(flintIndexMetadata.getOpensearchIndexName());
}
flintIndexClient.deleteIndex(flintIndexMetadata.getOpensearchIndexName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class FlintIndexOpVacuumTest {
@Mock FlintIndexStateModel flintIndexStateModel;
@Mock FlintIndexStateModel transitionedFlintIndexStateModel;
@Mock AsyncQueryRequestContext asyncQueryRequestContext;
@Mock AsyncQueryScheduler asyncQueryScheduler;

RuntimeException testException = new RuntimeException("Test Exception");

Expand All @@ -52,7 +53,8 @@ public void setUp() {
flintIndexStateModelService,
DATASOURCE_NAME,
flintIndexClient,
emrServerlessClientFactory);
emrServerlessClientFactory,
asyncQueryScheduler);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -53,6 +53,7 @@ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler {
private final Client client;
private final ClusterService clusterService;

@Override
/** Schedules a new job by indexing it into the job index. */
public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
OpenSearchScheduleQueryJobRequest request =
Expand Down Expand Up @@ -87,7 +88,7 @@ public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {

@Override
/** Unschedules a job by marking it as disabled and updating its last update time. */
public void unscheduleJob(String jobId) throws IOException {
public void unscheduleJob(String jobId) {
OpenSearchScheduleQueryJobRequest request =
OpenSearchScheduleQueryJobRequest.builder()
.jobId(jobId)
Expand All @@ -102,8 +103,10 @@ public void unscheduleJob(String jobId) throws IOException {
}
}

@Override
@SneakyThrows
/** Updates an existing job with new parameters. */
public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) throws IOException {
public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
OpenSearchScheduleQueryJobRequest request =
OpenSearchScheduleQueryJobRequest.fromAsyncQuerySchedulerRequest(
asyncQuerySchedulerRequest);
Expand Down

0 comments on commit dccc30e

Please sign in to comment.