Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delegate Flint index vacuum operation to Spark #2985

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ private FlintIndexOp getIndexOp(
case ALTER:
return flintIndexOpFactory.getAlter(
indexQueryDetails.getFlintIndexOptions(), dispatchQueryRequest.getDatasource());
case VACUUM:
return flintIndexOpFactory.getVacuum(dispatchQueryRequest.getDatasource());
default:
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ private boolean isEligibleForStreamingQuery(IndexQueryDetails indexQueryDetails)

private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetails) {
return IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())
|| IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType())
|| (IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType())
&& (indexQueryDetails
.getFlintIndexOptions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,6 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da
asyncQueryScheduler);
}

public FlintIndexOpVacuum getVacuum(String datasource) {
return new FlintIndexOpVacuum(
flintIndexStateModelService,
datasource,
flintIndexClient,
emrServerlessClientFactory,
asyncQueryScheduler);
}

public FlintIndexOpCancel getCancel(String datasource) {
return new FlintIndexOpCancel(
flintIndexStateModelService, datasource, emrServerlessClientFactory);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -267,31 +267,6 @@ public Void visitDropMaterializedViewStatement(
return super.visitDropMaterializedViewStatement(ctx);
}

@Override
public Void visitVacuumSkippingIndexStatement(
FlintSparkSqlExtensionsParser.VacuumSkippingIndexStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING);
return super.visitVacuumSkippingIndexStatement(ctx);
}

@Override
public Void visitVacuumCoveringIndexStatement(
FlintSparkSqlExtensionsParser.VacuumCoveringIndexStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING);
return super.visitVacuumCoveringIndexStatement(ctx);
}

@Override
public Void visitVacuumMaterializedViewStatement(
FlintSparkSqlExtensionsParser.VacuumMaterializedViewStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW);
indexQueryDetailsBuilder.mvName(ctx.mvName.getText());
return super.visitVacuumMaterializedViewStatement(ctx);
}

@Override
public Void visitDescribeCoveringIndexStatement(
FlintSparkSqlExtensionsParser.DescribeCoveringIndexStatementContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,32 +236,12 @@ public void createDropIndexQueryWithScheduler() {
public void createVacuumIndexQuery() {
givenSparkExecutionEngineConfigIsSupplied();
givenValidDataSourceMetadataExist();
givenSessionExists();
when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID);
String indexName = "flint_datasource_name_table_name_index_name_index";
givenFlintIndexMetadataExists(indexName);

CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"VACUUM INDEX index_name ON table_name", DATASOURCE_NAME, LangType.SQL),
asyncQueryRequestContext);

assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
verifyGetQueryIdCalled();
verify(flintIndexClient).deleteIndex(indexName);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH);
}

@Test
public void createVacuumIndexQueryWithScheduler() {
givenSparkExecutionEngineConfigIsSupplied();
givenValidDataSourceMetadataExist();
when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID);

String indexName = "flint_datasource_name_table_name_index_name_index";
givenFlintIndexMetadataExistsWithExternalScheduler(indexName);
when(sessionIdProvider.getSessionId(any())).thenReturn(SESSION_ID);
givenSessionExists(); // called twice
when(awsemrServerless.startJobRun(any()))
.thenReturn(new StartJobRunResult().withApplicationId(APPLICATION_ID).withJobRunId(JOB_ID));

CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
Expand All @@ -270,14 +250,12 @@ public void createVacuumIndexQueryWithScheduler() {
asyncQueryRequestContext);

assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
assertEquals(SESSION_ID, response.getSessionId());
verifyGetQueryIdCalled();

verify(flintIndexClient).deleteIndex(indexName);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH);

verify(asyncQueryScheduler).removeJob(indexName);
verifyGetSessionIdCalled();
verify(leaseManager).borrow(any());
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(JOB_ID, JobType.INTERACTIVE);
}

@Test
Expand Down
Loading
Loading