Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Implement vacuum index operation #2562

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
Expand All @@ -32,6 +33,7 @@
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpVacuum;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/** Handle Index DML query. includes * DROP * ALT? */
Expand All @@ -51,6 +53,8 @@ public class IndexDMLHandler extends AsyncQueryHandler {

private final StateStore stateStore;

private final Client client;

public static boolean isIndexDMLQuery(String jobId) {
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId);
}
Expand Down Expand Up @@ -127,6 +131,23 @@ private void executeIndexOp(
flintIndexMetadataService);
flintIndexOpAlter.apply(indexMetadata);
break;
case VACUUM:
// Try to perform drop operation first
FlintIndexOp tryDropOp =
new FlintIndexOpDrop(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
try {
tryDropOp.apply(indexMetadata);
} catch (IllegalStateException e) {
// Drop failed possibly due to invalid initial state
}

// Continue to delete index data physically if state is DELETED
// which means previous transaction succeeds
FlintIndexOp indexVacuumOp =
new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client);
indexVacuumOp.apply(indexMetadata);
break;
default:
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private boolean isEligibleForStreamingQuery(IndexQueryDetails indexQueryDetails)

private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetails) {
return IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())
|| IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType())
|| (IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType())
&& (indexQueryDetails
.getFlintIndexOptions()
Expand Down Expand Up @@ -161,7 +162,11 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {

private IndexDMLHandler createIndexDMLHandler(EMRServerlessClient emrServerlessClient) {
return new IndexDMLHandler(
emrServerlessClient, jobExecutionResponseReader, flintIndexMetadataService, stateStore);
emrServerlessClient,
jobExecutionResponseReader,
flintIndexMetadataService,
stateStore,
client);
}

// TODO: Revisit this logic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ public enum IndexQueryActionType {
DESCRIBE,
SHOW,
DROP,
VACUUM,
ALTER
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
Expand Down Expand Up @@ -167,6 +169,33 @@
}
}

/**
* Delete the index state document with the given ID.
*
* @param sid index state doc ID
* @param indexName index store index name
* @return true if deleted, otherwise false
*/
@VisibleForTesting
public boolean delete(String sid, String indexName) {
try {
// No action if the index doesn't exist
if (!this.clusterService.state().routingTable().hasIndex(indexName)) {
return true;

Check warning on line 184 in spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java#L184

Added line #L184 was not covered by tests
}

try (ThreadContext.StoredContext ignored =
client.threadPool().getThreadContext().stashContext()) {
DeleteRequest deleteRequest = new DeleteRequest(indexName, sid);
DeleteResponse deleteResponse = client.delete(deleteRequest).actionGet();
return deleteResponse.getResult() == DocWriteResponse.Result.DELETED;
}
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to delete index state doc %s in index %s", sid, indexName), e);

Check warning on line 195 in spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java#L193-L195

Added lines #L193 - L195 were not covered by tests
}
}

private void createIndex(String indexName) {
try {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
Expand Down Expand Up @@ -328,6 +357,16 @@
st, FlintIndexStateModel::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

/**
* @param stateStore index state store
* @param datasourceName data source name
* @return function that accepts index state doc ID and perform the deletion
*/
public static Function<String, Boolean> deleteFlintIndexState(
StateStore stateStore, String datasourceName) {
return (docId) -> stateStore.delete(docId, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

public static Function<IndexDMLResult, IndexDMLResult> createIndexDMLResult(
StateStore stateStore, String indexName) {
return (result) -> stateStore.create(result, IndexDMLResult::copy, indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public enum FlintIndexState {
// stable state
FAILED("failed"),
// unknown state, if some state update in Spark side, not reflect in here.
UNKNOWN("unknown");
UNKNOWN("unknown"),
// special state that instructs StateStore to purge the index state doc
NONE("none");

private final String state;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.spark.flint.operation;

import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState;

Expand Down Expand Up @@ -120,7 +121,12 @@ private void commit(FlintIndexStateModel flintIndex) {
LOG.debug("Committing the transaction and moving to stable state.");
FlintIndexState stableState = stableState();
try {
updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState);
if (stableState == FlintIndexState.NONE) {
LOG.info("Deleting index state with docId: " + flintIndex.getLatestId());
deleteFlintIndexState(stateStore, datasourceName).apply(flintIndex.getLatestId());
} else {
updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState);
}
} catch (Exception e) {
String errorMsg =
String.format(Locale.ROOT, "commit failed. target stable state: [%s]", stableState);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint.operation;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;

/** Flint index vacuum operation. */
public class FlintIndexOpVacuum extends FlintIndexOp {

private static final Logger LOG = LogManager.getLogger();

/** OpenSearch client. */
private final Client client;

public FlintIndexOpVacuum(StateStore stateStore, String datasourceName, Client client) {
super(stateStore, datasourceName);
this.client = client;
}

@Override
boolean validate(FlintIndexState state) {
return state == FlintIndexState.DELETED;
}

@Override
FlintIndexState transitioningState() {
return FlintIndexState.VACUUMING;
}

@Override
public void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndex) {
LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName());
DeleteIndexRequest request =
new DeleteIndexRequest().indices(flintIndexMetadata.getOpensearchIndexName());
AcknowledgedResponse response = client.admin().indices().delete(request).actionGet();
LOG.info("OpenSearch index delete result: {}", response.isAcknowledged());
}

@Override
FlintIndexState stableState() {
// Instruct StateStore to purge the index state doc
return FlintIndexState.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,31 @@ public Void visitDropMaterializedViewStatement(
return super.visitDropMaterializedViewStatement(ctx);
}

@Override
public Void visitVacuumSkippingIndexStatement(
FlintSparkSqlExtensionsParser.VacuumSkippingIndexStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING);
return super.visitVacuumSkippingIndexStatement(ctx);
}

@Override
public Void visitVacuumCoveringIndexStatement(
FlintSparkSqlExtensionsParser.VacuumCoveringIndexStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING);
return super.visitVacuumCoveringIndexStatement(ctx);
}

@Override
public Void visitVacuumMaterializedViewStatement(
FlintSparkSqlExtensionsParser.VacuumMaterializedViewStatementContext ctx) {
indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM);
indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW);
indexQueryDetailsBuilder.mvName(ctx.mvName.getText());
return super.visitVacuumMaterializedViewStatement(ctx);
}

@Override
public Void visitDescribeCoveringIndexStatement(
FlintSparkSqlExtensionsParser.DescribeCoveringIndexStatementContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec {
"REFRESH SKIPPING INDEX ON mys3.default.http_logs",
FlintIndexType.SKIPPING,
"flint_mys3_default_http_logs_skipping_index")
.latestId("skippingindexid");
.latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19za2lwcGluZ19pbmRleA==");

private MockFlintSparkJob mockIndexState;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
public final String REFRESH_SI = "REFRESH SKIPPING INDEX on mys3.default.http_logs";
public final String REFRESH_CI = "REFRESH INDEX covering ON mys3.default.http_logs";
public final String REFRESH_MV = "REFRESH MATERIALIZED VIEW mv";
public final String REFRESH_MV = "REFRESH MATERIALIZED VIEW mys3.default.http_logs_metrics";

public final FlintDatasetMock LEGACY_SKIPPING =
new FlintDatasetMock(
Expand All @@ -47,7 +47,10 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
.isLegacy(true);
public final FlintDatasetMock LEGACY_MV =
new FlintDatasetMock(
"DROP MATERIALIZED VIEW mv", REFRESH_MV, FlintIndexType.MATERIALIZED_VIEW, "flint_mv")
"DROP MATERIALIZED VIEW mys3.default.http_logs_metrics",
REFRESH_MV,
FlintIndexType.MATERIALIZED_VIEW,
"flint_mys3_default_http_logs_metrics")
.isLegacy(true);

public final FlintDatasetMock SKIPPING =
Expand All @@ -56,18 +59,21 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
REFRESH_SI,
FlintIndexType.SKIPPING,
"flint_mys3_default_http_logs_skipping_index")
.latestId("skippingindexid");
.latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19za2lwcGluZ19pbmRleA==");
public final FlintDatasetMock COVERING =
new FlintDatasetMock(
"DROP INDEX covering ON mys3.default.http_logs",
REFRESH_CI,
FlintIndexType.COVERING,
"flint_mys3_default_http_logs_covering_index")
.latestId("coveringid");
.latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19jb3ZlcmluZ19pbmRleA==");
public final FlintDatasetMock MV =
new FlintDatasetMock(
"DROP MATERIALIZED VIEW mv", REFRESH_MV, FlintIndexType.MATERIALIZED_VIEW, "flint_mv")
.latestId("mvid");
"DROP MATERIALIZED VIEW mys3.default.http_logs_metrics",
REFRESH_MV,
FlintIndexType.MATERIALIZED_VIEW,
"flint_mys3_default_http_logs_metrics")
.latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19tZXRyaWNz");
public final String CREATE_SI_AUTO =
"CREATE SKIPPING INDEX ON mys3.default.http_logs"
+ "(l_orderkey VALUE_SET) WITH (auto_refresh = true)";
Expand All @@ -77,7 +83,7 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
+ "(l_orderkey, l_quantity) WITH (auto_refresh = true)";

public final String CREATE_MV_AUTO =
"CREATE MATERIALIZED VIEW mv AS select * "
"CREATE MATERIALIZED VIEW mys3.default.http_logs_metrics AS select * "
+ "from mys3.default.https WITH (auto_refresh = true)";

/**
Expand Down
Loading
Loading