From 6fb3fac98f5917d13eb5cb37e10acc009f1cbee4 Mon Sep 17 00:00:00 2001
From: Chen Dai <46505291+dai-chen@users.noreply.github.com>
Date: Thu, 11 Aug 2022 12:33:20 -0700
Subject: [PATCH 1/3] Bump Jackson dependency version (#745)
* Bump jackson version and add version config in root gradle script
Signed-off-by: Chen Dai
* Bump jackson version in JDBC driver
Signed-off-by: Chen Dai
Signed-off-by: Chen Dai
---
build.gradle | 2 ++
core/build.gradle | 6 +++---
integ-test/build.gradle | 6 +++---
opensearch/build.gradle | 6 +++---
plugin/build.gradle | 8 ++++----
ppl/build.gradle | 4 ++--
protocol/build.gradle | 8 ++++----
sql-jdbc/build.gradle | 2 +-
sql/build.gradle | 4 ++--
9 files changed, 24 insertions(+), 22 deletions(-)
diff --git a/build.gradle b/build.gradle
index 28ecdc9dbe..8a2f9046bf 100644
--- a/build.gradle
+++ b/build.gradle
@@ -7,6 +7,8 @@
buildscript {
ext {
opensearch_version = System.getProperty("opensearch.version", "2.2.0-SNAPSHOT")
+ spring_version = "5.3.22"
+ jackson_version = "2.13.3"
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
version_tokens = opensearch_version.tokenize('-')
diff --git a/core/build.gradle b/core/build.gradle
index 342d5673cd..1fa3e19e26 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -40,8 +40,8 @@ repositories {
dependencies {
api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
- api group: 'org.springframework', name: 'spring-context', version: '5.3.22'
- api group: 'org.springframework', name: 'spring-beans', version: '5.3.22'
+ api group: 'org.springframework', name: 'spring-context', version: "${spring_version}"
+ api group: 'org.springframework', name: 'spring-beans', version: "${spring_version}"
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.10'
api group: 'com.facebook.presto', name: 'presto-matching', version: '0.240'
api group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1'
@@ -49,7 +49,7 @@ dependencies {
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
- testImplementation group: 'org.springframework', name: 'spring-test', version: '5.3.22'
+ testImplementation group: 'org.springframework', name: 'spring-test', version: "${spring_version}"
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}
diff --git a/integ-test/build.gradle b/integ-test/build.gradle
index 864b4df097..429c360a1b 100644
--- a/integ-test/build.gradle
+++ b/integ-test/build.gradle
@@ -53,9 +53,9 @@ configurations.all {
// enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379
resolutionStrategy.force 'commons-codec:commons-codec:1.13'
resolutionStrategy.force 'com.google.guava:guava:31.0.1-jre'
- resolutionStrategy.force 'com.fasterxml.jackson.core:jackson-core:2.13.3'
- resolutionStrategy.force 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.3'
- resolutionStrategy.force 'com.fasterxml.jackson.core:jackson-databind:2.13.3'
+ resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${jackson_version}"
+ resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${jackson_version}"
+ resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${jackson_version}"
}
dependencies {
diff --git a/opensearch/build.gradle b/opensearch/build.gradle
index 2b26943d5b..8b5f917dff 100644
--- a/opensearch/build.gradle
+++ b/opensearch/build.gradle
@@ -32,9 +32,9 @@ dependencies {
api project(':core')
api group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation "io.github.resilience4j:resilience4j-retry:1.5.0"
- implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.13.3'
- implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.13.3'
- implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: '2.13.3'
+ implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: "${jackson_version}"
+ implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jackson_version}"
+ implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: "${jackson_version}"
implementation group: 'org.json', name: 'json', version:'20180813'
compileOnly group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client', version: "${opensearch_version}"
implementation group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
diff --git a/plugin/build.gradle b/plugin/build.gradle
index 5c4d635ef0..5c3b3974ef 100644
--- a/plugin/build.gradle
+++ b/plugin/build.gradle
@@ -82,12 +82,12 @@ configurations.all {
// conflict with spring-jcl
exclude group: "commons-logging", module: "commons-logging"
// enforce 2.12.6, https://github.com/opensearch-project/sql/issues/424
- resolutionStrategy.force 'com.fasterxml.jackson.core:jackson-core:2.13.3'
+ resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${jackson_version}"
// enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379
resolutionStrategy.force 'commons-codec:commons-codec:1.13'
resolutionStrategy.force 'com.google.guava:guava:31.0.1-jre'
- resolutionStrategy.force 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.3'
- resolutionStrategy.force 'com.fasterxml.jackson.core:jackson-databind:2.13.3'
+ resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${jackson_version}"
+ resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${jackson_version}"
}
compileJava {
options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor'])
@@ -98,7 +98,7 @@ compileTestJava {
}
dependencies {
- api group: 'org.springframework', name: 'spring-beans', version: '5.3.22'
+ api group: 'org.springframework', name: 'spring-beans', version: "${spring_version}"
api project(":ppl")
api project(':legacy')
api project(':opensearch')
diff --git a/ppl/build.gradle b/ppl/build.gradle
index 1fb4c77642..12b0787efc 100644
--- a/ppl/build.gradle
+++ b/ppl/build.gradle
@@ -48,8 +48,8 @@ dependencies {
implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
api group: 'org.opensearch', name: 'opensearch-x-content', version: "${opensearch_version}"
api group: 'org.json', name: 'json', version: '20180813'
- implementation group: 'org.springframework', name: 'spring-context', version: '5.3.22'
- implementation group: 'org.springframework', name: 'spring-beans', version: '5.3.22'
+ implementation group: 'org.springframework', name: 'spring-context', version: "${spring_version}"
+ implementation group: 'org.springframework', name: 'spring-beans', version: "${spring_version}"
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.17.1'
api project(':common')
api project(':core')
diff --git a/protocol/build.gradle b/protocol/build.gradle
index 7cca4aa0a9..fc35b94d34 100644
--- a/protocol/build.gradle
+++ b/protocol/build.gradle
@@ -30,9 +30,9 @@ plugins {
dependencies {
implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
- implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.13.2'
- implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.13.2.2'
- implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: '2.13.2'
+ implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: "${jackson_version}"
+ implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jackson_version}"
+ implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: "${jackson_version}"
implementation 'com.google.code.gson:gson:2.8.9'
implementation project(':core')
implementation project(':opensearch')
@@ -44,7 +44,7 @@ dependencies {
}
configurations.all {
- resolutionStrategy.force 'com.fasterxml.jackson.core:jackson-databind:2.13.2.2'
+ resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${jackson_version}"
}
test {
diff --git a/sql-jdbc/build.gradle b/sql-jdbc/build.gradle
index 3dea2b49a7..dd629e438f 100644
--- a/sql-jdbc/build.gradle
+++ b/sql-jdbc/build.gradle
@@ -46,7 +46,7 @@ repositories {
dependencies {
implementation group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.13'
- implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.13.2.2'
+ implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "2.13.3"
implementation group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.11.452'
testImplementation('org.junit.jupiter:junit-jupiter-api:5.3.1')
diff --git a/sql/build.gradle b/sql/build.gradle
index e6d1a65a53..222ad92ac6 100644
--- a/sql/build.gradle
+++ b/sql/build.gradle
@@ -47,8 +47,8 @@ dependencies {
implementation "org.antlr:antlr4-runtime:4.7.1"
implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
implementation group: 'org.json', name: 'json', version:'20180813'
- implementation group: 'org.springframework', name: 'spring-context', version: '5.3.22'
- implementation group: 'org.springframework', name: 'spring-beans', version: '5.3.22'
+ implementation group: 'org.springframework', name: 'spring-context', version: "${spring_version}"
+ implementation group: 'org.springframework', name: 'spring-beans', version: "${spring_version}"
implementation project(':common')
implementation project(':core')
api project(':protocol')
From f3c9c29538abdf93780e156edba3558fd43479da Mon Sep 17 00:00:00 2001
From: Sean Kao
Date: Fri, 12 Aug 2022 09:54:31 -0700
Subject: [PATCH 2/3] Extend query size limit using scroll (#716)
* add maxResultWindow to LogicalRelation
Signed-off-by: Sean Kao
* add maxResultWindow to OpenSearchLogicalIndexScan
Signed-off-by: Sean Kao
* OpenSearchRequestBuilder init
Signed-off-by: Sean Kao
* request builder: push down and build
Signed-off-by: Sean Kao
* plan.build() for building request
Signed-off-by: Sean Kao
* maxResultWindow for test utils
Signed-off-by: Sean Kao
* fix style
Signed-off-by: Sean Kao
* remove plan.build()
Signed-off-by: Sean Kao
* fetch result in batches
Signed-off-by: Sean Kao
* get index.max_result_window settings
Signed-off-by: Sean Kao
* use index.max_result_window to decide scroll
Signed-off-by: Sean Kao
* maxResultWindow for aggregation
Signed-off-by: Sean Kao
* fix fetch size & for aggregation query
Signed-off-by: Sean Kao
* fix rest client get max result window
Signed-off-by: Sean Kao
* remove maxResultWindow from logical plan
Signed-off-by: Sean Kao
* get max result window when building physical plan
Signed-off-by: Sean Kao
* move source builder init to request builder
Signed-off-by: Sean Kao
* fix max result window for test & rest client
Signed-off-by: Sean Kao
* include request builder in equal comparison
Signed-off-by: Sean Kao
* rename getIndexMaxResultWindows
Signed-off-by: Sean Kao
* open search rest client test
Signed-off-by: Sean Kao
* test: request builder, scroll index scan
Signed-off-by: Sean Kao
* fix style
Signed-off-by: Sean Kao
* remove getMaxResultWindow from base Table
Signed-off-by: Sean Kao
* remove unused import from OpenSearchIndexScan
Signed-off-by: Sean Kao
* test index scan
Signed-off-by: Sean Kao
* integ test for head command
Signed-off-by: Sean Kao
* keep request query size for aggregation
Signed-off-by: Sean Kao
* fix rest client test coverage
Signed-off-by: Sean Kao
* test for node client
Signed-off-by: Sean Kao
* test node client default settings
Signed-off-by: Sean Kao
* change Elasticsearch to OpenSearch in comment
Signed-off-by: Sean Kao
* fix comments
Signed-off-by: Sean Kao
* more test for Head IT
Signed-off-by: Sean Kao
* ignore some head IT
Signed-off-by: Sean Kao
Signed-off-by: Sean Kao
---
.../sql/planner/logical/LogicalPlanDSL.java | 1 -
.../sql/legacy/SQLIntegTestCase.java | 24 +++
.../org/opensearch/sql/ppl/HeadCommandIT.java | 72 +++++++
.../opensearch/client/OpenSearchClient.java | 8 +
.../client/OpenSearchNodeClient.java | 26 +++
.../client/OpenSearchRestClient.java | 34 +++
.../request/OpenSearchQueryRequest.java | 16 +-
.../request/OpenSearchRequestBuilder.java | 202 ++++++++++++++++++
.../request/OpenSearchScrollRequest.java | 14 +-
.../OpenSearchDescribeIndexRequest.java | 10 +
.../opensearch/storage/OpenSearchIndex.java | 34 ++-
.../storage/OpenSearchIndexScan.java | 171 ++++-----------
.../client/OpenSearchNodeClientTest.java | 59 +++++
.../client/OpenSearchRestClientTest.java | 64 ++++++
.../OpenSearchExecutionEngineTest.java | 3 +-
.../OpenSearchExecutionProtectorTest.java | 5 +-
.../request/OpenSearchRequestBuilderTest.java | 76 +++++++
.../OpenSearchDefaultImplementorTest.java | 5 +-
.../storage/OpenSearchIndexScanTest.java | 93 +++++++-
.../storage/OpenSearchIndexTest.java | 32 ++-
.../sql/opensearch/utils/Utils.java | 4 +-
.../test/resources/mappings/accounts2.json | 93 ++++++++
22 files changed, 880 insertions(+), 166 deletions(-)
create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java
create mode 100644 opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java
create mode 100644 opensearch/src/test/resources/mappings/accounts2.json
diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java
index 4185d55c55..cdd3d3a103 100644
--- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java
+++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanDSL.java
@@ -18,7 +18,6 @@
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.LiteralExpression;
import org.opensearch.sql.expression.NamedExpression;
-import org.opensearch.sql.expression.ParseExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.aggregation.NamedAggregator;
import org.opensearch.sql.expression.window.WindowDefinition;
diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java
index 15f0261f0d..5c339cc7bb 100644
--- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java
+++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java
@@ -31,6 +31,7 @@
import java.nio.file.Paths;
import java.util.Locale;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static org.opensearch.sql.legacy.TestUtils.createIndexByRestClient;
import static org.opensearch.sql.legacy.TestUtils.getAccountIndexMapping;
import static org.opensearch.sql.legacy.TestUtils.getBankIndexMapping;
@@ -71,6 +72,8 @@ public abstract class SQLIntegTestCase extends OpenSearchSQLRestTestCase {
public static final String TRANSIENT = "transient";
public static final Integer DEFAULT_QUERY_SIZE_LIMIT =
Integer.parseInt(System.getProperty("defaultQuerySizeLimit", "200"));
+ public static final Integer DEFAULT_MAX_RESULT_WINDOW =
+ Integer.parseInt(System.getProperty("defaultMaxResultWindow", "10000"));
public boolean shouldResetQuerySizeLimit() {
return true;
@@ -161,6 +164,15 @@ protected static void wipeAllClusterSettings() throws IOException {
updateClusterSettings(new ClusterSetting("transient", "*", null));
}
+ protected void setMaxResultWindow(String indexName, Integer window) throws IOException {
+ updateIndexSettings(indexName, "{ \"index\": { \"max_result_window\":" + window + " } }");
+ }
+
+ protected void resetMaxResultWindow(String indexName) throws IOException {
+ updateIndexSettings(indexName,
+ "{ \"index\": { \"max_result_window\": " + DEFAULT_MAX_RESULT_WINDOW + " } }");
+ }
+
/**
* Provide for each test to load test index, data and other setup work
*/
@@ -378,6 +390,18 @@ public String toString() {
}
}
+ protected static JSONObject updateIndexSettings(String indexName, String setting)
+ throws IOException {
+ Request request = new Request("PUT", "/" + indexName + "/_settings");
+ if (!isNullOrEmpty(setting)) {
+ request.setJsonEntity(setting);
+ }
+ RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
+ restOptionsBuilder.addHeader("Content-Type", "application/json");
+ request.setOptions(restOptionsBuilder);
+ return new JSONObject(executeRequest(request));
+ }
+
protected String makeRequest(String query) {
return makeRequest(query, 0);
}
diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/HeadCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/HeadCommandIT.java
index 1ae45ab469..48c489ce10 100644
--- a/integ-test/src/test/java/org/opensearch/sql/ppl/HeadCommandIT.java
+++ b/integ-test/src/test/java/org/opensearch/sql/ppl/HeadCommandIT.java
@@ -14,6 +14,7 @@
import org.json.JSONObject;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.jupiter.api.Test;
public class HeadCommandIT extends PPLIntegTestCase {
@@ -26,6 +27,7 @@ public void beforeTest() throws IOException {
@After
public void afterTest() throws IOException {
resetQuerySizeLimit();
+ resetMaxResultWindow(TEST_INDEX_ACCOUNT);
}
@Override
@@ -60,6 +62,76 @@ public void testHeadWithNumber() throws IOException {
rows("Nanette", 28));
}
+ @Ignore("Fix https://github.com/opensearch-project/sql/issues/703#issuecomment-1211422130")
+ @Test
+ public void testHeadWithNumberLargerThanQuerySizeLimit() throws IOException {
+ setQuerySizeLimit(5);
+ JSONObject result =
+ executeQuery(String.format(
+ "source=%s | fields firstname, age | head 10", TEST_INDEX_ACCOUNT));
+ verifyDataRows(result,
+ rows("Amber", 32),
+ rows("Hattie", 36),
+ rows("Nanette", 28),
+ rows("Dale", 33),
+ rows("Elinor", 36),
+ rows("Virginia", 39),
+ rows("Dillard", 34),
+ rows("Mcgee", 39),
+ rows("Aurelia", 37),
+ rows("Fulton", 23));
+ }
+
+ @Test
+ public void testHeadWithNumberLargerThanMaxResultWindow() throws IOException {
+ setMaxResultWindow(TEST_INDEX_ACCOUNT, 10);
+ JSONObject result =
+ executeQuery(String.format(
+ "source=%s | fields firstname, age | head 15", TEST_INDEX_ACCOUNT));
+ verifyDataRows(result,
+ rows("Amber", 32),
+ rows("Hattie", 36),
+ rows("Nanette", 28),
+ rows("Dale", 33),
+ rows("Elinor", 36),
+ rows("Virginia", 39),
+ rows("Dillard", 34),
+ rows("Mcgee", 39),
+ rows("Aurelia", 37),
+ rows("Fulton", 23),
+ rows("Burton", 31),
+ rows("Josie", 32),
+ rows("Hughes", 30),
+ rows("Hall", 25),
+ rows("Deidre", 33));
+ }
+
+ @Ignore("Fix https://github.com/opensearch-project/sql/issues/703#issuecomment-1211422130")
+ @Test
+ public void testHeadWithLargeNumber() throws IOException {
+ setQuerySizeLimit(5);
+ setMaxResultWindow(TEST_INDEX_ACCOUNT, 10);
+ JSONObject result =
+ executeQuery(String.format(
+ "source=%s | fields firstname, age | head 15", TEST_INDEX_ACCOUNT));
+ verifyDataRows(result,
+ rows("Amber", 32),
+ rows("Hattie", 36),
+ rows("Nanette", 28),
+ rows("Dale", 33),
+ rows("Elinor", 36),
+ rows("Virginia", 39),
+ rows("Dillard", 34),
+ rows("Mcgee", 39),
+ rows("Aurelia", 37),
+ rows("Fulton", 23),
+ rows("Burton", 31),
+ rows("Josie", 32),
+ rows("Hughes", 30),
+ rows("Hall", 25),
+ rows("Deidre", 33));
+ }
+
@Test
public void testHeadWithNumberAndFrom() throws IOException {
JSONObject result =
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java
index c1b7d782d2..09a83f65a5 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java
@@ -30,6 +30,14 @@ public interface OpenSearchClient {
*/
Map getIndexMappings(String... indexExpression);
+ /**
+ * Fetch index.max_result_window settings according to index expression given.
+ *
+ * @param indexExpression index expression
+ * @return map from index name to its max result window
+ */
+ Map getIndexMaxResultWindows(String... indexExpression);
+
/**
* Perform search query in the search request.
*
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java
index fe26280812..db35f3580c 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java
@@ -24,11 +24,14 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.AliasMetadata;
+import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.ImmutableOpenMap;
+import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
+import org.opensearch.index.IndexSettings;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
@@ -86,6 +89,29 @@ public Map getIndexMappings(String... indexExpression) {
}
}
+ /**
+ * Fetch index.max_result_window settings according to index expression given.
+ *
+ * @param indexExpression index expression
+ * @return map from index name to its max result window
+ */
+ @Override
+ public Map getIndexMaxResultWindows(String... indexExpression) {
+ ClusterState state = clusterService.state();
+ ImmutableOpenMap indicesMetadata = state.metadata().getIndices();
+ String[] concreteIndices = resolveIndexExpression(state, indexExpression);
+
+ ImmutableMap.Builder result = ImmutableMap.builder();
+ for (String index : concreteIndices) {
+ Settings settings = indicesMetadata.get(index).getSettings();
+ Integer maxResultWindow = settings.getAsInt("index.max_result_window",
+ IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(settings));
+ result.put(index, maxResultWindow);
+ }
+
+ return result.build();
+ }
+
/**
* TODO: Scroll doesn't work for aggregation. Support aggregation later.
*/
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java
index 9da8c442e0..f354215e05 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java
@@ -11,12 +11,15 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
+import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
+import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
@@ -26,6 +29,7 @@
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.metadata.AliasMetadata;
+import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
@@ -54,6 +58,36 @@ public Map getIndexMappings(String... indexExpression) {
}
}
+ @Override
+ public Map getIndexMaxResultWindows(String... indexExpression) {
+ GetSettingsRequest request = new GetSettingsRequest()
+ .indices(indexExpression).includeDefaults(true);
+ try {
+ GetSettingsResponse response = client.indices().getSettings(request, RequestOptions.DEFAULT);
+ ImmutableOpenMap settings = response.getIndexToSettings();
+ ImmutableOpenMap defaultSettings = response.getIndexToDefaultSettings();
+ Map result = new HashMap<>();
+
+ defaultSettings.forEach(entry -> {
+ Integer maxResultWindow = entry.value.getAsInt("index.max_result_window", null);
+ if (maxResultWindow != null) {
+ result.put(entry.key, maxResultWindow);
+ }
+ });
+
+ settings.forEach(entry -> {
+ Integer maxResultWindow = entry.value.getAsInt("index.max_result_window", null);
+ if (maxResultWindow != null) {
+ result.put(entry.key, maxResultWindow);
+ }
+ });
+
+ return result;
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to get max result window for " + indexExpression, e);
+ }
+ }
+
@Override
public OpenSearchResponse search(OpenSearchRequest request) {
return request.search(
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java
index 5ca3670ca1..6f6fea841b 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchQueryRequest.java
@@ -49,7 +49,7 @@ public class OpenSearchQueryRequest implements OpenSearchRequest {
/**
- * ElasticsearchExprValueFactory.
+ * OpenSearchExprValueFactory.
*/
@EqualsAndHashCode.Exclude
@ToString.Exclude
@@ -61,7 +61,7 @@ public class OpenSearchQueryRequest implements OpenSearchRequest {
private boolean searchDone = false;
/**
- * Constructor of ElasticsearchQueryRequest.
+ * Constructor of OpenSearchQueryRequest.
*/
public OpenSearchQueryRequest(String indexName, int size,
OpenSearchExprValueFactory factory) {
@@ -69,7 +69,7 @@ public OpenSearchQueryRequest(String indexName, int size,
}
/**
- * Constructor of ElasticsearchQueryRequest.
+ * Constructor of OpenSearchQueryRequest.
*/
public OpenSearchQueryRequest(IndexName indexName, int size,
OpenSearchExprValueFactory factory) {
@@ -81,6 +81,16 @@ public OpenSearchQueryRequest(IndexName indexName, int size,
this.exprValueFactory = factory;
}
+ /**
+ * Constructor of OpenSearchQueryRequest.
+ */
+ public OpenSearchQueryRequest(IndexName indexName, SearchSourceBuilder sourceBuilder,
+ OpenSearchExprValueFactory factory) {
+ this.indexName = indexName;
+ this.sourceBuilder = sourceBuilder;
+ this.exprValueFactory = factory;
+ }
+
@Override
public OpenSearchResponse search(Function searchAction,
Function scrollAction) {
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java
new file mode 100644
index 0000000000..646395d790
--- /dev/null
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+
+package org.opensearch.sql.opensearch.request;
+
+import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME;
+import static org.opensearch.search.sort.SortOrder.ASC;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.commons.lang3.tuple.Pair;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.index.query.BoolQueryBuilder;
+import org.opensearch.index.query.QueryBuilder;
+import org.opensearch.index.query.QueryBuilders;
+import org.opensearch.search.aggregations.AggregationBuilder;
+import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
+import org.opensearch.search.sort.SortBuilder;
+import org.opensearch.sql.common.setting.Settings;
+import org.opensearch.sql.common.utils.StringUtils;
+import org.opensearch.sql.data.type.ExprType;
+import org.opensearch.sql.expression.ReferenceExpression;
+import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
+import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser;
+
+/**
+ * OpenSearch search request builder.
+ */
+@EqualsAndHashCode
+@Getter
+@ToString
+public class OpenSearchRequestBuilder {
+
+ /**
+ * Default query timeout in minutes.
+ */
+ public static final TimeValue DEFAULT_QUERY_TIMEOUT = TimeValue.timeValueMinutes(1L);
+
+ /**
+ * {@link OpenSearchRequest.IndexName}.
+ */
+ private final OpenSearchRequest.IndexName indexName;
+
+ /**
+ * Index max result window.
+ */
+ private final Integer maxResultWindow;
+
+ /**
+ * Search request source builder.
+ */
+ private final SearchSourceBuilder sourceBuilder;
+
+ /**
+ * OpenSearchExprValueFactory.
+ */
+ @EqualsAndHashCode.Exclude
+ @ToString.Exclude
+ private final OpenSearchExprValueFactory exprValueFactory;
+
+ /**
+ * Query size of the request.
+ */
+ private Integer querySize;
+
+ public OpenSearchRequestBuilder(String indexName,
+ Integer maxResultWindow,
+ Settings settings,
+ OpenSearchExprValueFactory exprValueFactory) {
+ this(new OpenSearchRequest.IndexName(indexName), maxResultWindow, settings, exprValueFactory);
+ }
+
+ /**
+ * Constructor.
+ */
+ public OpenSearchRequestBuilder(OpenSearchRequest.IndexName indexName,
+ Integer maxResultWindow,
+ Settings settings,
+ OpenSearchExprValueFactory exprValueFactory) {
+ this.indexName = indexName;
+ this.maxResultWindow = maxResultWindow;
+ this.sourceBuilder = new SearchSourceBuilder();
+ this.exprValueFactory = exprValueFactory;
+ this.querySize = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT);
+ sourceBuilder.from(0);
+ sourceBuilder.size(querySize);
+ sourceBuilder.timeout(DEFAULT_QUERY_TIMEOUT);
+ }
+
+ /**
+ * Build DSL request.
+ *
+ * @return query request or scroll request
+ */
+ public OpenSearchRequest build() {
+ Integer from = sourceBuilder.from();
+ Integer size = sourceBuilder.size();
+
+ if (from + size <= maxResultWindow) {
+ return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory);
+ } else {
+ sourceBuilder.size(maxResultWindow - from);
+ return new OpenSearchScrollRequest(indexName, sourceBuilder, exprValueFactory);
+ }
+ }
+
+ /**
+ * Push down query to DSL request.
+ *
+ * @param query query request
+ */
+ public void pushDown(QueryBuilder query) {
+ QueryBuilder current = sourceBuilder.query();
+
+ if (current == null) {
+ sourceBuilder.query(query);
+ } else {
+ if (isBoolFilterQuery(current)) {
+ ((BoolQueryBuilder) current).filter(query);
+ } else {
+ sourceBuilder.query(QueryBuilders.boolQuery()
+ .filter(current)
+ .filter(query));
+ }
+ }
+
+ if (sourceBuilder.sorts() == null) {
+ sourceBuilder.sort(DOC_FIELD_NAME, ASC); // Make sure consistent order
+ }
+ }
+
+ /**
+ * Push down aggregation to DSL request.
+ *
+ * @param aggregationBuilder pair of aggregation query and aggregation parser.
+ */
+ public void pushDownAggregation(
+ Pair, OpenSearchAggregationResponseParser> aggregationBuilder) {
+ aggregationBuilder.getLeft().forEach(builder -> sourceBuilder.aggregation(builder));
+ sourceBuilder.size(0);
+ exprValueFactory.setParser(aggregationBuilder.getRight());
+ }
+
+ /**
+ * Push down sort to DSL request.
+ *
+ * @param sortBuilders sortBuilders.
+ */
+ public void pushDownSort(List> sortBuilders) {
+ for (SortBuilder> sortBuilder : sortBuilders) {
+ sourceBuilder.sort(sortBuilder);
+ }
+ }
+
+ /**
+ * Push down size (limit) and from (offset) to DSL request.
+ */
+ public void pushDownLimit(Integer limit, Integer offset) {
+ querySize = limit;
+ sourceBuilder.from(offset).size(limit);
+ }
+
+ /**
+ * Add highlight to DSL requests.
+ * @param field name of the field to highlight
+ */
+ public void pushDownHighlight(String field) {
+ if (sourceBuilder.highlighter() != null) {
+ sourceBuilder.highlighter().field(StringUtils.unquoteText(field));
+ } else {
+ HighlightBuilder highlightBuilder =
+ new HighlightBuilder().field(StringUtils.unquoteText(field));
+ sourceBuilder.highlighter(highlightBuilder);
+ }
+ }
+
+ /**
+ * Push down project list to DSL requets.
+ */
+ public void pushDownProjects(Set projects) {
+ final Set projectsSet =
+ projects.stream().map(ReferenceExpression::getAttr).collect(Collectors.toSet());
+ sourceBuilder.fetchSource(projectsSet.toArray(new String[0]), new String[0]);
+ }
+
+ public void pushTypeMapping(Map typeMapping) {
+ exprValueFactory.setTypeMapping(typeMapping);
+ }
+
+ private boolean isBoolFilterQuery(QueryBuilder current) {
+ return (current instanceof BoolQueryBuilder);
+ }
+}
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java
index ebbebcd8eb..4509e443c0 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java
@@ -54,10 +54,12 @@ public class OpenSearchScrollRequest implements OpenSearchRequest {
private String scrollId;
/** Search request source builder. */
- private final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+ private final SearchSourceBuilder sourceBuilder;
+ /** Constructor. */
public OpenSearchScrollRequest(IndexName indexName, OpenSearchExprValueFactory exprValueFactory) {
this.indexName = indexName;
+ this.sourceBuilder = new SearchSourceBuilder();
this.exprValueFactory = exprValueFactory;
}
@@ -65,6 +67,16 @@ public OpenSearchScrollRequest(String indexName, OpenSearchExprValueFactory expr
this(new IndexName(indexName), exprValueFactory);
}
+ /** Constructor. */
+ public OpenSearchScrollRequest(IndexName indexName,
+ SearchSourceBuilder sourceBuilder,
+ OpenSearchExprValueFactory exprValueFactory) {
+ this.indexName = indexName;
+ this.sourceBuilder = sourceBuilder;
+ this.exprValueFactory = exprValueFactory;
+ }
+
+ /** Constructor. */
@Override
public OpenSearchResponse search(Function searchAction,
Function scrollAction) {
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java
index 5c6d3687c6..f321497099 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/system/OpenSearchDescribeIndexRequest.java
@@ -121,6 +121,16 @@ public Map getFieldTypes() {
return fieldTypes;
}
+ /**
+ * Get the minimum of the max result windows of the indices.
+ *
+ * @return max result window
+ */
+ public Integer getMaxResultWindow() {
+ return client.getIndexMaxResultWindows(indexName.getIndexNames())
+ .values().stream().min(Integer::compare).get();
+ }
+
private ExprType transformESTypeToExprType(String openSearchType) {
return OPENSEARCH_TYPE_TO_EXPR_TYPE_MAPPING.getOrDefault(openSearchType, ExprCoreType.UNKNOWN);
}
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java
index c028f283a2..ef6159020f 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java
@@ -58,6 +58,11 @@ public class OpenSearchIndex implements Table {
*/
private Map cachedFieldTypes = null;
+ /**
+ * The cached max result window setting of index.
+ */
+ private Integer cachedMaxResultWindow = null;
+
/**
* Constructor.
*/
@@ -80,13 +85,24 @@ public Map getFieldTypes() {
return cachedFieldTypes;
}
+ /**
+ * Get the max result window setting of the table.
+ */
+ public Integer getMaxResultWindow() {
+ if (cachedMaxResultWindow == null) {
+ cachedMaxResultWindow =
+ new OpenSearchDescribeIndexRequest(client, indexName).getMaxResultWindow();
+ }
+ return cachedMaxResultWindow;
+ }
+
/**
* TODO: Push down operations to index scan operator as much as possible in future.
*/
@Override
public PhysicalPlan implement(LogicalPlan plan) {
OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, settings, indexName,
- new OpenSearchExprValueFactory(getFieldTypes()));
+ getMaxResultWindow(), new OpenSearchExprValueFactory(getFieldTypes()));
/*
* Visit logical plan with index scan as context so logical operators visited, such as
@@ -128,7 +144,7 @@ public PhysicalPlan visitIndexScan(OpenSearchLogicalIndexScan node,
OpenSearchIndexScan context) {
if (null != node.getSortList()) {
final SortQueryBuilder builder = new SortQueryBuilder();
- context.pushDownSort(node.getSortList().stream()
+ context.getRequestBuilder().pushDownSort(node.getSortList().stream()
.map(sort -> builder.build(sort.getValue(), sort.getKey()))
.collect(Collectors.toList()));
}
@@ -136,15 +152,15 @@ public PhysicalPlan visitIndexScan(OpenSearchLogicalIndexScan node,
if (null != node.getFilter()) {
FilterQueryBuilder queryBuilder = new FilterQueryBuilder(new DefaultExpressionSerializer());
QueryBuilder query = queryBuilder.build(node.getFilter());
- context.pushDown(query);
+ context.getRequestBuilder().pushDown(query);
}
if (node.getLimit() != null) {
- context.pushDownLimit(node.getLimit(), node.getOffset());
+ context.getRequestBuilder().pushDownLimit(node.getLimit(), node.getOffset());
}
if (node.hasProjects()) {
- context.pushDownProjects(node.getProjectList());
+ context.getRequestBuilder().pushDownProjects(node.getProjectList());
}
return indexScan;
}
@@ -158,15 +174,15 @@ public PhysicalPlan visitIndexAggregation(OpenSearchLogicalIndexAgg node,
FilterQueryBuilder queryBuilder = new FilterQueryBuilder(
new DefaultExpressionSerializer());
QueryBuilder query = queryBuilder.build(node.getFilter());
- context.pushDown(query);
+ context.getRequestBuilder().pushDown(query);
}
AggregationQueryBuilder builder =
new AggregationQueryBuilder(new DefaultExpressionSerializer());
Pair, OpenSearchAggregationResponseParser> aggregationBuilder =
builder.buildAggregationBuilder(node.getAggregatorList(),
node.getGroupByList(), node.getSortList());
- context.pushDownAggregation(aggregationBuilder);
- context.pushTypeMapping(
+ context.getRequestBuilder().pushDownAggregation(aggregationBuilder);
+ context.getRequestBuilder().pushTypeMapping(
builder.buildTypeMapping(node.getAggregatorList(),
node.getGroupByList()));
return indexScan;
@@ -191,7 +207,7 @@ public PhysicalPlan visitAD(LogicalAD node, OpenSearchIndexScan context) {
@Override
public PhysicalPlan visitHighlight(LogicalHighlight node, OpenSearchIndexScan context) {
- context.pushDownHighlight(node.getHighlightField().toString());
+ context.getRequestBuilder().pushDownHighlight(node.getHighlightField().toString());
return visitChild(node, context);
}
}
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java
index 6e88f3de89..e9746e1fae 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java
@@ -6,38 +6,18 @@
package org.opensearch.sql.opensearch.storage;
-import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME;
-import static org.opensearch.search.sort.SortOrder.ASC;
-
-import com.google.common.collect.Iterables;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
-import org.apache.commons.lang3.tuple.Pair;
-import org.opensearch.index.query.BoolQueryBuilder;
-import org.opensearch.index.query.QueryBuilder;
-import org.opensearch.index.query.QueryBuilders;
-import org.opensearch.search.aggregations.AggregationBuilder;
-import org.opensearch.search.builder.SearchSourceBuilder;
-import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
-import org.opensearch.search.sort.SortBuilder;
import org.opensearch.sql.common.setting.Settings;
-import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.data.model.ExprValue;
-import org.opensearch.sql.data.type.ExprType;
-import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
-import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
+import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
-import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser;
import org.opensearch.sql.storage.TableScanOperator;
/**
@@ -50,11 +30,24 @@ public class OpenSearchIndexScan extends TableScanOperator {
/** OpenSearch client. */
private final OpenSearchClient client;
- /** Search request. */
+ /** Search request builder. */
@EqualsAndHashCode.Include
@Getter
@ToString.Include
- private final OpenSearchRequest request;
+ private final OpenSearchRequestBuilder requestBuilder;
+
+ /** Search request. */
+ @EqualsAndHashCode.Include
+ @ToString.Include
+ private OpenSearchRequest request;
+
+ /** Total query size. */
+ @EqualsAndHashCode.Include
+ @ToString.Include
+ private Integer querySize;
+
+ /** Number of rows returned. */
+ private Integer queryCount;
/** Search response for current batch. */
private Iterator iterator;
@@ -62,133 +55,57 @@ public class OpenSearchIndexScan extends TableScanOperator {
/**
* Constructor.
*/
- public OpenSearchIndexScan(OpenSearchClient client,
- Settings settings, String indexName,
+ public OpenSearchIndexScan(OpenSearchClient client, Settings settings,
+ String indexName, Integer maxResultWindow,
OpenSearchExprValueFactory exprValueFactory) {
- this(client, settings, new OpenSearchRequest.IndexName(indexName), exprValueFactory);
+ this(client, settings,
+ new OpenSearchRequest.IndexName(indexName),maxResultWindow, exprValueFactory);
}
/**
* Constructor.
*/
- public OpenSearchIndexScan(OpenSearchClient client,
- Settings settings, OpenSearchRequest.IndexName indexName,
- OpenSearchExprValueFactory exprValueFactory) {
+ public OpenSearchIndexScan(OpenSearchClient client, Settings settings,
+ OpenSearchRequest.IndexName indexName, Integer maxResultWindow,
+ OpenSearchExprValueFactory exprValueFactory) {
this.client = client;
- this.request = new OpenSearchQueryRequest(indexName,
- settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT), exprValueFactory);
+ this.requestBuilder = new OpenSearchRequestBuilder(
+ indexName, maxResultWindow, settings,exprValueFactory);
}
@Override
public void open() {
super.open();
-
- // For now pull all results immediately once open
- List responses = new ArrayList<>();
- OpenSearchResponse response = client.search(request);
- while (!response.isEmpty()) {
- responses.add(response);
- response = client.search(request);
- }
- iterator = Iterables.concat(responses.toArray(new OpenSearchResponse[0])).iterator();
+ querySize = requestBuilder.getQuerySize();
+ request = requestBuilder.build();
+ iterator = Collections.emptyIterator();
+ queryCount = 0;
+ fetchNextBatch();
}
@Override
public boolean hasNext() {
+ if (queryCount >= querySize) {
+ iterator = Collections.emptyIterator();
+ } else if (!iterator.hasNext()) {
+ fetchNextBatch();
+ }
return iterator.hasNext();
}
@Override
public ExprValue next() {
+ queryCount++;
return iterator.next();
}
- /**
- * Push down query to DSL request.
- * @param query query request
- */
- public void pushDown(QueryBuilder query) {
- SearchSourceBuilder source = request.getSourceBuilder();
- QueryBuilder current = source.query();
-
- if (current == null) {
- source.query(query);
- } else {
- if (isBoolFilterQuery(current)) {
- ((BoolQueryBuilder) current).filter(query);
- } else {
- source.query(QueryBuilders.boolQuery()
- .filter(current)
- .filter(query));
- }
- }
-
- if (source.sorts() == null) {
- source.sort(DOC_FIELD_NAME, ASC); // Make sure consistent order
- }
- }
-
- /**
- * Push down aggregation to DSL request.
- * @param aggregationBuilder pair of aggregation query and aggregation parser.
- */
- public void pushDownAggregation(
- Pair, OpenSearchAggregationResponseParser> aggregationBuilder) {
- SearchSourceBuilder source = request.getSourceBuilder();
- aggregationBuilder.getLeft().forEach(builder -> source.aggregation(builder));
- source.size(0);
- request.getExprValueFactory().setParser(aggregationBuilder.getRight());
- }
-
- /**
- * Push down sort to DSL request.
- *
- * @param sortBuilders sortBuilders.
- */
- public void pushDownSort(List> sortBuilders) {
- SearchSourceBuilder source = request.getSourceBuilder();
- for (SortBuilder> sortBuilder : sortBuilders) {
- source.sort(sortBuilder);
- }
- }
-
- /**
- * Push down size (limit) and from (offset) to DSL request.
- */
- public void pushDownLimit(Integer limit, Integer offset) {
- SearchSourceBuilder sourceBuilder = request.getSourceBuilder();
- sourceBuilder.from(offset).size(limit);
- }
-
- /**
- * Add highlight to DSL requests.
- * @param field name of the field to highlight
- */
- public void pushDownHighlight(String field) {
- SearchSourceBuilder sourceBuilder = request.getSourceBuilder();
- if (sourceBuilder.highlighter() != null) {
- sourceBuilder.highlighter().field(StringUtils.unquoteText(field));
- } else {
- HighlightBuilder highlightBuilder =
- new HighlightBuilder().field(StringUtils.unquoteText(field));
- sourceBuilder.highlighter(highlightBuilder);
+ private void fetchNextBatch() {
+ OpenSearchResponse response = client.search(request);
+ if (!response.isEmpty()) {
+ iterator = response.iterator();
}
}
- /**
- * Push down project list to DSL requets.
- */
- public void pushDownProjects(Set projects) {
- SearchSourceBuilder sourceBuilder = request.getSourceBuilder();
- final Set projectsSet =
- projects.stream().map(ReferenceExpression::getAttr).collect(Collectors.toSet());
- sourceBuilder.fetchSource(projectsSet.toArray(new String[0]), new String[0]);
- }
-
- public void pushTypeMapping(Map typeMapping) {
- request.getExprValueFactory().setTypeMapping(typeMapping);
- }
-
@Override
public void close() {
super.close();
@@ -196,12 +113,8 @@ public void close() {
client.cleanup(request);
}
- private boolean isBoolFilterQuery(QueryBuilder current) {
- return (current instanceof BoolQueryBuilder);
- }
-
@Override
public String explain() {
- return getRequest().toString();
+ return getRequestBuilder().build().toString();
}
}
diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java
index 50410e07cc..8fdb93427b 100644
--- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java
+++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java
@@ -27,6 +27,7 @@
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -72,6 +73,7 @@
class OpenSearchNodeClientTest {
private static final String TEST_MAPPING_FILE = "mappings/accounts.json";
+ private static final String TEST_MAPPING_SETTINGS_FILE = "mappings/accounts2.json";
@Mock(answer = RETURNS_DEEP_STUBS)
private NodeClient nodeClient;
@@ -151,6 +153,36 @@ public void getIndexMappingsWithNonExistIndex() {
assertThrows(IndexNotFoundException.class, () -> client.getIndexMappings("non_exist_index"));
}
+ @Test
+ public void getIndexMaxResultWindows() throws IOException {
+ URL url = Resources.getResource(TEST_MAPPING_SETTINGS_FILE);
+ String mappings = Resources.toString(url, Charsets.UTF_8);
+ String indexName = "accounts";
+ ClusterService clusterService = mockClusterServiceForSettings(indexName, mappings);
+ OpenSearchNodeClient client = new OpenSearchNodeClient(clusterService, nodeClient);
+
+ Map indexMaxResultWindows = client.getIndexMaxResultWindows(indexName);
+ assertEquals(1, indexMaxResultWindows.size());
+
+ Integer indexMaxResultWindow = indexMaxResultWindows.values().iterator().next();
+ assertEquals(100, indexMaxResultWindow);
+ }
+
+ @Test
+ public void getIndexMaxResultWindowsWithDefaultSettings() throws IOException {
+ URL url = Resources.getResource(TEST_MAPPING_FILE);
+ String mappings = Resources.toString(url, Charsets.UTF_8);
+ String indexName = "accounts";
+ ClusterService clusterService = mockClusterServiceForSettings(indexName, mappings);
+ OpenSearchNodeClient client = new OpenSearchNodeClient(clusterService, nodeClient);
+
+ Map indexMaxResultWindows = client.getIndexMaxResultWindows(indexName);
+ assertEquals(1, indexMaxResultWindows.size());
+
+ Integer indexMaxResultWindow = indexMaxResultWindows.values().iterator().next();
+ assertEquals(10000, indexMaxResultWindow);
+ }
+
/** Jacoco enforce this constant lambda be tested. */
@Test
public void testAllFieldsPredicate() {
@@ -353,6 +385,33 @@ public ClusterService mockClusterService(String indexName, Throwable t) {
return mockService;
}
+ public ClusterService mockClusterServiceForSettings(String indexName, String mappings) {
+ ClusterService mockService = mock(ClusterService.class);
+ ClusterState mockState = mock(ClusterState.class);
+ Metadata mockMetaData = mock(Metadata.class);
+
+ when(mockService.state()).thenReturn(mockState);
+ when(mockState.metadata()).thenReturn(mockMetaData);
+ try {
+ ImmutableOpenMap.Builder indexBuilder =
+ ImmutableOpenMap.builder();
+ IndexMetadata indexMetadata = IndexMetadata.fromXContent(createParser(mappings));
+
+ indexBuilder.put(indexName, indexMetadata);
+ when(mockMetaData.getIndices()).thenReturn(indexBuilder.build());
+
+ // IndexNameExpressionResolver use this method to check if index exists. If not,
+ // IndexNotFoundException is thrown.
+ IndexAbstraction indexAbstraction = mock(IndexAbstraction.class);
+ when(indexAbstraction.getIndices()).thenReturn(Collections.singletonList(indexMetadata));
+ when(mockMetaData.getIndicesLookup())
+ .thenReturn(ImmutableSortedMap.of(indexName, indexAbstraction));
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to mock cluster service", e);
+ }
+ return mockService;
+ }
+
private XContentParser createParser(String mappings) throws IOException {
return XContentType.JSON
.xContent()
diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java
index 0c2503ea57..bc334aaf39 100644
--- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java
+++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java
@@ -34,6 +34,8 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
+import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
+import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
@@ -43,6 +45,7 @@
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
+import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.DeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
@@ -128,6 +131,61 @@ void getIndexMappingsWithIOException() throws IOException {
assertThrows(IllegalStateException.class, () -> client.getIndexMappings("test"));
}
+ @Test
+ void getIndexMaxResultWindowsSettings() throws IOException {
+ String indexName = "test";
+ Integer maxResultWindow = 1000;
+
+ GetSettingsResponse response = mock(GetSettingsResponse.class);
+ Settings maxResultWindowSettings = Settings.builder()
+ .put("index.max_result_window", maxResultWindow)
+ .build();
+ Settings emptySettings = Settings.builder().build();
+ ImmutableOpenMap indexToSettings =
+ mockSettings(indexName, maxResultWindowSettings);
+ ImmutableOpenMap indexToDefaultSettings =
+ mockSettings(indexName, emptySettings);
+ when(response.getIndexToSettings()).thenReturn(indexToSettings);
+ when(response.getIndexToDefaultSettings()).thenReturn(indexToDefaultSettings);
+ when(restClient.indices().getSettings(any(GetSettingsRequest.class), any()))
+ .thenReturn(response);
+
+ Map indexMaxResultWindows = client.getIndexMaxResultWindows(indexName);
+ assertEquals(1, indexMaxResultWindows.size());
+ assertEquals(maxResultWindow, indexMaxResultWindows.values().iterator().next());
+ }
+
+ @Test
+ void getIndexMaxResultWindowsDefaultSettings() throws IOException {
+ String indexName = "test";
+ Integer maxResultWindow = 10000;
+
+ GetSettingsResponse response = mock(GetSettingsResponse.class);
+ Settings maxResultWindowSettings = Settings.builder()
+ .put("index.max_result_window", maxResultWindow)
+ .build();
+ Settings emptySettings = Settings.builder().build();
+ ImmutableOpenMap indexToSettings =
+ mockSettings(indexName, emptySettings);
+ ImmutableOpenMap indexToDefaultSettings =
+ mockSettings(indexName, maxResultWindowSettings);
+ when(response.getIndexToSettings()).thenReturn(indexToSettings);
+ when(response.getIndexToDefaultSettings()).thenReturn(indexToDefaultSettings);
+ when(restClient.indices().getSettings(any(GetSettingsRequest.class), any()))
+ .thenReturn(response);
+
+ Map indexMaxResultWindows = client.getIndexMaxResultWindows(indexName);
+ assertEquals(1, indexMaxResultWindows.size());
+ assertEquals(maxResultWindow, indexMaxResultWindows.values().iterator().next());
+ }
+
+ @Test
+ void getIndexMaxResultWindowsWithIOException() throws IOException {
+ when(restClient.indices().getSettings(any(GetSettingsRequest.class), any()))
+ .thenThrow(new IOException());
+ assertThrows(IllegalStateException.class, () -> client.getIndexMaxResultWindows("test"));
+ }
+
@Test
void search() throws IOException {
// Mock first scroll request
@@ -277,6 +335,12 @@ private Map mockFieldMappings(String indexName, String
return ImmutableMap.of(indexName, IndexMetadata.fromXContent(createParser(mappings)).mapping());
}
+ private ImmutableOpenMap mockSettings(String indexName, Settings settings) {
+ ImmutableOpenMap.Builder indexToSettingsBuilder = ImmutableOpenMap.builder();
+ indexToSettingsBuilder.put(indexName, settings);
+ return indexToSettingsBuilder.build();
+ }
+
private XContentParser createParser(String mappings) throws IOException {
return XContentType.JSON
.xContent()
diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java
index 24c305a75e..f1a0a7d5d7 100644
--- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java
+++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java
@@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.BeforeEach;
@@ -126,7 +127,7 @@ void explainSuccessfully() {
Settings settings = mock(Settings.class);
when(settings.getSettingValue(QUERY_SIZE_LIMIT)).thenReturn(100);
PhysicalPlan plan = new OpenSearchIndexScan(mock(OpenSearchClient.class),
- settings, "test", mock(OpenSearchExprValueFactory.class));
+ settings, "test", 10000, mock(OpenSearchExprValueFactory.class));
AtomicReference result = new AtomicReference<>();
executor.explain(plan, new ResponseListener() {
diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java
index ee981a4abc..e5c5046b81 100644
--- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java
+++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java
@@ -88,6 +88,7 @@ public void testProtectIndexScan() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
String indexName = "test";
+ Integer maxResultWindow = 10000;
NamedExpression include = named("age", ref("age", INTEGER));
ReferenceExpression exclude = ref("name", STRING);
ReferenceExpression dedupeField = ref("name", STRING);
@@ -124,7 +125,7 @@ public void testProtectIndexScan() {
resourceMonitor(
new OpenSearchIndexScan(
client, settings, indexName,
- exprValueFactory)),
+ maxResultWindow, exprValueFactory)),
filterExpr),
aggregators,
groupByExprs),
@@ -152,7 +153,7 @@ public void testProtectIndexScan() {
filter(
new OpenSearchIndexScan(
client, settings, indexName,
- exprValueFactory),
+ maxResultWindow, exprValueFactory),
filterExpr),
aggregators,
groupByExprs),
diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java
new file mode 100644
index 0000000000..43b9353190
--- /dev/null
+++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+
+package org.opensearch.sql.opensearch.request;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.sql.common.setting.Settings;
+import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
+
+@ExtendWith(MockitoExtension.class)
+public class OpenSearchRequestBuilderTest {
+
+ public static final TimeValue DEFAULT_QUERY_TIMEOUT = TimeValue.timeValueMinutes(1L);
+ @Mock
+ private Settings settings;
+
+ @Mock
+ private OpenSearchExprValueFactory factory;
+
+ @BeforeEach
+ void setup() {
+ when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ }
+
+ @Test
+ void buildQueryRequest() {
+ Integer maxResultWindow = 500;
+ Integer limit = 200;
+ Integer offset = 0;
+ OpenSearchRequestBuilder builder =
+ new OpenSearchRequestBuilder("test", maxResultWindow, settings, factory);
+ builder.pushDownLimit(limit, offset);
+
+ assertEquals(
+ new OpenSearchQueryRequest(
+ new OpenSearchRequest.IndexName("test"),
+ new SearchSourceBuilder()
+ .from(offset)
+ .size(limit)
+ .timeout(DEFAULT_QUERY_TIMEOUT),
+ factory),
+ builder.build());
+ }
+
+ @Test
+ void buildScrollRequestWithCorrectSize() {
+ Integer maxResultWindow = 500;
+ Integer limit = 800;
+ Integer offset = 10;
+ OpenSearchRequestBuilder builder =
+ new OpenSearchRequestBuilder("test", maxResultWindow, settings, factory);
+ builder.pushDownLimit(limit, offset);
+
+ assertEquals(
+ new OpenSearchScrollRequest(
+ new OpenSearchRequest.IndexName("test"),
+ new SearchSourceBuilder()
+ .from(offset)
+ .size(maxResultWindow - offset)
+ .timeout(DEFAULT_QUERY_TIMEOUT),
+ factory),
+ builder.build());
+ }
+}
diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java
index c83172955c..b85d60c1fb 100644
--- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java
+++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java
@@ -20,6 +20,7 @@
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
+import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalHighlight;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
@@ -76,10 +77,12 @@ public void visitHighlight() {
LogicalHighlight node = Mockito.mock(LogicalHighlight.class,
Answers.RETURNS_DEEP_STUBS);
Mockito.when(node.getChild().get(0)).thenReturn(Mockito.mock(LogicalPlan.class));
+ OpenSearchRequestBuilder requestBuilder = Mockito.mock(OpenSearchRequestBuilder.class);
+ Mockito.when(indexScan.getRequestBuilder()).thenReturn(requestBuilder);
OpenSearchIndex.OpenSearchDefaultImplementor implementor =
new OpenSearchIndex.OpenSearchDefaultImplementor(indexScan, client);
implementor.visitHighlight(node, indexScan);
- verify(indexScan).pushDownHighlight(any());
+ verify(requestBuilder).pushDownHighlight(any());
}
}
diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java
index 41769914d9..a1f2869ca5 100644
--- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java
+++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScanTest.java
@@ -62,7 +62,7 @@ void setup() {
void queryEmptyResult() {
mockResponse();
try (OpenSearchIndexScan indexScan =
- new OpenSearchIndexScan(client, settings, "test", exprValueFactory)) {
+ new OpenSearchIndexScan(client, settings, "test", 3, exprValueFactory)) {
indexScan.open();
assertFalse(indexScan.hasNext());
}
@@ -70,13 +70,90 @@ void queryEmptyResult() {
}
@Test
- void queryAllResults() {
+ void queryAllResultsWithQuery() {
+ mockResponse(new ExprValue[]{
+ employee(1, "John", "IT"),
+ employee(2, "Smith", "HR"),
+ employee(3, "Allen", "IT")});
+
+ try (OpenSearchIndexScan indexScan =
+ new OpenSearchIndexScan(client, settings, "employees", 10, exprValueFactory)) {
+ indexScan.open();
+
+ assertTrue(indexScan.hasNext());
+ assertEquals(employee(1, "John", "IT"), indexScan.next());
+
+ assertTrue(indexScan.hasNext());
+ assertEquals(employee(2, "Smith", "HR"), indexScan.next());
+
+ assertTrue(indexScan.hasNext());
+ assertEquals(employee(3, "Allen", "IT"), indexScan.next());
+
+ assertFalse(indexScan.hasNext());
+ }
+ verify(client).cleanup(any());
+ }
+
+ @Test
+ void queryAllResultsWithScroll() {
mockResponse(
new ExprValue[]{employee(1, "John", "IT"), employee(2, "Smith", "HR")},
new ExprValue[]{employee(3, "Allen", "IT")});
try (OpenSearchIndexScan indexScan =
- new OpenSearchIndexScan(client, settings, "employees", exprValueFactory)) {
+ new OpenSearchIndexScan(client, settings, "employees", 2, exprValueFactory)) {
+ indexScan.open();
+
+ assertTrue(indexScan.hasNext());
+ assertEquals(employee(1, "John", "IT"), indexScan.next());
+
+ assertTrue(indexScan.hasNext());
+ assertEquals(employee(2, "Smith", "HR"), indexScan.next());
+
+ assertTrue(indexScan.hasNext());
+ assertEquals(employee(3, "Allen", "IT"), indexScan.next());
+
+ assertFalse(indexScan.hasNext());
+ }
+ verify(client).cleanup(any());
+ }
+
+ @Test
+ void querySomeResultsWithQuery() {
+ mockResponse(new ExprValue[]{
+ employee(1, "John", "IT"),
+ employee(2, "Smith", "HR"),
+ employee(3, "Allen", "IT"),
+ employee(4, "Bob", "HR")});
+
+ try (OpenSearchIndexScan indexScan =
+ new OpenSearchIndexScan(client, settings, "employees", 10, exprValueFactory)) {
+ indexScan.getRequestBuilder().pushDownLimit(3, 0);
+ indexScan.open();
+
+ assertTrue(indexScan.hasNext());
+ assertEquals(employee(1, "John", "IT"), indexScan.next());
+
+ assertTrue(indexScan.hasNext());
+ assertEquals(employee(2, "Smith", "HR"), indexScan.next());
+
+ assertTrue(indexScan.hasNext());
+ assertEquals(employee(3, "Allen", "IT"), indexScan.next());
+
+ assertFalse(indexScan.hasNext());
+ }
+ verify(client).cleanup(any());
+ }
+
+ @Test
+ void querySomeResultsWithScroll() {
+ mockResponse(
+ new ExprValue[]{employee(1, "John", "IT"), employee(2, "Smith", "HR")},
+ new ExprValue[]{employee(3, "Allen", "IT"), employee(4, "Bob", "HR")});
+
+ try (OpenSearchIndexScan indexScan =
+ new OpenSearchIndexScan(client, settings, "employees", 2, exprValueFactory)) {
+ indexScan.getRequestBuilder().pushDownLimit(3, 0);
indexScan.open();
assertTrue(indexScan.hasNext());
@@ -135,19 +212,19 @@ public PushDownAssertion(OpenSearchClient client,
OpenSearchExprValueFactory valueFactory,
Settings settings) {
this.client = client;
- this.indexScan = new OpenSearchIndexScan(client, settings, "test", valueFactory);
+ this.indexScan = new OpenSearchIndexScan(client, settings, "test", 10000, valueFactory);
this.response = mock(OpenSearchResponse.class);
this.factory = valueFactory;
when(response.isEmpty()).thenReturn(true);
}
PushDownAssertion pushDown(QueryBuilder query) {
- indexScan.pushDown(query);
+ indexScan.getRequestBuilder().pushDown(query);
return this;
}
PushDownAssertion pushDownHighlight(String query) {
- indexScan.pushDownHighlight(query);
+ indexScan.getRequestBuilder().pushDownHighlight(query);
return this;
}
@@ -187,10 +264,8 @@ public OpenSearchResponse answer(InvocationOnMock invocation) {
when(response.isEmpty()).thenReturn(false);
ExprValue[] searchHit = searchHitBatches[batchNum];
when(response.iterator()).thenReturn(Arrays.asList(searchHit).iterator());
- } else if (batchNum == totalBatch) {
- when(response.isEmpty()).thenReturn(true);
} else {
- fail("Search request after empty response returned already");
+ when(response.isEmpty()).thenReturn(true);
}
batchNum++;
diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java
index 847ac8dfc0..f1754a455d 100644
--- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java
+++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java
@@ -70,7 +70,6 @@
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlanDSL;
import org.opensearch.sql.planner.physical.ProjectOperator;
-import org.opensearch.sql.storage.Table;
@ExtendWith(MockitoExtension.class)
class OpenSearchIndexTest {
@@ -109,7 +108,7 @@ void getFieldTypes() {
.put("blob", "binary")
.build())));
- Table index = new OpenSearchIndex(client, settings, "test");
+ OpenSearchIndex index = new OpenSearchIndex(client, settings, "test");
Map fieldTypes = index.getFieldTypes();
assertThat(
fieldTypes,
@@ -134,30 +133,35 @@ void getFieldTypes() {
@Test
void implementRelationOperatorOnly() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
String indexName = "test";
LogicalPlan plan = relation(indexName);
- Table index = new OpenSearchIndex(client, settings, indexName);
+ OpenSearchIndex index = new OpenSearchIndex(client, settings, indexName);
+ Integer maxResultWindow = index.getMaxResultWindow();
assertEquals(
- new OpenSearchIndexScan(client, settings, indexName, exprValueFactory),
+ new OpenSearchIndexScan(client, settings, indexName, maxResultWindow, exprValueFactory),
index.implement(plan));
}
@Test
void implementRelationOperatorWithOptimization() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
String indexName = "test";
LogicalPlan plan = relation(indexName);
- Table index = new OpenSearchIndex(client, settings, indexName);
+ OpenSearchIndex index = new OpenSearchIndex(client, settings, indexName);
+ Integer maxResultWindow = index.getMaxResultWindow();
assertEquals(
- new OpenSearchIndexScan(client, settings, indexName, exprValueFactory),
+ new OpenSearchIndexScan(client, settings, indexName, maxResultWindow, exprValueFactory),
index.implement(index.optimize(plan)));
}
@Test
void implementOtherLogicalOperators() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
String indexName = "test";
NamedExpression include = named("age", ref("age", INTEGER));
@@ -191,7 +195,8 @@ void implementOtherLogicalOperators() {
dedupeField),
include);
- Table index = new OpenSearchIndex(client, settings, indexName);
+ OpenSearchIndex index = new OpenSearchIndex(client, settings, indexName);
+ Integer maxResultWindow = index.getMaxResultWindow();
assertEquals(
PhysicalPlanDSL.project(
PhysicalPlanDSL.dedupe(
@@ -200,7 +205,7 @@ void implementOtherLogicalOperators() {
PhysicalPlanDSL.remove(
PhysicalPlanDSL.rename(
new OpenSearchIndexScan(client, settings, indexName,
- exprValueFactory),
+ maxResultWindow, exprValueFactory),
mappings),
exclude),
newEvalField),
@@ -213,6 +218,7 @@ void implementOtherLogicalOperators() {
@Test
void shouldImplLogicalIndexScan() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
ReferenceExpression field = ref("name", STRING);
NamedExpression named = named("n", field);
@@ -235,6 +241,7 @@ void shouldImplLogicalIndexScan() {
@Test
void shouldNotPushDownFilterFarFromRelation() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
ReferenceExpression field = ref("name", STRING);
Expression filterExpr = dsl.equal(field, literal("John"));
@@ -260,6 +267,7 @@ void shouldNotPushDownFilterFarFromRelation() {
@Test
void shouldImplLogicalIndexScanAgg() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
ReferenceExpression field = ref("name", STRING);
Expression filterExpr = dsl.equal(field, literal("John"));
@@ -296,6 +304,7 @@ void shouldImplLogicalIndexScanAgg() {
@Test
void shouldNotPushDownAggregationFarFromRelation() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
ReferenceExpression field = ref("name", STRING);
Expression filterExpr = dsl.equal(field, literal("John"));
@@ -320,6 +329,7 @@ void shouldNotPushDownAggregationFarFromRelation() {
@Test
void shouldImplIndexScanWithSort() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
ReferenceExpression field = ref("name", STRING);
NamedExpression named = named("n", field);
@@ -342,6 +352,7 @@ void shouldImplIndexScanWithSort() {
@Test
void shouldImplIndexScanWithLimit() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
ReferenceExpression field = ref("name", STRING);
NamedExpression named = named("n", field);
@@ -363,6 +374,7 @@ void shouldImplIndexScanWithLimit() {
@Test
void shouldImplIndexScanWithSortAndLimit() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
ReferenceExpression field = ref("name", STRING);
NamedExpression named = named("n", field);
@@ -387,6 +399,7 @@ void shouldImplIndexScanWithSortAndLimit() {
@Test
void shouldNotPushDownLimitFarFromRelationButUpdateScanSize() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
String indexName = "test";
OpenSearchIndex index = new OpenSearchIndex(client, settings, indexName);
@@ -411,6 +424,7 @@ void shouldNotPushDownLimitFarFromRelationButUpdateScanSize() {
@Test
void shouldPushDownProjects() {
when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200);
+ when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000));
String indexName = "test";
OpenSearchIndex index = new OpenSearchIndex(client, settings, indexName);
@@ -425,7 +439,7 @@ indexName, projects(ref("intV", INTEGER))
assertTrue(((ProjectOperator) plan).getInput() instanceof OpenSearchIndexScan);
final FetchSourceContext fetchSource =
- ((OpenSearchIndexScan) ((ProjectOperator) plan).getInput()).getRequest()
+ ((OpenSearchIndexScan) ((ProjectOperator) plan).getInput()).getRequestBuilder()
.getSourceBuilder().fetchSource();
assertThat(fetchSource.includes(), arrayContaining("intV"));
assertThat(fetchSource.excludes(), emptyArray());
diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/utils/Utils.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/utils/Utils.java
index 15ca9d491f..2ed9a16434 100644
--- a/opensearch/src/test/java/org/opensearch/sql/opensearch/utils/Utils.java
+++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/utils/Utils.java
@@ -31,7 +31,9 @@ public class Utils {
* Build ElasticsearchLogicalIndexScan.
*/
public static LogicalPlan indexScan(String tableName, Expression filter) {
- return OpenSearchLogicalIndexScan.builder().relationName(tableName).filter(filter).build();
+ return OpenSearchLogicalIndexScan.builder().relationName(tableName)
+ .filter(filter)
+ .build();
}
/**
diff --git a/opensearch/src/test/resources/mappings/accounts2.json b/opensearch/src/test/resources/mappings/accounts2.json
new file mode 100644
index 0000000000..d300b8c523
--- /dev/null
+++ b/opensearch/src/test/resources/mappings/accounts2.json
@@ -0,0 +1,93 @@
+{
+ "accounts": {
+ "mappings": {
+ "_doc": {
+ "properties": {
+ "address": {
+ "type": "text"
+ },
+ "age": {
+ "type": "integer"
+ },
+ "balance": {
+ "type": "double"
+ },
+ "city": {
+ "type": "keyword"
+ },
+ "birthday": {
+ "type": "date"
+ },
+ "location": {
+ "type": "geo_point"
+ },
+ "new_field": {
+ "type": "some_new_es_type_outside_type_system"
+ },
+ "field with spaces": {
+ "type": "text"
+ },
+ "employer": {
+ "type": "text",
+ "fields": {
+ "raw": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "projects": {
+ "type": "nested",
+ "properties": {
+ "members": {
+ "type": "nested",
+ "properties": {
+ "name": {
+ "type": "text"
+ }
+ }
+ },
+ "active": {
+ "type": "boolean"
+ },
+ "release": {
+ "type": "date"
+ }
+ }
+ },
+ "manager": {
+ "properties": {
+ "name": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "address": {
+ "type": "keyword"
+ },
+ "salary": {
+ "type": "long"
+ }
+ }
+ }
+ }
+ }
+ },
+ "settings": {
+ "index": {
+ "number_of_shards": 5,
+ "number_of_replicas": 0,
+ "max_result_window": 100,
+ "version": {
+ "created": "6050399"
+ }
+ }
+ },
+ "mapping_version": "1",
+ "settings_version": "1"
+ }
+}
\ No newline at end of file
From deececbb03a9dcf9647bf8f2f651a4e3fe0a6c2e Mon Sep 17 00:00:00 2001
From: Yury-Fridlyand
Date: Fri, 12 Aug 2022 10:29:33 -0700
Subject: [PATCH 3/3] Replace 2x `LogUtils` by `QueryContext` (#747)
Signed-off-by: Yury Fridlyand
---
.../{LogUtils.java => QueryContext.java} | 20 ++++--
.../legacy/executor/AsyncRestExecutor.java | 16 ++---
.../cursor/CursorAsyncRestExecutor.java | 12 ++--
.../sql/legacy/plugin/RestSqlAction.java | 16 ++---
.../sql/legacy/plugin/RestSqlStatsAction.java | 4 +-
.../opensearch/sql/legacy/utils/LogUtils.java | 67 -------------------
...ogUtilsTest.java => QueryContextTest.java} | 18 ++---
.../sql/plugin/rest/RestPPLStatsAction.java | 4 +-
.../plugin/rest/RestQuerySettingsAction.java | 4 +-
.../transport/TransportPPLQueryAction.java | 4 +-
.../org/opensearch/sql/ppl/PPLService.java | 5 +-
11 files changed, 55 insertions(+), 115 deletions(-)
rename common/src/main/java/org/opensearch/sql/common/utils/{LogUtils.java => QueryContext.java} (78%)
delete mode 100644 legacy/src/main/java/org/opensearch/sql/legacy/utils/LogUtils.java
rename legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/{LogUtilsTest.java => QueryContextTest.java} (79%)
diff --git a/common/src/main/java/org/opensearch/sql/common/utils/LogUtils.java b/common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java
similarity index 78%
rename from common/src/main/java/org/opensearch/sql/common/utils/LogUtils.java
rename to common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java
index 2f8c22c059..372dbae387 100644
--- a/common/src/main/java/org/opensearch/sql/common/utils/LogUtils.java
+++ b/common/src/main/java/org/opensearch/sql/common/utils/QueryContext.java
@@ -7,13 +7,14 @@
package org.opensearch.sql.common.utils;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import org.apache.logging.log4j.ThreadContext;
/**
- * Utility class for generating/accessing the request id from logging context.
+ * Utility class for recording and accessing context for the query being executed.
*/
-public class LogUtils {
+public class QueryContext {
/**
* The key of the request id in the context map.
@@ -29,8 +30,10 @@ public class LogUtils {
* call this method twice on the same thread within the lifetime of the request.
*
*/
- public static void addRequestId() {
- ThreadContext.put(REQUEST_ID_KEY, UUID.randomUUID().toString());
+ public static String addRequestId() {
+ var id = UUID.randomUUID().toString();
+ ThreadContext.put(REQUEST_ID_KEY, id);
+ return id;
}
/**
@@ -38,8 +41,11 @@ public static void addRequestId() {
* @return the current request id from {@link ThreadContext}.
*/
public static String getRequestId() {
- final String requestId = ThreadContext.get(REQUEST_ID_KEY);
- return requestId;
+ var id = ThreadContext.get(REQUEST_ID_KEY);
+ if (null == id) {
+ id = addRequestId();
+ }
+ return id;
}
/**
@@ -57,7 +63,7 @@ public static Runnable withCurrentContext(final Runnable task) {
};
}
- private LogUtils() {
+ private QueryContext() {
throw new AssertionError(
getClass().getCanonicalName() + " is a utility class and must not be initialized");
}
diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java
index e6406e8b3e..4ad6e55777 100644
--- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java
+++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java
@@ -19,13 +19,13 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestStatus;
import org.opensearch.sql.common.setting.Settings;
+import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.query.QueryAction;
import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy;
-import org.opensearch.sql.legacy.utils.LogUtils;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transports;
@@ -73,13 +73,13 @@ public void execute(Client client, Map params, QueryAction query
if (isBlockingAction(queryAction) && isRunningInTransportThread()) {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Async blocking query action [{}] for executor [{}] in current thread [{}]",
- LogUtils.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
+ QueryContext.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
}
async(client, params, queryAction, channel);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Continue running query action [{}] for executor [{}] in current thread [{}]",
- LogUtils.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
+ QueryContext.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
}
doExecuteWithTimeMeasured(client, params, queryAction, channel);
}
@@ -110,18 +110,18 @@ private void async(Client client, Map params, QueryAction queryA
doExecuteWithTimeMeasured(client, params, queryAction, channel);
} catch (IOException | SqlParseException | OpenSearchException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
- LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", LogUtils.getRequestId(),
+ LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", QueryContext.getRequestId(),
e.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
} catch (IllegalStateException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
- LOG.warn("[{}] [MCB] async task got a runtime exception: {}", LogUtils.getRequestId(),
+ LOG.warn("[{}] [MCB] async task got a runtime exception: {}", QueryContext.getRequestId(),
e.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE,
"Memory circuit is broken."));
} catch (Throwable t) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
- LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", LogUtils.getRequestId(),
+ LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", QueryContext.getRequestId(),
t.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
String.valueOf(t.getMessage())));
@@ -132,7 +132,7 @@ private void async(Client client, Map params, QueryAction queryA
// Preserve context of calling thread to ensure headers of requests are forwarded when running blocking actions
threadPool.schedule(
- LogUtils.withCurrentContext(runnable),
+ QueryContext.withCurrentContext(runnable),
new TimeValue(0L),
SQL_WORKER_THREAD_POOL_NAME
);
@@ -152,7 +152,7 @@ private void doExecuteWithTimeMeasured(Client client,
Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime);
int slowLogThreshold = LocalClusterState.state().getSettingValue(Settings.Key.SQL_SLOWLOG);
if (elapsed.getSeconds() >= slowLogThreshold) {
- LOG.warn("[{}] Slow query: elapsed={} (ms)", LogUtils.getRequestId(), elapsed.toMillis());
+ LOG.warn("[{}] Slow query: elapsed={} (ms)", QueryContext.getRequestId(), elapsed.toMillis());
}
}
}
diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java
index 0dc1fe301f..7bb6421502 100644
--- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java
+++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java
@@ -17,11 +17,11 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestStatus;
import org.opensearch.sql.common.setting.Settings;
+import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy;
-import org.opensearch.sql.legacy.utils.LogUtils;
import org.opensearch.threadpool.ThreadPool;
public class CursorAsyncRestExecutor {
@@ -57,20 +57,20 @@ private void async(Client client, Map params, RestChannel channe
doExecuteWithTimeMeasured(client, params, channel);
} catch (IOException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
- LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", LogUtils.getRequestId(),
+ LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", QueryContext.getRequestId(),
e.getMessage());
e.printStackTrace();
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
} catch (IllegalStateException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
- LOG.warn("[{}] [MCB] async task got a runtime exception: {}", LogUtils.getRequestId(),
+ LOG.warn("[{}] [MCB] async task got a runtime exception: {}", QueryContext.getRequestId(),
e.getMessage());
e.printStackTrace();
channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE,
"Memory circuit is broken."));
} catch (Throwable t) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
- LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", LogUtils.getRequestId(),
+ LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", QueryContext.getRequestId(),
t.getMessage());
t.printStackTrace();
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
@@ -82,7 +82,7 @@ private void async(Client client, Map params, RestChannel channe
// Preserve context of calling thread to ensure headers of requests are forwarded when running blocking actions
threadPool.schedule(
- LogUtils.withCurrentContext(runnable),
+ QueryContext.withCurrentContext(runnable),
new TimeValue(0L),
SQL_WORKER_THREAD_POOL_NAME
);
@@ -101,7 +101,7 @@ private void doExecuteWithTimeMeasured(Client client,
Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime);
int slowLogThreshold = LocalClusterState.state().getSettingValue(Settings.Key.SQL_SLOWLOG);
if (elapsed.getSeconds() >= slowLogThreshold) {
- LOG.warn("[{}] Slow query: elapsed={} (ms)", LogUtils.getRequestId(), elapsed.toMillis());
+ LOG.warn("[{}] Slow query: elapsed={} (ms)", QueryContext.getRequestId(), elapsed.toMillis());
}
}
}
diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java
index 10d9dab0fa..6f7579c9c7 100644
--- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java
+++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java
@@ -35,6 +35,7 @@
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestStatus;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
+import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.legacy.antlr.OpenSearchLegacySqlAnalyzer;
@@ -60,7 +61,6 @@
import org.opensearch.sql.legacy.request.SqlRequestParam;
import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException;
import org.opensearch.sql.legacy.utils.JsonPrettyFormatter;
-import org.opensearch.sql.legacy.utils.LogUtils;
import org.opensearch.sql.legacy.utils.QueryDataAnonymizer;
import org.opensearch.sql.sql.domain.SQLQueryRequest;
@@ -123,7 +123,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
Metrics.getInstance().getNumericalMetric(MetricName.REQ_TOTAL).increment();
Metrics.getInstance().getNumericalMetric(MetricName.REQ_COUNT_TOTAL).increment();
- LogUtils.addRequestId();
+ QueryContext.addRequestId();
try {
if (!isSQLFeatureEnabled()) {
@@ -137,12 +137,12 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
if (isExplainRequest(request)) {
throw new IllegalArgumentException("Invalid request. Cannot explain cursor");
} else {
- LOG.info("[{}] Cursor request {}: {}", LogUtils.getRequestId(), request.uri(), sqlRequest.cursor());
+ LOG.info("[{}] Cursor request {}: {}", QueryContext.getRequestId(), request.uri(), sqlRequest.cursor());
return channel -> handleCursorRequest(request, sqlRequest.cursor(), client, channel);
}
}
- LOG.info("[{}] Incoming request {}: {}", LogUtils.getRequestId(), request.uri(),
+ LOG.info("[{}] Incoming request {}: {}", QueryContext.getRequestId(), request.uri(),
QueryDataAnonymizer.anonymizeData(sqlRequest.getSql()));
Format format = SqlRequestParam.getFormat(request.params());
@@ -152,11 +152,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
sqlRequest.getSql(), request.path(), request.params());
RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client);
if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) {
- LOG.info("[{}] Request is handled by new SQL query engine", LogUtils.getRequestId());
+ LOG.info("[{}] Request is handled by new SQL query engine", QueryContext.getRequestId());
return result;
}
LOG.debug("[{}] Request {} is not supported and falling back to old SQL engine",
- LogUtils.getRequestId(), newSqlRequest);
+ QueryContext.getRequestId(), newSqlRequest);
final QueryAction queryAction = explainRequest(client, sqlRequest, format);
return channel -> executeSqlRequest(request, queryAction, client, channel);
@@ -182,10 +182,10 @@ private void handleCursorRequest(final RestRequest request, final String cursor,
private static void logAndPublishMetrics(final Exception e) {
if (isClientError(e)) {
- LOG.error(LogUtils.getRequestId() + " Client side error during query execution", e);
+ LOG.error(QueryContext.getRequestId() + " Client side error during query execution", e);
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_CUS).increment();
} else {
- LOG.error(LogUtils.getRequestId() + " Server side error during query execution", e);
+ LOG.error(QueryContext.getRequestId() + " Server side error during query execution", e);
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
}
}
diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java
index 70ec21c3fa..5b48ef6710 100644
--- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java
+++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java
@@ -22,9 +22,9 @@
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestStatus;
+import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.legacy.executor.format.ErrorMessageFactory;
import org.opensearch.sql.legacy.metrics.Metrics;
-import org.opensearch.sql.legacy.utils.LogUtils;
/**
* Currently this interface is for node level.
@@ -67,7 +67,7 @@ public List replacedRoutes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
- LogUtils.addRequestId();
+ QueryContext.addRequestId();
try {
return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.OK,
diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/utils/LogUtils.java b/legacy/src/main/java/org/opensearch/sql/legacy/utils/LogUtils.java
deleted file mode 100644
index 4830dd4413..0000000000
--- a/legacy/src/main/java/org/opensearch/sql/legacy/utils/LogUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright OpenSearch Contributors
- * SPDX-License-Identifier: Apache-2.0
- */
-
-
-package org.opensearch.sql.legacy.utils;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
-import org.apache.logging.log4j.ThreadContext;
-
-/**
- * Utility class for generating/accessing the request id from logging context.
- */
-public class LogUtils {
-
- /**
- * The key of the request id in the context map
- */
- private static final String REQUEST_ID_KEY = "request_id";
-
- private static final String EMPTY_ID = "ID";
-
- /**
- * Generates a random UUID and adds to the {@link ThreadContext} as the request id.
- *
- * Note: If a request id already present, this method will overwrite it with a new
- * one. This is to pre-vent re-using the same request id for different requests in
- * case the same thread handles both of them. But this also means one should not
- * call this method twice on the same thread within the lifetime of the request.
- *
- */
- public static void addRequestId() {
-
- ThreadContext.put(REQUEST_ID_KEY, UUID.randomUUID().toString());
- }
-
- /**
- * @return the current request id from {@link ThreadContext}
- */
- public static String getRequestId() {
- return Optional.ofNullable(ThreadContext.get(REQUEST_ID_KEY)).orElseGet(() -> EMPTY_ID);
- }
-
- /**
- * Wraps a given instance of {@link Runnable} into a new one which gets all the
- * entries from current ThreadContext map.
- *
- * @param task the instance of Runnable to wrap
- * @return the new task
- */
- public static Runnable withCurrentContext(final Runnable task) {
-
- final Map currentContext = ThreadContext.getImmutableContext();
- return () -> {
- ThreadContext.putAll(currentContext);
- task.run();
- };
- }
-
- private LogUtils() {
-
- throw new AssertionError(getClass().getCanonicalName() + " is a utility class and must not be initialized");
- }
-}
diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/LogUtilsTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/QueryContextTest.java
similarity index 79%
rename from legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/LogUtilsTest.java
rename to legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/QueryContextTest.java
index 564ce8c9ea..55b78af0d7 100644
--- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/LogUtilsTest.java
+++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/utils/QueryContextTest.java
@@ -8,15 +8,15 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import org.apache.logging.log4j.ThreadContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
-import org.opensearch.sql.legacy.utils.LogUtils;
+import org.opensearch.sql.common.utils.QueryContext;
-public class LogUtilsTest {
+public class QueryContextTest {
private static final String REQUEST_ID_KEY = "request_id";
@@ -30,7 +30,7 @@ public void cleanUpContext() {
public void addRequestId() {
Assert.assertNull(ThreadContext.get(REQUEST_ID_KEY));
- LogUtils.addRequestId();
+ QueryContext.addRequestId();
final String requestId = ThreadContext.get(REQUEST_ID_KEY);
Assert.assertNotNull(requestId);
}
@@ -38,16 +38,16 @@ public void addRequestId() {
@Test
public void addRequestId_alreadyExists() {
- LogUtils.addRequestId();
+ QueryContext.addRequestId();
final String requestId = ThreadContext.get(REQUEST_ID_KEY);
- LogUtils.addRequestId();
+ QueryContext.addRequestId();
final String requestId2 = ThreadContext.get(REQUEST_ID_KEY);
Assert.assertThat(requestId2, not(equalTo(requestId)));
}
@Test
public void getRequestId_doesNotExist() {
- assertEquals("ID", LogUtils.getRequestId());
+ assertNotNull(QueryContext.getRequestId());
}
@Test
@@ -55,7 +55,7 @@ public void getRequestId() {
final String test_request_id = "test_id_111";
ThreadContext.put(REQUEST_ID_KEY, test_request_id);
- final String requestId = LogUtils.getRequestId();
+ final String requestId = QueryContext.getRequestId();
Assert.assertThat(requestId, equalTo(test_request_id));
}
@@ -68,6 +68,6 @@ public void withCurrentContext() throws InterruptedException {
};
ThreadContext.put("test11", "value11");
ThreadContext.put("test22", "value11");
- new Thread(LogUtils.withCurrentContext(task)).join();
+ new Thread(QueryContext.withCurrentContext(task)).join();
}
}
diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLStatsAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLStatsAction.java
index 6b5cbd4135..5b9c792c7d 100644
--- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLStatsAction.java
+++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLStatsAction.java
@@ -22,9 +22,9 @@
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestStatus;
+import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.legacy.executor.format.ErrorMessageFactory;
import org.opensearch.sql.legacy.metrics.Metrics;
-import org.opensearch.sql.legacy.utils.LogUtils;
/**
* PPL Node level status.
@@ -67,7 +67,7 @@ public List replacedRoutes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
- LogUtils.addRequestId();
+ QueryContext.addRequestId();
try {
return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.OK,
diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestQuerySettingsAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestQuerySettingsAction.java
index 0d8ca66cc6..14d06dfc71 100644
--- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestQuerySettingsAction.java
+++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestQuerySettingsAction.java
@@ -28,8 +28,8 @@
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
+import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.legacy.executor.format.ErrorMessageFactory;
-import org.opensearch.sql.legacy.utils.LogUtils;
public class RestQuerySettingsAction extends BaseRestHandler {
private static final Logger LOG = LogManager.getLogger(RestQuerySettingsAction.class);
@@ -74,7 +74,7 @@ public List replacedRoutes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client)
throws IOException {
- LogUtils.addRequestId();
+ QueryContext.addRequestId();
final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest =
Requests.clusterUpdateSettingsRequest();
clusterUpdateSettingsRequest.timeout(request.paramAsTime(
diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java
index e4699b6f9f..31317c1962 100644
--- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java
+++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java
@@ -20,7 +20,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.common.setting.Settings;
-import org.opensearch.sql.common.utils.LogUtils;
+import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
@@ -77,7 +77,7 @@ protected void doExecute(
Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_TOTAL).increment();
Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_COUNT_TOTAL).increment();
- LogUtils.addRequestId();
+ QueryContext.addRequestId();
PPLService pplService = createPPLService(client);
TransportPPLQueryRequest transportRequest = TransportPPLQueryRequest.fromActionRequest(request);
diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java
index a1a831c7cd..866326f562 100644
--- a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java
+++ b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java
@@ -16,7 +16,7 @@
import org.opensearch.sql.analysis.Analyzer;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
-import org.opensearch.sql.common.utils.LogUtils;
+import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
import org.opensearch.sql.expression.DSL;
@@ -84,7 +84,8 @@ private PhysicalPlan plan(PPLQueryRequest request) {
UnresolvedPlan ast = cst.accept(
new AstBuilder(new AstExpressionBuilder(), request.getRequest()));
- LOG.info("[{}] Incoming request {}", LogUtils.getRequestId(), anonymizer.anonymizeData(ast));
+ LOG.info("[{}] Incoming request {}", QueryContext.getRequestId(),
+ anonymizer.anonymizeData(ast));
// 2.Analyze abstract syntax to generate logical plan
LogicalPlan logicalPlan = analyzer.analyze(UnresolvedPlanHelper.addSelectAll(ast),