Skip to content

Commit

Permalink
Add pit for multi query (#2753)
Browse files Browse the repository at this point in the history
* Add search after for join

Signed-off-by: Rupal Mahajan <[email protected]>

* Enable search after by default

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix tests

Signed-off-by: Rupal Mahajan <[email protected]>

* ignore joinWithGeoIntersectNL

Signed-off-by: Rupal Mahajan <[email protected]>

* Rerun CI with scroll

Signed-off-by: Rupal Mahajan <[email protected]>

* draft

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused code and retrigger CI with search_after true

Signed-off-by: Rupal Mahajan <[email protected]>

* Address comments

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused code change

Signed-off-by: Rupal Mahajan <[email protected]>

* Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix scroll condition

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit before query execution

Signed-off-by: Rupal Mahajan <[email protected]>

* Refactor get response with pit method

Signed-off-by: Rupal Mahajan <[email protected]>

* Update remaining scroll search calls

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix integ test failures

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Move pit from join request builder to executor

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused methods

Signed-off-by: Rupal Mahajan <[email protected]>

* Move pit from request to executor

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix pit.delete call missed while merge

Signed-off-by: Rupal Mahajan <[email protected]>

* Move getResponseWithHits method to util class

Signed-off-by: Rupal Mahajan <[email protected]>

* add try catch for create delete pit in minus executor

Signed-off-by: Rupal Mahajan <[email protected]>

* move all common fields to ElasticHitsExecutor

Signed-off-by: Rupal Mahajan <[email protected]>

* add javadoc for ElasticHitsExecutor

Signed-off-by: Rupal Mahajan <[email protected]>

* Add missing javadoc

Signed-off-by: Rupal Mahajan <[email protected]>

* Forcing an empty commit as last commit is stuck processing updates

Signed-off-by: Rupal Mahajan <[email protected]>

---------

Signed-off-by: Rupal Mahajan <[email protected]>
  • Loading branch information
rupal-bq authored Jul 10, 2024
1 parent 6f2aebb commit 8c6dc0c
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,96 @@

package org.opensearch.sql.legacy.executor;

import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME;
import static org.opensearch.search.sort.SortOrder.ASC;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.sql.legacy.domain.Select;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;

/** Executor for search requests with pagination. */
public abstract class ElasticHitsExecutor {
protected static final Logger LOG = LogManager.getLogger();
protected PointInTimeHandler pit;
protected Client client;

/**
* Executes search request
*
* @throws IOException If an input or output exception occurred
* @throws SqlParseException If parsing exception occurred
*/
protected abstract void run() throws IOException, SqlParseException;

/**
* Get search hits after execution
*
* @return Search hits
*/
protected abstract SearchHits getHits();

/**
* Get response for search request with pit/scroll
*
* @param request search request
* @param select sql select
* @param size fetch size
* @param previousResponse response for previous request
* @param pit point in time
* @return search response for subsequent request
*/
public SearchResponse getResponseWithHits(
SearchRequestBuilder request,
Select select,
int size,
SearchResponse previousResponse,
PointInTimeHandler pit) {
// Set Size
request.setSize(size);
SearchResponse responseWithHits;

/** Created by Eliran on 21/8/2016. */
public interface ElasticHitsExecutor {
void run() throws IOException, SqlParseException;
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
// Set sort field for search_after
boolean ordered = select.isOrderdSelect();
if (!ordered) {
request.addSort(DOC_FIELD_NAME, ASC);
}
// Set PIT
request.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
// from and size is alternate method to paginate result.
// If select has from clause, search after is not required.
if (previousResponse != null && select.getFrom().isEmpty()) {
request.searchAfter(previousResponse.getHits().getSortFields());
}
responseWithHits = request.get();
} else {
// Set scroll
TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE);
if (previousResponse != null) {
responseWithHits =
client
.prepareSearchScroll(previousResponse.getScrollId())
.setScroll(keepAlive)
.execute()
.actionGet();
} else {
request.setScroll(keepAlive);
responseWithHits = request.get();
}
}

SearchHits getHits();
return responseWithHits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.sql.legacy.executor.join;

import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.io.IOException;
Expand All @@ -16,30 +15,22 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.legacy.domain.Field;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.executor.ElasticHitsExecutor;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder;
import org.opensearch.sql.legacy.query.join.HashJoinElasticRequestBuilder;
Expand All @@ -49,17 +40,14 @@
import org.opensearch.sql.legacy.query.planner.HashJoinQueryPlanRequestBuilder;

/** Created by Eliran on 15/9/2015. */
public abstract class ElasticJoinExecutor implements ElasticHitsExecutor {
private static final Logger LOG = LogManager.getLogger();
public abstract class ElasticJoinExecutor extends ElasticHitsExecutor {

protected List<SearchHit> results; // Keep list to avoid copy to new array in SearchHits
protected MetaSearchResult metaResults;
protected final int MAX_RESULTS_ON_ONE_FETCH = 10000;
private Set<String> aliasesOnReturn;
private boolean allFieldsReturn;
protected Client client;
protected String[] indices;
protected PointInTimeHandler pit;

protected ElasticJoinExecutor(Client client, JoinRequestBuilder requestBuilder) {
metaResults = new MetaSearchResult();
Expand Down Expand Up @@ -283,38 +271,13 @@ protected void updateMetaSearchResults(SearchResponse searchResponse) {

public SearchResponse getResponseWithHits(
TableInJoinRequestBuilder tableRequest, int size, SearchResponse previousResponse) {
// Set Size
SearchRequestBuilder request = tableRequest.getRequestBuilder().setSize(size);
SearchResponse responseWithHits;
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
// Set sort field for search_after
boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect();
if (!ordered) {
request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC);
}
// Set PIT
request.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
if (previousResponse != null) {
request.searchAfter(previousResponse.getHits().getSortFields());
}
responseWithHits = request.get();
} else {
// Set scroll
TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE);
if (previousResponse != null) {
responseWithHits =
client
.prepareSearchScroll(previousResponse.getScrollId())
.setScroll(keepAlive)
.execute()
.actionGet();
} else {
request.setScroll(keepAlive);
responseWithHits = request.get();
}
}

return responseWithHits;
return getResponseWithHits(
tableRequest.getRequestBuilder(),
tableRequest.getOriginalSelect(),
size,
previousResponse,
pit);
}

public String[] getIndices(JoinRequestBuilder joinRequestBuilder) {
Expand Down
Loading

0 comments on commit 8c6dc0c

Please sign in to comment.