Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push down OpenSearch specific exception handling #2778

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
Expand Down Expand Up @@ -53,29 +52,23 @@ public class InteractiveSession implements Session {
public void open(
CreateSessionRequest createSessionRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
try {
// append session id;
createSessionRequest
.getSparkSubmitParameters()
.acceptModifier(
(parameters) -> {
parameters.sessionExecution(sessionId, createSessionRequest.getDatasourceName());
});
createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId);
StartJobRequest startJobRequest = createSessionRequest.getStartJobRequest(sessionId);
String jobID = serverlessClient.startJobRun(startJobRequest);
String applicationId = startJobRequest.getApplicationId();
String accountId = createSessionRequest.getAccountId();
// append session id;
createSessionRequest
.getSparkSubmitParameters()
.acceptModifier(
(parameters) -> {
parameters.sessionExecution(sessionId, createSessionRequest.getDatasourceName());
});
createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId);
StartJobRequest startJobRequest = createSessionRequest.getStartJobRequest(sessionId);
String jobID = serverlessClient.startJobRun(startJobRequest);
String applicationId = startJobRequest.getApplicationId();
String accountId = createSessionRequest.getAccountId();

sessionModel =
initInteractiveSession(
accountId, applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStorageService.createSession(sessionModel, asyncQueryRequestContext);
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
sessionModel =
initInteractiveSession(
accountId, applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStorageService.createSession(sessionModel, asyncQueryRequestContext);
}

/** todo. StatementSweeper will delete doc. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.execution.statestore.StatementStorageService;
import org.opensearch.sql.spark.rest.model.LangType;
Expand Down Expand Up @@ -41,25 +39,19 @@ public class Statement {

/** Open a statement. */
public void open() {
try {
statementModel =
submitStatement(
sessionId,
accountId,
applicationId,
jobId,
statementId,
langType,
datasourceName,
query,
queryId);
statementModel =
statementStorageService.createStatement(statementModel, asyncQueryRequestContext);
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
statementModel =
submitStatement(
sessionId,
accountId,
applicationId,
jobId,
statementId,
langType,
datasourceName,
query,
queryId);
statementModel =
statementStorageService.createStatement(statementModel, asyncQueryRequestContext);
}

/** Cancel a statement. */
Expand All @@ -77,26 +69,8 @@ public void cancel() {
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
try {
this.statementModel =
statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED);
} catch (DocumentMissingException e) {
String errorMsg =
String.format("cancel statement failed. no statement found. statement: %s.", statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
} catch (VersionConflictEngineException e) {
this.statementModel =
statementStorageService
.getStatement(statementModel.getId(), statementModel.getDatasourceName())
.orElse(this.statementModel);
String errorMsg =
String.format(
"cancel statement failed. current statementState: %s " + "statement: %s.",
this.statementModel.getStatementState(), statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
this.statementModel =
statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED);
}

public StatementState getStatementState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,35 @@

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.execution.session.SessionModel;
import org.opensearch.sql.spark.execution.session.SessionState;
import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer;

@RequiredArgsConstructor
public class OpenSearchSessionStorageService implements SessionStorageService {
private static final Logger LOG = LogManager.getLogger();

private final StateStore stateStore;
private final SessionModelXContentSerializer serializer;

@Override
public SessionModel createSession(
SessionModel sessionModel, AsyncQueryRequestContext asyncQueryRequestContext) {
return stateStore.create(
sessionModel.getId(),
sessionModel,
SessionModel::of,
OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName()));
try {
return stateStore.create(
sessionModel.getId(),
sessionModel,
SessionModel::of,
OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName()));
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionModel.getSessionId();
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,36 @@

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.execution.statement.StatementModel;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer;

@RequiredArgsConstructor
public class OpenSearchStatementStorageService implements StatementStorageService {
private static final Logger LOG = LogManager.getLogger();

private final StateStore stateStore;
private final StatementModelXContentSerializer serializer;

@Override
public StatementModel createStatement(
StatementModel statementModel, AsyncQueryRequestContext asyncQueryRequestContext) {
return stateStore.create(
statementModel.getId(),
statementModel,
StatementModel::copy,
OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName()));
try {
return stateStore.create(
statementModel.getId(),
statementModel,
StatementModel::copy,
OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName()));
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementModel.getStatementId();
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

@Override
Expand All @@ -37,10 +48,29 @@ public Optional<StatementModel> getStatement(String id, String datasourceName) {
@Override
public StatementModel updateStatementState(
StatementModel oldStatementModel, StatementState statementState) {
return stateStore.updateState(
oldStatementModel,
statementState,
StatementModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(oldStatementModel.getDatasourceName()));
try {
return stateStore.updateState(
oldStatementModel,
statementState,
StatementModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(oldStatementModel.getDatasourceName()));
} catch (DocumentMissingException e) {
String errorMsg =
String.format(
"cancel statement failed. no statement found. statement: %s.",
oldStatementModel.getStatementId());
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
} catch (VersionConflictEngineException e) {
StatementModel statementModel =
getStatement(oldStatementModel.getId(), oldStatementModel.getDatasourceName())
.orElse(oldStatementModel);
String errorMsg =
String.format(
"cancel statement failed. current statementState: %s " + "statement: %s.",
statementModel.getStatementState(), statementModel.getStatementId());
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}
}
Loading