Skip to content

Commit

Permalink
Push down OpenSearch specific exception handling
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed Jun 25, 2024
1 parent b2403ca commit bb5ba9e
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
Expand Down Expand Up @@ -53,29 +52,23 @@ public class InteractiveSession implements Session {
public void open(
CreateSessionRequest createSessionRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
try {
// append session id;
createSessionRequest
.getSparkSubmitParameters()
.acceptModifier(
(parameters) -> {
parameters.sessionExecution(sessionId, createSessionRequest.getDatasourceName());
});
createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId);
StartJobRequest startJobRequest = createSessionRequest.getStartJobRequest(sessionId);
String jobID = serverlessClient.startJobRun(startJobRequest);
String applicationId = startJobRequest.getApplicationId();
String accountId = createSessionRequest.getAccountId();
// append session id;
createSessionRequest
.getSparkSubmitParameters()
.acceptModifier(
(parameters) -> {
parameters.sessionExecution(sessionId, createSessionRequest.getDatasourceName());
});
createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId);
StartJobRequest startJobRequest = createSessionRequest.getStartJobRequest(sessionId);
String jobID = serverlessClient.startJobRun(startJobRequest);
String applicationId = startJobRequest.getApplicationId();
String accountId = createSessionRequest.getAccountId();

sessionModel =
initInteractiveSession(
accountId, applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStorageService.createSession(sessionModel, asyncQueryRequestContext);
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
sessionModel =
initInteractiveSession(
accountId, applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStorageService.createSession(sessionModel, asyncQueryRequestContext);
}

/** todo. StatementSweeper will delete doc. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.execution.statestore.StatementStorageService;
import org.opensearch.sql.spark.rest.model.LangType;
Expand Down Expand Up @@ -41,25 +39,19 @@ public class Statement {

/** Open a statement. */
public void open() {
try {
statementModel =
submitStatement(
sessionId,
accountId,
applicationId,
jobId,
statementId,
langType,
datasourceName,
query,
queryId);
statementModel =
statementStorageService.createStatement(statementModel, asyncQueryRequestContext);
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
statementModel =
submitStatement(
sessionId,
accountId,
applicationId,
jobId,
statementId,
langType,
datasourceName,
query,
queryId);
statementModel =
statementStorageService.createStatement(statementModel, asyncQueryRequestContext);
}

/** Cancel a statement. */
Expand All @@ -77,26 +69,8 @@ public void cancel() {
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
try {
this.statementModel =
statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED);
} catch (DocumentMissingException e) {
String errorMsg =
String.format("cancel statement failed. no statement found. statement: %s.", statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
} catch (VersionConflictEngineException e) {
this.statementModel =
statementStorageService
.getStatement(statementModel.getId(), statementModel.getDatasourceName())
.orElse(this.statementModel);
String errorMsg =
String.format(
"cancel statement failed. current statementState: %s " + "statement: %s.",
this.statementModel.getStatementState(), statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
this.statementModel =
statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED);
}

public StatementState getStatementState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,35 @@

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.execution.session.SessionModel;
import org.opensearch.sql.spark.execution.session.SessionState;
import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer;

@RequiredArgsConstructor
public class OpenSearchSessionStorageService implements SessionStorageService {
private static final Logger LOG = LogManager.getLogger();

private final StateStore stateStore;
private final SessionModelXContentSerializer serializer;

@Override
public SessionModel createSession(
SessionModel sessionModel, AsyncQueryRequestContext asyncQueryRequestContext) {
return stateStore.create(
sessionModel.getId(),
sessionModel,
SessionModel::of,
OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName()));
try {
return stateStore.create(
sessionModel.getId(),
sessionModel,
SessionModel::of,
OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName()));
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionModel.getSessionId();
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,36 @@

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.execution.statement.StatementModel;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer;

@RequiredArgsConstructor
public class OpenSearchStatementStorageService implements StatementStorageService {
private static final Logger LOG = LogManager.getLogger();

private final StateStore stateStore;
private final StatementModelXContentSerializer serializer;

@Override
public StatementModel createStatement(
StatementModel statementModel, AsyncQueryRequestContext asyncQueryRequestContext) {
return stateStore.create(
statementModel.getId(),
statementModel,
StatementModel::copy,
OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName()));
try {
return stateStore.create(
statementModel.getId(),
statementModel,
StatementModel::copy,
OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName()));
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementModel.getStatementId();
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

@Override
Expand All @@ -37,10 +48,29 @@ public Optional<StatementModel> getStatement(String id, String datasourceName) {
@Override
public StatementModel updateStatementState(
StatementModel oldStatementModel, StatementState statementState) {
return stateStore.updateState(
oldStatementModel,
statementState,
StatementModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(oldStatementModel.getDatasourceName()));
try {
return stateStore.updateState(
oldStatementModel,
statementState,
StatementModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(oldStatementModel.getDatasourceName()));
} catch (DocumentMissingException e) {
String errorMsg =
String.format(
"cancel statement failed. no statement found. statement: %s.",
oldStatementModel.getStatementId());
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
} catch (VersionConflictEngineException e) {
StatementModel statementModel =
getStatement(oldStatementModel.getId(), oldStatementModel.getDatasourceName())
.orElse(oldStatementModel);
String errorMsg =
String.format(
"cancel statement failed. current statementState: %s " + "statement: %s.",
statementModel.getStatementState(), statementModel.getStatementId());
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}
}

0 comments on commit bb5ba9e

Please sign in to comment.