Skip to content

Commit

Permalink
Add query planning time limit
Browse files Browse the repository at this point in the history
When the query state changes from PLANNING to FAILED,
due to error or explicit cancel, the planning thread will get
interrupted possibly freeing some of the resources.
  • Loading branch information
skrzypo987 committed Apr 21, 2021
1 parent 6998462 commit 8bbdb6c
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public final class SystemSessionProperties
public static final String QUERY_MAX_MEMORY = "query_max_memory";
public static final String QUERY_MAX_TOTAL_MEMORY = "query_max_total_memory";
public static final String QUERY_MAX_EXECUTION_TIME = "query_max_execution_time";
public static final String QUERY_MAX_PLANNING_TIME = "query_max_planning_time";
public static final String QUERY_MAX_RUN_TIME = "query_max_run_time";
public static final String RESOURCE_OVERCOMMIT = "resource_overcommit";
public static final String QUERY_MAX_CPU_TIME = "query_max_cpu_time";
Expand Down Expand Up @@ -261,6 +262,11 @@ public SystemSessionProperties(
"Maximum execution time of a query",
queryManagerConfig.getQueryMaxExecutionTime(),
false),
durationProperty(
QUERY_MAX_PLANNING_TIME,
"Maximum planning time of a query",
queryManagerConfig.getQueryMaxPlanningTime(),
false),
durationProperty(
QUERY_MAX_CPU_TIME,
"Maximum CPU time of a query",
Expand Down Expand Up @@ -737,6 +743,11 @@ public static Duration getQueryMaxExecutionTime(Session session)
return session.getSystemProperty(QUERY_MAX_EXECUTION_TIME, Duration.class);
}

public static Duration getQueryMaxPlanningTime(Session session)
{
return session.getSystemProperty(QUERY_MAX_PLANNING_TIME, Duration.class);
}

public static boolean resourceOvercommit(Session session)
{
return session.getSystemProperty(RESOURCE_OVERCOMMIT, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ public Optional<DateTime> getExecutionStartTime()
return getEndTime();
}

@Override
public Optional<Duration> getPlanningTime()
{
return Optional.empty();
}

@Override
public Optional<DateTime> getEndTime()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ public Optional<DateTime> getExecutionStartTime()
return stateMachine.getExecutionStartTime();
}

@Override
public Optional<Duration> getPlanningTime()
{
return stateMachine.getPlanningTime();
}

@Override
public Optional<DateTime> getEndTime()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ public QueryState getState()
return stateMachine.getQueryState();
}

@Override
public Optional<Duration> getPlanningTime()
{
return stateMachine.getPlanningTime();
}

public List<Expression> getParameters()
{
return parameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class QueryManagerConfig
private String queryExecutionPolicy = "all-at-once";
private Duration queryMaxRunTime = new Duration(100, TimeUnit.DAYS);
private Duration queryMaxExecutionTime = new Duration(100, TimeUnit.DAYS);
private Duration queryMaxPlanningTime = new Duration(10, TimeUnit.MINUTES);
private Duration queryMaxCpuTime = new Duration(1_000_000_000, TimeUnit.DAYS);
private Optional<DataSize> queryMaxScanPhysicalBytes = Optional.empty();

Expand Down Expand Up @@ -284,6 +285,19 @@ public QueryManagerConfig setQueryMaxExecutionTime(Duration queryMaxExecutionTim
return this;
}

@NotNull
public Duration getQueryMaxPlanningTime()
{
return queryMaxPlanningTime;
}

@Config("query.max-planning-time")
public QueryManagerConfig setQueryMaxPlanningTime(Duration queryMaxPlanningTime)
{
this.queryMaxPlanningTime = queryMaxPlanningTime;
return this;
}

@NotNull
@MinDuration("1ns")
public Duration getQueryMaxCpuTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,13 @@ public Optional<DateTime> getExecutionStartTime()
return queryStateTimer.getExecutionStartTime();
}

public Optional<Duration> getPlanningTime()
{
// Execution start time is empty if planning has not started
return queryStateTimer.getExecutionStartTime()
.map(ignored -> queryStateTimer.getPlanningTime());
}

public DateTime getLastHeartbeat()
{
return queryStateTimer.getLastHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ public Optional<DateTime> getExecutionStartTime()
return toDateTime(beginPlanningNanos);
}

public Optional<DateTime> getPlanningStartTime()
{
return toDateTime(beginPlanningNanos);
}

public Duration getElapsedTime()
{
if (endNanos.get() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import static com.google.common.base.Preconditions.checkState;
import static io.trino.SystemSessionProperties.getQueryMaxExecutionTime;
import static io.trino.SystemSessionProperties.getQueryMaxPlanningTime;
import static io.trino.SystemSessionProperties.getQueryMaxRunTime;
import static io.trino.spi.StandardErrorCode.ABANDONED_QUERY;
import static io.trino.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT;
Expand Down Expand Up @@ -178,11 +179,16 @@ private void enforceTimeLimits()
}
Duration queryMaxRunTime = getQueryMaxRunTime(query.getSession());
Duration queryMaxExecutionTime = getQueryMaxExecutionTime(query.getSession());
Duration queryMaxPlanningTime = getQueryMaxPlanningTime(query.getSession());
Optional<DateTime> executionStartTime = query.getExecutionStartTime();
Optional<Duration> planningTime = query.getPlanningTime();
DateTime createTime = query.getCreateTime();
if (executionStartTime.isPresent() && executionStartTime.get().plus(queryMaxExecutionTime.toMillis()).isBeforeNow()) {
query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded the maximum execution time limit of " + queryMaxExecutionTime));
}
planningTime
.filter(duration -> duration.compareTo(queryMaxPlanningTime) > 0)
.ifPresent(ignored -> query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded the maximum planning time limit of " + queryMaxPlanningTime)));
if (createTime.plus(queryMaxRunTime.toMillis()).isBeforeNow()) {
query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded maximum time limit of " + queryMaxRunTime));
}
Expand Down Expand Up @@ -288,6 +294,8 @@ public interface TrackedQuery

Optional<DateTime> getExecutionStartTime();

Optional<Duration> getPlanningTime();

DateTime getLastHeartbeat();

Optional<DateTime> getEndTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,18 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.succinctBytes;
import static io.trino.SystemSessionProperties.isEnableDynamicFiltering;
import static io.trino.execution.QueryState.FAILED;
import static io.trino.execution.QueryState.PLANNING;
import static io.trino.execution.buffer.OutputBuffers.BROADCAST_PARTITION_ID;
import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static io.trino.execution.scheduler.SqlQueryScheduler.createSqlQueryScheduler;
import static io.trino.server.DynamicFilterService.DynamicFiltersStats;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.sql.ParameterUtils.parameterExtractor;
import static java.lang.Thread.currentThread;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -335,6 +339,12 @@ public Optional<DateTime> getExecutionStartTime()
return stateMachine.getExecutionStartTime();
}

@Override
public Optional<Duration> getPlanningTime()
{
return stateMachine.getPlanningTime();
}

@Override
public DateTime getLastHeartbeat()
{
Expand Down Expand Up @@ -379,11 +389,33 @@ public void start()
return;
}

PlanRoot plan = planQuery();
// DynamicFilterService needs plan for query to be registered.
// Query should be registered before dynamic filter suppliers are requested in distribution planning.
registerDynamicFilteringQuery(plan);
planDistribution(plan);
AtomicReference<Thread> planningThread = new AtomicReference<>(currentThread());
stateMachine.getStateChange(PLANNING).addListener(() -> {
if (stateMachine.getQueryState() == FAILED) {
synchronized (this) {
Thread thread = planningThread.get();
if (thread != null) {
thread.interrupt();
}
}
}
}, directExecutor());

try {
PlanRoot plan = planQuery();
// DynamicFilterService needs plan for query to be registered.
// Query should be registered before dynamic filter suppliers are requested in distribution planning.
registerDynamicFilteringQuery(plan);
planDistribution(plan);
}
finally {
synchronized (this) {
planningThread.set(null);
// Clear the interrupted flag in case there was a race condition where
// the planning thread was interrupted right after planning completes above
Thread.interrupted();
}
}

if (!stateMachine.transitionToStarting()) {
// query already started or finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void testDefaults()
.setQueryExecutionPolicy("all-at-once")
.setQueryMaxRunTime(new Duration(100, DAYS))
.setQueryMaxExecutionTime(new Duration(100, DAYS))
.setQueryMaxPlanningTime(new Duration(10, MINUTES))
.setQueryMaxCpuTime(new Duration(1_000_000_000, DAYS))
.setQueryMaxScanPhysicalBytes(null)
.setRequiredWorkers(1)
Expand Down Expand Up @@ -81,6 +82,7 @@ public void testExplicitPropertyMappings()
.put("query.execution-policy", "phased")
.put("query.max-run-time", "2h")
.put("query.max-execution-time", "3h")
.put("query.max-planning-time", "1h")
.put("query.max-cpu-time", "2d")
.put("query.max-scan-physical-bytes", "1kB")
.put("query-manager.required-workers", "333")
Expand All @@ -106,6 +108,7 @@ public void testExplicitPropertyMappings()
.setQueryExecutionPolicy("phased")
.setQueryMaxRunTime(new Duration(2, HOURS))
.setQueryMaxExecutionTime(new Duration(3, HOURS))
.setQueryMaxPlanningTime(new Duration(1, HOURS))
.setQueryMaxCpuTime(new Duration(2, DAYS))
.setQueryMaxScanPhysicalBytes(DataSize.of(1, KILOBYTE))
.setRequiredWorkers(333)
Expand Down
12 changes: 12 additions & 0 deletions docs/src/main/sphinx/admin/properties-query-management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ The maximum allowed time for a query to be actively executing on the
cluster, before it is terminated. Compared to the run time below, execution
time does not include analysis, query planning or wait times in a queue.

``query.max-planning-time``
^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``duration``
* **Default value:** ``10 minutes``
* **Session property:** ``query_max_planning_time``

The maximum allowed time for a query to be actively planning the execution.
After this period the coordinator will make its best effort to stop the
query. Note that some operations in planning phase are not easily cancellable
and may not terminate immediately.

``query.max-run-time``
^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution;

import com.google.common.collect.ImmutableList;
import io.trino.Session;
import io.trino.connector.MockConnectorFactory;
import io.trino.spi.Plugin;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorFactory;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import java.util.concurrent.CountDownLatch;

import static io.trino.SystemSessionProperties.QUERY_MAX_PLANNING_TIME;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

// Tests need to finish before strict timeouts. Any background work
// may make them flaky
@Test(singleThreaded = true)
public class TestQueryTracker
extends AbstractTestQueryFramework
{
private final CountDownLatch freeze = new CountDownLatch(1);
private final CountDownLatch interrupted = new CountDownLatch(1);

@Override
@AfterClass
public void close()
{
freeze.countDown();
}

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
Session defaultSession = testSessionBuilder()
.setCatalog("mock")
.setSchema("default")
.setSystemProperty(QUERY_MAX_PLANNING_TIME, "1s")
.build();

DistributedQueryRunner queryRunner = DistributedQueryRunner
.builder(defaultSession)
.build();
queryRunner.installPlugin(new Plugin()
{
@Override
public Iterable<ConnectorFactory> getConnectorFactories()
{
return ImmutableList.of(MockConnectorFactory.builder()
.withGetColumns(ignored -> ImmutableList.of(new ColumnMetadata("col", VARCHAR)))
// Apply filter happens inside optimizer so this should model most blocking tasks in planning phase
.withApplyFilter((ignored1, ignored2, ignored3) -> freeze())
.build());
}
});
queryRunner.createCatalog("mock", "mock");

return queryRunner;
}

@Test(timeOut = 5_000)
public void testInterruptApplyFilter()
throws InterruptedException
{
assertThatThrownBy(() -> getQueryRunner().execute("SELECT * FROM t1 WHERE col = 'abc'"))
.hasMessageContaining("Query exceeded the maximum planning time limit of 1.00s");

interrupted.await();
}

private <T> T freeze()
{
try {
freeze.await();
}
catch (InterruptedException e) {
interrupted.countDown();
throw new RuntimeException(e);
}

return null;
}
}

0 comments on commit 8bbdb6c

Please sign in to comment.