From bf4810dc26cc1782ab433bc97d63a6a0b326da11 Mon Sep 17 00:00:00 2001 From: Abbas Hussain Date: Tue, 14 Apr 2020 09:58:46 -0700 Subject: [PATCH] Simple Query Cursor support (#390) * Add integration tests to be passed * Add cluster settings for cursor - enabled, fetch_size, keep_alive * Add fetch_size and cursor params. fetch_size validation * new SqlRequest constructor for cursor * Add logic to open scroll based on settings, fetch_size and limit values * Add cursor close endpoint * Remove date formatting changes * Fix unit and integ tests, Ignored date format tests for a while, synced previous cursor changes * Add cursor generation * Add test helper methods * Cursor close API * Remove commented code and add partial date formatting change * Add error metrics when not able to close cursor * Add indexname and fieldAliasMap to cursor context * Remove ignored test cases affected by date formatting changes * Remove unneeded interface, refactor CursorType enum * Remove logs, unneeded fields, comments, refactor * Disable cursor by default * Fix cursor for parameterized request, add integration test for same * LIMIT changes * Changes to handle different LIMIT cases * Add default cursor metrics * Add integration test on explain cursor * Update monitoring, settings and endpoint docs * Refactor cursor classes to separate package * Add Lombok for DefaultCursor * Add unit test for DefaultCursor * Update doc * Unit tests, bug fix , refactoring --- docs/user/admin/monitoring.rst | 34 +- docs/user/admin/settings.rst | 126 +++++ docs/user/interfaces/endpoint.rst | 59 +++ .../sql/cursor/Cursor.java | 30 ++ .../sql/cursor/CursorType.java | 53 ++ .../sql/cursor/DefaultCursor.java | 170 ++++++ .../sql/cursor/NullCursor.java | 38 ++ .../sql/domain/Select.java | 8 +- ...ursorActionRequestRestExecutorFactory.java | 35 ++ .../cursor/CursorAsyncRestExecutor.java | 119 +++++ .../executor/cursor/CursorCloseExecutor.java | 100 ++++ .../executor/cursor/CursorRestExecutor.java | 33 ++ .../executor/cursor/CursorResultExecutor.java | 142 +++++ .../format/PrettyFormatRestExecutor.java | 38 +- .../sql/executor/format/Protocol.java | 67 ++- .../sql/executor/format/Schema.java | 6 + .../sql/executor/format/SelectResultSet.java | 112 +++- .../sql/metrics/MetricFactory.java | 2 + .../sql/metrics/MetricName.java | 5 +- .../sql/plugin/RestSqlAction.java | 21 + .../sql/plugin/SqlSettings.java | 13 + .../sql/query/AggregationQueryAction.java | 6 +- .../sql/query/DefaultQueryAction.java | 77 +-- .../sql/query/QueryAction.java | 10 + .../sql/request/PreparedStatementRequest.java | 6 + .../sql/request/SqlRequest.java | 20 +- .../sql/request/SqlRequestFactory.java | 32 +- .../sql/doctest/admin/MonitoringIT.java | 5 + .../sql/doctest/admin/PluginSettingIT.java | 32 ++ .../sql/doctest/interfaces/EndpointIT.java | 19 + .../sql/esintgtest/CursorIT.java | 484 ++++++++++++++++++ .../sql/esintgtest/PluginIT.java | 2 - .../sql/esintgtest/QueryIT.java | 49 -- .../sql/esintgtest/SQLIntegTestCase.java | 93 +++- .../sql/esintgtest/TestUtils.java | 58 +++ .../sql/esintgtest/TestsConstants.java | 3 + .../sql/unittest/DateFormatTest.java | 6 +- .../unittest/cursor/DefaultCursorTest.java | 68 +++ .../query/DefaultQueryActionTest.java | 145 ++++++ src/test/resources/datetime.json | 8 + src/test/resources/nested_simple.json | 10 + 41 files changed, 2222 insertions(+), 122 deletions(-) create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/Cursor.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/CursorType.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/DefaultCursor.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/NullCursor.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorActionRequestRestExecutorFactory.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorAsyncRestExecutor.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorCloseExecutor.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorRestExecutor.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorResultExecutor.java create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/CursorIT.java create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/cursor/DefaultCursorTest.java create mode 100644 src/test/resources/datetime.json create mode 100644 src/test/resources/nested_simple.json diff --git a/docs/user/admin/monitoring.rst b/docs/user/admin/monitoring.rst index 32d588d70b..2098c20446 100644 --- a/docs/user/admin/monitoring.rst +++ b/docs/user/admin/monitoring.rst @@ -24,19 +24,23 @@ Description The meaning of fields in the response is as follows: -+---------------------------+---------------------------------------------------------------+ -| Field name| Description| -+===========================+===============================================================+ -| request_total| Total count of request| -+---------------------------+---------------------------------------------------------------+ -| request_count| Total count of request within the interval| -+---------------------------+---------------------------------------------------------------+ -|failed_request_count_syserr|Count of failed request due to system error within the interval| -+---------------------------+---------------------------------------------------------------+ -|failed_request_count_cuserr| Count of failed request due to bad request within the interval| -+---------------------------+---------------------------------------------------------------+ -| failed_request_count_cb| Indicate if plugin is being circuit broken within the interval| -+---------------------------+---------------------------------------------------------------+ ++----------------------------+---------------------------------------------------------------+ +| Field name| Description| ++============================+===============================================================+ +| request_total| Total count of request| ++----------------------------+---------------------------------------------------------------+ +| request_count| Total count of request within the interval| ++----------------------------+---------------------------------------------------------------+ +|default_cursor_request_total| Total count of simple cursor request| ++----------------------------+---------------------------------------------------------------+ +|default_cursor_request_count| Total count of simple cursor request within the interval| ++----------------------------+---------------------------------------------------------------+ +| failed_request_count_syserr|Count of failed request due to system error within the interval| ++----------------------------+---------------------------------------------------------------+ +| failed_request_count_cuserr| Count of failed request due to bad request within the interval| ++----------------------------+---------------------------------------------------------------+ +| failed_request_count_cb| Indicate if plugin is being circuit broken within the interval| ++----------------------------+---------------------------------------------------------------+ Example @@ -50,9 +54,11 @@ Result set:: { "failed_request_count_cb" : 0, + "default_cursor_request_count" : 10, + "default_cursor_request_total" : 3, "failed_request_count_cuserr" : 0, "circuit_breaker" : 0, - "request_total" : 49, + "request_total" : 70, "request_count" : 0, "failed_request_count_syserr" : 0 } diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 89d93e1fb5..82ba48a10f 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -384,3 +384,129 @@ Result set:: "timed_out" : false } +opendistro.sql.cursor.enabled +============================= + +Description +----------- + +User can enable/disable pagination for all queries that are supported. + +1. The default value is false. +2. This setting is node scope. +3. This setting can be updated dynamically. + + +Example +------- + +You can update the setting with a new value like this. + +SQL query:: + + >> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_opendistro/_sql/settings -d '{ + "transient" : { + "opendistro.sql.cursor.enabled" : "true" + } + }' + +Result set:: + + { + "acknowledged" : true, + "persistent" : { }, + "transient" : { + "opendistro" : { + "sql" : { + "cursor" : { + "enabled" : "true" + } + } + } + } + } + +opendistro.sql.cursor.fetch_size +================================ + +Description +----------- + +User can set the default fetch_size for all queries that are supported by pagination. Explicit `fetch_size` passed in request will override this value + +1. The default value is 1000. +2. This setting is node scope. +3. This setting can be updated dynamically. + + +Example +------- + +You can update the setting with a new value like this. + +SQL query:: + + >> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_opendistro/_sql/settings -d '{ + "transient" : { + "opendistro.sql.cursor.fetch_size" : "50" + } + }' + +Result set:: + + { + "acknowledged" : true, + "persistent" : { }, + "transient" : { + "opendistro" : { + "sql" : { + "cursor" : { + "fetch_size" : "50" + } + } + } + } + } + +opendistro.sql.cursor.keep_alive +================================ + +Description +----------- + +User can set this value to indicate how long the cursor context should be kept open. Cursor contexts are resource heavy, and a lower value should be used if possible. + +1. The default value is 1m. +2. This setting is node scope. +3. This setting can be updated dynamically. + + +Example +------- + +You can update the setting with a new value like this. + +SQL query:: + + >> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_opendistro/_sql/settings -d '{ + "transient" : { + "opendistro.sql.cursor.keep_alive" : "5m" + } + }' + +Result set:: + + { + "acknowledged" : true, + "persistent" : { }, + "transient" : { + "opendistro" : { + "sql" : { + "cursor" : { + "keep_alive" : "5m" + } + } + } + } + } + diff --git a/docs/user/interfaces/endpoint.rst b/docs/user/interfaces/endpoint.rst index b8923289a5..dafbe5808b 100644 --- a/docs/user/interfaces/endpoint.rst +++ b/docs/user/interfaces/endpoint.rst @@ -91,3 +91,62 @@ Explain:: } } +Cursor +====== + +Description +----------- + +To get paginated response for a query, user needs to provide `fetch_size` parameter as part of normal query. The value of `fetch_size` should be greater than `0`. In absence of `fetch_size`, default value of 1000 is used. A value of `0` will fallback to non-paginated response. This feature is only available over `jdbc` format for now. + +Example +------- + +SQL query:: + + >> curl -H 'Content-Type: application/json' -X POST localhost:9200/_opendistro/_sql -d '{ + "fetch_size" : 5, + "query" : "SELECT firstname, lastname FROM accounts WHERE age > 20 ORDER BY state ASC" + }' + +Result set:: + + { + "schema": [ + { + "name": "firstname", + "type": "text" + }, + { + "name": "lastname", + "type": "text" + } + ], + "cursor": "d:eyJhIjp7fSwicyI6IkRYRjFaWEo1UVc1a1JtVjBZMmdCQUFBQUFBQUFBQU1XZWpkdFRFRkZUMlpTZEZkeFdsWnJkRlZoYnpaeVVRPT0iLCJjIjpbeyJuYW1lIjoiZmlyc3RuYW1lIiwidHlwZSI6InRleHQifSx7Im5hbWUiOiJsYXN0bmFtZSIsInR5cGUiOiJ0ZXh0In1dLCJmIjo1LCJpIjoiYWNjb3VudHMiLCJsIjo5NTF9", + "total": 956, + "datarows": [ + [ + "Cherry", + "Carey" + ], + [ + "Lindsey", + "Hawkins" + ], + [ + "Sargent", + "Powers" + ], + [ + "Campos", + "Olsen" + ], + [ + "Savannah", + "Kirby" + ] + ], + "size": 5, + "status": 200 + } + diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/Cursor.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/Cursor.java new file mode 100644 index 0000000000..54f738bb21 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/Cursor.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.cursor; + + +public interface Cursor { + + NullCursor NULL_CURSOR = new NullCursor(); + + /** + * All cursor's are of the form : + * The serialized form before encoding is upto Cursor implementation + */ + String generateCursorId(); + + CursorType getType(); +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/CursorType.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/CursorType.java new file mode 100644 index 0000000000..13a3d86e9f --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/CursorType.java @@ -0,0 +1,53 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.cursor; + +import java.util.HashMap; +import java.util.Map; + +/** + * Different types queries for which cursor is supported. + * The result execution, and cursor genreation/parsing will depend on the cursor type. + * NullCursor is the placeholder implementation in case of non-cursor query. + */ +public enum CursorType { + NULL(null), + DEFAULT("d"), + AGGREGATION("a"), + JOIN("j"); + + public String id; + + CursorType(String id) { + this.id = id; + } + + public String getId() { + return this.id; + } + + public static final Map LOOKUP = new HashMap<>(); + + static { + for (CursorType type : CursorType.values()) { + LOOKUP.put(type.getId(), type); + } + } + + public static CursorType getById(String id) { + return LOOKUP.getOrDefault(id, NULL); + } +} \ No newline at end of file diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/DefaultCursor.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/DefaultCursor.java new file mode 100644 index 0000000000..1f97ec6369 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/DefaultCursor.java @@ -0,0 +1,170 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.cursor; + +import com.amazon.opendistroforelasticsearch.sql.executor.format.Schema; +import com.google.common.base.Strings; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.Setter; +import org.json.JSONArray; +import org.json.JSONObject; + +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + + +/** + * Minimum metdata that will be serialized for generating cursorId for + * SELECT .... FROM .. ORDER BY .... queries + */ +@Getter +@Setter +@NoArgsConstructor +public class DefaultCursor implements Cursor { + + /** Make sure all keys are unique to prevent overriding + * and as small as possible to make cursor compact + */ + private static final String FETCH_SIZE = "f"; + private static final String ROWS_LEFT = "l"; + private static final String INDEX_PATTERN = "i"; + private static final String SCROLL_ID = "s"; + private static final String SCHEMA_COLUMNS = "c"; + private static final String FIELD_ALIAS_MAP = "a"; + + /** To get mappings for index to check if type is date needed for + * @see com.amazon.opendistroforelasticsearch.sql.executor.format.DateFieldFormatter */ + @NonNull + private String indexPattern; + + /** List of Schema.Column for maintaining field order and generating null values of missing fields */ + @NonNull + private List columns; + + /** To delegate to correct cursor handler to get next page*/ + private final CursorType type = CursorType.DEFAULT; + + /** + * Truncate the @see DataRows to respect LIMIT clause and/or to identify last page to close scroll context. + * docsLeft is decremented by fetch_size for call to get page of result. + */ + private long rowsLeft; + + /** @see com.amazon.opendistroforelasticsearch.sql.executor.format.SelectResultSet */ + @NonNull + private Map fieldAliasMap; + + /** To get next batch of result */ + private String scrollId; + + /** To reduce the number of rows left by fetchSize */ + @NonNull + private Integer fetchSize; + + private Integer limit; + + @Override + public CursorType getType() { + return type; + } + + @Override + public String generateCursorId() { + if (rowsLeft <=0 || Strings.isNullOrEmpty(scrollId)) { + return null; + } + JSONObject json = new JSONObject(); + json.put(FETCH_SIZE, fetchSize); + json.put(ROWS_LEFT, rowsLeft); + json.put(INDEX_PATTERN, indexPattern); + json.put(SCROLL_ID, scrollId); + json.put(SCHEMA_COLUMNS, getSchemaAsJson()); + json.put(FIELD_ALIAS_MAP, fieldAliasMap); + return String.format("%s:%s", type.getId(), encodeCursor(json)); + } + + public static DefaultCursor from(String cursorId) { + /** + * It is assumed that cursorId here is the second part of the original cursor passed + * by the client after removing first part which identifies cursor type + */ + JSONObject json = decodeCursor(cursorId); + DefaultCursor cursor = new DefaultCursor(); + cursor.setFetchSize(json.getInt(FETCH_SIZE)); + cursor.setRowsLeft(json.getLong(ROWS_LEFT)); + cursor.setIndexPattern(json.getString(INDEX_PATTERN)); + cursor.setScrollId(json.getString(SCROLL_ID)); + cursor.setColumns(getColumnsFromSchema(json.getJSONArray(SCHEMA_COLUMNS))); + cursor.setFieldAliasMap(fieldAliasMap(json.getJSONObject(FIELD_ALIAS_MAP))); + + return cursor; + } + + private JSONArray getSchemaAsJson() { + JSONArray schemaJson = new JSONArray(); + + for (Schema.Column column : columns) { + schemaJson.put(schemaEntry(column.getName(), column.getAlias(), column.getType())); + } + + return schemaJson; + } + + private JSONObject schemaEntry(String name, String alias, String type) { + JSONObject entry = new JSONObject(); + entry.put("name", name); + if (alias != null) { + entry.put("alias", alias); + } + entry.put("type", type); + return entry; + } + + private static String encodeCursor(JSONObject cursorJson) { + return Base64.getEncoder().encodeToString(cursorJson.toString().getBytes()); + } + + private static JSONObject decodeCursor(String cursorId) { + return new JSONObject(new String(Base64.getDecoder().decode(cursorId))); + } + + private static Map fieldAliasMap(JSONObject json) { + Map fieldToAliasMap = new HashMap<>(); + json.keySet().forEach(key -> fieldToAliasMap.put(key, json.get(key).toString())); + return fieldToAliasMap; + } + + private static List getColumnsFromSchema(JSONArray schema) { + List columns = IntStream. + range(0, schema.length()). + mapToObj(i -> { + JSONObject jsonColumn = schema.getJSONObject(i); + return new Schema.Column( + jsonColumn.getString("name"), + jsonColumn.optString("alias", null), + Schema.Type.valueOf(jsonColumn.getString("type").toUpperCase()) + ); + } + ).collect(Collectors.toList()); + return columns; + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/NullCursor.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/NullCursor.java new file mode 100644 index 0000000000..cc74b5b191 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/cursor/NullCursor.java @@ -0,0 +1,38 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.cursor; + +/** + * A placeholder Cursor implementation to work with non-paginated queries. + */ +public class NullCursor implements Cursor { + + private final CursorType type = CursorType.NULL; + + @Override + public String generateCursorId() { + return null; + } + + @Override + public CursorType getType() { + return type; + } + + public NullCursor from(String cursorId) { + return NULL_CURSOR; + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/domain/Select.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/domain/Select.java index 4d0b9e9e9f..a9b9a4b6c6 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/domain/Select.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/domain/Select.java @@ -50,7 +50,7 @@ public class Select extends Query { private Having having; private List orderBys = new ArrayList<>(); private int offset; - private int rowCount = 200; + private Integer rowCount; private boolean containsSubQueries; private List subQueries; private boolean selectAll = false; @@ -59,6 +59,8 @@ public class Select extends Query { public boolean isQuery = false; public boolean isAggregate = false; + public static final int DEFAULT_LIMIT = 200; + public Select() { } @@ -70,7 +72,7 @@ public void setOffset(int offset) { this.offset = offset; } - public void setRowCount(int rowCount) { + public void setRowCount(Integer rowCount) { this.rowCount = rowCount; } @@ -106,7 +108,7 @@ public int getOffset() { return offset; } - public int getRowCount() { + public Integer getRowCount() { return rowCount; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorActionRequestRestExecutorFactory.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorActionRequestRestExecutorFactory.java new file mode 100644 index 0000000000..83ac6ee594 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorActionRequestRestExecutorFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.executor.cursor; + +import com.amazon.opendistroforelasticsearch.sql.executor.Format; +import org.elasticsearch.rest.RestRequest; + +public class CursorActionRequestRestExecutorFactory { + + public static CursorAsyncRestExecutor createExecutor(RestRequest request, String cursorId, Format format) { + + if (isCursorCloseRequest(request)) { + return new CursorAsyncRestExecutor(new CursorCloseExecutor(cursorId)); + } else { + return new CursorAsyncRestExecutor(new CursorResultExecutor(cursorId, format)); + } + } + + private static boolean isCursorCloseRequest(final RestRequest request) { + return request.path().endsWith("/_sql/close"); + } +} \ No newline at end of file diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorAsyncRestExecutor.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorAsyncRestExecutor.java new file mode 100644 index 0000000000..84d19e945e --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorAsyncRestExecutor.java @@ -0,0 +1,119 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.executor.cursor; + +import com.amazon.opendistroforelasticsearch.sql.esdomain.LocalClusterState; +import com.amazon.opendistroforelasticsearch.sql.metrics.MetricName; +import com.amazon.opendistroforelasticsearch.sql.metrics.Metrics; +import com.amazon.opendistroforelasticsearch.sql.query.join.BackOffRetryStrategy; +import com.amazon.opendistroforelasticsearch.sql.utils.LogUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; + +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.QUERY_SLOWLOG; + +public class CursorAsyncRestExecutor { + /** + * Custom thread pool name managed by ES + */ + public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; + + private static final Logger LOG = LogManager.getLogger(CursorAsyncRestExecutor.class); + + /** + * Delegated rest executor to async + */ + private final CursorRestExecutor executor; + + + CursorAsyncRestExecutor(CursorRestExecutor executor) { + this.executor = executor; + } + + public void execute(Client client, Map params, RestChannel channel) { + async(client, params, channel); + } + + /** + * Run given task in thread pool asynchronously + */ + private void async(Client client, Map params, RestChannel channel) { + + ThreadPool threadPool = client.threadPool(); + Runnable runnable = () -> { + try { + doExecuteWithTimeMeasured(client, params, channel); + } catch (IOException e) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", LogUtils.getRequestId(), + e.getMessage()); + e.printStackTrace(); + channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } catch (IllegalStateException e) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.warn("[{}] [MCB] async task got a runtime exception: {}", LogUtils.getRequestId(), + e.getMessage()); + e.printStackTrace(); + channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE, + "Memory circuit is broken.")); + } catch (Throwable t) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", LogUtils.getRequestId(), + t.getMessage()); + t.printStackTrace(); + channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, + String.valueOf(t.getMessage()))); + } finally { + BackOffRetryStrategy.releaseMem(executor); + } + }; + + // Preserve context of calling thread to ensure headers of requests are forwarded when running blocking actions + threadPool.schedule( + threadPool.preserveContext(LogUtils.withCurrentContext(runnable)), + new TimeValue(0L), + SQL_WORKER_THREAD_POOL_NAME + ); + } + + /** + * Time the real execution of Executor and log slow query for troubleshooting + */ + private void doExecuteWithTimeMeasured(Client client, + Map params, + RestChannel channel) throws Exception { + long startTime = System.nanoTime(); + try { + executor.execute(client, params, channel); + } finally { + Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime); + int slowLogThreshold = LocalClusterState.state().getSettingValue(QUERY_SLOWLOG); + if (elapsed.getSeconds() >= slowLogThreshold) { + LOG.warn("[{}] Slow query: elapsed={} (ms)", LogUtils.getRequestId(), elapsed.toMillis()); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorCloseExecutor.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorCloseExecutor.java new file mode 100644 index 0000000000..43d2498d58 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorCloseExecutor.java @@ -0,0 +1,100 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.executor.cursor; + +import com.amazon.opendistroforelasticsearch.sql.cursor.CursorType; +import com.amazon.opendistroforelasticsearch.sql.cursor.DefaultCursor; +import com.amazon.opendistroforelasticsearch.sql.metrics.MetricName; +import com.amazon.opendistroforelasticsearch.sql.metrics.Metrics; +import com.amazon.opendistroforelasticsearch.sql.rewriter.matchtoterm.VerificationException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestChannel; +import org.json.JSONException; + +import java.util.Map; + +import static org.elasticsearch.rest.RestStatus.OK; + +public class CursorCloseExecutor implements CursorRestExecutor { + + private static final Logger LOG = LogManager.getLogger(CursorCloseExecutor.class); + + private static final String SUCCEEDED_TRUE = "{\"succeeded\":true}"; + private static final String SUCCEEDED_FALSE = "{\"succeeded\":false}"; + + private String cursorId; + + public CursorCloseExecutor(String cursorId) { + this.cursorId = cursorId; + } + + public void execute(Client client, Map params, RestChannel channel) throws Exception { + try { + String formattedResponse = execute(client, params); + channel.sendResponse(new BytesRestResponse(OK, "application/json; charset=UTF-8", formattedResponse)); + } catch (IllegalArgumentException | JSONException e) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_CUS).increment(); + LOG.error("Error parsing the cursor", e); + channel.sendResponse(new BytesRestResponse(channel, e)); + } catch (ElasticsearchException e) { + int status = (e.status().getStatus()); + if (status > 399 && status < 500) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_CUS).increment(); + } else if (status > 499) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + } + LOG.error("Error completing cursor request", e); + channel.sendResponse(new BytesRestResponse(channel, e)); + } + } + + public String execute(Client client, Map params) throws Exception { + String[] splittedCursor = cursorId.split(":"); + + if (splittedCursor.length!=2) { + throw new VerificationException("Not able to parse invalid cursor"); + } + + String type = splittedCursor[0]; + CursorType cursorType = CursorType.getById(type); + + switch(cursorType) { + case DEFAULT: + DefaultCursor defaultCursor = DefaultCursor.from(splittedCursor[1]); + return handleDefaultCursorCloseRequest(client, defaultCursor); + case AGGREGATION: + case JOIN: + default: throw new VerificationException("Unsupported cursor type [" + type + "]"); + } + + } + + private String handleDefaultCursorCloseRequest(Client client, DefaultCursor cursor) { + String scrollId = cursor.getScrollId(); + ClearScrollResponse clearScrollResponse = client.prepareClearScroll().addScrollId(scrollId).get(); + if (clearScrollResponse.isSucceeded()) { + return SUCCEEDED_TRUE; + } else { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + return SUCCEEDED_FALSE; + } + } +} \ No newline at end of file diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorRestExecutor.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorRestExecutor.java new file mode 100644 index 0000000000..418841b967 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorRestExecutor.java @@ -0,0 +1,33 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + + +package com.amazon.opendistroforelasticsearch.sql.executor.cursor; + +import org.elasticsearch.client.Client; +import org.elasticsearch.rest.RestChannel; + +import java.util.Map; + +/** + * Interface to execute cursor request. + */ +public interface CursorRestExecutor { + + void execute(Client client, Map params, RestChannel channel) + throws Exception; + + String execute(Client client, Map params) throws Exception; +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorResultExecutor.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorResultExecutor.java new file mode 100644 index 0000000000..131646ff1d --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/cursor/CursorResultExecutor.java @@ -0,0 +1,142 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.executor.cursor; + +import com.amazon.opendistroforelasticsearch.sql.cursor.CursorType; +import com.amazon.opendistroforelasticsearch.sql.cursor.DefaultCursor; +import com.amazon.opendistroforelasticsearch.sql.esdomain.LocalClusterState; +import com.amazon.opendistroforelasticsearch.sql.executor.Format; +import com.amazon.opendistroforelasticsearch.sql.executor.format.Protocol; +import com.amazon.opendistroforelasticsearch.sql.metrics.MetricName; +import com.amazon.opendistroforelasticsearch.sql.metrics.Metrics; +import com.amazon.opendistroforelasticsearch.sql.rewriter.matchtoterm.VerificationException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.json.JSONException; + +import java.util.Arrays; +import java.util.Map; + +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.CURSOR_KEEPALIVE; +import static org.elasticsearch.rest.RestStatus.OK; + +public class CursorResultExecutor implements CursorRestExecutor { + + private String cursorId; + private Format format; + + private static final Logger LOG = LogManager.getLogger(CursorResultExecutor.class); + + public CursorResultExecutor(String cursorId, Format format) { + this.cursorId = cursorId; + this.format = format; + } + + public void execute(Client client, Map params, RestChannel channel) throws Exception { + try { + String formattedResponse = execute(client, params); + channel.sendResponse(new BytesRestResponse(OK, "application/json; charset=UTF-8", formattedResponse)); + } catch (IllegalArgumentException | JSONException e) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_CUS).increment(); + LOG.error("Error parsing the cursor", e); + channel.sendResponse(new BytesRestResponse(channel, e)); + } catch (ElasticsearchException e) { + int status = (e.status().getStatus()); + if (status > 399 && status < 500) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_CUS).increment(); + } else if (status > 499) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + } + LOG.error("Error completing cursor request", e); + channel.sendResponse(new BytesRestResponse(channel, e)); + } + } + + public String execute(Client client, Map params) throws Exception { + /** + * All cursor's are of the form : + * The serialized form before encoding is upto Cursor implementation + */ + String[] splittedCursor = cursorId.split(":", 2); + + if (splittedCursor.length!=2) { + throw new VerificationException("Not able to parse invalid cursor"); + } + + String type = splittedCursor[0]; + CursorType cursorType = CursorType.getById(type); + + switch(cursorType) { + case DEFAULT: + DefaultCursor defaultCursor = DefaultCursor.from(splittedCursor[1]); + return handleDefaultCursorRequest(client, defaultCursor); + case AGGREGATION: + case JOIN: + default: throw new VerificationException("Unsupported cursor type [" + type + "]"); + } + } + + private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) { + String previousScrollId = cursor.getScrollId(); + LocalClusterState clusterState = LocalClusterState.state(); + TimeValue scrollTimeout = clusterState.getSettingValue(CURSOR_KEEPALIVE); + SearchResponse scrollResponse = client.prepareSearchScroll(previousScrollId).setScroll(scrollTimeout).get(); + SearchHits searchHits = scrollResponse.getHits(); + SearchHit[] searchHitArray = searchHits.getHits(); + String newScrollId = scrollResponse.getScrollId(); + + int rowsLeft = (int) cursor.getRowsLeft(); + int fetch = cursor.getFetchSize(); + + if (rowsLeft < fetch && rowsLeft < searchHitArray.length) { + /** + * This condition implies we are on the last page, and we might need to truncate the result from SearchHit[] + * Avoid truncating in following two scenarios + * 1. number of rows to be sent equals fetchSize + * 2. size of SearchHit[] is already less that rows that needs to be sent + * + * Else truncate to desired number of rows + */ + SearchHit[] newSearchHits = Arrays.copyOf(searchHitArray, rowsLeft); + searchHits = new SearchHits(newSearchHits, searchHits.getTotalHits(), searchHits.getMaxScore()); + } + + rowsLeft = rowsLeft - fetch; + + if (rowsLeft <=0) { + /** Clear the scroll context on last page */ + ClearScrollResponse clearScrollResponse = client.prepareClearScroll().addScrollId(newScrollId).get(); + if (!clearScrollResponse.isSucceeded()) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.info("Error closing the cursor context {} ", newScrollId); + } + } + + cursor.setRowsLeft(rowsLeft); + cursor.setScrollId(newScrollId); + Protocol protocol = new Protocol(client, searchHits, format.name().toLowerCase(), cursor); + return protocol.cursorFormat(); + } +} \ No newline at end of file diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/PrettyFormatRestExecutor.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/PrettyFormatRestExecutor.java index 247f4c534c..8b82ea4640 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/PrettyFormatRestExecutor.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/PrettyFormatRestExecutor.java @@ -15,14 +15,20 @@ package com.amazon.opendistroforelasticsearch.sql.executor.format; +import com.amazon.opendistroforelasticsearch.sql.cursor.Cursor; +import com.amazon.opendistroforelasticsearch.sql.exception.SqlParseException; import com.amazon.opendistroforelasticsearch.sql.executor.QueryActionElasticExecutor; import com.amazon.opendistroforelasticsearch.sql.executor.RestExecutor; +import com.amazon.opendistroforelasticsearch.sql.cursor.DefaultCursor; +import com.amazon.opendistroforelasticsearch.sql.query.DefaultQueryAction; import com.amazon.opendistroforelasticsearch.sql.query.QueryAction; import com.amazon.opendistroforelasticsearch.sql.query.join.BackOffRetryStrategy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestStatus; @@ -67,8 +73,12 @@ public String execute(Client client, Map params, QueryAction que Protocol protocol; try { - Object queryResult = QueryActionElasticExecutor.executeAnyAction(client, queryAction); - protocol = new Protocol(client, queryAction, queryResult, format); + if (queryAction instanceof DefaultQueryAction) { + protocol = buildProtocolForDefaultQuery(client, (DefaultQueryAction) queryAction); + } else { + Object queryResult = QueryActionElasticExecutor.executeAnyAction(client, queryAction); + protocol = new Protocol(client, queryAction, queryResult, format, Cursor.NULL_CURSOR); + } } catch (Exception e) { if (e instanceof ElasticsearchException) { LOG.warn("An error occurred in Elasticsearch engine: " @@ -81,4 +91,28 @@ public String execute(Client client, Map params, QueryAction que return protocol.format(); } + + /** + * QueryActionElasticExecutor.executeAnyAction() returns SearchHits inside SearchResponse. + * In order to get scroll ID if any, we need to execute DefaultQueryAction ourselves for SearchResponse. + */ + private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction queryAction) + throws SqlParseException { + + SearchResponse response = (SearchResponse) queryAction.explain().get(); + String scrollId = response.getScrollId(); + + Protocol protocol; + if (!Strings.isNullOrEmpty(scrollId)) { + DefaultCursor defaultCursor = new DefaultCursor(); + defaultCursor.setScrollId(scrollId); + defaultCursor.setLimit(queryAction.getSelect().getRowCount()); + defaultCursor.setFetchSize(queryAction.getSqlRequest().fetchSize()); + protocol = new Protocol(client, queryAction, response.getHits(), format, defaultCursor); + } else { + protocol = new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR); + } + + return protocol; + } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Protocol.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Protocol.java index aa89b418a9..9b03474964 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Protocol.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Protocol.java @@ -20,18 +20,23 @@ import com.amazon.opendistroforelasticsearch.sql.domain.IndexStatement; import com.amazon.opendistroforelasticsearch.sql.domain.Query; import com.amazon.opendistroforelasticsearch.sql.domain.QueryStatement; -import com.amazon.opendistroforelasticsearch.sql.executor.adapter.QueryPlanQueryAction; -import com.amazon.opendistroforelasticsearch.sql.executor.adapter.QueryPlanRequestBuilder; +import com.amazon.opendistroforelasticsearch.sql.cursor.Cursor; +import com.amazon.opendistroforelasticsearch.sql.cursor.NullCursor; import com.amazon.opendistroforelasticsearch.sql.executor.format.DataRows.Row; import com.amazon.opendistroforelasticsearch.sql.executor.format.Schema.Column; +import com.amazon.opendistroforelasticsearch.sql.executor.adapter.QueryPlanQueryAction; +import com.amazon.opendistroforelasticsearch.sql.executor.adapter.QueryPlanRequestBuilder; import com.amazon.opendistroforelasticsearch.sql.expression.domain.BindingTuple; import com.amazon.opendistroforelasticsearch.sql.query.DefaultQueryAction; import com.amazon.opendistroforelasticsearch.sql.query.QueryAction; + +import com.google.common.base.Strings; + import com.amazon.opendistroforelasticsearch.sql.query.planner.core.ColumnNode; + import org.elasticsearch.client.Client; import org.json.JSONArray; import org.json.JSONObject; - import java.util.List; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -50,15 +55,19 @@ public class Protocol { private ResultSet resultSet; private ErrorMessage error; private List columnNodeList; + private Cursor cursor = new NullCursor(); private ColumnTypeProvider scriptColumnType = new ColumnTypeProvider(); - public Protocol(Client client, QueryAction queryAction, Object queryResult, String formatType) { + public Protocol(Client client, QueryAction queryAction, Object queryResult, String formatType, Cursor cursor) { + this.cursor = cursor; + if (queryAction instanceof QueryPlanQueryAction) { this.columnNodeList = ((QueryPlanRequestBuilder) (((QueryPlanQueryAction) queryAction).explain())).outputColumns(); } else if (queryAction instanceof DefaultQueryAction) { scriptColumnType = queryAction.getScriptColumnType(); } + this.formatType = formatType; QueryStatement query = queryAction.getQueryStatement(); this.status = OK_STATUS; @@ -67,12 +76,24 @@ public Protocol(Client client, QueryAction queryAction, Object queryResult, Stri this.total = resultSet.getDataRows().getTotalHits(); } + + public Protocol(Client client, Object queryResult, String formatType, Cursor cursor) { + this.cursor = cursor; + this.status = OK_STATUS; + this.formatType = formatType; + this.resultSet = loadResultSetForCursor(client, queryResult); + } + public Protocol(Exception e) { this.formatType = null; this.status = ERROR_STATUS; this.error = ErrorMessageFactory.createErrorMessage(e, status); } + private ResultSet loadResultSetForCursor(Client client, Object queryResult) { + return new SelectResultSet(client, queryResult, formatType, cursor); + } + private ResultSet loadResultSet(Client client, QueryStatement queryStatement, Object queryResult) { if (queryResult instanceof List) { return new BindingTupleResultSet(columnNodeList, (List) queryResult); @@ -80,7 +101,8 @@ private ResultSet loadResultSet(Client client, QueryStatement queryStatement, Ob if (queryStatement instanceof Delete) { return new DeleteResultSet(client, (Delete) queryStatement, queryResult); } else if (queryStatement instanceof Query) { - return new SelectResultSet(client, (Query) queryStatement, queryResult, scriptColumnType, formatType); + return new SelectResultSet(client, (Query) queryStatement, queryResult, + scriptColumnType, formatType, cursor); } else if (queryStatement instanceof IndexStatement) { IndexStatement statement = (IndexStatement) queryStatement; StatementType statementType = statement.getStatementType(); @@ -131,9 +153,16 @@ private String outputInJdbcFormat() { formattedOutput.put("size", size); formattedOutput.put("total", total); - formattedOutput.put("schema", getSchemaAsJson()); + JSONArray schema = getSchemaAsJson(); + + formattedOutput.put("schema", schema); formattedOutput.put("datarows", getDataRowsAsJson()); + String cursorId = cursor.generateCursorId(); + if (!Strings.isNullOrEmpty(cursorId)) { + formattedOutput.put("cursor", cursorId); + } + return formattedOutput.toString(2); } @@ -153,6 +182,30 @@ private String outputInTableFormat() { return null; } + public String cursorFormat() { + if (status == OK_STATUS) { + switch (formatType) { + case "jdbc": + return cursorOutputInJDBCFormat(); + default: + throw new UnsupportedOperationException(String.format( + "The following response format is not supported for cursor: [%s]", formatType)); + } + } + return error.toString(); + } + + private String cursorOutputInJDBCFormat() { + JSONObject formattedOutput = new JSONObject(); + formattedOutput.put("datarows", getDataRowsAsJson()); + + String cursorId = cursor.generateCursorId(); + if (!Strings.isNullOrEmpty(cursorId)) { + formattedOutput.put("cursor", cursorId); + } + return formattedOutput.toString(2); + } + private String rawEntry(Row row, Schema schema) { // TODO String separator is being kept to "|" for the time being as using "\t" will require formatting since // TODO tabs are occurring in multiple of 4 (one option is Guava's Strings.padEnd() method) @@ -203,4 +256,4 @@ private JSONArray dataEntry(Row dataRow, Schema schema) { } return entry; } -} +} \ No newline at end of file diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Schema.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Schema.java index 2ac042fa3a..552f1e172b 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Schema.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Schema.java @@ -23,6 +23,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static java.util.Collections.unmodifiableList; + public class Schema implements Iterable { private String indexName; @@ -64,6 +66,10 @@ public List getHeaders() { .collect(Collectors.toList()); } + public List getColumns() { + return unmodifiableList(columns); + } + private static Set getTypes() { HashSet types = new HashSet<>(); for (Type type : Type.values()) { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/SelectResultSet.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/SelectResultSet.java index a94aed514c..ece165a951 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/SelectResultSet.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/SelectResultSet.java @@ -29,9 +29,16 @@ import com.amazon.opendistroforelasticsearch.sql.esdomain.mapping.FieldMapping; import com.amazon.opendistroforelasticsearch.sql.exception.SqlFeatureNotImplementedException; import com.amazon.opendistroforelasticsearch.sql.executor.Format; +import com.amazon.opendistroforelasticsearch.sql.cursor.Cursor; +import com.amazon.opendistroforelasticsearch.sql.cursor.DefaultCursor; +import com.amazon.opendistroforelasticsearch.sql.metrics.MetricName; +import com.amazon.opendistroforelasticsearch.sql.metrics.Metrics; import com.amazon.opendistroforelasticsearch.sql.utils.SQLFunctions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse; +import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.document.DocumentField; @@ -56,11 +63,14 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static java.util.Collections.unmodifiableMap; import static java.util.stream.Collectors.toSet; import static org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData; public class SelectResultSet extends ResultSet { + private static final Logger LOG = LogManager.getLogger(SelectResultSet.class); + public static final String SCORE = "_score"; private final String formatType; @@ -76,7 +86,9 @@ public class SelectResultSet extends ResultSet { private List head; private long size; private long totalHits; + private long internalTotalHits; private List rows; + private Cursor cursor; private DateFieldFormatter dateFieldFormatter; // alias -> base field name @@ -86,13 +98,15 @@ public SelectResultSet(Client client, Query query, Object queryResult, ColumnTypeProvider outputColumnType, - String formatType) { + String formatType, + Cursor cursor) { this.client = client; this.query = query; this.queryResult = queryResult; this.selectAll = false; this.formatType = formatType; this.outputColumnType = outputColumnType; + this.cursor = cursor; if (isJoinQuery()) { JoinSelect joinQuery = (JoinSelect) query; @@ -105,6 +119,46 @@ public SelectResultSet(Client client, this.head = schema.getHeaders(); this.dateFieldFormatter = new DateFieldFormatter(indexName, columns, fieldAliasMap); + extractData(); + populateCursor(); + this.dataRows = new DataRows(size, totalHits, rows); + } + + public SelectResultSet(Client client, Object queryResult, String formatType, Cursor cursor) { + this.cursor = cursor; + this.client = client; + this.queryResult = queryResult; + this.selectAll = false; + this.formatType = formatType; + populateResultSetFromCursor(cursor); + } + + public String indexName(){ + return this.indexName; + } + + public Map fieldAliasMap() { + return unmodifiableMap(this.fieldAliasMap); + } + + public void populateResultSetFromCursor(Cursor cursor) { + switch (cursor.getType()) { + case DEFAULT: + populateResultSetFromDefaultCursor((DefaultCursor) cursor); + default: + return; + } + } + + private void populateResultSetFromDefaultCursor(DefaultCursor cursor) { + this.columns = cursor.getColumns(); + this.schema = new Schema(null, null, columns); + this.head = schema.getHeaders(); + this.dateFieldFormatter = new DateFieldFormatter( + cursor.getIndexPattern(), + columns, + cursor.getFieldAliasMap() + ); extractData(); this.dataRows = new DataRows(size, totalHits, rows); } @@ -525,20 +579,66 @@ private void extractData() { this.rows = populateRows(searchHits); this.size = rows.size(); - this.totalHits = Math.max(size, // size may be greater than totalHits after nested rows be flatten - Optional.ofNullable(searchHits.getTotalHits()).map(th -> th.value) - .orElse(0L)); - + this.internalTotalHits = Optional.ofNullable(searchHits.getTotalHits()).map(th -> th.value).orElse(0L); + // size may be greater than totalHits after nested rows be flatten + this.totalHits = Math.max(size, internalTotalHits); } else if (queryResult instanceof Aggregations) { Aggregations aggregations = (Aggregations) queryResult; this.rows = populateRows(aggregations); this.size = rows.size(); + this.internalTotalHits = size; // Total hits is not available from Aggregations so 'size' is used this.totalHits = size; } } + private void populateCursor() { + switch(cursor.getType()) { + case DEFAULT: + populateDefaultCursor((DefaultCursor) cursor); + default: + return; + } + } + + private void populateDefaultCursor(DefaultCursor cursor) { + /** + * Assumption: scrollId, fetchSize, limit already being set in + * @see PrettyFormatRestExecutor.buildProtocolForDefaultQuery() + */ + + Integer limit = cursor.getLimit(); + long rowsLeft = rowsLeft(cursor.getFetchSize(), cursor.getLimit()); + if (rowsLeft <= 0) { + // close the cursor + String scrollId = cursor.getScrollId(); + ClearScrollResponse clearScrollResponse = client.prepareClearScroll().addScrollId(scrollId).get(); + if (!clearScrollResponse.isSucceeded()) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.error("Error closing the cursor context {} ", scrollId); + } + return; + } + + cursor.setRowsLeft(rowsLeft); + cursor.setIndexPattern(indexName); + cursor.setFieldAliasMap(fieldAliasMap()); + cursor.setColumns(columns); + this.totalHits = limit != null && limit < internalTotalHits ? limit : internalTotalHits; + } + + private long rowsLeft(Integer fetchSize, Integer limit) { + long rowsLeft = 0; + long totalHits = internalTotalHits; + if (limit != null && limit < totalHits) { + rowsLeft = limit - fetchSize; + } else { + rowsLeft = totalHits - fetchSize; + } + return rowsLeft; + } + private List populateRows(SearchHits searchHits) { List rows = new ArrayList<>(); Set newKeys = new HashSet<>(head); @@ -811,4 +911,4 @@ private Map addMap(String field, Object term) { private boolean isJoinQuery() { return query instanceof JoinSelect; } -} +} \ No newline at end of file diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/metrics/MetricFactory.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/metrics/MetricFactory.java index c484c6caaf..606838317f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/metrics/MetricFactory.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/metrics/MetricFactory.java @@ -23,11 +23,13 @@ public static Metric createMetric(MetricName name) { switch (name) { case REQ_TOTAL: + case DEFAULT_CURSOR_REQUEST_TOTAL: case DEFAULT: return new NumericMetric<>(name.getName(), new BasicCounter()); case CIRCUIT_BREAKER: return new GaugeMetric<>(name.getName(), BackOffRetryStrategy.GET_CB_STATE); case REQ_COUNT_TOTAL: + case DEFAULT_CURSOR_REQUEST_COUNT_TOTAL: case FAILED_REQ_COUNT_CUS: case FAILED_REQ_COUNT_SYS: case FAILED_REQ_COUNT_CB: diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/metrics/MetricName.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/metrics/MetricName.java index fb47b79cf2..843a082be6 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/metrics/MetricName.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/metrics/MetricName.java @@ -26,6 +26,8 @@ public enum MetricName { FAILED_REQ_COUNT_SYS("failed_request_count_syserr"), FAILED_REQ_COUNT_CUS("failed_request_count_cuserr"), FAILED_REQ_COUNT_CB("failed_request_count_cb"), + DEFAULT_CURSOR_REQUEST_TOTAL("default_cursor_request_total"), + DEFAULT_CURSOR_REQUEST_COUNT_TOTAL("default_cursor_request_count"), CIRCUIT_BREAKER("circuit_breaker"), DEFAULT("default"); @@ -45,7 +47,8 @@ public static List getNames() { public boolean isNumerical() { return this == REQ_TOTAL || this == REQ_COUNT_TOTAL || this == FAILED_REQ_COUNT_SYS - || this == FAILED_REQ_COUNT_CUS || this == FAILED_REQ_COUNT_CB || this == DEFAULT; + || this == FAILED_REQ_COUNT_CUS || this == FAILED_REQ_COUNT_CB || this == DEFAULT + || this == DEFAULT_CURSOR_REQUEST_TOTAL || this == DEFAULT_CURSOR_REQUEST_COUNT_TOTAL; } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/RestSqlAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/RestSqlAction.java index 14207a906f..4423a27083 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/RestSqlAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/RestSqlAction.java @@ -28,6 +28,8 @@ import com.amazon.opendistroforelasticsearch.sql.executor.ActionRequestRestExecutorFactory; import com.amazon.opendistroforelasticsearch.sql.executor.Format; import com.amazon.opendistroforelasticsearch.sql.executor.RestExecutor; +import com.amazon.opendistroforelasticsearch.sql.executor.cursor.CursorActionRequestRestExecutorFactory; +import com.amazon.opendistroforelasticsearch.sql.executor.cursor.CursorAsyncRestExecutor; import com.amazon.opendistroforelasticsearch.sql.executor.format.ErrorMessageFactory; import com.amazon.opendistroforelasticsearch.sql.metrics.MetricName; import com.amazon.opendistroforelasticsearch.sql.metrics.Metrics; @@ -83,12 +85,14 @@ public class RestSqlAction extends BaseRestHandler { */ public static final String QUERY_API_ENDPOINT = "/_opendistro/_sql"; public static final String EXPLAIN_API_ENDPOINT = QUERY_API_ENDPOINT + "/_explain"; + public static final String CURSOR_CLOSE_ENDPOINT = QUERY_API_ENDPOINT + "/close"; RestSqlAction(Settings settings, RestController restController) { super(); restController.registerHandler(RestRequest.Method.POST, QUERY_API_ENDPOINT, this); restController.registerHandler(RestRequest.Method.POST, EXPLAIN_API_ENDPOINT, this); + restController.registerHandler(RestRequest.Method.POST, CURSOR_CLOSE_ENDPOINT, this); this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); } @@ -113,6 +117,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli } final SqlRequest sqlRequest = SqlRequestFactory.getSqlRequest(request); + if (sqlRequest.cursor() != null) { + if (isExplainRequest(request)) { + throw new IllegalArgumentException("Invalid request. Cannot explain cursor"); + } else { + LOG.info("[{}] Cursor request {}: {}", LogUtils.getRequestId(), request.uri(), sqlRequest.cursor()); + return channel -> handleCursorRequest(request, sqlRequest.cursor(), client, channel); + } + } + LOG.info("[{}] Incoming request {}: {}", LogUtils.getRequestId(), request.uri(), QueryDataAnonymizer.anonymizeData(sqlRequest.getSql())); @@ -132,6 +145,13 @@ protected Set responseParams() { return responseParams; } + private void handleCursorRequest(final RestRequest request, final String cursor, final Client client, + final RestChannel channel) throws Exception { + CursorAsyncRestExecutor cursorRestExecutor = CursorActionRequestRestExecutorFactory.createExecutor( + request, cursor, SqlRequestParam.getFormat(request.params())); + cursorRestExecutor.execute(client, request.params(), channel); + } + private static void logAndPublishMetrics(final Exception e) { if (isClientError(e)) { LOG.error(LogUtils.getRequestId() + " Client side error during query execution", e); @@ -150,6 +170,7 @@ private static QueryAction explainRequest(final NodeClient client, final SqlRequ final QueryAction queryAction = new SearchDao(client) .explain(new QueryActionRequest(sqlRequest.getSql(), typeProvider, format)); queryAction.setSqlRequest(sqlRequest); + queryAction.setFormat(format); queryAction.setColumnTypeProvider(typeProvider); return queryAction; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/SqlSettings.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/SqlSettings.java index a3e8700538..795f4fe918 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/SqlSettings.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/plugin/SqlSettings.java @@ -26,6 +26,7 @@ import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.common.settings.Setting.Property.Dynamic; import static org.elasticsearch.common.settings.Setting.Property.NodeScope; +import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; /** * SQL plugin settings @@ -46,6 +47,10 @@ public class SqlSettings { public static final String METRICS_ROLLING_WINDOW = "opendistro.sql.metrics.rollingwindow"; public static final String METRICS_ROLLING_INTERVAL = "opendistro.sql.metrics.rollinginterval"; + public static final String CURSOR_ENABLED= "opendistro.sql.cursor.enabled"; + public static final String CURSOR_FETCH_SIZE = "opendistro.sql.cursor.fetch_size"; + public static final String CURSOR_KEEPALIVE= "opendistro.sql.cursor.keep_alive"; + private final Map> settings; public SqlSettings() { @@ -68,6 +73,13 @@ public SqlSettings() { settings.put(METRICS_ROLLING_INTERVAL, Setting.longSetting(METRICS_ROLLING_INTERVAL, 60L, 1L, NodeScope, Dynamic)); + // Settings for cursor + settings.put(CURSOR_ENABLED, Setting.boolSetting(CURSOR_ENABLED, false, NodeScope, Dynamic)); + settings.put(CURSOR_FETCH_SIZE, Setting.intSetting(CURSOR_FETCH_SIZE, 1000, + 1, NodeScope, Dynamic)); + settings.put(CURSOR_KEEPALIVE, Setting.positiveTimeSetting(CURSOR_KEEPALIVE, timeValueMinutes(1), + NodeScope, Dynamic)); + this.settings = unmodifiableMap(settings); } @@ -85,4 +97,5 @@ public Setting getSetting(String key) { public List> getSettings() { return new ArrayList<>(settings.values()); } + } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/AggregationQueryAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/AggregationQueryAction.java index 160e2defba..6bc3d85b28 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/AggregationQueryAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/AggregationQueryAction.java @@ -66,6 +66,10 @@ public AggregationQueryAction(Client client, Select select) { public SqlElasticSearchRequestBuilder explain() throws SqlParseException { this.request = new SearchRequestBuilder(client, SearchAction.INSTANCE); + if (select.getRowCount() == null) { + select.setRowCount(Select.DEFAULT_LIMIT); + } + setIndicesAndTypes(); setWhere(select.getWhere()); @@ -81,7 +85,7 @@ public SqlElasticSearchRequestBuilder explain() throws SqlParseException { if (lastAgg instanceof TermsAggregationBuilder) { // TODO: Consider removing that condition - // in theory we should be able to apply this for all types of fiels, but + // in theory we should be able to apply this for all types of fields, but // this change requires too much of related integration tests (e.g. there are comparisons against // raw javascript dsl, so I'd like to scope the changes as of now to one particular fix for // scripted functions diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/DefaultQueryAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/DefaultQueryAction.java index 35cc55d590..da92490ec0 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/DefaultQueryAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/DefaultQueryAction.java @@ -25,17 +25,19 @@ import com.amazon.opendistroforelasticsearch.sql.domain.Order; import com.amazon.opendistroforelasticsearch.sql.domain.Select; import com.amazon.opendistroforelasticsearch.sql.domain.Where; -import com.amazon.opendistroforelasticsearch.sql.domain.hints.Hint; -import com.amazon.opendistroforelasticsearch.sql.domain.hints.HintType; +import com.amazon.opendistroforelasticsearch.sql.esdomain.LocalClusterState; import com.amazon.opendistroforelasticsearch.sql.exception.SqlParseException; +import com.amazon.opendistroforelasticsearch.sql.executor.Format; import com.amazon.opendistroforelasticsearch.sql.executor.format.Schema; +import com.amazon.opendistroforelasticsearch.sql.metrics.MetricName; +import com.amazon.opendistroforelasticsearch.sql.metrics.Metrics; import com.amazon.opendistroforelasticsearch.sql.query.maker.QueryMaker; import com.amazon.opendistroforelasticsearch.sql.rewriter.nestedfield.NestedFieldProjection; import com.amazon.opendistroforelasticsearch.sql.utils.SQLFunctions; + +import com.google.common.annotations.VisibleForTesting; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchScrollAction; -import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; @@ -55,8 +57,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.CURSOR_ENABLED; +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.CURSOR_KEEPALIVE; + /** * Transform SQL query to standard Elasticsearch search query */ @@ -78,51 +84,62 @@ public void initialize(SearchRequestBuilder request) { @Override public SqlElasticSearchRequestBuilder explain() throws SqlParseException { - Hint scrollHint = null; - for (Hint hint : select.getHints()) { - if (hint.getType() == HintType.USE_SCROLL) { - scrollHint = hint; - break; - } - } - if (scrollHint != null && scrollHint.getParams()[0] instanceof String) { - return new SqlElasticSearchRequestBuilder(new SearchScrollRequestBuilder(client, - SearchScrollAction.INSTANCE, (String) scrollHint.getParams()[0]) - .setScroll(new TimeValue((Integer) scrollHint.getParams()[1]))); - } + Objects.requireNonNull(this.sqlRequest, "SqlRequest is required for ES request build"); + buildRequest(); + checkAndSetScroll(); + return new SqlElasticSearchRequestBuilder(request); + } + private void buildRequest() throws SqlParseException { this.request = new SearchRequestBuilder(client, SearchAction.INSTANCE); setIndicesAndTypes(); - setFields(select.getFields()); setWhere(select.getWhere()); setSorts(select.getOrderBys()); - setLimit(select.getOffset(), select.getRowCount()); - - if (scrollHint != null) { - if (!select.isOrderdSelect()) { - request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); - } - request.setSize((Integer) scrollHint.getParams()[0]) - .setScroll(new TimeValue((Integer) scrollHint.getParams()[1])); - } else { - request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); - } updateRequestWithIndexAndRoutingOptions(select, request); updateRequestWithHighlight(select, request); updateRequestWithCollapse(select, request); updateRequestWithPostFilter(select, request); updateRequestWithInnerHits(select, request); + } - return new SqlElasticSearchRequestBuilder(request); + @VisibleForTesting + public void checkAndSetScroll() { + LocalClusterState clusterState = LocalClusterState.state(); + + Integer fetchSize = sqlRequest.fetchSize(); + TimeValue timeValue = clusterState.getSettingValue(CURSOR_KEEPALIVE); + Boolean cursorEnabled = clusterState.getSettingValue(CURSOR_ENABLED); + Integer rowCount = select.getRowCount(); + + if (checkIfScrollNeeded(cursorEnabled, fetchSize, rowCount)) { + Metrics.getInstance().getNumericalMetric(MetricName.DEFAULT_CURSOR_REQUEST_COUNT_TOTAL).increment(); + Metrics.getInstance().getNumericalMetric(MetricName.DEFAULT_CURSOR_REQUEST_TOTAL).increment(); + request.setSize(fetchSize).setScroll(timeValue); + } else { + request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); + setLimit(select.getOffset(), rowCount != null ? rowCount : Select.DEFAULT_LIMIT); + } + } + + + private boolean checkIfScrollNeeded(boolean cursorEnabled, Integer fetchSize, Integer rowCount) { + return cursorEnabled + && (format !=null && format.equals(Format.JDBC)) + && fetchSize > 0 + && (rowCount == null || (rowCount > fetchSize)); } @Override public Optional> getFieldNames() { - return Optional.of(fieldNames); } + + public Select getSelect() { + return select; + } + /** * Set indices and types to the search request. */ diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/QueryAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/QueryAction.java index ed1c02cb5e..ee7e9ceaf7 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/QueryAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/query/QueryAction.java @@ -22,6 +22,7 @@ import com.amazon.opendistroforelasticsearch.sql.domain.hints.Hint; import com.amazon.opendistroforelasticsearch.sql.domain.hints.HintType; import com.amazon.opendistroforelasticsearch.sql.exception.SqlParseException; +import com.amazon.opendistroforelasticsearch.sql.executor.Format; import com.amazon.opendistroforelasticsearch.sql.request.SqlRequest; import com.fasterxml.jackson.core.JsonFactory; import org.elasticsearch.action.search.SearchRequestBuilder; @@ -50,6 +51,7 @@ public abstract class QueryAction { protected Client client; protected SqlRequest sqlRequest = SqlRequest.NULL; protected ColumnTypeProvider scriptColumnType; + protected Format format; public QueryAction(Client client, Query query) { this.client = client; @@ -76,6 +78,14 @@ public SqlRequest getSqlRequest() { return sqlRequest; } + public void setFormat(Format format) { + this.format = format; + } + + public Format getFormat() { + return this.format; + } + public ColumnTypeProvider getScriptColumnType() { return scriptColumnType; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/PreparedStatementRequest.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/PreparedStatementRequest.java index 686d414dd9..e110b1d1f3 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/PreparedStatementRequest.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/PreparedStatementRequest.java @@ -31,6 +31,12 @@ public PreparedStatementRequest(String sql, JSONObject payloadJson, List parameters) { + this(sql, payloadJson, parameters); + this.fetchSize = fetchSize; + } + public List getParameters() { return this.parameters; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/SqlRequest.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/SqlRequest.java index 14b5a5c8b3..7e2e3fce7e 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/SqlRequest.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/SqlRequest.java @@ -36,11 +36,21 @@ public class SqlRequest { String sql; JSONObject jsonContent; - + String cursor; + Integer fetchSize; public SqlRequest(final String sql, final JSONObject jsonContent) { + this.sql = sql; + this.jsonContent = jsonContent; + } + + public SqlRequest(final String cursor) { + this.cursor = cursor; + } + public SqlRequest(final String sql, final Integer fetchSize, final JSONObject jsonContent) { this.sql = sql; + this.fetchSize = fetchSize; this.jsonContent = jsonContent; } @@ -57,6 +67,14 @@ public String getSql() { return this.sql; } + public String cursor() { + return this.cursor; + } + + public Integer fetchSize() { + return this.fetchSize; + } + public JSONObject getJsonContent() { return this.jsonContent; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/SqlRequestFactory.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/SqlRequestFactory.java index cd348a244f..d285939cab 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/SqlRequestFactory.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/request/SqlRequestFactory.java @@ -15,6 +15,7 @@ package com.amazon.opendistroforelasticsearch.sql.request; +import com.amazon.opendistroforelasticsearch.sql.esdomain.LocalClusterState; import org.elasticsearch.rest.RestRequest; import org.json.JSONArray; import org.json.JSONException; @@ -22,16 +23,21 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; + +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.CURSOR_FETCH_SIZE; public class SqlRequestFactory { private static final String SQL_URL_PARAM_KEY = "sql"; private static final String SQL_FIELD_NAME = "query"; private static final String PARAM_FIELD_NAME = "parameters"; - private static final String PARAM_TYPE_FIELD_NAME = "type"; private static final String PARAM_VALUE_FIELD_NAME = "value"; + public static final String SQL_CURSOR_FIELD_NAME = "cursor"; + public static final String SQL_FETCH_FIELD_NAME = "fetch_size"; + public static SqlRequest getSqlRequest(RestRequest request) { switch (request.method()) { case POST: @@ -57,16 +63,36 @@ private static SqlRequest parseSqlRequestFromPayload(RestRequest restRequest) { JSONObject jsonContent; try { jsonContent = new JSONObject(content); + if (jsonContent.has(SQL_CURSOR_FIELD_NAME)) { + return new SqlRequest(jsonContent.getString(SQL_CURSOR_FIELD_NAME)); + } } catch (JSONException e) { throw new IllegalArgumentException("Failed to parse request payload", e); } String sql = jsonContent.getString(SQL_FIELD_NAME); + if (jsonContent.has(PARAM_FIELD_NAME)) { // is a PreparedStatement JSONArray paramArray = jsonContent.getJSONArray(PARAM_FIELD_NAME); List parameters = parseParameters(paramArray); - return new PreparedStatementRequest(sql, jsonContent, parameters); + return new PreparedStatementRequest(sql, validateAndGetFetchSize(jsonContent), jsonContent, parameters); + } + return new SqlRequest(sql, validateAndGetFetchSize(jsonContent), jsonContent); + } + + + private static Integer validateAndGetFetchSize(JSONObject jsonContent) { + Optional fetchSize = Optional.empty(); + try { + if (jsonContent.has(SQL_FETCH_FIELD_NAME)) { + fetchSize = Optional.of(jsonContent.getInt(SQL_FETCH_FIELD_NAME)); + if (fetchSize.get() < 0) { + throw new IllegalArgumentException("Fetch_size must be greater or equal to 0"); + } + } + } catch (JSONException e) { + throw new IllegalArgumentException("Failed to parse field [" + SQL_FETCH_FIELD_NAME +"]", e); } - return new SqlRequest(sql, jsonContent); + return fetchSize.orElse(LocalClusterState.state().getSettingValue(CURSOR_FETCH_SIZE)); } private static List parseParameters( diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/admin/MonitoringIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/admin/MonitoringIT.java index 81815e9174..e7e835f0da 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/admin/MonitoringIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/admin/MonitoringIT.java @@ -27,6 +27,8 @@ import static com.amazon.opendistroforelasticsearch.sql.doctest.core.request.SqlRequestFormat.IGNORE_REQUEST; import static com.amazon.opendistroforelasticsearch.sql.doctest.core.response.SqlResponseFormat.IGNORE_RESPONSE; import static com.amazon.opendistroforelasticsearch.sql.doctest.core.response.SqlResponseFormat.PRETTY_JSON_RESPONSE; +import static com.amazon.opendistroforelasticsearch.sql.metrics.MetricName.DEFAULT_CURSOR_REQUEST_COUNT_TOTAL; +import static com.amazon.opendistroforelasticsearch.sql.metrics.MetricName.DEFAULT_CURSOR_REQUEST_TOTAL; import static com.amazon.opendistroforelasticsearch.sql.metrics.MetricName.FAILED_REQ_COUNT_CB; import static com.amazon.opendistroforelasticsearch.sql.metrics.MetricName.FAILED_REQ_COUNT_CUS; import static com.amazon.opendistroforelasticsearch.sql.metrics.MetricName.FAILED_REQ_COUNT_SYS; @@ -58,9 +60,12 @@ private String fieldDescriptions() { DataTable table = new DataTable(new String[]{ "Field name", "Description" }); table.addRow(row(REQ_TOTAL, "Total count of request")); table.addRow(row(REQ_COUNT_TOTAL, "Total count of request within the interval")); + table.addRow(row(DEFAULT_CURSOR_REQUEST_TOTAL, "Total count of simple cursor request")); + table.addRow(row(DEFAULT_CURSOR_REQUEST_COUNT_TOTAL, "Total count of simple cursor request within the interval")); table.addRow(row(FAILED_REQ_COUNT_SYS, "Count of failed request due to system error within the interval")); table.addRow(row(FAILED_REQ_COUNT_CUS, "Count of failed request due to bad request within the interval")); table.addRow(row(FAILED_REQ_COUNT_CB, "Indicate if plugin is being circuit broken within the interval")); + return table.toString(); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/admin/PluginSettingIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/admin/PluginSettingIT.java index f43e29a9b3..af348310d0 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/admin/PluginSettingIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/admin/PluginSettingIT.java @@ -33,6 +33,9 @@ import static com.amazon.opendistroforelasticsearch.sql.doctest.core.request.SqlRequestFormat.IGNORE_REQUEST; import static com.amazon.opendistroforelasticsearch.sql.doctest.core.response.SqlResponseFormat.IGNORE_RESPONSE; import static com.amazon.opendistroforelasticsearch.sql.doctest.core.response.SqlResponseFormat.PRETTY_JSON_RESPONSE; +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.CURSOR_ENABLED; +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.CURSOR_FETCH_SIZE; +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.CURSOR_KEEPALIVE; import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.QUERY_ANALYSIS_ENABLED; import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.QUERY_ANALYSIS_SEMANTIC_SUGGESTION; import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.QUERY_ANALYSIS_SEMANTIC_THRESHOLD; @@ -116,6 +119,35 @@ public void responseFormatSetting() { ); } + @Section(7) + public void cursorEnabledSetting() { + docSetting( + CURSOR_ENABLED, + "User can enable/disable pagination for all queries that are supported.", + true + ); + } + + @Section(8) + public void cursorDefaultFetchSizeSetting() { + docSetting( + CURSOR_FETCH_SIZE, + "User can set the default fetch_size for all queries that are supported by pagination. " + + "Explicit `fetch_size` passed in request will override this value", + 50 + ); + } + + @Section(9) + public void cursorDefaultContextKeepAliveSetting() { + docSetting( + CURSOR_KEEPALIVE, + "User can set this value to indicate how long the cursor context should be kept open. " + + "Cursor contexts are resource heavy, and a lower value should be used if possible.", + "5m" + ); + } + /** * Generate content for sample queries with setting changed to new value. * Finally setting will be reverted to avoid potential impact on other test cases. diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/interfaces/EndpointIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/interfaces/EndpointIT.java index 1a8578a163..c86963eb78 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/interfaces/EndpointIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/doctest/interfaces/EndpointIT.java @@ -62,4 +62,23 @@ public void explainQuery() { ); } + @Section(3) + public void cursorQuery() { + section( + title("Cursor"), + description( + "To get paginated response for a query, user needs to provide `fetch_size` parameter as part of normal query.", + "The value of `fetch_size` should be greater than `0`. In absence of `fetch_size`, default value of 1000 is used.", + "A value of `0` will fallback to non-paginated response.", + "This feature is only available over `jdbc` format for now." + ), + example( + description(), + post("SELECT firstname, lastname FROM accounts WHERE age > 20 ORDER BY state ASC"), + queryFormat(CURL_REQUEST, PRETTY_JSON_RESPONSE), + explainFormat(IGNORE_REQUEST, IGNORE_RESPONSE) + ) + ); + } + } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/CursorIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/CursorIT.java new file mode 100644 index 0000000000..c2d3733202 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/CursorIT.java @@ -0,0 +1,484 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.esintgtest; + +import com.amazon.opendistroforelasticsearch.sql.utils.StringUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestUtils.getResponseBody; +import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestsConstants.TEST_INDEX_ACCOUNT; +import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestsConstants.TEST_INDEX_DATE_TIME; +import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestsConstants.TEST_INDEX_NESTED_SIMPLE; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.containsString; + +public class CursorIT extends SQLIntegTestCase { + + private static final String CURSOR = "cursor"; + private static final String DATAROWS = "datarows"; + private static final String SCHEMA = "schema"; + private static final String JDBC = "jdbc"; + private static final String NEW_LINE = "\n"; + + @Override + protected void init() throws Exception { + loadIndex(Index.ACCOUNT); + enableCursorClusterSetting(); + } + + /** + * Acceptable fetch_size are positive numbers. + * For example 0, 24, 53.0, "110" (parsable string) , "786.23" + * Negative values should throw 400 + */ + @Test + public void invalidNegativeFetchSize() throws IOException { + String query = StringUtils.format("SELECT firstname, state FROM %s", TestsConstants.TEST_INDEX_ACCOUNT); + Response response = null; + try { + String queryResult = executeFetchQuery(query, -2, JDBC); + } catch (ResponseException ex) { + response = ex.getResponse(); + } + + JSONObject resp = new JSONObject(TestUtils.getResponseBody(response)); + assertThat(resp.getInt("status"), equalTo(400)); + assertThat(resp.query("/error/reason"), equalTo("Invalid SQL query")); + assertThat(resp.query("/error/details"), equalTo("Fetch_size must be greater or equal to 0")); + assertThat(resp.query("/error/type"), equalTo("IllegalArgumentException")); + } + + /** + * Non-numeric fetch_size value should throw 400 + */ + @Test + public void invalidNonNumericFetchSize() throws IOException { + String query = StringUtils.format("SELECT firstname, state FROM %s", TestsConstants.TEST_INDEX_ACCOUNT); + Response response = null; + try { + String queryResult = executeFetchAsStringQuery(query, "hello world", JDBC); + } catch (ResponseException ex) { + response = ex.getResponse(); + } + + JSONObject resp = new JSONObject(TestUtils.getResponseBody(response)); + assertThat(resp.getInt("status"), equalTo(400)); + assertThat(resp.query("/error/reason"), equalTo("Invalid SQL query")); + assertThat(resp.query("/error/details"), equalTo("Failed to parse field [fetch_size]")); + assertThat(resp.query("/error/type"), equalTo("IllegalArgumentException")); + } + + @Test + public void testExceptionOnCursorExplain() throws IOException { + String cursorRequest = "{\"cursor\":\"d:eyJhIjp7fSwicyI6IkRYRjFaWEo1\"}"; + Request sqlRequest = getSqlRequest(cursorRequest, true); + Response response = null; + try { + String queryResult = executeRequest(sqlRequest); + } catch (ResponseException ex) { + response = ex.getResponse(); + } + + JSONObject resp = new JSONObject(TestUtils.getResponseBody(response)); + assertThat(resp.getInt("status"), equalTo(400)); + assertThat(resp.query("/error/reason"), equalTo("Invalid SQL query")); + assertThat(resp.query("/error/details"), equalTo("Invalid request. Cannot explain cursor")); + assertThat(resp.query("/error/type"), equalTo("IllegalArgumentException")); + } + + /** + * For fetch_size = 0, default to non-pagination behaviour for simple queries + * This can be verified by checking that cursor is not present, and old default limit applies + */ + @Test + public void noPaginationWhenFetchSizeZero() throws IOException { + String selectQuery = StringUtils.format("SELECT firstname, state FROM %s", TEST_INDEX_ACCOUNT); + JSONObject response = new JSONObject(executeFetchQuery(selectQuery, 0, JDBC)); + assertFalse(response.has(CURSOR)); + assertThat(response.getJSONArray(DATAROWS).length(), equalTo(200)); + } + + /** + * The index has 1000 records, with fetch size of 50 we should get 20 pages with no cursor on last page + */ + @Test + public void validNumberOfPages() throws IOException { + String selectQuery = StringUtils.format("SELECT firstname, state FROM %s", TEST_INDEX_ACCOUNT); + JSONObject response = new JSONObject(executeFetchQuery(selectQuery, 50, JDBC)); + String cursor = response.getString(CURSOR); + int pageCount = 1; + + while (!cursor.isEmpty()) { //this condition also checks that there is no cursor on last page + response = executeCursorQuery(cursor); + cursor = response.optString(CURSOR); + pageCount++; + } + + assertThat(pageCount, equalTo(20)); + + // using random value here, with fetch size of 28 we should get 36 pages (ceil of 1000/28) + response = new JSONObject(executeFetchQuery(selectQuery, 28, JDBC)); + cursor = response.getString(CURSOR); + System.out.println(response); + pageCount = 1; + + while (!cursor.isEmpty()) { + response = executeCursorQuery(cursor); + cursor = response.optString(CURSOR); + pageCount++; + } + assertThat(pageCount, equalTo(36)); + } + + + @Test + public void validTotalResultWithAndWithoutPagination() throws IOException { + // simple query - accounts index has 1000 docs, using higher limit to get all docs + String selectQuery = StringUtils.format("SELECT firstname, state FROM %s ", TEST_INDEX_ACCOUNT ); + verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000" , selectQuery , 80); + } + + @Test + public void validTotalResultWithAndWithoutPaginationWhereClause() throws IOException { + String selectQuery = StringUtils.format( + "SELECT firstname, state FROM %s WHERE balance < 25000 AND age > 32", TEST_INDEX_ACCOUNT + ); + verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000" , selectQuery , 17); + } + + @Test + public void validTotalResultWithAndWithoutPaginationOrderBy() throws IOException { + String selectQuery = StringUtils.format( + "SELECT firstname, state FROM %s ORDER BY balance DESC ", TEST_INDEX_ACCOUNT + ); + verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000" , selectQuery , 26); + } + + @Test + public void validTotalResultWithAndWithoutPaginationWhereAndOrderBy() throws IOException { + String selectQuery = StringUtils.format( + "SELECT firstname, state FROM %s WHERE balance < 25000 ORDER BY balance ASC ", TEST_INDEX_ACCOUNT + ); + verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000" , selectQuery , 80); + + } + + @Test + public void validTotalResultWithAndWithoutPaginationNested() throws IOException { + loadIndex(Index.NESTED_SIMPLE); + String selectQuery = StringUtils.format( + "SELECT name, a.city, a.state FROM %s m , m.address as a ", TEST_INDEX_NESTED_SIMPLE + ); + verifyWithAndWithoutPaginationResponse(selectQuery + " LIMIT 2000" , selectQuery , 1); + } + + @Test + public void noCursorWhenResultsLessThanFetchSize() throws IOException { + // fetch_size is 100, but actual number of rows returned from ElasticSearch is 97 + // a scroll context will be opened but will be closed after first page as all records are fetched + String selectQuery = StringUtils.format( + "SELECT * FROM %s WHERE balance < 25000 AND age > 36 LIMIT 2000", TEST_INDEX_ACCOUNT + ); + JSONObject response = new JSONObject(executeFetchQuery(selectQuery, 100, JDBC)); + assertFalse(response.has(CURSOR)); + } + + @Test + public void testCursorWithPreparedStatement() throws IOException { + JSONObject response = executeJDBCRequest(String.format("{" + + " \"fetch_size\": 200," + + " \"query\": \" SELECT age, state FROM %s WHERE age > ? OR state IN (?, ?)\"," + + " \"parameters\": [" + + " {" + + " \"type\": \"integer\"," + + " \"value\": 25" + + " }," + + " {" + + " \"type\": \"string\"," + + " \"value\": \"WA\"" + + " }," + + " {" + + " \"type\": \"string\"," + + " \"value\": \"UT\"" + + " }" + + " ]" + + "}", TestsConstants.TEST_INDEX_ACCOUNT)); + + assertTrue(response.has(CURSOR)); + } + + @Test + public void testRegressionOnDateFormatChange() throws IOException { + loadIndex(Index.DATETIME); + /** + * With pagination, the field should be date formatted to MySQL format as in + * @see PR #367 actualDateList = new ArrayList<>(); + String selectQuery = StringUtils.format("SELECT login_time FROM %s LIMIT 500", TEST_INDEX_DATE_TIME); + JSONObject response = new JSONObject(executeFetchQuery(selectQuery, 1, JDBC)); + String cursor = response.getString(CURSOR); + actualDateList.add(response.getJSONArray(DATAROWS).getJSONArray(0).getString(0)); + + while (!cursor.isEmpty()) { + response = executeCursorQuery(cursor); + cursor = response.optString(CURSOR); + actualDateList.add(response.getJSONArray(DATAROWS).getJSONArray(0).getString(0)); + } + + List expectedDateList = Arrays.asList( + "2015-01-01 00:00:00.000", + "2015-01-01 12:10:30.000", + "1585882955", // by existing design, this is not formatted in MySQL standard format + "2020-04-08 06:10:30.000"); + + assertThat(actualDateList, equalTo(expectedDateList)); + } + + + @Test + public void defaultBehaviorWhenCursorSettingIsDisabled() throws IOException { + updateClusterSettings(new ClusterSetting(PERSISTENT, "opendistro.sql.cursor.enabled", "false")); + String query = StringUtils.format("SELECT firstname, email, state FROM %s", TEST_INDEX_ACCOUNT); + JSONObject response = new JSONObject(executeFetchQuery(query, 100, JDBC)); + assertFalse(response.has(CURSOR)); + + updateClusterSettings(new ClusterSetting(PERSISTENT, "opendistro.sql.cursor.enabled", "true")); + query = StringUtils.format("SELECT firstname, email, state FROM %s", TEST_INDEX_ACCOUNT); + response = new JSONObject(executeFetchQuery(query, 100, JDBC)); + assertTrue(response.has(CURSOR)); + + wipeAllClusterSettings(); + } + + + @Test + public void testCursorSettings() throws IOException { + // reverting enableCursorClusterSetting() in init() method before checking defaults + updateClusterSettings(new ClusterSetting(PERSISTENT, "opendistro.sql.cursor.enabled", null)); + + // Assert default cursor settings + JSONObject clusterSettings = getAllClusterSettings(); + assertThat(clusterSettings.query("/defaults/opendistro.sql.cursor.enabled"), equalTo("false")); + assertThat(clusterSettings.query("/defaults/opendistro.sql.cursor.fetch_size"), equalTo("1000")); + assertThat(clusterSettings.query("/defaults/opendistro.sql.cursor.keep_alive"), equalTo("1m")); + + updateClusterSettings(new ClusterSetting(PERSISTENT, "opendistro.sql.cursor.enabled", "true")); + updateClusterSettings(new ClusterSetting(TRANSIENT, "opendistro.sql.cursor.fetch_size", "400")); + updateClusterSettings(new ClusterSetting(PERSISTENT, "opendistro.sql.cursor.keep_alive", "200s")); + + clusterSettings = getAllClusterSettings(); + assertThat(clusterSettings.query("/persistent/opendistro.sql.cursor.enabled"), equalTo("true")); + assertThat(clusterSettings.query("/transient/opendistro.sql.cursor.fetch_size"), equalTo("400")); + assertThat(clusterSettings.query("/persistent/opendistro.sql.cursor.keep_alive"), equalTo("200s")); + + wipeAllClusterSettings(); + } + + + @Test + public void testDefaultFetchSizeFromClusterSettings() throws IOException { + // the default fetch size is 1000 + // using non-nested query here as page will have more rows on flattening + String query = StringUtils.format("SELECT firstname, email, state FROM %s", TEST_INDEX_ACCOUNT); + JSONObject response = new JSONObject(executeFetchLessQuery(query, JDBC)); + JSONArray datawRows = response.optJSONArray(DATAROWS); + assertThat(datawRows.length(), equalTo(1000)); + + updateClusterSettings(new ClusterSetting(TRANSIENT, "opendistro.sql.cursor.fetch_size", "786")); + response = new JSONObject(executeFetchLessQuery(query, JDBC)); + datawRows = response.optJSONArray(DATAROWS); + assertThat(datawRows.length(), equalTo(786)); + assertTrue(response.has(CURSOR)); + + wipeAllClusterSettings(); + } + + @Test + public void testCursorCloseAPI() throws IOException { + // multiple invocation of closing cursor should return success + // fetch page using old cursor should throw error + String selectQuery = StringUtils.format( + "SELECT firstname, state FROM %s WHERE balance > 100 and age < 40", TEST_INDEX_ACCOUNT); + JSONObject result = new JSONObject(executeFetchQuery(selectQuery, 50, JDBC)); + String cursor = result.getString(CURSOR); + + // Retrieving next 10 pages out of remaining 19 pages + for(int i =0 ; i < 10 ; i++) { + result = executeCursorQuery(cursor); + cursor = result.optString(CURSOR); + } + //Closing the cursor + JSONObject closeResp = executeCursorCloseQuery(cursor); + assertThat(closeResp.getBoolean("succeeded"), equalTo(true)); + + //Closing the cursor multiple times is idempotent + for(int i =0 ; i < 5 ; i++) { + closeResp = executeCursorCloseQuery(cursor); + assertThat(closeResp.getBoolean("succeeded"), equalTo(true)); + } + + // using the cursor after its cleared, will throw exception + Response response = null; + try { + JSONObject queryResult = executeCursorQuery(cursor); + } catch (ResponseException ex) { + response = ex.getResponse(); + } + + JSONObject resp = new JSONObject(TestUtils.getResponseBody(response)); + assertThat(resp.getInt("status"), equalTo(404)); + assertThat(resp.query("/error/reason"), equalTo("all shards failed")); + assertThat(resp.query("/error/caused_by/reason").toString(), containsString("No search context found")); + assertThat(resp.query("/error/type"), equalTo("search_phase_execution_exception")); + } + + + @Test + public void invalidCursorIdNotDecodable() throws IOException { + // could be either not decode-able + String randomCursor = "d:eyJzY2hlbWEiOlt7Im5hbWUiOiJmaXJzdG5hbWUiLCJ0eXBlIjoidGV4dCJ9LHsibmFtZSI6InN0Y"; + + Response response = null; + try { + JSONObject resp = executeCursorQuery(randomCursor); + } catch (ResponseException ex) { + response = ex.getResponse(); + } + + JSONObject resp = new JSONObject(TestUtils.getResponseBody(response)); + assertThat(resp.getInt("status"), equalTo(400)); + assertThat(resp.query("/error/type"), equalTo("illegal_argument_exception")); + } + + /** + * The index has 1000 records, with fetch size of 50 and LIMIT in place + * we should get Math.ceil(limit/fetchSize) pages and LIMIT number of rows. + * Basically it should not retrieve all records in presence of a smaller LIMIT value. + */ + @Test + public void respectLimitPassedInSelectClause() throws IOException { + int limit = 234; + String selectQuery = StringUtils.format("SELECT age, balance FROM %s LIMIT %s", TEST_INDEX_ACCOUNT, limit); + JSONObject response = new JSONObject(executeFetchQuery(selectQuery, 50, JDBC)); + String cursor = response.getString(CURSOR); + int actualDataRowCount = response.getJSONArray(DATAROWS).length(); + int pageCount = 1; + + while (!cursor.isEmpty()) { + response = executeCursorQuery(cursor); + cursor = response.optString(CURSOR); + actualDataRowCount += response.getJSONArray(DATAROWS).length(); + pageCount++; + } + + assertThat(pageCount, equalTo(5)); + assertThat(actualDataRowCount, equalTo(limit)); + } + + + @Test + public void noPaginationWithNonJDBCFormat() throws IOException { + // checking for CSV, RAW format + String query = StringUtils.format("SELECT firstname, email, state FROM %s LIMIT 2000", TEST_INDEX_ACCOUNT); + String csvResult = executeFetchQuery(query, 100, "csv"); + String[] rows = csvResult.split(NEW_LINE); + // all the 1001 records (+1 for header) are retrieved instead of fetch_size number of records + assertThat(rows.length, equalTo(1001)); + + String rawResult = executeFetchQuery(query, 100, "raw"); + rows = rawResult.split(NEW_LINE); + // all the 1000 records (NO headers) are retrieved instead of fetch_size number of records + assertThat(rows.length, equalTo(1000)); + } + + + public void verifyWithAndWithoutPaginationResponse(String sqlQuery, String cursorQuery, int fetch_size) throws IOException { + // we are only checking here for schema and datarows + JSONObject withoutCursorResponse = new JSONObject(executeFetchQuery(sqlQuery, 0, JDBC)); + + JSONObject withCursorResponse = new JSONObject("{\"schema\":[],\"datarows\":[]}"); + JSONArray schema = withCursorResponse.getJSONArray(SCHEMA); + JSONArray dataRows = withCursorResponse.getJSONArray(DATAROWS); + + JSONObject response = new JSONObject(executeFetchQuery(cursorQuery, fetch_size, JDBC)); + response.optJSONArray(SCHEMA).forEach(schema::put); + response.optJSONArray(DATAROWS).forEach(dataRows::put); + + String cursor = response.getString(CURSOR); + while (!cursor.isEmpty()) { + response = executeCursorQuery(cursor); + response.optJSONArray(DATAROWS).forEach(dataRows::put); + cursor = response.optString(CURSOR); + } + + verifySchema(withoutCursorResponse.optJSONArray(SCHEMA), withCursorResponse.optJSONArray(SCHEMA)); + verifyDataRows(withoutCursorResponse.optJSONArray(DATAROWS), withCursorResponse.optJSONArray(DATAROWS)); + } + + public void verifySchema(JSONArray schemaOne, JSONArray schemaTwo) { + assertTrue(schemaOne.similar(schemaTwo)); + } + + public void verifyDataRows(JSONArray dataRowsOne, JSONArray dataRowsTwo) { + assertTrue(dataRowsOne.similar(dataRowsTwo)); + } + + private void enableCursorClusterSetting() throws IOException{ + updateClusterSettings(new ClusterSetting("persistent", "opendistro.sql.cursor.enabled", "true")); + } + + public String executeFetchAsStringQuery(String query, String fetchSize, String requestType) throws IOException { + String endpoint = "/_opendistro/_sql?format=" + requestType; + String requestBody = makeRequest(query, fetchSize); + + Request sqlRequest = new Request("POST", endpoint); + sqlRequest.setJsonEntity(requestBody); + + Response response = client().performRequest(sqlRequest); + String responseString = getResponseBody(response, true); + return responseString; + } + + private String makeRequest(String query, String fetch_size) { + return String.format("{" + + " \"fetch_size\": \"%s\"," + + " \"query\": \"%s\"" + + "}", fetch_size, query); + } + + private JSONObject executeJDBCRequest(String requestBody) throws IOException { + Request sqlRequest = getSqlRequest(requestBody, false, JDBC); + return new JSONObject(executeRequest(sqlRequest)); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/PluginIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/PluginIT.java index 93d9fe305c..561072cbed 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/PluginIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/PluginIT.java @@ -31,8 +31,6 @@ public class PluginIT extends SQLIntegTestCase { - private static final String PERSISTENT = "persistent"; - @Override protected void init() throws Exception { wipeAllClusterSettings(); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/QueryIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/QueryIT.java index 2a19268f1e..3663794320 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/QueryIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/QueryIT.java @@ -1296,55 +1296,6 @@ public void isNotNullTest() throws IOException { Assert.assertEquals(1, getTotalHits(response)); } - @Test - public void useScrollWithoutParams() throws IOException { - JSONObject response = executeQuery( - String.format(Locale.ROOT, "SELECT /*! USE_SCROLL*/ age, gender, firstname, balance " + - "FROM %s " + - "LIMIT 2000", - TEST_INDEX_ACCOUNT)); - - Assert.assertNotNull(getScrollId(response)); - JSONArray hits = getHits(response); - // By default, 50 results are returned - Assert.assertEquals(50, hits.length()); - Assert.assertEquals(1000, getTotalHits(response)); - } - - @Test - public void useScrollWithParams() throws IOException { - JSONObject response = executeQuery( - String.format(Locale.ROOT, - "SELECT /*! USE_SCROLL(10, 5000) */ age, gender, firstname, balance FROM %s", - TEST_INDEX_ACCOUNT)); - - Assert.assertNotNull(getScrollId(response)); - JSONArray hits = getHits(response); - Assert.assertEquals(10, hits.length()); - Assert.assertEquals(1000, getTotalHits(response)); - } - - @Test - public void useScrollWithOrderByAndParams() throws IOException { - JSONObject response = executeQuery( - String.format(Locale.ROOT, - "SELECT /*! USE_SCROLL(5, 50000) */ age, gender, firstname, balance " + - "FROM %s " + - "ORDER BY age", - TEST_INDEX_ACCOUNT)); - - Assert.assertNotNull(getScrollId(response)); - JSONArray hits = getHits(response); - Assert.assertEquals(5, hits.length()); - Assert.assertEquals(1000, getTotalHits(response)); - for (int i = 0; i < hits.length(); i++) { - JSONObject hit = hits.getJSONObject(i); - JSONObject source = getSource(hit); - - Assert.assertEquals(20, source.getInt("age")); - } - } - @Test public void innerQueryTest() throws IOException { JSONObject response = executeQuery( diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/SQLIntegTestCase.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/SQLIntegTestCase.java index fb5b5f777b..d395053e16 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/SQLIntegTestCase.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/SQLIntegTestCase.java @@ -45,6 +45,8 @@ import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestUtils.getBankIndexMapping; import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestUtils.getBankWithNullValuesIndexMapping; import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestUtils.getDateIndexMapping; +import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestUtils.getDateTimeIndexMapping; +import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestUtils.getNestedSimpleIndexMapping; import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestUtils.getDogIndexMapping; import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestUtils.getDogs2IndexMapping; import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestUtils.getDogs3IndexMapping; @@ -63,6 +65,7 @@ import static com.amazon.opendistroforelasticsearch.sql.esintgtest.TestUtils.loadDataByRestClient; import static com.amazon.opendistroforelasticsearch.sql.plugin.RestSqlAction.EXPLAIN_API_ENDPOINT; import static com.amazon.opendistroforelasticsearch.sql.plugin.RestSqlAction.QUERY_API_ENDPOINT; +import static com.amazon.opendistroforelasticsearch.sql.plugin.RestSqlAction.CURSOR_CLOSE_ENDPOINT; /** * SQL plugin integration test base class. @@ -77,6 +80,9 @@ */ public abstract class SQLIntegTestCase extends ESRestTestCase { + public static final String PERSISTENT = "persistent"; + public static final String TRANSIENT = "transient"; + @Before public void setUpIndices() throws Exception { if (client() == null) { @@ -173,7 +179,11 @@ protected synchronized void loadIndex(Index index) throws IOException { } protected Request getSqlRequest(String request, boolean explain) { - String queryEndpoint = String.format("%s?format=%s", QUERY_API_ENDPOINT, "json"); + return getSqlRequest(request, explain, "json"); + } + + protected Request getSqlRequest(String request, boolean explain, String requestType) { + String queryEndpoint = String.format("%s?format=%s", QUERY_API_ENDPOINT, requestType); Request sqlRequest = new Request("POST", explain ? EXPLAIN_API_ENDPOINT : queryEndpoint); sqlRequest.setJsonEntity(request); RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); @@ -183,6 +193,17 @@ protected Request getSqlRequest(String request, boolean explain) { return sqlRequest; } + protected Request getSqlCursorCloseRequest(String cursorRequest) { + String queryEndpoint = String.format("%s?format=%s", CURSOR_CLOSE_ENDPOINT, "jdbc"); + Request sqlRequest = new Request("POST", queryEndpoint); + sqlRequest.setJsonEntity(cursorRequest); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + sqlRequest.setOptions(restOptionsBuilder); + + return sqlRequest; + } + protected String executeQuery(String query, String requestType) { try { String endpoint = "/_opendistro/_sql?format=" + requestType; @@ -201,6 +222,31 @@ protected String executeQuery(String query, String requestType) { } } + protected String executeFetchQuery(String query, int fetchSize, String requestType) throws IOException { + String endpoint = "/_opendistro/_sql?format=" + requestType; + String requestBody = makeRequest(query, fetchSize); + + Request sqlRequest = new Request("POST", endpoint); + sqlRequest.setJsonEntity(requestBody); + + Response response = client().performRequest(sqlRequest); + String responseString = getResponseBody(response, true); + return responseString; + } + + protected String executeFetchLessQuery(String query, String requestType) throws IOException { + + String endpoint = "/_opendistro/_sql?format=" + requestType; + String requestBody = makeFetchLessRequest(query); + + Request sqlRequest = new Request("POST", endpoint); + sqlRequest.setJsonEntity(requestBody); + + Response response = client().performRequest(sqlRequest); + String responseString = getResponseBody(response, true); + return responseString; + } + protected Request buildGetEndpointRequest(final String sqlQuery) { final String utf8CharsetName = StandardCharsets.UTF_8.name(); @@ -266,6 +312,18 @@ protected JSONObject executeQueryWithGetRequest(final String sqlQuery) throws IO return new JSONObject(result); } + protected JSONObject executeCursorQuery(final String cursor) throws IOException { + final String requestBody = makeCursorRequest(cursor); + Request sqlRequest = getSqlRequest(requestBody, false, "jdbc"); + return new JSONObject(executeRequest(sqlRequest)); + } + + protected JSONObject executeCursorCloseQuery(final String cursor) throws IOException { + final String requestBody = makeCursorRequest(cursor); + Request sqlRequest = getSqlCursorCloseRequest(requestBody); + return new JSONObject(executeRequest(sqlRequest)); + } + protected static JSONObject updateClusterSettings(ClusterSetting setting) throws IOException { Request request = new Request("PUT", "/_cluster/settings"); String persistentSetting = String.format(Locale.ROOT, @@ -277,6 +335,14 @@ protected static JSONObject updateClusterSettings(ClusterSetting setting) throws return new JSONObject(executeRequest(request)); } + protected static JSONObject getAllClusterSettings() throws IOException { + Request request = new Request("GET", "/_cluster/settings?flat_settings&include_defaults"); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + return new JSONObject(executeRequest(request)); + } + protected static class ClusterSetting { private final String type; private final String name; @@ -303,11 +369,26 @@ public String toString() { } protected String makeRequest(String query) { + return makeRequest(query, 0); + } + + protected String makeRequest(String query, int fetch_size) { + return String.format("{\n" + + " \"fetch_size\": \"%s\",\n" + + " \"query\": \"%s\"\n" + + "}", fetch_size, query); + } + + protected String makeFetchLessRequest(String query) { return String.format("{\n" + " \"query\": \"%s\"\n" + "}", query); } + protected String makeCursorRequest(String cursor) { + return String.format("{\"cursor\":\"%s\"}" , cursor); + } + protected JSONArray getHits(JSONObject response) { Assert.assertTrue(response.getJSONObject("hits").has("hits")); @@ -424,7 +505,15 @@ public enum Index { DATE(TestsConstants.TEST_INDEX_DATE, "dates", getDateIndexMapping(), - "src/test/resources/dates.json"); + "src/test/resources/dates.json"), + DATETIME(TestsConstants.TEST_INDEX_DATE_TIME, + "_doc", + getDateTimeIndexMapping(), + "src/test/resources/datetime.json"), + NESTED_SIMPLE(TestsConstants.TEST_INDEX_NESTED_SIMPLE, + "_doc", + getNestedSimpleIndexMapping(), + "src/test/resources/nested_simple.json"); private final String name; private final String type; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/TestUtils.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/TestUtils.java index 9aa9f0126b..000576020e 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/TestUtils.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/TestUtils.java @@ -608,6 +608,64 @@ public static String getDateIndexMapping() { "}"; } + public static String getDateTimeIndexMapping() { + return "{" + + " \"mappings\": {" + + " \"properties\": {" + + " \"birthday\": {" + + " \"type\": \"date\"" + + " }" + + " }" + + " }" + + "}"; + } + + public static String getNestedSimpleIndexMapping() { + return "{" + + " \"mappings\": {" + + " \"properties\": {" + + " \"address\": {" + + " \"type\": \"nested\"," + + " \"properties\": {" + + " \"city\": {" + + " \"type\": \"text\"," + + " \"fields\": {" + + " \"keyword\": {" + + " \"type\": \"keyword\"," + + " \"ignore_above\": 256" + + " }" + + " }" + + " }," + + " \"state\": {" + + " \"type\": \"text\"," + + " \"fields\": {" + + " \"keyword\": {" + + " \"type\": \"keyword\"," + + " \"ignore_above\": 256" + + " }" + + " }" + + " }" + + " }" + + " }," + + " \"age\": {" + + " \"type\": \"long\"" + + " }," + + " \"id\": {" + + " \"type\": \"long\"" + + " }," + + " \"name\": {" + + " \"type\": \"text\"," + + " \"fields\": {" + + " \"keyword\": {" + + " \"type\": \"keyword\"," + + " \"ignore_above\": 256" + + " }" + + " }" + + " }" + + " }" + + " }" + + "}"; + } public static void loadBulk(Client client, String jsonPath, String defaultIndex) throws Exception { System.out.println(String.format("Loading file %s into elasticsearch cluster", jsonPath)); String absJsonPath = getResourceFilePath(jsonPath); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/TestsConstants.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/TestsConstants.java index 89330cbd5b..66c01ad244 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/TestsConstants.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/TestsConstants.java @@ -37,6 +37,7 @@ public class TestsConstants { public final static String TEST_INDEX_LOCATION = TEST_INDEX + "_location"; public final static String TEST_INDEX_LOCATION2 = TEST_INDEX + "_location2"; public final static String TEST_INDEX_NESTED_TYPE = TEST_INDEX + "_nested_type"; + public final static String TEST_INDEX_NESTED_SIMPLE = TEST_INDEX + "_nested_simple"; public final static String TEST_INDEX_NESTED_WITH_QUOTES = TEST_INDEX + "_nested_type_with_quotes"; public final static String TEST_INDEX_EMPLOYEE_NESTED = TEST_INDEX + "_employee_nested"; public final static String TEST_INDEX_JOIN_TYPE = TEST_INDEX + "_join_type"; @@ -46,6 +47,8 @@ public class TestsConstants { public final static String TEST_INDEX_ORDER = TEST_INDEX + "_order"; public final static String TEST_INDEX_WEBLOG = TEST_INDEX + "_weblog"; public final static String TEST_INDEX_DATE = TEST_INDEX + "_date"; + public final static String TEST_INDEX_DATE_TIME = TEST_INDEX + "_datetime"; + public final static String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; public final static String TS_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/DateFormatTest.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/DateFormatTest.java index 7426a8e37c..ce6c76671d 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/DateFormatTest.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/DateFormatTest.java @@ -214,7 +214,11 @@ private SQLQueryExpr parseSql(String sql) { private Select getSelect(String query) { try { - return new SqlParser().parseSelect(parseSql(query)); + Select select = new SqlParser().parseSelect(parseSql(query)); + if (select.getRowCount() == null){ + select.setRowCount(Select.DEFAULT_LIMIT); + } + return select; } catch (SqlParseException e) { throw new RuntimeException(e); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/cursor/DefaultCursorTest.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/cursor/DefaultCursorTest.java new file mode 100644 index 0000000000..5fd1626a12 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/cursor/DefaultCursorTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.unittest.cursor; + +import com.amazon.opendistroforelasticsearch.sql.cursor.CursorType; +import com.amazon.opendistroforelasticsearch.sql.cursor.DefaultCursor; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; + +import static org.hamcrest.Matchers.emptyOrNullString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.hamcrest.Matchers.startsWith; + +public class DefaultCursorTest { + + @Test + public void checkCursorType() { + DefaultCursor cursor = new DefaultCursor(); + assertEquals(cursor.getType(), CursorType.DEFAULT); + } + + + @Test + public void cursorShouldStartWithCursorTypeID() { + DefaultCursor cursor = new DefaultCursor(); + cursor.setRowsLeft(50); + cursor.setScrollId("dbdskbcdjksbcjkdsbcjk+//"); + cursor.setIndexPattern("myIndex"); + cursor.setFetchSize(500); + cursor.setFieldAliasMap(Collections.emptyMap()); + cursor.setColumns(new ArrayList<>()); + assertThat(cursor.generateCursorId(), startsWith(cursor.getType().getId()+ ":") ); + } + + @Test + public void nullCursorWhenRowLeftIsLessThanEqualZero() { + DefaultCursor cursor = new DefaultCursor(); + assertThat(cursor.generateCursorId(), emptyOrNullString()); + + cursor.setRowsLeft(-10); + assertThat(cursor.generateCursorId(), emptyOrNullString()); + } + + @Test + public void nullCursorWhenScrollIDIsNullOrEmpty() { + DefaultCursor cursor = new DefaultCursor(); + assertThat(cursor.generateCursorId(), emptyOrNullString()); + + cursor.setScrollId(""); + assertThat(cursor.generateCursorId(), emptyOrNullString()); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/query/DefaultQueryActionTest.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/query/DefaultQueryActionTest.java index e28aa8b5c7..e9873df0d5 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/query/DefaultQueryActionTest.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/query/DefaultQueryActionTest.java @@ -19,11 +19,17 @@ import com.amazon.opendistroforelasticsearch.sql.domain.KVValue; import com.amazon.opendistroforelasticsearch.sql.domain.MethodField; import com.amazon.opendistroforelasticsearch.sql.domain.Select; +import com.amazon.opendistroforelasticsearch.sql.esdomain.LocalClusterState; import com.amazon.opendistroforelasticsearch.sql.exception.SqlParseException; +import com.amazon.opendistroforelasticsearch.sql.executor.Format; +import com.amazon.opendistroforelasticsearch.sql.metrics.Metrics; import com.amazon.opendistroforelasticsearch.sql.query.DefaultQueryAction; +import com.amazon.opendistroforelasticsearch.sql.request.SqlRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.script.Script; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -34,12 +40,19 @@ import java.util.List; import java.util.Optional; +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.CURSOR_ENABLED; +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.CURSOR_FETCH_SIZE; +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.CURSOR_KEEPALIVE; +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.METRICS_ROLLING_WINDOW; +import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.METRICS_ROLLING_INTERVAL; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; public class DefaultQueryActionTest { @@ -69,6 +82,11 @@ public void initDefaultQueryAction() { queryAction.initialize(mockRequestBuilder); } + @After + public void cleanup() { + LocalClusterState.state(null); + } + @Test public void scriptFieldWithTwoParams() throws SqlParseException { @@ -123,6 +141,133 @@ public void scriptFieldWithMoreThanThreeParams() throws SqlParseException { queryAction.setFields(fields); } + @Test + public void testIfScrollShouldBeOpenWithDifferentFormats() { + int settingFetchSize = 500; + TimeValue timeValue = new TimeValue(120000); + int limit = 2300; + mockLocalClusterStateAndInitializeMetrics(true, settingFetchSize, timeValue); + + doReturn(limit).when(mockSelect).getRowCount(); + doReturn(mockRequestBuilder).when(mockRequestBuilder).setSize(settingFetchSize); + SqlRequest mockSqlRequest = mock(SqlRequest.class); + doReturn(settingFetchSize).when(mockSqlRequest).fetchSize(); + queryAction.setSqlRequest(mockSqlRequest); + + Format[] formats = new Format[] {Format.CSV, Format.RAW, Format.JSON, Format.TABLE}; + for (Format format : formats) { + queryAction.setFormat(format); + queryAction.checkAndSetScroll(); + } + + Mockito.verify(mockRequestBuilder, times(4)).setSize(limit); + Mockito.verify(mockRequestBuilder, never()).setScroll(any(TimeValue.class)); + + queryAction.setFormat(Format.JDBC); + queryAction.checkAndSetScroll(); + Mockito.verify(mockRequestBuilder).setSize(settingFetchSize); + Mockito.verify(mockRequestBuilder).setScroll(timeValue); + + } + + @Test + public void testIfScrollShouldBeOpenWithCursorEnabled() { + int settingFetchSize = 500; + TimeValue timeValue = new TimeValue(120000); + int limit = 2300; + + doReturn(limit).when(mockSelect).getRowCount(); + doReturn(mockRequestBuilder).when(mockRequestBuilder).setSize(settingFetchSize); + SqlRequest mockSqlRequest = mock(SqlRequest.class); + doReturn(settingFetchSize).when(mockSqlRequest).fetchSize(); + queryAction.setSqlRequest(mockSqlRequest); + queryAction.setFormat(Format.JDBC); + + mockLocalClusterStateAndInitializeMetrics(false, settingFetchSize, timeValue); + queryAction.checkAndSetScroll(); + Mockito.verify(mockRequestBuilder).setSize(limit); + Mockito.verify(mockRequestBuilder, never()).setScroll(any(TimeValue.class)); + + mockLocalClusterStateAndInitializeMetrics(true, settingFetchSize, timeValue); + queryAction.checkAndSetScroll(); + Mockito.verify(mockRequestBuilder).setSize(settingFetchSize); + Mockito.verify(mockRequestBuilder).setScroll(timeValue); + + } + + @Test + public void testIfScrollShouldBeOpenWithDifferentFetchSize() { + int fetchSize = 500; + TimeValue timeValue = new TimeValue(120000); + int limit = 2300; + mockLocalClusterStateAndInitializeMetrics(true, fetchSize, timeValue); + + doReturn(limit).when(mockSelect).getRowCount(); + SqlRequest mockSqlRequest = mock(SqlRequest.class); + queryAction.setSqlRequest(mockSqlRequest); + queryAction.setFormat(Format.JDBC); + + int[] fetchSizes = new int[] {0, -10}; + for (int fetch : fetchSizes) { + doReturn(fetch).when(mockSqlRequest).fetchSize(); + queryAction.checkAndSetScroll(); + } + Mockito.verify(mockRequestBuilder, times(2)).setSize(limit); + Mockito.verify(mockRequestBuilder, never()).setScroll(timeValue); + + int userFetchSize = 20; + doReturn(userFetchSize).when(mockSqlRequest).fetchSize(); + doReturn(mockRequestBuilder).when(mockRequestBuilder).setSize(userFetchSize); + queryAction.checkAndSetScroll(); + Mockito.verify(mockRequestBuilder).setSize(20); + Mockito.verify(mockRequestBuilder).setScroll(timeValue); + } + + + @Test + public void testIfScrollShouldBeOpenWithDifferentValidFetchSizeAndLimit() { + int fetchSize = 1000; + TimeValue timeValue = new TimeValue(120000); + mockLocalClusterStateAndInitializeMetrics(true, fetchSize, timeValue); + + int limit = 2300; + doReturn(limit).when(mockSelect).getRowCount(); + SqlRequest mockSqlRequest = mock(SqlRequest.class); + + /** fetchSize <= LIMIT - open scroll*/ + int userFetchSize = 1500; + doReturn(userFetchSize).when(mockSqlRequest).fetchSize(); + doReturn(mockRequestBuilder).when(mockRequestBuilder).setSize(userFetchSize); + queryAction.setSqlRequest(mockSqlRequest); + queryAction.setFormat(Format.JDBC); + + queryAction.checkAndSetScroll(); + Mockito.verify(mockRequestBuilder).setSize(userFetchSize); + Mockito.verify(mockRequestBuilder).setScroll(timeValue); + + /** fetchSize > LIMIT - no scroll */ + userFetchSize = 5000; + doReturn(userFetchSize).when(mockSqlRequest).fetchSize(); + mockRequestBuilder = mock(SearchRequestBuilder.class); + queryAction.initialize(mockRequestBuilder); + queryAction.checkAndSetScroll(); + Mockito.verify(mockRequestBuilder).setSize(limit); + Mockito.verify(mockRequestBuilder, never()).setScroll(timeValue); + } + + private void mockLocalClusterStateAndInitializeMetrics(boolean cursorEnabled, Integer fetchSize, TimeValue time) { + LocalClusterState mockLocalClusterState = mock(LocalClusterState.class); + LocalClusterState.state(mockLocalClusterState); + doReturn(cursorEnabled).when(mockLocalClusterState).getSettingValue(CURSOR_ENABLED); + doReturn(fetchSize).when(mockLocalClusterState).getSettingValue(CURSOR_FETCH_SIZE); + doReturn(time).when(mockLocalClusterState).getSettingValue(CURSOR_KEEPALIVE); + doReturn(3600L).when(mockLocalClusterState).getSettingValue(METRICS_ROLLING_WINDOW); + doReturn(2L).when(mockLocalClusterState).getSettingValue(METRICS_ROLLING_INTERVAL); + + Metrics.getInstance().registerDefaultMetrics(); + + } + private Field createScriptField(final String name, final String script, final boolean addScriptLanguage, final boolean addScriptParam, final boolean addRedundantParam) { diff --git a/src/test/resources/datetime.json b/src/test/resources/datetime.json new file mode 100644 index 0000000000..3898da61a7 --- /dev/null +++ b/src/test/resources/datetime.json @@ -0,0 +1,8 @@ +{"index":{"_id":"1"}} +{"login_time":"2015-01-01"} +{"index":{"_id":"2"}} +{"login_time":"2015-01-01T12:10:30Z"} +{"index":{"_id":"3"}} +{"login_time":"1585882955"} +{"index":{"_id":"4"}} +{"login_time":"2020-04-08T11:10:30+05:00"} diff --git a/src/test/resources/nested_simple.json b/src/test/resources/nested_simple.json new file mode 100644 index 0000000000..d42cc667df --- /dev/null +++ b/src/test/resources/nested_simple.json @@ -0,0 +1,10 @@ +{"index":{"_id":"1"}} +{"name":"abbas","age":24,"address":[{"city":"New york city","state":"NY"},{"city":"bellevue","state":"WA"},{"city":"seattle","state":"WA"},{"city":"chicago","state":"IL"}]} +{"index":{"_id":"2"}} +{"name":"chen","age":32,"address":[{"city":"Miami","state":"Florida"},{"city":"los angeles","state":"CA"}]} +{"index":{"_id":"3"}} +{"name":"peng","age":26,"address":[{"city":"san diego","state":"CA"},{"city":"austin","state":"TX"}]} +{"index":{"_id":"4"}} +{"name":"andy","age":19,"id":4,"address":[{"city":"houston","state":"TX"}]} +{"index":{"_id":"5"}} +{"name":"david","age":25,"address":[{"city":"raleigh","state":"NC"},{"city":"charlotte","state":"SC"}]}