Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.16] SQL: fix use of requestTimeout and pageTimeout query parameters (#79360) #79915

Merged
merged 1 commit into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions docs/reference/sql/apis/sql-search-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ searches>> if you also specify the `wait_for_completion_timeout` parameter. If
the `wait_for_completion_timeout`. Defaults to `false`.

`page_timeout`::
(Optional, <<time-units,time value>>) Timeout before a
<<sql-pagination,pagination request>> fails. Defaults to `45s` (45 seconds).
(Optional, <<time-units,time value>>) Minimum retention period for the scroll
cursor. After this time period, a <<sql-pagination,pagination request>> might
fail because the scroll cursor is no longer available. Subsequent scroll requests
prolong the lifetime of the scroll cursor by the duration of `page_timeout` in
the scroll request. Defaults to `45s` (45 seconds).

`params`::
(Optional, array) Values for parameters in the `query`. For syntax, see
Expand Down
8 changes: 5 additions & 3 deletions docs/reference/sql/endpoints/jdbc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,14 @@ Connection timeout (in milliseconds). That is the maximum amount of time waiting
`network.timeout` (default `60000`)::
Network timeout (in milliseconds). That is the maximum amount of time waiting for the network.

`page.timeout` (default `45000`)::
Page timeout (in milliseconds). That is the maximum amount of time waiting for a page.

`page.size` (default `1000`)::
Page size (in entries). The number of results returned per page by the server.

`page.timeout` (default `45000`)::
Page timeout (in milliseconds). Minimum retention period for the scroll cursor on the server. Queries that require
a stateful scroll cursor on the server side might fail after this timeout. Hence, when scrolling through large result sets,
processing `page.size` records should not take longer than `page.timeout` milliseconds.

`query.timeout` (default `90000`)::
Query timeout (in milliseconds). That is the maximum amount of time waiting for a query to return.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.sql.action;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchService;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;

public class SqlSearchPageTimeoutIT extends AbstractSqlIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
// use static low keepAlive interval to ensure obsolete search contexts are pruned soon enough
settings.put(SearchService.KEEPALIVE_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200));
return settings.build();
}

public void testSearchContextIsCleanedUpAfterPageTimeoutForHitsQueries() throws Exception {
setupTestIndex();

SqlQueryResponse response = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).query("SELECT field FROM test")
.fetchSize(1)
.pageTimeout(TimeValue.timeValueMillis(100))
.get();

assertEquals(1, response.size());
assertTrue(response.hasCursor());
assertEquals(1, getNumberOfSearchContexts());

assertBusy(() -> assertEquals(0, getNumberOfSearchContexts()), 3, TimeUnit.SECONDS);

SearchPhaseExecutionException exception = expectThrows(
SearchPhaseExecutionException.class,
() -> new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).cursor(response.cursor()).get()
);

assertThat(Arrays.asList(exception.guessRootCauses()), contains(instanceOf(SearchContextMissingException.class)));
}

public void testNoSearchContextForAggregationQueries() throws InterruptedException {
setupTestIndex();

SqlQueryResponse response = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).query(
"SELECT COUNT(*) FROM test GROUP BY field"
).fetchSize(1).pageTimeout(TimeValue.timeValueMillis(100)).get();

assertEquals(1, response.size());
assertTrue(response.hasCursor());
assertEquals(0, getNumberOfSearchContexts());

Thread.sleep(1000);

// since aggregation queries do not have a stateful search context, scrolling is still possible after page_timeout
response = new SqlQueryRequestBuilder(client(), SqlQueryAction.INSTANCE).cursor(response.cursor()).get();

assertEquals(1, response.size());
}

private void setupTestIndex() {
assertAcked(client().admin().indices().prepareCreate("test").get());
client().prepareBulk()
.add(new IndexRequest("test").id("1").source("field", "bar"))
.add(new IndexRequest("test").id("2").source("field", "baz"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
ensureYellow("test");
}

private long getNumberOfSearchContexts() {
return client().admin()
.indices()
.prepareStats("test")
.clear()
.setSearch(true)
.get()
.getIndex("test")
.getTotal()
.getSearch()
.getOpenContexts();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public void nextPage(SqlConfiguration cfg, Cursor cursor, ActionListener<Page> l
}));
}

public void cleanCursor(SqlConfiguration cfg, Cursor cursor, ActionListener<Boolean> listener) {
cursor.clear(cfg, client, listener);
public void cleanCursor(Cursor cursor, ActionListener<Boolean> listener) {
cursor.clear(client, listener);
}

public Client client() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.elasticsearch.xpack.ql.util.StringUtils;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.querydsl.agg.Aggs;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -133,7 +133,7 @@ public void nextPage(SqlConfiguration cfg, Client client, NamedWriteableRegistry
log.trace("About to execute composite query {} on {}", StringUtils.toString(query), indices);
}

SearchRequest request = Querier.prepareRequest(client, query, cfg.pageTimeout(), includeFrozen, indices);
SearchRequest request = Querier.prepareRequest(query, cfg.requestTimeout(), includeFrozen, indices);

client.search(request, new ActionListener.Delegating<SearchResponse, Cursor.Page>(listener) {
@Override
Expand Down Expand Up @@ -267,7 +267,7 @@ private static byte[] serializeQuery(SearchSourceBuilder source) throws IOExcept


@Override
public void clear(SqlConfiguration cfg, Client client, ActionListener<Boolean> listener) {
public void clear(Client client, ActionListener<Boolean> listener) {
listener.onResponse(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
Expand All @@ -28,6 +27,7 @@
import org.elasticsearch.search.aggregations.bucket.filter.Filters;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ql.execution.search.FieldExtraction;
import org.elasticsearch.xpack.ql.execution.search.extractor.BucketExtractor;
import org.elasticsearch.xpack.ql.execution.search.extractor.ComputingExtractor;
Expand Down Expand Up @@ -93,7 +93,6 @@ public class Querier {

private final PlanExecutor planExecutor;
private final SqlConfiguration cfg;
private final TimeValue keepAlive, timeout;
private final int size;
private final Client client;
@Nullable
Expand All @@ -103,21 +102,14 @@ public Querier(SqlSession sqlSession) {
this.planExecutor = sqlSession.planExecutor();
this.client = sqlSession.client();
this.cfg = sqlSession.configuration();
this.keepAlive = cfg.requestTimeout();
this.timeout = cfg.pageTimeout();
this.filter = cfg.filter();
this.size = cfg.pageSize();
}

public void query(List<Attribute> output, QueryContainer query, String index, ActionListener<Page> listener) {
// prepare the request
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(query, filter, size);
// set query timeout
if (timeout.getSeconds() > 0) {
sourceBuilder.timeout(timeout);
}

// set runtime mappings
if (this.cfg.runtimeMappings() != null) {
sourceBuilder.runtimeMappings(this.cfg.runtimeMappings());
}
Expand All @@ -126,7 +118,7 @@ public void query(List<Attribute> output, QueryContainer query, String index, Ac
log.trace("About to execute query {} on {}", StringUtils.toString(sourceBuilder), index);
}

SearchRequest search = prepareRequest(client, sourceBuilder, timeout, query.shouldIncludeFrozen(),
SearchRequest search = prepareRequest(sourceBuilder, cfg.requestTimeout(), query.shouldIncludeFrozen(),
Strings.commaDelimitedListToStringArray(index));

@SuppressWarnings("rawtypes")
Expand All @@ -141,7 +133,7 @@ public void query(List<Attribute> output, QueryContainer query, String index, Ac
l = new CompositeActionListener(listener, client, cfg, output, query, search);
}
} else {
search.scroll(keepAlive);
search.scroll(cfg.pageTimeout());
l = new ScrollActionListener(listener, client, cfg, output, query);
}

Expand All @@ -152,7 +144,7 @@ public void query(List<Attribute> output, QueryContainer query, String index, Ac
client.search(search, l);
}

public static SearchRequest prepareRequest(Client client, SearchSourceBuilder source, TimeValue timeout, boolean includeFrozen,
public static SearchRequest prepareRequest(SearchSourceBuilder source, TimeValue timeout, boolean includeFrozen,
String... indices) {
source.timeout(timeout);

Expand Down Expand Up @@ -533,15 +525,13 @@ abstract static class BaseActionListener extends ActionListener.Delegating<Searc

final Client client;
final SqlConfiguration cfg;
final TimeValue keepAlive;
final Schema schema;

BaseActionListener(ActionListener<Page> listener, Client client, SqlConfiguration cfg, List<Attribute> output) {
super(listener);

this.client = client;
this.cfg = cfg;
this.keepAlive = cfg.requestTimeout();
this.schema = Rows.schema(output);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.ql.type.Schema;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Rows;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;

import java.io.IOException;
import java.util.BitSet;
Expand Down Expand Up @@ -100,13 +100,13 @@ public void nextPage(SqlConfiguration cfg, Client client, NamedWriteableRegistry
client.searchScroll(request, wrap(response -> {
handle(response, () -> new SearchHitRowSet(extractors, mask, limit, response),
p -> listener.onResponse(p),
p -> clear(cfg, client, wrap(success -> listener.onResponse(p), listener::onFailure)),
p -> clear(client, wrap(success -> listener.onResponse(p), listener::onFailure)),
Schema.EMPTY);
}, listener::onFailure));
}

@Override
public void clear(SqlConfiguration cfg, Client client, ActionListener<Boolean> listener) {
public void clear(Client client, ActionListener<Boolean> listener) {
cleanCursor(client, scrollId, wrap(
clearScrollResponse -> listener.onResponse(clearScrollResponse.isSucceeded()),
listener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.sql.action.BasicFormatter;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;

import java.io.IOException;
import java.util.Objects;
Expand Down Expand Up @@ -59,8 +59,8 @@ public void nextPage(SqlConfiguration cfg, Client client, NamedWriteableRegistry
}

@Override
public void clear(SqlConfiguration cfg, Client client, ActionListener<Boolean> listener) {
delegate.clear(cfg, client, listener);
public void clear(Client client, ActionListener<Boolean> listener) {
delegate.clear(client, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,12 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ql.util.StringUtils;
import org.elasticsearch.xpack.sql.action.SqlClearCursorRequest;
import org.elasticsearch.xpack.sql.action.SqlClearCursorResponse;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.proto.Protocol;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Cursors;
import org.elasticsearch.xpack.sql.session.SqlConfiguration;
import org.elasticsearch.xpack.sql.util.DateUtils;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.sql.action.SqlClearCursorAction.NAME;

public class TransportSqlClearCursorAction extends HandledTransportAction<SqlClearCursorRequest, SqlClearCursorResponse> {
Expand All @@ -47,11 +42,9 @@ public static void operation(PlanExecutor planExecutor, SqlClearCursorRequest re
ActionListener<SqlClearCursorResponse> listener) {
Cursor cursor = Cursors.decodeFromStringWithZone(request.getCursor()).v1();
planExecutor.cleanCursor(
new SqlConfiguration(DateUtils.UTC, null, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT, Protocol.PAGE_TIMEOUT, null,
emptyMap(), request.mode(), StringUtils.EMPTY, request.version(), StringUtils.EMPTY, StringUtils.EMPTY,
Protocol.FIELD_MULTI_VALUE_LENIENCY, Protocol.INDEX_INCLUDE_FROZEN),
cursor, ActionListener.wrap(
success -> listener.onResponse(new SqlClearCursorResponse(success)), listener::onFailure));
cursor,
ActionListener.<Boolean>wrap(success -> listener.onResponse(new SqlClearCursorResponse(success)), listener::onFailure)
);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -112,8 +112,7 @@ public static void operation(PlanExecutor planExecutor, SqlQueryTask task, SqlQu
SqlConfiguration cfg = new SqlConfiguration(request.zoneId(), request.catalog(), request.fetchSize(), request.requestTimeout(),
request.pageTimeout(), request.filter(), request.runtimeMappings(), request.mode(), request.clientId(), request.version(),
username, clusterName(clusterService), request.fieldMultiValueLeniency(), request.indexIncludeFrozen(),
new TaskId(clusterService.localNode().getId(), task.getId()), task,
request.waitForCompletionTimeout(), request.keepOnCompletion(), request.keepAlive());
new TaskId(clusterService.localNode().getId(), task.getId()), task);

if (Strings.hasText(request.cursor()) == false) {
executeRequestWithRetryAttempt(clusterService, listener::onFailure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void doExecute(Task task, SqlTranslateRequest request, ActionListener<
request.requestTimeout(), request.pageTimeout(), request.filter(), request.runtimeMappings(),
request.mode(), request.clientId(), request.version(),
username(securityContext), clusterName(clusterService), Protocol.FIELD_MULTI_VALUE_LENIENCY,
Protocol.INDEX_INCLUDE_FROZEN);
Protocol.INDEX_INCLUDE_FROZEN, null, null);

planExecutor.searchSource(cfg, request.query(), request.params(), ActionListener.wrap(
searchSourceBuilder -> listener.onResponse(new SqlTranslateResponse(searchSourceBuilder)), listener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ public static Page last(RowSet rowSet) {
/**
* Cleans the resources associated with the cursor
*/
void clear(SqlConfiguration cfg, Client client, ActionListener<Boolean> listener);
void clear(Client client, ActionListener<Boolean> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void nextPage(SqlConfiguration cfg, Client client, NamedWriteableRegistry
}

@Override
public void clear(SqlConfiguration cfg, Client client, ActionListener<Boolean> listener) {
public void clear(Client client, ActionListener<Boolean> listener) {
// There is nothing to clean
listener.onResponse(false);
}
Expand Down
Loading