Skip to content

Commit

Permalink
Revert "Bug Fix, support cancel query in running state (opensearch-pr…
Browse files Browse the repository at this point in the history
  • Loading branch information
vmmusings committed Nov 13, 2023
1 parent d420e73 commit 4a1566d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,9 @@ public void open() {

/** Cancel a statement. */
public void cancel() {
StatementState statementState = statementModel.getStatementState();

if (statementState.equals(StatementState.SUCCESS)
|| statementState.equals(StatementState.FAILED)
|| statementState.equals(StatementState.CANCELLED)) {
if (statementModel.getStatementState().equals(StatementState.RUNNING)) {
String errorMsg =
String.format(
"can't cancel statement in %s state. statement: %s.",
statementState.getState(), statementId);
String.format("can't cancel statement in waiting state. statement: %s.", statementId);
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import static org.opensearch.sql.spark.execution.session.InteractiveSessionTest.createSessionRequest;
import static org.opensearch.sql.spark.execution.session.SessionManagerTest.sessionSetting;
import static org.opensearch.sql.spark.execution.statement.StatementState.CANCELLED;
import static org.opensearch.sql.spark.execution.statement.StatementState.RUNNING;
import static org.opensearch.sql.spark.execution.statement.StatementState.WAITING;
import static org.opensearch.sql.spark.execution.statement.StatementTest.TestStatement.testStatement;
import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;
Expand Down Expand Up @@ -169,91 +168,36 @@ public void cancelFailedBecauseOfConflict() {
}

@Test
public void cancelSuccessStatementFailed() {
public void cancelRunningStatementFailed() {
StatementId stId = new StatementId("statementId");
Statement st = createStatement(stId);

// update to running state
StatementModel model = st.getStatementModel();
st.setStatementModel(
StatementModel.copyWithState(
st.getStatementModel(),
StatementState.SUCCESS,
model.getSeqNo(),
model.getPrimaryTerm()));

// cancel conflict
IllegalStateException exception = assertThrows(IllegalStateException.class, st::cancel);
assertEquals(
String.format("can't cancel statement in success state. statement: %s.", stId),
exception.getMessage());
}

@Test
public void cancelFailedStatementFailed() {
StatementId stId = new StatementId("statementId");
Statement st = createStatement(stId);

// update to running state
StatementModel model = st.getStatementModel();
st.setStatementModel(
StatementModel.copyWithState(
st.getStatementModel(),
StatementState.FAILED,
model.getSeqNo(),
model.getPrimaryTerm()));

// cancel conflict
IllegalStateException exception = assertThrows(IllegalStateException.class, st::cancel);
assertEquals(
String.format("can't cancel statement in failed state. statement: %s.", stId),
exception.getMessage());
}

@Test
public void cancelCancelledStatementFailed() {
StatementId stId = new StatementId("statementId");
Statement st = createStatement(stId);

// update to running state
StatementModel model = st.getStatementModel();
st.setStatementModel(
StatementModel.copyWithState(
st.getStatementModel(), CANCELLED, model.getSeqNo(), model.getPrimaryTerm()));

// cancel conflict
IllegalStateException exception = assertThrows(IllegalStateException.class, st::cancel);
assertEquals(
String.format("can't cancel statement in cancelled state. statement: %s.", stId),
exception.getMessage());
}

@Test
public void cancelRunningStatementSuccess() {
Statement st =
Statement.builder()
.sessionId(new SessionId("sessionId"))
.applicationId("appId")
.jobId("jobId")
.statementId(new StatementId("statementId"))
.statementId(stId)
.langType(LangType.SQL)
.datasourceName(DS_NAME)
.query("query")
.queryId("statementId")
.stateStore(stateStore)
.build();
st.open();

// submit statement
TestStatement testStatement = testStatement(st, stateStore);
testStatement
.open()
.assertSessionState(WAITING)
.assertStatementId(new StatementId("statementId"));

testStatement.run();
// update to running state
StatementModel model = st.getStatementModel();
st.setStatementModel(
StatementModel.copyWithState(
st.getStatementModel(),
StatementState.RUNNING,
model.getSeqNo(),
model.getPrimaryTerm()));

// close statement
testStatement.cancel().assertSessionState(CANCELLED);
// cancel conflict
IllegalStateException exception = assertThrows(IllegalStateException.class, st::cancel);
assertEquals(
String.format("can't cancel statement in waiting state. statement: %s.", stId),
exception.getMessage());
}

@Test
Expand Down Expand Up @@ -411,33 +355,9 @@ public TestStatement cancel() {
st.cancel();
return this;
}

public TestStatement run() {
StatementModel model =
updateStatementState(stateStore, DS_NAME).apply(st.getStatementModel(), RUNNING);
st.setStatementModel(model);
return this;
}
}

private QueryRequest queryRequest() {
return new QueryRequest(AsyncQueryId.newAsyncQueryId(DS_NAME), LangType.SQL, "select 1");
}

private Statement createStatement(StatementId stId) {
Statement st =
Statement.builder()
.sessionId(new SessionId("sessionId"))
.applicationId("appId")
.jobId("jobId")
.statementId(stId)
.langType(LangType.SQL)
.datasourceName(DS_NAME)
.query("query")
.queryId("statementId")
.stateStore(stateStore)
.build();
st.open();
return st;
}
}

0 comments on commit 4a1566d

Please sign in to comment.