From c8e99e3e07e7fa8c0491bdca8743e7592ea073c3 Mon Sep 17 00:00:00 2001 From: skrzypo987 Date: Fri, 26 Mar 2021 12:00:15 +0100 Subject: [PATCH] Add query planning time limit When the query state changes from PLANNING to FAILED, due to error or explicit cancel, the planning thread will get interrupted possibly freeing some of the resources. --- .../io/trino/SystemSessionProperties.java | 11 ++ .../trino/dispatcher/FailedDispatchQuery.java | 6 + .../trino/dispatcher/LocalDispatchQuery.java | 6 + .../execution/DataDefinitionExecution.java | 6 + .../trino/execution/QueryManagerConfig.java | 14 +++ .../io/trino/execution/QueryStateMachine.java | 7 ++ .../io/trino/execution/QueryStateTimer.java | 5 + .../java/io/trino/execution/QueryTracker.java | 8 ++ .../io/trino/execution/SqlQueryExecution.java | 42 ++++++- .../execution/TestQueryManagerConfig.java | 3 + .../admin/properties-query-management.rst | 12 ++ .../io/trino/execution/TestQueryTracker.java | 103 ++++++++++++++++++ 12 files changed, 218 insertions(+), 5 deletions(-) create mode 100644 testing/trino-tests/src/test/java/io/trino/execution/TestQueryTracker.java diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index 8934bc074ff59..b10aee25723de 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -63,6 +63,7 @@ public final class SystemSessionProperties public static final String QUERY_MAX_MEMORY = "query_max_memory"; public static final String QUERY_MAX_TOTAL_MEMORY = "query_max_total_memory"; public static final String QUERY_MAX_EXECUTION_TIME = "query_max_execution_time"; + public static final String QUERY_MAX_PLANNING_TIME = "query_max_planning_time"; public static final String QUERY_MAX_RUN_TIME = "query_max_run_time"; public static final String RESOURCE_OVERCOMMIT = "resource_overcommit"; public static final String QUERY_MAX_CPU_TIME = "query_max_cpu_time"; @@ -261,6 +262,11 @@ public SystemSessionProperties( "Maximum execution time of a query", queryManagerConfig.getQueryMaxExecutionTime(), false), + durationProperty( + QUERY_MAX_PLANNING_TIME, + "Maximum planning time of a query", + queryManagerConfig.getQueryMaxPlanningTime(), + false), durationProperty( QUERY_MAX_CPU_TIME, "Maximum CPU time of a query", @@ -737,6 +743,11 @@ public static Duration getQueryMaxExecutionTime(Session session) return session.getSystemProperty(QUERY_MAX_EXECUTION_TIME, Duration.class); } + public static Duration getQueryMaxPlanningTime(Session session) + { + return session.getSystemProperty(QUERY_MAX_PLANNING_TIME, Duration.class); + } + public static boolean resourceOvercommit(Session session) { return session.getSystemProperty(RESOURCE_OVERCOMMIT, Boolean.class); diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java index 0a1aa71a55883..215a64ae58479 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java @@ -171,6 +171,12 @@ public Optional getExecutionStartTime() return getEndTime(); } + @Override + public Optional getPlanningTime() + { + return Optional.empty(); + } + @Override public Optional getEndTime() { diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/LocalDispatchQuery.java b/core/trino-main/src/main/java/io/trino/dispatcher/LocalDispatchQuery.java index 559d6ec006038..04db83f8d2a44 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/LocalDispatchQuery.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/LocalDispatchQuery.java @@ -218,6 +218,12 @@ public Optional getExecutionStartTime() return stateMachine.getExecutionStartTime(); } + @Override + public Optional getPlanningTime() + { + return stateMachine.getPlanningTime(); + } + @Override public Optional getEndTime() { diff --git a/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java b/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java index b6ca104dea457..0045ca29aef4f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java @@ -279,6 +279,12 @@ public QueryState getState() return stateMachine.getQueryState(); } + @Override + public Optional getPlanningTime() + { + return stateMachine.getPlanningTime(); + } + public List getParameters() { return parameters; diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index 67bd1abe871fe..7a3468663fd27 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -61,6 +61,7 @@ public class QueryManagerConfig private String queryExecutionPolicy = "all-at-once"; private Duration queryMaxRunTime = new Duration(100, TimeUnit.DAYS); private Duration queryMaxExecutionTime = new Duration(100, TimeUnit.DAYS); + private Duration queryMaxPlanningTime = new Duration(10, TimeUnit.MINUTES); private Duration queryMaxCpuTime = new Duration(1_000_000_000, TimeUnit.DAYS); private Optional queryMaxScanPhysicalBytes = Optional.empty(); @@ -284,6 +285,19 @@ public QueryManagerConfig setQueryMaxExecutionTime(Duration queryMaxExecutionTim return this; } + @NotNull + public Duration getQueryMaxPlanningTime() + { + return queryMaxPlanningTime; + } + + @Config("query.max-planning-time") + public QueryManagerConfig setQueryMaxPlanningTime(Duration queryMaxPlanningTime) + { + this.queryMaxPlanningTime = queryMaxPlanningTime; + return this; + } + @NotNull @MinDuration("1ns") public Duration getQueryMaxCpuTime() diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java index 0008670464f4b..95e741f3004b5 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java @@ -1009,6 +1009,13 @@ public Optional getExecutionStartTime() return queryStateTimer.getExecutionStartTime(); } + public Optional getPlanningTime() + { + // Execution start time is empty if planning has not started + return queryStateTimer.getExecutionStartTime() + .map(ignored -> queryStateTimer.getPlanningTime()); + } + public DateTime getLastHeartbeat() { return queryStateTimer.getLastHeartbeat(); diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateTimer.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateTimer.java index 8a6b96c0ac88e..31860e579e45c 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateTimer.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateTimer.java @@ -189,6 +189,11 @@ public Optional getExecutionStartTime() return toDateTime(beginPlanningNanos); } + public Optional getPlanningStartTime() + { + return toDateTime(beginPlanningNanos); + } + public Duration getElapsedTime() { if (endNanos.get() != null) { diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryTracker.java b/core/trino-main/src/main/java/io/trino/execution/QueryTracker.java index 270a32774f3bd..84b41314fb292 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryTracker.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryTracker.java @@ -38,6 +38,7 @@ import static com.google.common.base.Preconditions.checkState; import static io.trino.SystemSessionProperties.getQueryMaxExecutionTime; +import static io.trino.SystemSessionProperties.getQueryMaxPlanningTime; import static io.trino.SystemSessionProperties.getQueryMaxRunTime; import static io.trino.spi.StandardErrorCode.ABANDONED_QUERY; import static io.trino.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT; @@ -178,11 +179,16 @@ private void enforceTimeLimits() } Duration queryMaxRunTime = getQueryMaxRunTime(query.getSession()); Duration queryMaxExecutionTime = getQueryMaxExecutionTime(query.getSession()); + Duration queryMaxPlanningTime = getQueryMaxPlanningTime(query.getSession()); Optional executionStartTime = query.getExecutionStartTime(); + Optional planningTime = query.getPlanningTime(); DateTime createTime = query.getCreateTime(); if (executionStartTime.isPresent() && executionStartTime.get().plus(queryMaxExecutionTime.toMillis()).isBeforeNow()) { query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded the maximum execution time limit of " + queryMaxExecutionTime)); } + planningTime + .filter(duration -> duration.compareTo(queryMaxPlanningTime) > 0) + .ifPresent(ignored -> query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded the maximum planning time limit of " + queryMaxPlanningTime))); if (createTime.plus(queryMaxRunTime.toMillis()).isBeforeNow()) { query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded maximum time limit of " + queryMaxRunTime)); } @@ -288,6 +294,8 @@ public interface TrackedQuery Optional getExecutionStartTime(); + Optional getPlanningTime(); + DateTime getLastHeartbeat(); Optional getEndTime(); diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java index d6f842c0a7abe..8a7ed2291f9ee 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java @@ -84,14 +84,18 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.units.DataSize.succinctBytes; import static io.trino.SystemSessionProperties.isEnableDynamicFiltering; +import static io.trino.execution.QueryState.FAILED; +import static io.trino.execution.QueryState.PLANNING; import static io.trino.execution.buffer.OutputBuffers.BROADCAST_PARTITION_ID; import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; import static io.trino.execution.scheduler.SqlQueryScheduler.createSqlQueryScheduler; import static io.trino.server.DynamicFilterService.DynamicFiltersStats; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.sql.ParameterUtils.parameterExtractor; +import static java.lang.Thread.currentThread; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.SECONDS; @@ -335,6 +339,12 @@ public Optional getExecutionStartTime() return stateMachine.getExecutionStartTime(); } + @Override + public Optional getPlanningTime() + { + return stateMachine.getPlanningTime(); + } + @Override public DateTime getLastHeartbeat() { @@ -379,11 +389,33 @@ public void start() return; } - PlanRoot plan = planQuery(); - // DynamicFilterService needs plan for query to be registered. - // Query should be registered before dynamic filter suppliers are requested in distribution planning. - registerDynamicFilteringQuery(plan); - planDistribution(plan); + AtomicReference planningThread = new AtomicReference<>(currentThread()); + stateMachine.getStateChange(PLANNING).addListener(() -> { + if (stateMachine.getQueryState() == FAILED) { + synchronized (this) { + Thread thread = planningThread.get(); + if (thread != null) { + thread.interrupt(); + } + } + } + }, directExecutor()); + + try { + PlanRoot plan = planQuery(); + // DynamicFilterService needs plan for query to be registered. + // Query should be registered before dynamic filter suppliers are requested in distribution planning. + registerDynamicFilteringQuery(plan); + planDistribution(plan); + } + finally { + synchronized (this) { + planningThread.set(null); + // Clear the interrupted flag in case there was a race condition where + // the planning thread was interrupted right after planning completes above + Thread.interrupted(); + } + } if (!stateMachine.transitionToStarting()) { // query already started or finished diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java index b292499f05440..15a26cee44848 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java @@ -53,6 +53,7 @@ public void testDefaults() .setQueryExecutionPolicy("all-at-once") .setQueryMaxRunTime(new Duration(100, DAYS)) .setQueryMaxExecutionTime(new Duration(100, DAYS)) + .setQueryMaxPlanningTime(new Duration(10, MINUTES)) .setQueryMaxCpuTime(new Duration(1_000_000_000, DAYS)) .setQueryMaxScanPhysicalBytes(null) .setRequiredWorkers(1) @@ -81,6 +82,7 @@ public void testExplicitPropertyMappings() .put("query.execution-policy", "phased") .put("query.max-run-time", "2h") .put("query.max-execution-time", "3h") + .put("query.max-planning-time", "1h") .put("query.max-cpu-time", "2d") .put("query.max-scan-physical-bytes", "1kB") .put("query-manager.required-workers", "333") @@ -106,6 +108,7 @@ public void testExplicitPropertyMappings() .setQueryExecutionPolicy("phased") .setQueryMaxRunTime(new Duration(2, HOURS)) .setQueryMaxExecutionTime(new Duration(3, HOURS)) + .setQueryMaxPlanningTime(new Duration(1, HOURS)) .setQueryMaxCpuTime(new Duration(2, DAYS)) .setQueryMaxScanPhysicalBytes(DataSize.of(1, KILOBYTE)) .setRequiredWorkers(333) diff --git a/docs/src/main/sphinx/admin/properties-query-management.rst b/docs/src/main/sphinx/admin/properties-query-management.rst index c5c78d038bce2..441334f926ce5 100644 --- a/docs/src/main/sphinx/admin/properties-query-management.rst +++ b/docs/src/main/sphinx/admin/properties-query-management.rst @@ -13,6 +13,18 @@ The maximum allowed time for a query to be actively executing on the cluster, before it is terminated. Compared to the run time below, execution time does not include analysis, query planning or wait times in a queue. +``query.max-planning-time`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``duration`` +* **Default value:** ``10 minutes`` +* **Session property:** ``query_max_planning_time`` + +The maximum allowed time for a query to be actively planning the execution. +After this period the coordinator will make its best effort to stop the +query. Note that some operations in planning phase are not easily cancellable +and may not terminate immediately. + ``query.max-run-time`` ^^^^^^^^^^^^^^^^^^^^^^ diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestQueryTracker.java b/testing/trino-tests/src/test/java/io/trino/execution/TestQueryTracker.java new file mode 100644 index 0000000000000..3277f628afc50 --- /dev/null +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestQueryTracker.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution; + +import com.google.common.collect.ImmutableList; +import io.trino.Session; +import io.trino.connector.MockConnectorFactory; +import io.trino.spi.Plugin; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorFactory; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.util.concurrent.CountDownLatch; + +import static io.trino.SystemSessionProperties.QUERY_MAX_PLANNING_TIME; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +// Tests need to finish before strict timeouts. Any background work +// may make them flaky +@Test(singleThreaded = true) +public class TestQueryTracker + extends AbstractTestQueryFramework +{ + private final CountDownLatch freeze = new CountDownLatch(1); + private final CountDownLatch interrupted = new CountDownLatch(1); + + @Override + @AfterClass + public void close() + { + freeze.countDown(); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Session defaultSession = testSessionBuilder() + .setCatalog("mock") + .setSchema("default") + .setSystemProperty(QUERY_MAX_PLANNING_TIME, "1s") + .build(); + + DistributedQueryRunner queryRunner = DistributedQueryRunner + .builder(defaultSession) + .build(); + queryRunner.installPlugin(new Plugin() + { + @Override + public Iterable getConnectorFactories() + { + return ImmutableList.of(MockConnectorFactory.builder() + .withGetColumns(ignored -> ImmutableList.of(new ColumnMetadata("col", VARCHAR))) + // Apply filter happens inside optimizer so this should model most blocking tasks in planning phase + .withApplyFilter((ignored1, ignored2, ignored3) -> freeze()) + .build()); + } + }); + queryRunner.createCatalog("mock", "mock"); + + return queryRunner; + } + + @Test(timeOut = 5_000) + public void testInterruptApplyFilter() + throws InterruptedException + { + assertThatThrownBy(() -> getQueryRunner().execute("SELECT * FROM t1 WHERE col = 'abc'")) + .hasMessageContaining("Query exceeded the maximum planning time limit of 1.00s"); + + interrupted.await(); + } + + private T freeze() + { + try { + freeze.await(); + } + catch (InterruptedException e) { + interrupted.countDown(); + throw new RuntimeException(e); + } + + return null; + } +}