Skip to content

Commit

Permalink
Implement vacuum index operation (#2557)
Browse files Browse the repository at this point in the history
* Add vacuum operation and IT

Signed-off-by: Chen Dai <[email protected]>

* Add index state doc delete and more IT

Signed-off-by: Chen Dai <[email protected]>

* Refactor IT

Signed-off-by: Chen Dai <[email protected]>

* Fix bytebuddy version conflict

Signed-off-by: Chen Dai <[email protected]>

* Fix broken IT

Signed-off-by: Chen Dai <[email protected]>

* Fix broken IT

Signed-off-by: Chen Dai <[email protected]>

* Fix jacoco failure with new IT

Signed-off-by: Chen Dai <[email protected]>

* Fix code format

Signed-off-by: Chen Dai <[email protected]>

* Fix jacoco test coverage

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
(cherry picked from commit 8374cb6)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Mar 19, 2024
1 parent 8e25dd9 commit fd13a80
Show file tree
Hide file tree
Showing 16 changed files with 435 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
Expand All @@ -32,6 +33,7 @@
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpVacuum;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/** Handle Index DML query. includes * DROP * ALT? */
Expand All @@ -51,6 +53,8 @@ public class IndexDMLHandler extends AsyncQueryHandler {

private final StateStore stateStore;

private final Client client;

public static boolean isIndexDMLQuery(String jobId) {
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId);
}
Expand Down Expand Up @@ -127,6 +131,23 @@ private void executeIndexOp(
flintIndexMetadataService);
flintIndexOpAlter.apply(indexMetadata);
break;
case VACUUM:
// Try to perform drop operation first
FlintIndexOp tryDropOp =
new FlintIndexOpDrop(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
try {
tryDropOp.apply(indexMetadata);
} catch (IllegalStateException e) {
// Drop failed possibly due to invalid initial state
}

// Continue to delete index data physically if state is DELETED
// which means previous transaction succeeds
FlintIndexOp indexVacuumOp =
new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client);
indexVacuumOp.apply(indexMetadata);
break;
default:
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private boolean isEligibleForStreamingQuery(IndexQueryDetails indexQueryDetails)

private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetails) {
return IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())
|| IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType())
|| (IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType())
&& (indexQueryDetails
.getFlintIndexOptions()
Expand Down Expand Up @@ -161,7 +162,11 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {

private IndexDMLHandler createIndexDMLHandler(EMRServerlessClient emrServerlessClient) {
return new IndexDMLHandler(
emrServerlessClient, jobExecutionResponseReader, flintIndexMetadataService, stateStore);
emrServerlessClient,
jobExecutionResponseReader,
flintIndexMetadataService,
stateStore,
client);
}

// TODO: Revisit this logic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ public enum IndexQueryActionType {
DESCRIBE,
SHOW,
DROP,
VACUUM,
ALTER
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
Expand Down Expand Up @@ -167,6 +169,33 @@ public <T extends StateModel, S> T updateState(
}
}

/**
* Delete the index state document with the given ID.
*
* @param sid index state doc ID
* @param indexName index store index name
* @return true if deleted, otherwise false
*/
@VisibleForTesting
public boolean delete(String sid, String indexName) {
try {
// No action if the index doesn't exist
if (!this.clusterService.state().routingTable().hasIndex(indexName)) {
return true;

Check warning on line 184 in spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java#L184

Added line #L184 was not covered by tests
}

try (ThreadContext.StoredContext ignored =
client.threadPool().getThreadContext().stashContext()) {
DeleteRequest deleteRequest = new DeleteRequest(indexName, sid);
DeleteResponse deleteResponse = client.delete(deleteRequest).actionGet();
return deleteResponse.getResult() == DocWriteResponse.Result.DELETED;
}
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to delete index state doc %s in index %s", sid, indexName), e);

Check warning on line 195 in spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java#L193-L195

Added lines #L193 - L195 were not covered by tests
}
}

private void createIndex(String indexName) {
try {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
Expand Down Expand Up @@ -328,6 +357,16 @@ public static Function<FlintIndexStateModel, FlintIndexStateModel> createFlintIn
st, FlintIndexStateModel::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

/**
* @param stateStore index state store
* @param datasourceName data source name
* @return function that accepts index state doc ID and perform the deletion
*/
public static Function<String, Boolean> deleteFlintIndexState(
StateStore stateStore, String datasourceName) {
return (docId) -> stateStore.delete(docId, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

public static Function<IndexDMLResult, IndexDMLResult> createIndexDMLResult(
StateStore stateStore, String indexName) {
return (result) -> stateStore.create(result, IndexDMLResult::copy, indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public enum FlintIndexState {
// stable state
FAILED("failed"),
// unknown state, if some state update in Spark side, not reflect in here.
UNKNOWN("unknown");
UNKNOWN("unknown"),
// special state that instructs StateStore to purge the index state doc
NONE("none");

private final String state;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.spark.flint.operation;

import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState;

Expand Down Expand Up @@ -120,7 +121,12 @@ private void commit(FlintIndexStateModel flintIndex) {
LOG.debug("Committing the transaction and moving to stable state.");
FlintIndexState stableState = stableState();
try {
updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState);
if (stableState == FlintIndexState.NONE) {
LOG.info("Deleting index state with docId: " + flintIndex.getLatestId());
deleteFlintIndexState(stateStore, datasourceName).apply(flintIndex.getLatestId());
} else {
updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState);
}
} catch (Exception e) {
String errorMsg =
String.format(Locale.ROOT, "commit failed. target stable state: [%s]", stableState);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint.operation;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;

/** Flint index vacuum operation. */
public class FlintIndexOpVacuum extends FlintIndexOp {

private static final Logger LOG = LogManager.getLogger();

/** OpenSearch client. */
private final Client client;

public FlintIndexOpVacuum(StateStore stateStore, String datasourceName, Client client) {
super(stateStore, datasourceName);
this.client = client;
}

@Override
boolean validate(FlintIndexState state) {
return state == FlintIndexState.DELETED;
}

@Override
FlintIndexState transitioningState() {
return FlintIndexState.VACUUMING;
}

@Override
public void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndex) {
LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName());
DeleteIndexRequest request =
new DeleteIndexRequest().indices(flintIndexMetadata.getOpensearchIndexName());
AcknowledgedResponse response = client.admin().indices().delete(request).actionGet();
LOG.info("OpenSearch index delete result: {}", response.isAcknowledged());
}

@Override
FlintIndexState stableState() {
// Instruct StateStore to purge the index state doc
return FlintIndexState.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,31 @@ public Void visitDropMaterializedViewStatement(
return super.visitDropMaterializedViewStatement(ctx);
}

@Override
public Void visitVacuumSkippingIndexStatement(
FlintSparkSqlExtensionsParser.VacuumSkippingIndexStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING);
return super.visitVacuumSkippingIndexStatement(ctx);
}

@Override
public Void visitVacuumCoveringIndexStatement(
FlintSparkSqlExtensionsParser.VacuumCoveringIndexStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING);
return super.visitVacuumCoveringIndexStatement(ctx);
}

@Override
public Void visitVacuumMaterializedViewStatement(
FlintSparkSqlExtensionsParser.VacuumMaterializedViewStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW);
indexQueryDetailsBuilder.mvName(ctx.mvName.getText());
return super.visitVacuumMaterializedViewStatement(ctx);
}

@Override
public Void visitDescribeCoveringIndexStatement(
FlintSparkSqlExtensionsParser.DescribeCoveringIndexStatementContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec {
"REFRESH SKIPPING INDEX ON mys3.default.http_logs",
FlintIndexType.SKIPPING,
"flint_mys3_default_http_logs_skipping_index")
.latestId("skippingindexid");
.latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19za2lwcGluZ19pbmRleA==");

private MockFlintSparkJob mockIndexState;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
public final String REFRESH_SI = "REFRESH SKIPPING INDEX on mys3.default.http_logs";
public final String REFRESH_CI = "REFRESH INDEX covering ON mys3.default.http_logs";
public final String REFRESH_MV = "REFRESH MATERIALIZED VIEW mv";
public final String REFRESH_MV = "REFRESH MATERIALIZED VIEW mys3.default.http_logs_metrics";

public final FlintDatasetMock LEGACY_SKIPPING =
new FlintDatasetMock(
Expand All @@ -47,7 +47,10 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
.isLegacy(true);
public final FlintDatasetMock LEGACY_MV =
new FlintDatasetMock(
"DROP MATERIALIZED VIEW mv", REFRESH_MV, FlintIndexType.MATERIALIZED_VIEW, "flint_mv")
"DROP MATERIALIZED VIEW mys3.default.http_logs_metrics",
REFRESH_MV,
FlintIndexType.MATERIALIZED_VIEW,
"flint_mys3_default_http_logs_metrics")
.isLegacy(true);

public final FlintDatasetMock SKIPPING =
Expand All @@ -56,18 +59,21 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
REFRESH_SI,
FlintIndexType.SKIPPING,
"flint_mys3_default_http_logs_skipping_index")
.latestId("skippingindexid");
.latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19za2lwcGluZ19pbmRleA==");
public final FlintDatasetMock COVERING =
new FlintDatasetMock(
"DROP INDEX covering ON mys3.default.http_logs",
REFRESH_CI,
FlintIndexType.COVERING,
"flint_mys3_default_http_logs_covering_index")
.latestId("coveringid");
.latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19jb3ZlcmluZ19pbmRleA==");
public final FlintDatasetMock MV =
new FlintDatasetMock(
"DROP MATERIALIZED VIEW mv", REFRESH_MV, FlintIndexType.MATERIALIZED_VIEW, "flint_mv")
.latestId("mvid");
"DROP MATERIALIZED VIEW mys3.default.http_logs_metrics",
REFRESH_MV,
FlintIndexType.MATERIALIZED_VIEW,
"flint_mys3_default_http_logs_metrics")
.latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19tZXRyaWNz");
public final String CREATE_SI_AUTO =
"CREATE SKIPPING INDEX ON mys3.default.http_logs"
+ "(l_orderkey VALUE_SET) WITH (auto_refresh = true)";
Expand All @@ -77,7 +83,7 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
+ "(l_orderkey, l_quantity) WITH (auto_refresh = true)";

public final String CREATE_MV_AUTO =
"CREATE MATERIALIZED VIEW mv AS select * "
"CREATE MATERIALIZED VIEW mys3.default.http_logs_metrics AS select * "
+ "from mys3.default.https WITH (auto_refresh = true)";

/**
Expand Down
Loading

0 comments on commit fd13a80

Please sign in to comment.