Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Mar 13, 2024
1 parent 4cc0d5e commit 24369b1
Showing 1 changed file with 4 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.sql.spark.dispatcher;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand All @@ -24,7 +22,6 @@

/** Handle Refresh Query. */
public class RefreshQueryHandler extends BatchQueryHandler {
private static final Logger LOG = LogManager.getLogger();

private final FlintIndexMetadataReader flintIndexMetadataReader;
private final StateStore stateStore;
Expand All @@ -47,24 +44,16 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String datasourceName = asyncQueryJobMetadata.getDatasourceName();
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName());
try {
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient);
jobCancelOp.apply(indexMetadata);
} catch (Exception e) {
LOG.error(e);
}
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient);
jobCancelOp.apply(indexMetadata);
return asyncQueryJobMetadata.getQueryId().getId();
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
String indexName =
context.getIndexQueryDetails() == null
? null
: context.getIndexQueryDetails().openSearchIndexName();
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
return new DispatchQueryResponse(
resp.getQueryId(),
Expand All @@ -73,6 +62,6 @@ public DispatchQueryResponse submit(
resp.getSessionId(),
dataSourceMetadata.getName(),
JobType.BATCH,
indexName);
context.getIndexQueryDetails().openSearchIndexName());
}
}

0 comments on commit 24369b1

Please sign in to comment.