Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend query size limit using scroll #716

Merged
merged 36 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e2aac98
add maxResultWindow to LogicalRelation
seankao-az Aug 1, 2022
289ff47
add maxResultWindow to OpenSearchLogicalIndexScan
seankao-az Aug 1, 2022
f612a9e
OpenSearchRequestBuilder init
seankao-az Aug 1, 2022
2201cc2
request builder: push down and build
seankao-az Aug 2, 2022
a62b25d
plan.build() for building request
seankao-az Aug 2, 2022
e8abd9a
maxResultWindow for test utils
seankao-az Aug 2, 2022
6691803
fix style
seankao-az Aug 2, 2022
668b92b
remove plan.build()
seankao-az Aug 2, 2022
fd477b0
fetch result in batches
seankao-az Aug 2, 2022
80d213c
get index.max_result_window settings
seankao-az Aug 2, 2022
135c012
use index.max_result_window to decide scroll
seankao-az Aug 2, 2022
4ec69f1
maxResultWindow for aggregation
seankao-az Aug 2, 2022
919ccb6
fix fetch size & for aggregation query
seankao-az Aug 2, 2022
7b6b07f
fix rest client get max result window
seankao-az Aug 2, 2022
b1bee34
remove maxResultWindow from logical plan
seankao-az Aug 3, 2022
5b40987
get max result window when building physical plan
seankao-az Aug 3, 2022
4a585a9
move source builder init to request builder
seankao-az Aug 3, 2022
edb2eb3
fix max result window for test & rest client
seankao-az Aug 3, 2022
aa4034d
include request builder in equal comparison
seankao-az Aug 3, 2022
e2361ff
rename getIndexMaxResultWindows
seankao-az Aug 4, 2022
f2ddb87
open search rest client test
seankao-az Aug 4, 2022
d7d233c
test: request builder, scroll index scan
seankao-az Aug 4, 2022
1bc9037
fix style
seankao-az Aug 4, 2022
8c07640
remove getMaxResultWindow from base Table
seankao-az Aug 4, 2022
1a7983f
remove unused import from OpenSearchIndexScan
seankao-az Aug 4, 2022
17cf25c
test index scan
seankao-az Aug 4, 2022
fd47234
integ test for head command
seankao-az Aug 4, 2022
669d787
keep request query size for aggregation
seankao-az Aug 8, 2022
589a77b
fix rest client test coverage
seankao-az Aug 8, 2022
aa703b4
fix highlight conflict
seankao-az Aug 8, 2022
8aed5cc
test for node client
seankao-az Aug 9, 2022
1ace2de
test node client default settings
seankao-az Aug 9, 2022
6ec8da0
change Elasticsearch to OpenSearch in comment
seankao-az Aug 10, 2022
208921e
fix comments
seankao-az Aug 10, 2022
ff02665
more test for Head IT
seankao-az Aug 10, 2022
52ba001
ignore some head IT
seankao-az Aug 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.LiteralExpression;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.ParseExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.aggregation.NamedAggregator;
import org.opensearch.sql.expression.window.WindowDefinition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.nio.file.Paths;
import java.util.Locale;

import static com.google.common.base.Strings.isNullOrEmpty;
import static org.opensearch.sql.legacy.TestUtils.createIndexByRestClient;
import static org.opensearch.sql.legacy.TestUtils.getAccountIndexMapping;
import static org.opensearch.sql.legacy.TestUtils.getBankIndexMapping;
Expand Down Expand Up @@ -71,6 +72,8 @@ public abstract class SQLIntegTestCase extends OpenSearchSQLRestTestCase {
public static final String TRANSIENT = "transient";
public static final Integer DEFAULT_QUERY_SIZE_LIMIT =
Integer.parseInt(System.getProperty("defaultQuerySizeLimit", "200"));
public static final Integer DEFAULT_MAX_RESULT_WINDOW =
Integer.parseInt(System.getProperty("defaultMaxResultWindow", "10000"));

public boolean shouldResetQuerySizeLimit() {
return true;
Expand Down Expand Up @@ -161,6 +164,15 @@ protected static void wipeAllClusterSettings() throws IOException {
updateClusterSettings(new ClusterSetting("transient", "*", null));
}

protected void setMaxResultWindow(String indexName, Integer window) throws IOException {
updateIndexSettings(indexName, "{ \"index\": { \"max_result_window\":" + window + " } }");
}

protected void resetMaxResultWindow(String indexName) throws IOException {
updateIndexSettings(indexName,
"{ \"index\": { \"max_result_window\": " + DEFAULT_MAX_RESULT_WINDOW + " } }");
}

/**
* Provide for each test to load test index, data and other setup work
*/
Expand Down Expand Up @@ -378,6 +390,18 @@ public String toString() {
}
}

protected static JSONObject updateIndexSettings(String indexName, String setting)
throws IOException {
Request request = new Request("PUT", "/" + indexName + "/_settings");
if (!isNullOrEmpty(setting)) {
request.setJsonEntity(setting);
}
RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
restOptionsBuilder.addHeader("Content-Type", "application/json");
request.setOptions(restOptionsBuilder);
return new JSONObject(executeRequest(request));
}

protected String makeRequest(String query) {
return makeRequest(query, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public void beforeTest() throws IOException {
@After
public void afterTest() throws IOException {
resetQuerySizeLimit();
resetMaxResultWindow(TEST_INDEX_ACCOUNT);
}

@Override
Expand Down Expand Up @@ -60,6 +61,30 @@ public void testHeadWithNumber() throws IOException {
rows("Nanette", 28));
}

@Test
public void testHeadWithLargeNumber() throws IOException {
setMaxResultWindow(TEST_INDEX_ACCOUNT, 10);
JSONObject result =
executeQuery(String.format(
"source=%s | fields firstname, age | head 15", TEST_INDEX_ACCOUNT));
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
verifyDataRows(result,
rows("Amber", 32),
rows("Hattie", 36),
rows("Nanette", 28),
rows("Dale", 33),
rows("Elinor", 36),
rows("Virginia", 39),
rows("Dillard", 34),
rows("Mcgee", 39),
rows("Aurelia", 37),
rows("Fulton", 23),
rows("Burton", 31),
rows("Josie", 32),
rows("Hughes", 30),
rows("Hall", 25),
rows("Deidre", 33));
}

@Test
public void testHeadWithNumberAndFrom() throws IOException {
JSONObject result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public interface OpenSearchClient {
*/
Map<String, IndexMapping> getIndexMappings(String... indexExpression);

/**
* Fetch index.max_result_window settings according to index expression given.
*
* @param indexExpression index expression
* @return map from index name to its max result window
*/
Map<String, Integer> getIndexMaxResultWindows(String... indexExpression);

/**
* Perform search query in the search request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
Expand Down Expand Up @@ -86,6 +89,29 @@ public Map<String, IndexMapping> getIndexMappings(String... indexExpression) {
}
}

/**
* Fetch index.max_result_window settings according to index expression given.
*
* @param indexExpression index expression
* @return map from index name to its max result window
*/
@Override
public Map<String, Integer> getIndexMaxResultWindows(String... indexExpression) {
ClusterState state = clusterService.state();
ImmutableOpenMap<String, IndexMetadata> indicesMetadata = state.metadata().getIndices();
String[] concreteIndices = resolveIndexExpression(state, indexExpression);

ImmutableMap.Builder<String, Integer> result = ImmutableMap.builder();
for (String index : concreteIndices) {
Settings settings = indicesMetadata.get(index).getSettings();
Integer maxResultWindow = settings.getAsInt("index.max_result_window",
IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(settings));
result.put(index, maxResultWindow);
}

return result.build();
}

/**
* TODO: Scroll doesn't work for aggregation. Support aggregation later.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
Expand All @@ -26,6 +29,7 @@
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
Expand Down Expand Up @@ -54,6 +58,36 @@ public Map<String, IndexMapping> getIndexMappings(String... indexExpression) {
}
}

@Override
public Map<String, Integer> getIndexMaxResultWindows(String... indexExpression) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need mapping or just single maxResultWindow?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approach this in a way similar to how getIndexMappings handles it.
For the client, getIndexMappings and getIndexMaxResultWindows return a Mapping from the index name to the corresponding result, without extra business logic.
In OpenSearchDescribeIndexRequest, getMaxResultWindow chooses the minimum of these values. This is specific to our need: if there's multiple indices, get the minimum max_result_window. Similarly, getFieldTypes unions the mappings of multiple indices here.

GetSettingsRequest request = new GetSettingsRequest()
.indices(indexExpression).includeDefaults(true);
try {
GetSettingsResponse response = client.indices().getSettings(request, RequestOptions.DEFAULT);
ImmutableOpenMap<String, Settings> settings = response.getIndexToSettings();
ImmutableOpenMap<String, Settings> defaultSettings = response.getIndexToDefaultSettings();
Map<String, Integer> result = new HashMap<>();

defaultSettings.forEach(entry -> {
Integer maxResultWindow = entry.value.getAsInt("index.max_result_window", null);
if (maxResultWindow != null) {
result.put(entry.key, maxResultWindow);
}
});

settings.forEach(entry -> {
Integer maxResultWindow = entry.value.getAsInt("index.max_result_window", null);
if (maxResultWindow != null) {
result.put(entry.key, maxResultWindow);
}
});

return result;
} catch (IOException e) {
throw new IllegalStateException("Failed to get max result window for " + indexExpression, e);
}
}

@Override
public OpenSearchResponse search(OpenSearchRequest request) {
return request.search(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ public OpenSearchQueryRequest(IndexName indexName, int size,
this.exprValueFactory = factory;
}

/**
* Constructor of ElasticsearchQueryRequest.
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
*/
public OpenSearchQueryRequest(IndexName indexName, SearchSourceBuilder sourceBuilder,
OpenSearchExprValueFactory factory) {
this.indexName = indexName;
this.sourceBuilder = sourceBuilder;
this.exprValueFactory = factory;
}

@Override
public OpenSearchResponse search(Function<SearchRequest, SearchResponse> searchAction,
Function<SearchScrollRequest, SearchResponse> scrollAction) {
Expand Down
Loading