Skip to content

Commit

Permalink
Remove unneeded datasourceName parameters (#2683)
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 authored May 20, 2024
1 parent 2c849f7 commit a64fcb1
Show file tree
Hide file tree
Showing 17 changed files with 38 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private AsyncQueryId storeIndexDMLResult(
dispatchQueryRequest.getDatasource(),
queryRunTime,
System.currentTimeMillis());
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult, dataSourceMetadata.getName());
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult);
return asyncQueryId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void open(CreateSessionRequest createSessionRequest) {
sessionModel =
initInteractiveSession(
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStorageService.createSession(sessionModel, sessionModel.getDatasourceName());
sessionStorageService.createSession(sessionModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void open() {
datasourceName,
query,
queryId);
statementModel = statementStorageService.createStatement(statementModel, datasourceName);
statementModel = statementStorageService.createStatement(statementModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "statement already exist. " + statementId;
LOG.error(errorMsg);
Expand All @@ -73,8 +73,7 @@ public void cancel() {
}
try {
this.statementModel =
statementStorageService.updateStatementState(
statementModel, StatementState.CANCELLED, statementModel.getDatasourceName());
statementStorageService.updateStatementState(statementModel, StatementState.CANCELLED);
} catch (DocumentMissingException e) {
String errorMsg =
String.format("cancel statement failed. no statement found. statement: %s.", statementId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ public class OpenSearchSessionStorageService implements SessionStorageService {
private final SessionModelXContentSerializer serializer;

@Override
public SessionModel createSession(SessionModel sessionModel, String datasourceName) {
public SessionModel createSession(SessionModel sessionModel) {
return stateStore.create(
sessionModel, SessionModel::of, OpenSearchStateStoreUtil.getIndexName(datasourceName));
sessionModel,
SessionModel::of,
OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName()));
}

@Override
Expand All @@ -30,12 +32,11 @@ public Optional<SessionModel> getSession(String id, String datasourceName) {
}

@Override
public SessionModel updateSessionState(
SessionModel sessionModel, SessionState sessionState, String datasourceName) {
public SessionModel updateSessionState(SessionModel sessionModel, SessionState sessionState) {
return stateStore.updateState(
sessionModel,
sessionState,
SessionModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
OpenSearchStateStoreUtil.getIndexName(sessionModel.getDatasourceName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ public class OpenSearchStatementStorageService implements StatementStorageServic
private final StatementModelXContentSerializer serializer;

@Override
public StatementModel createStatement(StatementModel statementModel, String datasourceName) {
public StatementModel createStatement(StatementModel statementModel) {
return stateStore.create(
statementModel,
StatementModel::copy,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
OpenSearchStateStoreUtil.getIndexName(statementModel.getDatasourceName()));
}

@Override
Expand All @@ -33,11 +33,11 @@ public Optional<StatementModel> getStatement(String id, String datasourceName) {

@Override
public StatementModel updateStatementState(
StatementModel oldStatementModel, StatementState statementState, String datasourceName) {
StatementModel oldStatementModel, StatementState statementState) {
return stateStore.updateState(
oldStatementModel,
statementState,
StatementModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
OpenSearchStateStoreUtil.getIndexName(oldStatementModel.getDatasourceName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
/** Interface for accessing {@link SessionModel} data storage. */
public interface SessionStorageService {

SessionModel createSession(SessionModel sessionModel, String datasourceName);
SessionModel createSession(SessionModel sessionModel);

Optional<SessionModel> getSession(String id, String datasourceName);

SessionModel updateSessionState(
SessionModel sessionModel, SessionState sessionState, String datasourceName);
SessionModel updateSessionState(SessionModel sessionModel, SessionState sessionState);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
public interface StatementStorageService {

StatementModel createStatement(StatementModel statementModel, String datasourceName);
StatementModel createStatement(StatementModel statementModel);

StatementModel updateStatementState(
StatementModel oldStatementModel, StatementState statementState, String datasourceName);
StatementModel oldStatementModel, StatementState statementState);

Optional<StatementModel> getStatement(String id, String datasourceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
* flint index.
*/
public interface FlintIndexStateModelService {
FlintIndexStateModel createFlintIndexStateModel(
FlintIndexStateModel flintIndexStateModel, String datasourceName);
FlintIndexStateModel createFlintIndexStateModel(FlintIndexStateModel flintIndexStateModel);

Optional<FlintIndexStateModel> getFlintIndexStateModel(String id, String datasourceName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
* Abstraction over the IndexDMLResult storage. It stores the result of IndexDML query execution.
*/
public interface IndexDMLResultStorageService {
IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName);
IndexDMLResult createIndexDMLResult(IndexDMLResult result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public Optional<FlintIndexStateModel> getFlintIndexStateModel(String id, String

@Override
public FlintIndexStateModel createFlintIndexStateModel(
FlintIndexStateModel flintIndexStateModel, String datasourceName) {
FlintIndexStateModel flintIndexStateModel) {
return stateStore.create(
flintIndexStateModel,
FlintIndexStateModel::copy,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
OpenSearchStateStoreUtil.getIndexName(flintIndexStateModel.getDatasourceName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ public class OpenSearchIndexDMLResultStorageService implements IndexDMLResultSto
private final StateStore stateStore;

@Override
public IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName) {
DataSourceMetadata dataSourceMetadata = dataSourceService.getDataSourceMetadata(datasourceName);
public IndexDMLResult createIndexDMLResult(IndexDMLResult result) {
DataSourceMetadata dataSourceMetadata =
dataSourceService.getDataSourceMetadata(result.getDatasourceName());
return stateStore.create(result, IndexDMLResult::copy, dataSourceMetadata.getResultIndex());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void withSessionCreateAsyncQueryFailed() {
.seqNo(submitted.getSeqNo())
.primaryTerm(submitted.getPrimaryTerm())
.build();
statementStorageService.updateStatementState(mocked, StatementState.FAILED, MYS3_DATASOURCE);
statementStorageService.updateStatementState(mocked, StatementState.FAILED);

AsyncQueryExecutionResponse asyncQueryResults =
asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,7 @@ int search(QueryBuilder query) {

void setSessionState(String sessionId, SessionState sessionState) {
Optional<SessionModel> model = sessionStorageService.getSession(sessionId, MYS3_DATASOURCE);
SessionModel updated =
sessionStorageService.updateSessionState(model.get(), sessionState, MYS3_DATASOURCE);
SessionModel updated = sessionStorageService.updateSessionState(model.get(), sessionState);
assertEquals(sessionState, updated.getSessionState());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ void emrJobWriteResultDoc(Map<String, Object> resultDoc) {
/** Simulate EMR-S updates query_execution_request with state */
void emrJobUpdateStatementState(StatementState newState) {
StatementModel stmt = statementStorageService.getStatement(queryId, MYS3_DATASOURCE).get();
statementStorageService.updateStatementState(stmt, newState, MYS3_DATASOURCE);
statementStorageService.updateStatementState(stmt, newState);
}

void emrJobUpdateJobState(JobRunState jobState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public MockFlintSparkJob(
"",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
stateModel = flintIndexStateModelService.createFlintIndexStateModel(stateModel, datasource);
stateModel = flintIndexStateModelService.createFlintIndexStateModel(stateModel);
}

public void transition(FlintIndexState newState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ public void cancelFailedBecauseOfConflict() {
st.open();

StatementModel running =
statementStorageService.updateStatementState(
st.getStatementModel(), CANCELLED, TEST_DATASOURCE_NAME);
statementStorageService.updateStatementState(st.getStatementModel(), CANCELLED);

assertEquals(StatementState.CANCELLED, running.getStatementState());

Expand Down Expand Up @@ -232,8 +231,7 @@ public void submitStatementInRunningSession() {
Session session = sessionManager.createSession(createSessionRequest());

// App change state to running
sessionStorageService.updateSessionState(
session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME);
sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING);

StatementId statementId = session.submit(queryRequest());
assertFalse(statementId.getId().isEmpty());
Expand All @@ -251,8 +249,7 @@ public void submitStatementInNotStartedState() {
public void failToSubmitStatementInDeadState() {
Session session = sessionManager.createSession(createSessionRequest());

sessionStorageService.updateSessionState(
session.getSessionModel(), SessionState.DEAD, TEST_DATASOURCE_NAME);
sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.DEAD);

IllegalStateException exception =
assertThrows(IllegalStateException.class, () -> session.submit(queryRequest()));
Expand All @@ -266,8 +263,7 @@ public void failToSubmitStatementInDeadState() {
public void failToSubmitStatementInFailState() {
Session session = sessionManager.createSession(createSessionRequest());

sessionStorageService.updateSessionState(
session.getSessionModel(), SessionState.FAIL, TEST_DATASOURCE_NAME);
sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.FAIL);

IllegalStateException exception =
assertThrows(IllegalStateException.class, () -> session.submit(queryRequest()));
Expand Down Expand Up @@ -312,8 +308,7 @@ public void failToSubmitStatementInDeletedSession() {
public void getStatementSuccess() {
Session session = sessionManager.createSession(createSessionRequest());
// App change state to running
sessionStorageService.updateSessionState(
session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME);
sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING);
StatementId statementId = session.submit(queryRequest());

Optional<Statement> statement = session.get(statementId);
Expand All @@ -326,8 +321,7 @@ public void getStatementSuccess() {
public void getStatementNotExist() {
Session session = sessionManager.createSession(createSessionRequest());
// App change state to running
sessionStorageService.updateSessionState(
session.getSessionModel(), SessionState.RUNNING, TEST_DATASOURCE_NAME);
sessionStorageService.updateSessionState(session.getSessionModel(), SessionState.RUNNING);

Optional<Statement> statement = session.get(StatementId.newStatementId("not-exist-id"));
assertFalse(statement.isPresent());
Expand Down Expand Up @@ -376,8 +370,7 @@ public TestStatement cancel() {

public TestStatement run() {
StatementModel model =
statementStorageService.updateStatementState(
st.getStatementModel(), RUNNING, TEST_DATASOURCE_NAME);
statementStorageService.updateStatementState(st.getStatementModel(), RUNNING);
st.setStatementModel(model);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ void getFlintIndexStateModel() {
@Test
void createFlintIndexStateModel() {
when(mockStateStore.create(any(), any(), any())).thenReturn(responseFlintIndexStateModel);
when(flintIndexStateModel.getDatasourceName()).thenReturn(DATASOURCE);

FlintIndexStateModel result =
openSearchFlintIndexStateModelService.createFlintIndexStateModel(
flintIndexStateModel, DATASOURCE);
openSearchFlintIndexStateModelService.createFlintIndexStateModel(flintIndexStateModel);

assertEquals(responseFlintIndexStateModel, result);
}
Expand Down

0 comments on commit a64fcb1

Please sign in to comment.