Skip to content

Commit

Permalink
Delegate Flint index vacuum operation to Spark (#2985)
Browse files Browse the repository at this point in the history
* Remove vacuum dispatch and update UT

Signed-off-by: Chen Dai <[email protected]>

* Remove unused code and test

Signed-off-by: Chen Dai <[email protected]>

* Fix jacoco test

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Sep 6, 2024
1 parent 06c56e7 commit 83e89fb
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 890 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ private FlintIndexOp getIndexOp(
case ALTER:
return flintIndexOpFactory.getAlter(
indexQueryDetails.getFlintIndexOptions(), dispatchQueryRequest.getDatasource());
case VACUUM:
return flintIndexOpFactory.getVacuum(dispatchQueryRequest.getDatasource());
default:
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ private boolean isEligibleForStreamingQuery(IndexQueryDetails indexQueryDetails)

private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetails) {
return IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())
|| IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType())
|| (IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType())
&& (indexQueryDetails
.getFlintIndexOptions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,6 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da
asyncQueryScheduler);
}

public FlintIndexOpVacuum getVacuum(String datasource) {
return new FlintIndexOpVacuum(
flintIndexStateModelService,
datasource,
flintIndexClient,
emrServerlessClientFactory,
asyncQueryScheduler);
}

public FlintIndexOpCancel getCancel(String datasource) {
return new FlintIndexOpCancel(
flintIndexStateModelService, datasource, emrServerlessClientFactory);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -267,31 +267,6 @@ public Void visitDropMaterializedViewStatement(
return super.visitDropMaterializedViewStatement(ctx);
}

@Override
public Void visitVacuumSkippingIndexStatement(
FlintSparkSqlExtensionsParser.VacuumSkippingIndexStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING);
return super.visitVacuumSkippingIndexStatement(ctx);
}

@Override
public Void visitVacuumCoveringIndexStatement(
FlintSparkSqlExtensionsParser.VacuumCoveringIndexStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING);
return super.visitVacuumCoveringIndexStatement(ctx);
}

@Override
public Void visitVacuumMaterializedViewStatement(
FlintSparkSqlExtensionsParser.VacuumMaterializedViewStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW);
indexQueryDetailsBuilder.mvName(ctx.mvName.getText());
return super.visitVacuumMaterializedViewStatement(ctx);
}

@Override
public Void visitDescribeCoveringIndexStatement(
FlintSparkSqlExtensionsParser.DescribeCoveringIndexStatementContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,32 +236,12 @@ public void createDropIndexQueryWithScheduler() {
public void createVacuumIndexQuery() {
givenSparkExecutionEngineConfigIsSupplied();
givenValidDataSourceMetadataExist();
givenSessionExists();
when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID);
String indexName = "flint_datasource_name_table_name_index_name_index";
givenFlintIndexMetadataExists(indexName);

CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"VACUUM INDEX index_name ON table_name", DATASOURCE_NAME, LangType.SQL),
asyncQueryRequestContext);

assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
verifyGetQueryIdCalled();
verify(flintIndexClient).deleteIndex(indexName);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH);
}

@Test
public void createVacuumIndexQueryWithScheduler() {
givenSparkExecutionEngineConfigIsSupplied();
givenValidDataSourceMetadataExist();
when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID);

String indexName = "flint_datasource_name_table_name_index_name_index";
givenFlintIndexMetadataExistsWithExternalScheduler(indexName);
when(sessionIdProvider.getSessionId(any())).thenReturn(SESSION_ID);
givenSessionExists(); // called twice
when(awsemrServerless.startJobRun(any()))
.thenReturn(new StartJobRunResult().withApplicationId(APPLICATION_ID).withJobRunId(JOB_ID));

CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
Expand All @@ -270,14 +250,12 @@ public void createVacuumIndexQueryWithScheduler() {
asyncQueryRequestContext);

assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
assertEquals(SESSION_ID, response.getSessionId());
verifyGetQueryIdCalled();

verify(flintIndexClient).deleteIndex(indexName);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH);

verify(asyncQueryScheduler).removeJob(indexName);
verifyGetSessionIdCalled();
verify(leaseManager).borrow(any());
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(JOB_ID, JobType.INTERACTIVE);
}

@Test
Expand Down
Loading

0 comments on commit 83e89fb

Please sign in to comment.