diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index 8934bc074ff59..b10aee25723de 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -63,6 +63,7 @@ public final class SystemSessionProperties public static final String QUERY_MAX_MEMORY = "query_max_memory"; public static final String QUERY_MAX_TOTAL_MEMORY = "query_max_total_memory"; public static final String QUERY_MAX_EXECUTION_TIME = "query_max_execution_time"; + public static final String QUERY_MAX_PLANNING_TIME = "query_max_planning_time"; public static final String QUERY_MAX_RUN_TIME = "query_max_run_time"; public static final String RESOURCE_OVERCOMMIT = "resource_overcommit"; public static final String QUERY_MAX_CPU_TIME = "query_max_cpu_time"; @@ -261,6 +262,11 @@ public SystemSessionProperties( "Maximum execution time of a query", queryManagerConfig.getQueryMaxExecutionTime(), false), + durationProperty( + QUERY_MAX_PLANNING_TIME, + "Maximum planning time of a query", + queryManagerConfig.getQueryMaxPlanningTime(), + false), durationProperty( QUERY_MAX_CPU_TIME, "Maximum CPU time of a query", @@ -737,6 +743,11 @@ public static Duration getQueryMaxExecutionTime(Session session) return session.getSystemProperty(QUERY_MAX_EXECUTION_TIME, Duration.class); } + public static Duration getQueryMaxPlanningTime(Session session) + { + return session.getSystemProperty(QUERY_MAX_PLANNING_TIME, Duration.class); + } + public static boolean resourceOvercommit(Session session) { return session.getSystemProperty(RESOURCE_OVERCOMMIT, Boolean.class); diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java index 0a1aa71a55883..215a64ae58479 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java @@ -171,6 +171,12 @@ public Optional getExecutionStartTime() return getEndTime(); } + @Override + public Optional getPlanningTime() + { + return Optional.empty(); + } + @Override public Optional getEndTime() { diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/LocalDispatchQuery.java b/core/trino-main/src/main/java/io/trino/dispatcher/LocalDispatchQuery.java index 559d6ec006038..04db83f8d2a44 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/LocalDispatchQuery.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/LocalDispatchQuery.java @@ -218,6 +218,12 @@ public Optional getExecutionStartTime() return stateMachine.getExecutionStartTime(); } + @Override + public Optional getPlanningTime() + { + return stateMachine.getPlanningTime(); + } + @Override public Optional getEndTime() { diff --git a/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java b/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java index b6ca104dea457..0045ca29aef4f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java @@ -279,6 +279,12 @@ public QueryState getState() return stateMachine.getQueryState(); } + @Override + public Optional getPlanningTime() + { + return stateMachine.getPlanningTime(); + } + public List getParameters() { return parameters; diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index 67bd1abe871fe..7a3468663fd27 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -61,6 +61,7 @@ public class QueryManagerConfig private String queryExecutionPolicy = "all-at-once"; private Duration queryMaxRunTime = new Duration(100, TimeUnit.DAYS); private Duration queryMaxExecutionTime = new Duration(100, TimeUnit.DAYS); + private Duration queryMaxPlanningTime = new Duration(10, TimeUnit.MINUTES); private Duration queryMaxCpuTime = new Duration(1_000_000_000, TimeUnit.DAYS); private Optional queryMaxScanPhysicalBytes = Optional.empty(); @@ -284,6 +285,19 @@ public QueryManagerConfig setQueryMaxExecutionTime(Duration queryMaxExecutionTim return this; } + @NotNull + public Duration getQueryMaxPlanningTime() + { + return queryMaxPlanningTime; + } + + @Config("query.max-planning-time") + public QueryManagerConfig setQueryMaxPlanningTime(Duration queryMaxPlanningTime) + { + this.queryMaxPlanningTime = queryMaxPlanningTime; + return this; + } + @NotNull @MinDuration("1ns") public Duration getQueryMaxCpuTime() diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java index 0008670464f4b..95e741f3004b5 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java @@ -1009,6 +1009,13 @@ public Optional getExecutionStartTime() return queryStateTimer.getExecutionStartTime(); } + public Optional getPlanningTime() + { + // Execution start time is empty if planning has not started + return queryStateTimer.getExecutionStartTime() + .map(ignored -> queryStateTimer.getPlanningTime()); + } + public DateTime getLastHeartbeat() { return queryStateTimer.getLastHeartbeat(); diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateTimer.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateTimer.java index 8a6b96c0ac88e..31860e579e45c 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateTimer.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateTimer.java @@ -189,6 +189,11 @@ public Optional getExecutionStartTime() return toDateTime(beginPlanningNanos); } + public Optional getPlanningStartTime() + { + return toDateTime(beginPlanningNanos); + } + public Duration getElapsedTime() { if (endNanos.get() != null) { diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryTracker.java b/core/trino-main/src/main/java/io/trino/execution/QueryTracker.java index 270a32774f3bd..84b41314fb292 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryTracker.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryTracker.java @@ -38,6 +38,7 @@ import static com.google.common.base.Preconditions.checkState; import static io.trino.SystemSessionProperties.getQueryMaxExecutionTime; +import static io.trino.SystemSessionProperties.getQueryMaxPlanningTime; import static io.trino.SystemSessionProperties.getQueryMaxRunTime; import static io.trino.spi.StandardErrorCode.ABANDONED_QUERY; import static io.trino.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT; @@ -178,11 +179,16 @@ private void enforceTimeLimits() } Duration queryMaxRunTime = getQueryMaxRunTime(query.getSession()); Duration queryMaxExecutionTime = getQueryMaxExecutionTime(query.getSession()); + Duration queryMaxPlanningTime = getQueryMaxPlanningTime(query.getSession()); Optional executionStartTime = query.getExecutionStartTime(); + Optional planningTime = query.getPlanningTime(); DateTime createTime = query.getCreateTime(); if (executionStartTime.isPresent() && executionStartTime.get().plus(queryMaxExecutionTime.toMillis()).isBeforeNow()) { query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded the maximum execution time limit of " + queryMaxExecutionTime)); } + planningTime + .filter(duration -> duration.compareTo(queryMaxPlanningTime) > 0) + .ifPresent(ignored -> query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded the maximum planning time limit of " + queryMaxPlanningTime))); if (createTime.plus(queryMaxRunTime.toMillis()).isBeforeNow()) { query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded maximum time limit of " + queryMaxRunTime)); } @@ -288,6 +294,8 @@ public interface TrackedQuery Optional getExecutionStartTime(); + Optional getPlanningTime(); + DateTime getLastHeartbeat(); Optional getEndTime(); diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java index d6f842c0a7abe..8a7ed2291f9ee 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java @@ -84,14 +84,18 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.units.DataSize.succinctBytes; import static io.trino.SystemSessionProperties.isEnableDynamicFiltering; +import static io.trino.execution.QueryState.FAILED; +import static io.trino.execution.QueryState.PLANNING; import static io.trino.execution.buffer.OutputBuffers.BROADCAST_PARTITION_ID; import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; import static io.trino.execution.scheduler.SqlQueryScheduler.createSqlQueryScheduler; import static io.trino.server.DynamicFilterService.DynamicFiltersStats; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.sql.ParameterUtils.parameterExtractor; +import static java.lang.Thread.currentThread; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.SECONDS; @@ -335,6 +339,12 @@ public Optional getExecutionStartTime() return stateMachine.getExecutionStartTime(); } + @Override + public Optional getPlanningTime() + { + return stateMachine.getPlanningTime(); + } + @Override public DateTime getLastHeartbeat() { @@ -379,11 +389,33 @@ public void start() return; } - PlanRoot plan = planQuery(); - // DynamicFilterService needs plan for query to be registered. - // Query should be registered before dynamic filter suppliers are requested in distribution planning. - registerDynamicFilteringQuery(plan); - planDistribution(plan); + AtomicReference planningThread = new AtomicReference<>(currentThread()); + stateMachine.getStateChange(PLANNING).addListener(() -> { + if (stateMachine.getQueryState() == FAILED) { + synchronized (this) { + Thread thread = planningThread.get(); + if (thread != null) { + thread.interrupt(); + } + } + } + }, directExecutor()); + + try { + PlanRoot plan = planQuery(); + // DynamicFilterService needs plan for query to be registered. + // Query should be registered before dynamic filter suppliers are requested in distribution planning. + registerDynamicFilteringQuery(plan); + planDistribution(plan); + } + finally { + synchronized (this) { + planningThread.set(null); + // Clear the interrupted flag in case there was a race condition where + // the planning thread was interrupted right after planning completes above + Thread.interrupted(); + } + } if (!stateMachine.transitionToStarting()) { // query already started or finished diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java index b292499f05440..15a26cee44848 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java @@ -53,6 +53,7 @@ public void testDefaults() .setQueryExecutionPolicy("all-at-once") .setQueryMaxRunTime(new Duration(100, DAYS)) .setQueryMaxExecutionTime(new Duration(100, DAYS)) + .setQueryMaxPlanningTime(new Duration(10, MINUTES)) .setQueryMaxCpuTime(new Duration(1_000_000_000, DAYS)) .setQueryMaxScanPhysicalBytes(null) .setRequiredWorkers(1) @@ -81,6 +82,7 @@ public void testExplicitPropertyMappings() .put("query.execution-policy", "phased") .put("query.max-run-time", "2h") .put("query.max-execution-time", "3h") + .put("query.max-planning-time", "1h") .put("query.max-cpu-time", "2d") .put("query.max-scan-physical-bytes", "1kB") .put("query-manager.required-workers", "333") @@ -106,6 +108,7 @@ public void testExplicitPropertyMappings() .setQueryExecutionPolicy("phased") .setQueryMaxRunTime(new Duration(2, HOURS)) .setQueryMaxExecutionTime(new Duration(3, HOURS)) + .setQueryMaxPlanningTime(new Duration(1, HOURS)) .setQueryMaxCpuTime(new Duration(2, DAYS)) .setQueryMaxScanPhysicalBytes(DataSize.of(1, KILOBYTE)) .setRequiredWorkers(333) diff --git a/docs/src/main/sphinx/admin/properties-query-management.rst b/docs/src/main/sphinx/admin/properties-query-management.rst index c5c78d038bce2..441334f926ce5 100644 --- a/docs/src/main/sphinx/admin/properties-query-management.rst +++ b/docs/src/main/sphinx/admin/properties-query-management.rst @@ -13,6 +13,18 @@ The maximum allowed time for a query to be actively executing on the cluster, before it is terminated. Compared to the run time below, execution time does not include analysis, query planning or wait times in a queue. +``query.max-planning-time`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``duration`` +* **Default value:** ``10 minutes`` +* **Session property:** ``query_max_planning_time`` + +The maximum allowed time for a query to be actively planning the execution. +After this period the coordinator will make its best effort to stop the +query. Note that some operations in planning phase are not easily cancellable +and may not terminate immediately. + ``query.max-run-time`` ^^^^^^^^^^^^^^^^^^^^^^ diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestQueryTracker.java b/testing/trino-tests/src/test/java/io/trino/execution/TestQueryTracker.java new file mode 100644 index 0000000000000..3277f628afc50 --- /dev/null +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestQueryTracker.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution; + +import com.google.common.collect.ImmutableList; +import io.trino.Session; +import io.trino.connector.MockConnectorFactory; +import io.trino.spi.Plugin; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorFactory; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.util.concurrent.CountDownLatch; + +import static io.trino.SystemSessionProperties.QUERY_MAX_PLANNING_TIME; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +// Tests need to finish before strict timeouts. Any background work +// may make them flaky +@Test(singleThreaded = true) +public class TestQueryTracker + extends AbstractTestQueryFramework +{ + private final CountDownLatch freeze = new CountDownLatch(1); + private final CountDownLatch interrupted = new CountDownLatch(1); + + @Override + @AfterClass + public void close() + { + freeze.countDown(); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Session defaultSession = testSessionBuilder() + .setCatalog("mock") + .setSchema("default") + .setSystemProperty(QUERY_MAX_PLANNING_TIME, "1s") + .build(); + + DistributedQueryRunner queryRunner = DistributedQueryRunner + .builder(defaultSession) + .build(); + queryRunner.installPlugin(new Plugin() + { + @Override + public Iterable getConnectorFactories() + { + return ImmutableList.of(MockConnectorFactory.builder() + .withGetColumns(ignored -> ImmutableList.of(new ColumnMetadata("col", VARCHAR))) + // Apply filter happens inside optimizer so this should model most blocking tasks in planning phase + .withApplyFilter((ignored1, ignored2, ignored3) -> freeze()) + .build()); + } + }); + queryRunner.createCatalog("mock", "mock"); + + return queryRunner; + } + + @Test(timeOut = 5_000) + public void testInterruptApplyFilter() + throws InterruptedException + { + assertThatThrownBy(() -> getQueryRunner().execute("SELECT * FROM t1 WHERE col = 'abc'")) + .hasMessageContaining("Query exceeded the maximum planning time limit of 1.00s"); + + interrupted.await(); + } + + private T freeze() + { + try { + freeze.await(); + } + catch (InterruptedException e) { + interrupted.countDown(); + throw new RuntimeException(e); + } + + return null; + } +}