Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Push down OpenSearch specific exception handling #2782

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
Expand Down Expand Up @@ -52,29 +51,23 @@
public void open(
CreateSessionRequest createSessionRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
try {
// append session id;
createSessionRequest
.getSparkSubmitParameters()
.acceptModifier(
(parameters) -> {
parameters.sessionExecution(sessionId, createSessionRequest.getDatasourceName());
});
createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId);
StartJobRequest startJobRequest = createSessionRequest.getStartJobRequest(sessionId);
String jobID = serverlessClient.startJobRun(startJobRequest);
String applicationId = startJobRequest.getApplicationId();
String accountId = createSessionRequest.getAccountId();
// append session id;
createSessionRequest
.getSparkSubmitParameters()
.acceptModifier(

Check warning on line 57 in async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java

View check run for this annotation

Codecov / codecov/patch

async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java#L55-L57

Added lines #L55 - L57 were not covered by tests
(parameters) -> {
parameters.sessionExecution(sessionId, createSessionRequest.getDatasourceName());
});
createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId);
StartJobRequest startJobRequest = createSessionRequest.getStartJobRequest(sessionId);
String jobID = serverlessClient.startJobRun(startJobRequest);
String applicationId = startJobRequest.getApplicationId();
String accountId = createSessionRequest.getAccountId();

Check warning on line 65 in async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java

View check run for this annotation

Codecov / codecov/patch

async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java#L59-L65

Added lines #L59 - L65 were not covered by tests

sessionModel =
initInteractiveSession(
accountId, applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStorageService.createSession(sessionModel, asyncQueryRequestContext);
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
sessionModel =
initInteractiveSession(
accountId, applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStorageService.createSession(sessionModel, asyncQueryRequestContext);

Check warning on line 70 in async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java

View check run for this annotation

Codecov / codecov/patch

async-query-core/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java#L67-L70

Added lines #L67 - L70 were not covered by tests
}

/** todo. StatementSweeper will delete doc. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.execution.statestore.StatementStorageService;
import org.opensearch.sql.spark.rest.model.LangType;
Expand Down Expand Up @@ -41,25 +39,19 @@

/** Open a statement. */
public void open() {
try {
statementModel =
submitStatement(
sessionId,
accountId,
applicationId,
jobId,
statementId,
langType,
datasourceName,
query,
queryId);
statementModel =
statementStorageService.createStatement(statementModel, asyncQueryRequestContext);
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
statementModel =
submitStatement(

Check warning on line 43 in async-query-core/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java

View check run for this annotation

Codecov / codecov/patch

async-query-core/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java#L42-L43

Added lines #L42 - L43 were not covered by tests
sessionId,
accountId,
applicationId,
jobId,
statementId,
langType,
datasourceName,
query,
queryId);
statementModel =
statementStorageService.createStatement(statementModel, asyncQueryRequestContext);

Check warning on line 54 in async-query-core/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java

View check run for this annotation

Codecov / codecov/patch

async-query-core/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java#L53-L54

Added lines #L53 - L54 were not covered by tests
}

/** Cancel a statement. */
Expand All @@ -77,26 +69,8 @@
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
try {
this.statementModel =
statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED);
} catch (DocumentMissingException e) {
String errorMsg =
String.format("cancel statement failed. no statement found. statement: %s.", statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
} catch (VersionConflictEngineException e) {
this.statementModel =
statementStorageService
.getStatement(statementModel.getId(), statementModel.getDatasourceName())
.orElse(this.statementModel);
String errorMsg =
String.format(
"cancel statement failed. current statementState: %s " + "statement: %s.",
this.statementModel.getStatementState(), statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
this.statementModel =
statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED);

Check warning on line 73 in async-query-core/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java

View check run for this annotation

Codecov / codecov/patch

async-query-core/src/main/java/org/opensearch/sql/spark/execution/statement/Statement.java#L72-L73

Added lines #L72 - L73 were not covered by tests
}

public StatementState getStatementState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,35 @@

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.execution.session.SessionModel;
import org.opensearch.sql.spark.execution.session.SessionState;
import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer;

@RequiredArgsConstructor
public class OpenSearchSessionStorageService implements SessionStorageService {
private static final Logger LOG = LogManager.getLogger();

private final StateStore stateStore;
private final SessionModelXContentSerializer serializer;

@Override
public SessionModel createSession(
SessionModel sessionModel, AsyncQueryRequestContext asyncQueryRequestContext) {
return stateStore.create(
sessionModel.getId(),
sessionModel,
SessionModel::of,
OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName()));
try {
return stateStore.create(
sessionModel.getId(),
sessionModel,
SessionModel::of,
OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName()));
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionModel.getSessionId();
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,36 @@

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.execution.statement.StatementModel;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer;

@RequiredArgsConstructor
public class OpenSearchStatementStorageService implements StatementStorageService {
private static final Logger LOG = LogManager.getLogger();

private final StateStore stateStore;
private final StatementModelXContentSerializer serializer;

@Override
public StatementModel createStatement(
StatementModel statementModel, AsyncQueryRequestContext asyncQueryRequestContext) {
return stateStore.create(
statementModel.getId(),
statementModel,
StatementModel::copy,
OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName()));
try {
return stateStore.create(
statementModel.getId(),
statementModel,
StatementModel::copy,
OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName()));
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementModel.getStatementId();
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

@Override
Expand All @@ -37,10 +48,29 @@ public Optional<StatementModel> getStatement(String id, String datasourceName) {
@Override
public StatementModel updateStatementState(
StatementModel oldStatementModel, StatementState statementState) {
return stateStore.updateState(
oldStatementModel,
statementState,
StatementModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(oldStatementModel.getDatasourceName()));
try {
return stateStore.updateState(
oldStatementModel,
statementState,
StatementModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(oldStatementModel.getDatasourceName()));
} catch (DocumentMissingException e) {
String errorMsg =
String.format(
"cancel statement failed. no statement found. statement: %s.",
oldStatementModel.getStatementId());
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
} catch (VersionConflictEngineException e) {
StatementModel statementModel =
getStatement(oldStatementModel.getId(), oldStatementModel.getDatasourceName())
.orElse(oldStatementModel);
String errorMsg =
String.format(
"cancel statement failed. current statementState: %s " + "statement: %s.",
statementModel.getStatementState(), statementModel.getStatementId());
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}
}
Loading