Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pit for multi query #2753

Merged
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
e225977
Add search after for join
rupal-bq May 29, 2024
d69feda
Enable search after by default
rupal-bq May 29, 2024
b75d782
Add pit
rupal-bq Jun 2, 2024
a336e3b
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 2, 2024
afd24d8
nit
rupal-bq Jun 3, 2024
8f840ae
Fix tests
rupal-bq Jun 4, 2024
070b40f
ignore joinWithGeoIntersectNL
rupal-bq Jun 4, 2024
58c96f5
Rerun CI with scroll
rupal-bq Jun 4, 2024
3dad30d
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 5, 2024
b5035ac
draft
rupal-bq Jun 5, 2024
1404cd5
Remove unused code and retrigger CI with search_after true
rupal-bq Jun 5, 2024
fd09367
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 5, 2024
fcb584a
Address comments
rupal-bq Jun 7, 2024
12f5abb
Merge branch 'join-search-after' of github.com:rupal-bq/opensearch-sq…
rupal-bq Jun 7, 2024
08a6a29
Remove unused code change
rupal-bq Jun 7, 2024
030f4b5
Merge branch 'main' into join-search-after
rupal-bq Jun 10, 2024
39f727a
Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE
rupal-bq Jun 10, 2024
be8d986
Fix scroll condition
rupal-bq Jun 12, 2024
7bb89d8
nit
rupal-bq Jun 12, 2024
60bd8fe
Add pit before query execution
rupal-bq Jun 13, 2024
17d1d0e
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 13, 2024
3c511b2
Merge branch 'join-search-after' into multi-query-search-after-pit
rupal-bq Jun 13, 2024
281153f
Refactor get response with pit method
rupal-bq Jun 13, 2024
5bcd134
Update remaining scroll search calls
rupal-bq Jun 13, 2024
87f611b
Fix integ test failures
rupal-bq Jun 14, 2024
3af87c4
nit
rupal-bq Jun 14, 2024
bee4863
Move pit from join request builder to executor
rupal-bq Jun 16, 2024
0857a6b
Remove unused methods
rupal-bq Jun 16, 2024
bcd38b0
Merge branch 'join-search-after' into multi-query-search-after-pit
rupal-bq Jun 16, 2024
8e1c101
Merge branch 'opensearch-project:main' into join-search-after
rupal-bq Jun 16, 2024
1d0eaa1
Move pit from request to executor
rupal-bq Jun 16, 2024
7d9da79
Merge branch 'join-search-after' into multi-query-search-after-pit
rupal-bq Jun 16, 2024
9ef90cd
Merge remote-tracking branch 'upstream/feature/pit' into multi-query-…
rupal-bq Jun 26, 2024
1629bdb
Fix pit.delete call missed while merge
rupal-bq Jun 26, 2024
b74233b
Move getResponseWithHits method to util class
rupal-bq Jun 26, 2024
6aed0f4
add try catch for create delete pit in minus executor
rupal-bq Jun 26, 2024
783c24f
move all common fields to ElasticHitsExecutor
rupal-bq Jul 8, 2024
c60a396
Merge branch 'main' into multi-query-search-after-pit
rupal-bq Jul 8, 2024
ff3cae4
Merge branch 'feature/pit' into multi-query-search-after-pit
rupal-bq Jul 8, 2024
bed6a13
Merge remote-tracking branch 'upstream/feature/pit' into multi-query-…
rupal-bq Jul 8, 2024
4fbc0ec
add javadoc for ElasticHitsExecutor
rupal-bq Jul 8, 2024
037b146
Add missing javadoc
rupal-bq Jul 9, 2024
81e2af9
Forcing an empty commit as last commit is stuck processing updates
rupal-bq Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,77 @@

package org.opensearch.sql.legacy.executor;

import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME;
import static org.opensearch.search.sort.SortOrder.ASC;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.sql.legacy.domain.Select;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;

/** Created by Eliran on 21/8/2016. */
public interface ElasticHitsExecutor {
void run() throws IOException, SqlParseException;
public abstract class ElasticHitsExecutor {
rupal-bq marked this conversation as resolved.
Show resolved Hide resolved

protected static final Logger LOG = LogManager.getLogger();
protected PointInTimeHandler pit;
protected Client client;

protected abstract void run() throws IOException, SqlParseException;

protected abstract SearchHits getHits();

public SearchResponse getResponseWithHits(
Client client,
SearchRequestBuilder request,
Select select,
int size,
SearchResponse previousResponse,
PointInTimeHandler pit) {
// Set Size
request.setSize(size);
SearchResponse responseWithHits;

if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
// Set sort field for search_after
boolean ordered = select.isOrderdSelect();
if (!ordered) {
request.addSort(DOC_FIELD_NAME, ASC);
}
// Set PIT
request.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
// from and size is alternate method to paginate result.
// If select has from clause, search after is not required.
if (previousResponse != null && select.getFrom().isEmpty()) {
request.searchAfter(previousResponse.getHits().getSortFields());
}
responseWithHits = request.get();
} else {
// Set scroll
TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE);
if (previousResponse != null) {
responseWithHits =
client
.prepareSearchScroll(previousResponse.getScrollId())
.setScroll(keepAlive)
.execute()
.actionGet();
} else {
request.setScroll(keepAlive);
responseWithHits = request.get();
}
}

SearchHits getHits();
return responseWithHits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.sql.legacy.executor.join;

import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.io.IOException;
Expand All @@ -16,30 +15,22 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.legacy.domain.Field;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.executor.ElasticHitsExecutor;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.query.SqlElasticRequestBuilder;
import org.opensearch.sql.legacy.query.join.HashJoinElasticRequestBuilder;
Expand All @@ -49,17 +40,14 @@
import org.opensearch.sql.legacy.query.planner.HashJoinQueryPlanRequestBuilder;

/** Created by Eliran on 15/9/2015. */
public abstract class ElasticJoinExecutor implements ElasticHitsExecutor {
private static final Logger LOG = LogManager.getLogger();
public abstract class ElasticJoinExecutor extends ElasticHitsExecutor {

protected List<SearchHit> results; // Keep list to avoid copy to new array in SearchHits
protected MetaSearchResult metaResults;
protected final int MAX_RESULTS_ON_ONE_FETCH = 10000;
private Set<String> aliasesOnReturn;
private boolean allFieldsReturn;
protected Client client;
protected String[] indices;
protected PointInTimeHandler pit;

protected ElasticJoinExecutor(Client client, JoinRequestBuilder requestBuilder) {
metaResults = new MetaSearchResult();
Expand Down Expand Up @@ -283,38 +271,14 @@ protected void updateMetaSearchResults(SearchResponse searchResponse) {

public SearchResponse getResponseWithHits(
TableInJoinRequestBuilder tableRequest, int size, SearchResponse previousResponse) {
// Set Size
SearchRequestBuilder request = tableRequest.getRequestBuilder().setSize(size);
SearchResponse responseWithHits;
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
// Set sort field for search_after
boolean ordered = tableRequest.getOriginalSelect().isOrderdSelect();
if (!ordered) {
request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC);
}
// Set PIT
request.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
if (previousResponse != null) {
request.searchAfter(previousResponse.getHits().getSortFields());
}
responseWithHits = request.get();
} else {
// Set scroll
TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE);
if (previousResponse != null) {
responseWithHits =
client
.prepareSearchScroll(previousResponse.getScrollId())
.setScroll(keepAlive)
.execute()
.actionGet();
} else {
request.setScroll(keepAlive);
responseWithHits = request.get();
}
}

return responseWithHits;
return getResponseWithHits(
client,
tableRequest.getRequestBuilder(),
tableRequest.getOriginalSelect(),
size,
previousResponse,
pit);
}

public String[] getIndices(JoinRequestBuilder joinRequestBuilder) {
Expand Down
Loading
Loading