Skip to content

Commit

Permalink
Merge branch 'main' into dqs/add-query-status
Browse files Browse the repository at this point in the history
# Conflicts:
#	async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java
  • Loading branch information
ykmr1224 committed Sep 6, 2024
2 parents 965c449 + 83e89fb commit 74f36ad
Show file tree
Hide file tree
Showing 14 changed files with 102 additions and 894 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ private FlintIndexOp getIndexOp(
case ALTER:
return flintIndexOpFactory.getAlter(
indexQueryDetails.getFlintIndexOptions(), dispatchQueryRequest.getDatasource());
case VACUUM:
return flintIndexOpFactory.getVacuum(dispatchQueryRequest.getDatasource());
default:
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ private boolean isEligibleForStreamingQuery(IndexQueryDetails indexQueryDetails)

private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetails) {
return IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())
|| IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType())
|| (IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType())
&& (indexQueryDetails
.getFlintIndexOptions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,6 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da
asyncQueryScheduler);
}

public FlintIndexOpVacuum getVacuum(String datasource) {
return new FlintIndexOpVacuum(
flintIndexStateModelService,
datasource,
flintIndexClient,
emrServerlessClientFactory,
asyncQueryScheduler);
}

public FlintIndexOpCancel getCancel(String datasource) {
return new FlintIndexOpCancel(
flintIndexStateModelService, datasource, emrServerlessClientFactory);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -267,31 +267,6 @@ public Void visitDropMaterializedViewStatement(
return super.visitDropMaterializedViewStatement(ctx);
}

@Override
public Void visitVacuumSkippingIndexStatement(
FlintSparkSqlExtensionsParser.VacuumSkippingIndexStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING);
return super.visitVacuumSkippingIndexStatement(ctx);
}

@Override
public Void visitVacuumCoveringIndexStatement(
FlintSparkSqlExtensionsParser.VacuumCoveringIndexStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING);
return super.visitVacuumCoveringIndexStatement(ctx);
}

@Override
public Void visitVacuumMaterializedViewStatement(
FlintSparkSqlExtensionsParser.VacuumMaterializedViewStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW);
indexQueryDetailsBuilder.mvName(ctx.mvName.getText());
return super.visitVacuumMaterializedViewStatement(ctx);
}

@Override
public Void visitDescribeCoveringIndexStatement(
FlintSparkSqlExtensionsParser.DescribeCoveringIndexStatementContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,32 +237,12 @@ public void createDropIndexQueryWithScheduler() {
public void createVacuumIndexQuery() {
givenSparkExecutionEngineConfigIsSupplied();
givenValidDataSourceMetadataExist();
givenSessionExists();
when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID);
String indexName = "flint_datasource_name_table_name_index_name_index";
givenFlintIndexMetadataExists(indexName);

CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"VACUUM INDEX index_name ON table_name", DATASOURCE_NAME, LangType.SQL),
asyncQueryRequestContext);

assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
verifyGetQueryIdCalled();
verify(flintIndexClient).deleteIndex(indexName);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);
}

@Test
public void createVacuumIndexQueryWithScheduler() {
givenSparkExecutionEngineConfigIsSupplied();
givenValidDataSourceMetadataExist();
when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID);

String indexName = "flint_datasource_name_table_name_index_name_index";
givenFlintIndexMetadataExistsWithExternalScheduler(indexName);
when(sessionIdProvider.getSessionId(any())).thenReturn(SESSION_ID);
givenSessionExists(); // called twice
when(awsemrServerless.startJobRun(any()))
.thenReturn(new StartJobRunResult().withApplicationId(APPLICATION_ID).withJobRunId(JOB_ID));

CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
Expand All @@ -271,14 +251,12 @@ public void createVacuumIndexQueryWithScheduler() {
asyncQueryRequestContext);

assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
assertEquals(SESSION_ID, response.getSessionId());
verifyGetQueryIdCalled();

verify(flintIndexClient).deleteIndex(indexName);
verifyCreateIndexDMLResultCalled();
verifyGetSessionIdCalled();
verify(leaseManager).borrow(any());
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);

verify(asyncQueryScheduler).removeJob(indexName);
}

@Test
Expand Down
Loading

0 comments on commit 74f36ad

Please sign in to comment.