Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Simple Query Cursor support #390

Merged
merged 52 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
19499d6
Add integ tests to be passed
abbashus Mar 11, 2020
dd400f3
Add cluster settings for cursor - enabled, fetch_size, keep_alive
abbashus Mar 11, 2020
abf7d65
Add fetch_size and cursor params. fetch_size valisation
abbashus Mar 12, 2020
496335c
new SqlRequest constructor for cursor
abbashus Mar 12, 2020
68efc8b
Add logic to open scroll based on settings, fetch_size and limit values
abbashus Mar 12, 2020
e5bc18b
Add curosr close endpoint
abbashus Mar 12, 2020
6871b35
Some updates
abbashus Mar 16, 2020
7818bd6
Remove date formatting changes
abbashus Mar 18, 2020
9b82f75
Fix unit and integ tests, Ignored date format tests for a while, sync…
abbashus Mar 18, 2020
444fc06
Add cursor generation
abbashus Mar 18, 2020
d5a5263
Add test helper methods
abbashus Mar 18, 2020
fc1dc3a
Add test hepler method to get query with explicit fetch_size
abbashus Mar 18, 2020
1b46c33
Add some integ test cases
abbashus Mar 18, 2020
718a773
More integ tests, more test hepler methods
abbashus Mar 18, 2020
527f6f9
Add two more integ tests
abbashus Mar 18, 2020
c1a8642
Refactor
abbashus Mar 19, 2020
d6c623b
Cursor close API
abbashus Mar 19, 2020
dbaf8c1
Close cursor integ tests
abbashus Mar 19, 2020
3c0213e
Merge branch 'master' into cursor-one
abbashus Mar 19, 2020
b82f98f
Fix typo causing cursor close API to fail
abbashus Mar 19, 2020
f592e88
Remove commented code and add partial date formatting change
abbashus Mar 19, 2020
1d13a83
Remove unused import causing build failure
abbashus Mar 19, 2020
25a1749
Add error metrics when not able to close cursor
abbashus Mar 19, 2020
cbe4f93
Add indexname and fieldAliasMap to cursor context
abbashus Mar 19, 2020
c7df8df
Remove ignored test cases affected by date formatting changes
abbashus Mar 19, 2020
21f00ff
Remove unneeded interface, refactor CursorType enum
abbashus Mar 19, 2020
392defd
Remove logs, unneeded fields, comments, refactor
abbashus Mar 19, 2020
3cb9ffb
Add more assert and remove comments
abbashus Mar 19, 2020
edbb36c
Address comments
abbashus Mar 20, 2020
4d9a6e8
Merge branch 'master' into cursor-one
abbashus Mar 20, 2020
29941ad
Merge branch 'master' into cursor-one
abbashus Mar 24, 2020
afd251f
Disable cursor by default, remove comments, fixed tests
abbashus Mar 26, 2020
153d91a
Merge branch 'master' into cursor-one
abbashus Apr 3, 2020
16712b3
Fix cursor for parameterized request, add integration test for same
abbashus Apr 3, 2020
7c272ec
LIMIT changes
abbashus Apr 6, 2020
a285d10
Changes to handle different LIMIT cases
abbashus Apr 6, 2020
230f8e7
Add default cursor metrics
abbashus Apr 6, 2020
f48ab5f
Merge branch 'master' into cursor-one
abbashus Apr 7, 2020
0b7c5b6
Refactoring and integration tests
abbashus Apr 9, 2020
160754c
Merge branch 'master' into cursor-one
abbashus Apr 9, 2020
4bbbd5f
Address comments
abbashus Apr 9, 2020
641ed28
Add integration test on explain cursor
abbashus Apr 9, 2020
6bc00b1
Update monitoring, settings and endpoint docs
abbashus Apr 9, 2020
63868ec
Refactor cursor classes to separate package
abbashus Apr 9, 2020
579d12c
Add Lombok for DefaultCursor
abbashus Apr 9, 2020
3ef6d7a
Add unit test for DefaultCursor
abbashus Apr 9, 2020
cafa9d9
Update doc
abbashus Apr 10, 2020
06be5e5
Unit tests, bug fix , refactoring
abbashus Apr 11, 2020
72cbe34
Merge branch 'master' into cursor-one
abbashus Apr 11, 2020
14d2b7c
Updates
abbashus Apr 13, 2020
32bbfe5
Merge branch 'master' into cursor-one
abbashus Apr 13, 2020
e5f344b
Address comments
abbashus Apr 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class Select extends Query {
private Having having;
private List<Order> orderBys = new ArrayList<>();
private int offset;
private int rowCount = 200;
private Integer rowCount;
private boolean containsSubQueries;
private List<SubQueryExpression> subQueries;
private boolean selectAll = false;
Expand All @@ -59,6 +59,8 @@ public class Select extends Query {
public boolean isQuery = false;
public boolean isAggregate = false;

public static final int DEFAULT_LIMIT = 200;

public Select() {
}

Expand All @@ -70,7 +72,7 @@ public void setOffset(int offset) {
this.offset = offset;
}

public void setRowCount(int rowCount) {
public void setRowCount(Integer rowCount) {
this.rowCount = rowCount;
}

Expand Down Expand Up @@ -106,7 +108,7 @@ public int getOffset() {
return offset;
}

public int getRowCount() {
public Integer getRowCount() {
return rowCount;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.executor.cursor;

import com.amazon.opendistroforelasticsearch.sql.executor.Format;
import org.elasticsearch.rest.RestRequest;

public class CursorActionRequestRestExecutorFactory {
//TODO: add javadocs, see RestExecutor

public static CursorRestExecutor createExecutor(RestRequest request, String cursor, Format format) {
if (isCursorCloseRequest(request)) {
abbashus marked this conversation as resolved.
Show resolved Hide resolved
return new CursorAsyncRestExecutor(new CursorCloseExecutor(cursor));
} else {
return new CursorAsyncRestExecutor(new CursorResultExecutor(cursor, format));
}
}

private static boolean isCursorCloseRequest(final RestRequest request) {
return request.path().endsWith("/_sql/close");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.executor.cursor;

import com.amazon.opendistroforelasticsearch.sql.esdomain.LocalClusterState;
import com.amazon.opendistroforelasticsearch.sql.metrics.MetricName;
import com.amazon.opendistroforelasticsearch.sql.metrics.Metrics;
import com.amazon.opendistroforelasticsearch.sql.query.QueryAction;
import com.amazon.opendistroforelasticsearch.sql.query.join.BackOffRetryStrategy;
import com.amazon.opendistroforelasticsearch.sql.utils.LogUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.function.Predicate;

import static com.amazon.opendistroforelasticsearch.sql.plugin.SqlSettings.QUERY_SLOWLOG;

public class CursorAsyncRestExecutor implements CursorRestExecutor {
abbashus marked this conversation as resolved.
Show resolved Hide resolved
/**
* Custom thread pool name managed by ES
*/
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

private static final Logger LOG = LogManager.getLogger(CursorAsyncRestExecutor.class);

private static final Predicate<QueryAction> ALL_ACTION_IS_BLOCKING = anyAction -> true;

/**
* Delegated rest executor to async
*/
private final CursorRestExecutor executor;

/**
* Request type that expect to async to avoid blocking
*/
private final Predicate<QueryAction> isBlocking;

CursorAsyncRestExecutor(CursorRestExecutor executor) {
this(executor, ALL_ACTION_IS_BLOCKING);
}

CursorAsyncRestExecutor(CursorRestExecutor executor, Predicate<QueryAction> isBlocking) {
this.executor = executor;
this.isBlocking = isBlocking;
}


public void execute(Client client, Map<String, String> params, RestChannel channel) throws Exception {
LOG.info("executing something inside CursorAsyncRestExecutor execute ");
async(client, params, channel);
}

public String execute(Client client, Map<String, String> params) throws Exception {
abbashus marked this conversation as resolved.
Show resolved Hide resolved
return "string from CursorAsyncRestExecutor execute()";
}

/**
* Run given task in thread pool asynchronously
*/
private void async(Client client, Map<String, String> params, RestChannel channel) {

ThreadPool threadPool = client.threadPool();
Runnable runnable = () -> {
try {
doExecuteWithTimeMeasured(client, params, channel);
} catch (IOException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", LogUtils.getRequestId(),
e.getMessage());
e.printStackTrace();
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
} catch (IllegalStateException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.warn("[{}] [MCB] async task got a runtime exception: {}", LogUtils.getRequestId(),
e.getMessage());
e.printStackTrace();
channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE,
"Memory circuit is broken."));
} catch (Throwable t) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", LogUtils.getRequestId(),
t.getMessage());
t.printStackTrace();
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
String.valueOf(t.getMessage())));
} finally {
BackOffRetryStrategy.releaseMem(executor);
}
};

// Preserve context of calling thread to ensure headers of requests are forwarded when running blocking actions
threadPool.schedule(
threadPool.preserveContext(LogUtils.withCurrentContext(runnable)),
new TimeValue(0L),
SQL_WORKER_THREAD_POOL_NAME
);
}

/**
* Time the real execution of Executor and log slow query for troubleshooting
*/
private void doExecuteWithTimeMeasured(Client client,
Map<String, String> params,
RestChannel channel) throws Exception {
long startTime = System.nanoTime();
try {
executor.execute(client, params, channel);
} finally {
Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime);
int slowLogThreshold = LocalClusterState.state().getSettingValue(QUERY_SLOWLOG);
if (elapsed.getSeconds() >= slowLogThreshold) {
LOG.warn("[{}] Slow query: elapsed={} (ms)", LogUtils.getRequestId(), elapsed.toMillis());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.executor.cursor;

import com.amazon.opendistroforelasticsearch.sql.metrics.MetricName;
import com.amazon.opendistroforelasticsearch.sql.metrics.Metrics;
import com.amazon.opendistroforelasticsearch.sql.rewriter.matchtoterm.VerificationException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.json.JSONException;
import org.json.JSONObject;

import java.util.Base64;
import java.util.Map;

import static org.elasticsearch.rest.RestStatus.OK;

public class CursorCloseExecutor implements CursorRestExecutor {

private static final String SUCCEEDED_TRUE = "{\"succeeded\":true}";
private static final String SUCCEEDED_FALSE = "{\"succeeded\":false}";

private String cursorId;

public CursorCloseExecutor(String cursorId) {
this.cursorId = cursorId;
}

public void execute(Client client, Map<String, String> params, RestChannel channel) throws Exception {
try {
String formattedResponse = execute(client, params);
channel.sendResponse(new BytesRestResponse(OK, "application/json; charset=UTF-8", formattedResponse));
} catch (IllegalArgumentException | JSONException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_CUS).increment();
e.printStackTrace();
abbashus marked this conversation as resolved.
Show resolved Hide resolved
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (ElasticsearchException e) {
int status = (e.status().getStatus());
if (status > 399 && status < 500) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_CUS).increment();
} else if (status > 499) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
}
e.printStackTrace();
channel.sendResponse(new BytesRestResponse(channel, e));
}
}

public String execute(Client client, Map<String, String> params) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two concerns.

  1. The execute method is almost as same as CursorResultExecutor.execute, only difference is the LOG.error. Is it possible to avoid the duplication.
  2. In case we already define the Curser, could we construct CursorCloseExecutor by using cursor and encapsulate the Cursor construct operation in CursorFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will give a shot to refactor as described.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping it as it now, little tricky to refactor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, the cursor format ":" which should be encapsulated to Cursor implementation and should not been exposed to caller.
We can leave it for now for time consideration, Please consider add UT to cover the logic in CursorCloseExecutor and CursorResultExecutor. If possible, add the issue to tracking the refactoring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#426.

Adding UTs.

String decodedCursorContext = new String(Base64.getDecoder().decode(cursorId));
abbashus marked this conversation as resolved.
Show resolved Hide resolved
JSONObject cursorJson = new JSONObject(decodedCursorContext);
abbashus marked this conversation as resolved.
Show resolved Hide resolved

String type = cursorJson.optString("type", null); // see if it is a good case to use Optionals
CursorType cursorType = null;

if (type != null) {
cursorType = CursorType.valueOf(type);
}

if (cursorType!=null) {
switch(cursorType) {
case DEFAULT:
return handleDefaultCursorCloseRequest(client, cursorJson);
case AGGREGATION:
return handleAggregationCursorCloseRequest(client, cursorJson);
case JOIN:
return handleJoinCursorCloseRequest(client, cursorJson);
default: throw new VerificationException("Unsupported cursor");
}
}

throw new VerificationException("Invalid cursor");
abbashus marked this conversation as resolved.
Show resolved Hide resolved
}

private String handleDefaultCursorCloseRequest(Client client, JSONObject cursorContext) {
String scrollId = cursorContext.getString("scrollId");
ClearScrollResponse clearScrollResponse = client.prepareClearScroll().addScrollId(scrollId).get();
if (clearScrollResponse.isSucceeded()) {
return SUCCEEDED_TRUE;
} else {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
return SUCCEEDED_FALSE;
}
}

private String handleAggregationCursorCloseRequest(Client client, JSONObject cursorContext) {
return SUCCEEDED_TRUE;
}

private String handleJoinCursorCloseRequest(Client client, JSONObject cursorContext) {
return SUCCEEDED_FALSE;
}
abbashus marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/


package com.amazon.opendistroforelasticsearch.sql.executor.cursor;

import org.elasticsearch.client.Client;
import org.elasticsearch.rest.RestChannel;

import java.util.Map;

public interface CursorRestExecutor {
abbashus marked this conversation as resolved.
Show resolved Hide resolved

void execute(Client client, Map<String, String> params, RestChannel channel)
throws Exception;

String execute(Client client, Map<String, String> params) throws Exception;
}
Loading