Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend query size limit using scroll #716

Merged
merged 36 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e2aac98
add maxResultWindow to LogicalRelation
seankao-az Aug 1, 2022
289ff47
add maxResultWindow to OpenSearchLogicalIndexScan
seankao-az Aug 1, 2022
f612a9e
OpenSearchRequestBuilder init
seankao-az Aug 1, 2022
2201cc2
request builder: push down and build
seankao-az Aug 2, 2022
a62b25d
plan.build() for building request
seankao-az Aug 2, 2022
e8abd9a
maxResultWindow for test utils
seankao-az Aug 2, 2022
6691803
fix style
seankao-az Aug 2, 2022
668b92b
remove plan.build()
seankao-az Aug 2, 2022
fd477b0
fetch result in batches
seankao-az Aug 2, 2022
80d213c
get index.max_result_window settings
seankao-az Aug 2, 2022
135c012
use index.max_result_window to decide scroll
seankao-az Aug 2, 2022
4ec69f1
maxResultWindow for aggregation
seankao-az Aug 2, 2022
919ccb6
fix fetch size & for aggregation query
seankao-az Aug 2, 2022
7b6b07f
fix rest client get max result window
seankao-az Aug 2, 2022
b1bee34
remove maxResultWindow from logical plan
seankao-az Aug 3, 2022
5b40987
get max result window when building physical plan
seankao-az Aug 3, 2022
4a585a9
move source builder init to request builder
seankao-az Aug 3, 2022
edb2eb3
fix max result window for test & rest client
seankao-az Aug 3, 2022
aa4034d
include request builder in equal comparison
seankao-az Aug 3, 2022
e2361ff
rename getIndexMaxResultWindows
seankao-az Aug 4, 2022
f2ddb87
open search rest client test
seankao-az Aug 4, 2022
d7d233c
test: request builder, scroll index scan
seankao-az Aug 4, 2022
1bc9037
fix style
seankao-az Aug 4, 2022
8c07640
remove getMaxResultWindow from base Table
seankao-az Aug 4, 2022
1a7983f
remove unused import from OpenSearchIndexScan
seankao-az Aug 4, 2022
17cf25c
test index scan
seankao-az Aug 4, 2022
fd47234
integ test for head command
seankao-az Aug 4, 2022
669d787
keep request query size for aggregation
seankao-az Aug 8, 2022
589a77b
fix rest client test coverage
seankao-az Aug 8, 2022
aa703b4
fix highlight conflict
seankao-az Aug 8, 2022
8aed5cc
test for node client
seankao-az Aug 9, 2022
1ace2de
test node client default settings
seankao-az Aug 9, 2022
6ec8da0
change Elasticsearch to OpenSearch in comment
seankao-az Aug 10, 2022
208921e
fix comments
seankao-az Aug 10, 2022
ff02665
more test for Head IT
seankao-az Aug 10, 2022
52ba001
ignore some head IT
seankao-az Aug 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
// can be removed when analyzing qualified name. The value (expr type) here doesn't matter.
curEnv.define(new Symbol(Namespace.INDEX_NAME, node.getTableNameOrAlias()), STRUCT);

return new LogicalRelation(node.getTableName());
return new LogicalRelation(node.getTableName(), table.getMaxResultWindow());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static LogicalPlan filter(LogicalPlan input, Expression expression) {
}

public static LogicalPlan relation(String tableName) {
return new LogicalRelation(tableName);
return new LogicalRelation(tableName, 10000);
}

public static LogicalPlan rename(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ public class LogicalRelation extends LogicalPlan {
@Getter
private final String relationName;

@Getter
private final Integer maxResultWindow;

/**
* Constructor of LogicalRelation.
*/
public LogicalRelation(String relationName) {
public LogicalRelation(String relationName, Integer maxResultWindow) {
super(ImmutableList.of());
this.relationName = relationName;
this.maxResultWindow = maxResultWindow;
}

@Override
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/opensearch/sql/storage/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ public interface Table {
*/
Map<String, ExprType> getFieldTypes();

/**
* Get the max result window setting of the table.
*/
default Integer getMaxResultWindow() {
return 10000;
}

/**
* Implement a {@link LogicalPlan} by {@link PhysicalPlan} in storage engine.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
class WindowExpressionAnalyzerTest extends AnalyzerTestBase {

private final LogicalPlan child = new LogicalRelation("test");
private final LogicalPlan child = new LogicalRelation("test", 10000);

private WindowExpressionAnalyzer analyzer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void visitShouldReturnDefaultPhysicalOperator() {
@Test
public void visitRelationShouldThrowException() {
assertThrows(UnsupportedOperationException.class,
() -> new LogicalRelation("test").accept(implementor, null));
() -> new LogicalRelation("test", 10000).accept(implementor, null));
}

@SuppressWarnings({"rawtypes", "unchecked"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public interface OpenSearchClient {
*/
Map<String, IndexMapping> getIndexMappings(String... indexExpression);

/**
* Fetch index.max_result_window settings according to index expression given.
*
* @param indexExpression index expression
* @return map from index name to its max result window
*/
Map<String, Integer> getIndexMaxResultWindow(String... indexExpression);

/**
* Perform search query in the search request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
Expand Down Expand Up @@ -86,6 +89,29 @@ public Map<String, IndexMapping> getIndexMappings(String... indexExpression) {
}
}

/**
* Fetch index.max_result_window settings according to index expression given.
*
* @param indexExpression index expression
* @return map from index name to its max result window
*/
@Override
public Map<String, Integer> getIndexMaxResultWindow(String... indexExpression) {
ClusterState state = clusterService.state();
ImmutableOpenMap<String, IndexMetadata> indicesMetadata = state.metadata().getIndices();
String[] concreteIndices = resolveIndexExpression(state, indexExpression);

ImmutableMap.Builder<String, Integer> result = ImmutableMap.builder();
for (String index : concreteIndices) {
Settings settings = indicesMetadata.get(index).getSettings();
Integer maxResultWindow = settings.getAsInt("index.max_result_window",
IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(settings));
result.put(index, maxResultWindow);
}

return result.build();
}

/**
* TODO: Scroll doesn't work for aggregation. Support aggregation later.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
Expand All @@ -26,6 +29,7 @@
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
Expand Down Expand Up @@ -54,6 +58,35 @@ public Map<String, IndexMapping> getIndexMappings(String... indexExpression) {
}
}

@Override
public Map<String, Integer> getIndexMaxResultWindow(String... indexExpression) {
GetSettingsRequest request = new GetSettingsRequest().indices(indexExpression);
try {
GetSettingsResponse response = client.indices().getSettings(request, RequestOptions.DEFAULT);
ImmutableOpenMap<String, Settings> settings = response.getIndexToSettings();
ImmutableOpenMap<String, Settings> defaultSettings = response.getIndexToDefaultSettings();
Map<String, Integer> result = new HashMap<>();

defaultSettings.forEach(entry -> {
Integer maxResultWindow = entry.value.getAsInt("index.max_result_window", null);
if (maxResultWindow != null) {
result.put(entry.key, maxResultWindow);
}
});

settings.forEach(entry -> {
Integer maxResultWindow = entry.value.getAsInt("index.max_result_window", null);
if (maxResultWindow != null) {
result.put(entry.key, maxResultWindow);
}
});

return result;
} catch (IOException e) {
throw new IllegalStateException("Failed to get max result window for " + indexExpression, e);
}
}

@Override
public OpenSearchResponse search(OpenSearchRequest request) {
return request.search(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public class OpenSearchLogicalIndexAgg extends LogicalPlan {

private final String relationName;

/**
* Max Result Window.
*/
private final Integer maxResultWindow;

/**
* Filter Condition.
*/
Expand Down Expand Up @@ -61,12 +66,13 @@ public class OpenSearchLogicalIndexAgg extends LogicalPlan {
@Builder
public OpenSearchLogicalIndexAgg(
String relationName,
Expression filter,
Integer maxResultWindow, Expression filter,
List<NamedAggregator> aggregatorList,
List<NamedExpression> groupByList,
List<Pair<Sort.SortOption, Expression>> sortList) {
super(ImmutableList.of());
this.relationName = relationName;
this.maxResultWindow = maxResultWindow;
this.filter = filter;
this.aggregatorList = aggregatorList;
this.groupByList = groupByList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public class OpenSearchLogicalIndexScan extends LogicalPlan {
*/
private final String relationName;

/**
* Max Result Window.
*/
private final Integer maxResultWindow;

/**
* Filter Condition.
*/
Expand Down Expand Up @@ -64,12 +69,13 @@ public class OpenSearchLogicalIndexScan extends LogicalPlan {
@Builder
public OpenSearchLogicalIndexScan(
String relationName,
Expression filter,
Integer maxResultWindow, Expression filter,
Set<ReferenceExpression> projectList,
List<Pair<Sort.SortOption, Expression>> sortList,
Integer limit, Integer offset) {
super(ImmutableList.of());
this.relationName = relationName;
this.maxResultWindow = maxResultWindow;
this.filter = filter;
this.projectList = projectList;
this.sortList = sortList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public LogicalPlan apply(LogicalAggregation aggregation,
return OpenSearchLogicalIndexAgg
.builder()
.relationName(indexScan.getRelationName())
.maxResultWindow(indexScan.getMaxResultWindow())
.filter(indexScan.getFilter())
.aggregatorList(aggregation.getAggregatorList())
.groupByList(aggregation.getGroupByList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public LogicalPlan apply(LogicalAggregation aggregation,
return OpenSearchLogicalIndexAgg
.builder()
.relationName(relation.getRelationName())
.maxResultWindow(relation.getMaxResultWindow())
.aggregatorList(aggregation.getAggregatorList())
.groupByList(aggregation.getGroupByList())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public LogicalPlan apply(LogicalFilter filter,
return OpenSearchLogicalIndexScan
.builder()
.relationName(relation.getRelationName())
.maxResultWindow(relation.getMaxResultWindow())
.filter(filter.getCondition())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public LogicalPlan apply(LogicalLimit plan, Captures captures) {
OpenSearchLogicalIndexScan.OpenSearchLogicalIndexScanBuilder builder =
OpenSearchLogicalIndexScan.builder();
builder.relationName(indexScan.getRelationName())
.maxResultWindow(indexScan.getMaxResultWindow())
.filter(indexScan.getFilter())
.offset(plan.getOffset())
.limit(plan.getLimit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public LogicalPlan apply(LogicalLimit plan, Captures captures) {
LogicalRelation relation = captures.get(relationCapture);
return OpenSearchLogicalIndexScan.builder()
.relationName(relation.getRelationName())
.maxResultWindow(relation.getMaxResultWindow())
.offset(plan.getOffset())
.limit(plan.getLimit())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public LogicalPlan apply(LogicalSort sort,
OpenSearchLogicalIndexAgg indexAgg = captures.get(indexAggCapture);
return OpenSearchLogicalIndexAgg.builder()
.relationName(indexAgg.getRelationName())
.maxResultWindow(indexAgg.getMaxResultWindow())
.filter(indexAgg.getFilter())
.groupByList(indexAgg.getGroupByList())
.aggregatorList(indexAgg.getAggregatorList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public LogicalPlan apply(LogicalSort sort,
return OpenSearchLogicalIndexScan
.builder()
.relationName(indexScan.getRelationName())
.maxResultWindow(indexScan.getMaxResultWindow())
.filter(indexScan.getFilter())
.sortList(mergeSortList(indexScan.getSortList(), sort.getSortList()))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public LogicalPlan apply(LogicalSort sort,
return OpenSearchLogicalIndexScan
.builder()
.relationName(relation.getRelationName())
.maxResultWindow(relation.getMaxResultWindow())
.sortList(sort.getSortList())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public LogicalPlan apply(LogicalProject project,
OpenSearchLogicalIndexScan
.builder()
.relationName(relation.getRelationName())
.maxResultWindow(relation.getMaxResultWindow())
.projectList(findReferenceExpressions(project.getProjectList()))
.build(),
project.getProjectList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ public OpenSearchQueryRequest(IndexName indexName, int size,
this.exprValueFactory = factory;
}

/**
* Constructor of ElasticsearchQueryRequest.
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
*/
public OpenSearchQueryRequest(IndexName indexName, SearchSourceBuilder sourceBuilder,
OpenSearchExprValueFactory factory) {
this.indexName = indexName;
this.sourceBuilder = sourceBuilder;
this.exprValueFactory = factory;
}

@Override
public OpenSearchResponse search(Function<SearchRequest, SearchResponse> searchAction,
Function<SearchScrollRequest, SearchResponse> scrollAction) {
Expand Down
Loading