Skip to content

Commit

Permalink
Support query retry for HBO
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavmuk04 committed Aug 2, 2024
1 parent 27a3384 commit 106cc4b
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public final class SystemSessionProperties
public static final String OPTIMIZE_HASH_GENERATION = "optimize_hash_generation";
public static final String JOIN_DISTRIBUTION_TYPE = "join_distribution_type";
public static final String JOIN_MAX_BROADCAST_TABLE_SIZE = "join_max_broadcast_table_size";
public static final String RETRY_QUERY_WITH_HISTORY_BASED_OPTIMIZATION = "retry_query_with_history_based_optimization";
public static final String SIZE_BASED_JOIN_DISTRIBUTION_TYPE = "size_based_join_distribution_type";
public static final String DISTRIBUTED_JOIN = "distributed_join";
public static final String DISTRIBUTED_INDEX_JOIN = "distributed_index_join";
Expand Down Expand Up @@ -432,6 +433,10 @@ public SystemSessionProperties(
"Enable confidence based broadcasting when enabled",
featuresConfig.isConfidenceBasedBroadcastEnabled(),
false),
booleanProperty(RETRY_QUERY_WITH_HISTORY_BASED_OPTIMIZATION,
"Automatically retry a query if it fails and HBO can change the query plan",
featuresConfig.isRetryQueryWithHistoryBasedOptimizationEnabled(),
false),
booleanProperty(
TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED,
"Treat low confidence zero estimations as unknowns during joins when enabled",
Expand Down Expand Up @@ -2057,6 +2062,11 @@ public static boolean treatLowConfidenceZeroEstimationAsUnknownEnabled(Session s
return session.getSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, Boolean.class);
}

public static boolean retryQueryWithHistoryBasedOptimizationEnabled(Session session)
{
return session.getSystemProperty(RETRY_QUERY_WITH_HISTORY_BASED_OPTIMIZATION, Boolean.class);
}

public static int getHashPartitionCount(Session session)
{
return session.getSystemProperty(HASH_PARTITION_COUNT, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.buffer.PagesSerdeFactory;
import com.facebook.presto.operator.ExchangeClient;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.function.SqlInvokedFunction;
Expand Down Expand Up @@ -79,6 +80,9 @@
import static com.facebook.presto.SystemSessionProperties.getTargetResultSize;
import static com.facebook.presto.SystemSessionProperties.isExchangeChecksumEnabled;
import static com.facebook.presto.SystemSessionProperties.isExchangeCompressionEnabled;
import static com.facebook.presto.SystemSessionProperties.retryQueryWithHistoryBasedOptimizationEnabled;
import static com.facebook.presto.SystemSessionProperties.trackHistoryBasedPlanStatisticsEnabled;
import static com.facebook.presto.SystemSessionProperties.useHistoryBasedPlanStatisticsEnabled;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.WAITING_FOR_PREREQUISITES;
import static com.facebook.presto.server.protocol.QueryResourceUtil.toStatementStats;
Expand All @@ -98,6 +102,9 @@ class Query
{
private static final Logger log = Logger.get(Query.class);
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
private static Optional<QueryId> originalBeforeRetryQueryId = Optional.empty();
private static Optional<Integer> previousQueryTopLevelPlanHash = Optional.empty();
private static Optional<QueryError> previousQueryFailureError = Optional.empty();

private final QueryManager queryManager;
private final TransactionManager transactionManager;
Expand Down Expand Up @@ -383,27 +390,40 @@ private synchronized Optional<QueryResults> getCachedResult(long token)
private synchronized QueryResults getNextResultWithRetry(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize, boolean binaryResults)
{
QueryResults queryResults = getNextResult(token, uriInfo, scheme, targetResultSize, binaryResults);
if (queryResults.getError() == null || !queryResults.getError().isRetriable()) {
return queryResults;
}

// check if we have exceeded the global limit
retryCircuitBreaker.incrementFailure();
if (!retryCircuitBreaker.isRetryAllowed() || hasProducedResult) {
if (queryResults.getError() == null) {
return queryResults;
}

// check if we have exceeded the local limit
if (queryManager.getQueryRetryCount(queryId) >= getQueryRetryLimit(session) ||
queryManager.getQueryInfo(queryId).getQueryStats().getExecutionTime().toMillis() > getQueryRetryMaxExecutionTime(session).toMillis()) {
return queryResults;
boolean historyBasedOptimizationEnabled = useHistoryBasedPlanStatisticsEnabled(session) && trackHistoryBasedPlanStatisticsEnabled(session);
boolean hasNotRetried = queryManager.getQueryRetryCount(queryId) < 1;

if (historyBasedOptimizationEnabled && hasNotRetried && retryConditionsMet(queryResults) && retryQueryWithHistoryBasedOptimizationEnabled(session)) {
originalBeforeRetryQueryId = Optional.of(queryId);
previousQueryTopLevelPlanHash = getCurrentTopLevelPlanHash();
previousQueryFailureError = Optional.of(queryResults.getError());
}
else if (queryManager.getQueryRetryCount(queryId) == 1 && retryQueryWithHistoryBasedOptimizationEnabled(session) && retryConditionsMet(queryResults) && historyBasedOptimizationEnabled) {
Optional<Integer> currentTopLevelPlanHash = getCurrentTopLevelPlanHash();

if (previousQueryTopLevelPlanHash.isPresent() && currentTopLevelPlanHash.isPresent() && currentTopLevelPlanHash.equals(previousQueryTopLevelPlanHash)
|| (!previousQueryTopLevelPlanHash.isPresent() && !currentTopLevelPlanHash.isPresent())) {
queryManager.failQuery(queryId, new PrestoException(GENERIC_INTERNAL_ERROR, "Since the plan hashes did not change, your retry query will not execute." +
"Your original error was " + previousQueryFailureError.get() + ". Original QueryId: " + originalBeforeRetryQueryId +
". Retry QueryId: " + queryId));
}

originalBeforeRetryQueryId = Optional.empty();
previousQueryTopLevelPlanHash = Optional.empty();
previousQueryFailureError = Optional.empty();

// no support for transactions
if (session.getTransactionId().isPresent() &&
!transactionManager.getOptionalTransactionInfo(session.getRequiredTransactionId()).map(TransactionInfo::isAutoCommitContext).orElse(true)) {
return queryResults;
}
else {
if (!retryConditionsMet(queryResults)) {
return queryResults;
}
}

// build a new query with next uri
// we expect failed nodes have been removed from discovery server upon query failure
Expand Down Expand Up @@ -687,6 +707,54 @@ private static URI findCancelableLeafStage(StageInfo stage)
return stage.getSelf();
}

private boolean retryConditionsMet(QueryResults queryResults)
{
if (queryResults.getError() == null) {
return false;
}

if (!retryQueryWithHistoryBasedOptimizationEnabled(session)) {
if (!queryResults.getError().isRetriable()) {
return false;
}

// check if we have exceeded the global limit
retryCircuitBreaker.incrementFailure();
if (!retryCircuitBreaker.isRetryAllowed()) {
return false;
}

if (queryManager.getQueryRetryCount(queryId) >= getQueryRetryLimit(session)) {
return false;
}
}

if (hasProducedResult) {
return false;
}

// check if we have exceeded the local limit
if (queryManager.getQueryInfo(queryId).getQueryStats().getExecutionTime().toMillis() > getQueryRetryMaxExecutionTime(session).toMillis()) {
return false;
}

// no support for transactions
if (session.getTransactionId().isPresent() &&
!transactionManager.getOptionalTransactionInfo(session.getRequiredTransactionId()).map(TransactionInfo::isAutoCommitContext).orElse(true)) {
return false;
}

return true;
}

private Optional<Integer> getCurrentTopLevelPlanHash()
{
if (queryManager.getFullQueryInfo(queryId).getPlanCanonicalInfo().isEmpty()) {
return Optional.empty();
}
return Optional.of(queryManager.getFullQueryInfo(queryId).getPlanCanonicalInfo().get(0).getCanonicalPlan().getPlan().hashCode());
}

private static QueryError toQueryError(QueryInfo queryInfo)
{
QueryState state = queryInfo.getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public class FeaturesConfig
private boolean optimizeNullsInJoin;
private boolean optimizePayloadJoins;
private boolean confidenceBasedBroadcastEnabled;
private boolean retryQueryWithHistoryBasedOptimizationEnabled;
private boolean treatLowConfidenceZeroEstimationAsUnknownEnabled;
private boolean pushdownDereferenceEnabled;
private boolean inlineSqlFunctions = true;
Expand Down Expand Up @@ -1266,6 +1267,18 @@ public FeaturesConfig setConfidenceBasedBroadcastEnabled(boolean confidenceBased
return this;
}

public boolean isRetryQueryWithHistoryBasedOptimizationEnabled()
{
return retryQueryWithHistoryBasedOptimizationEnabled;
}

@Config("optimizer.retry-query-with-history-based-optimization")
public FeaturesConfig setRetryQueryWithHistoryBasedOptimizationEnabled(boolean retryQueryWithHistoryBasedOptimizationEnabled)
{
this.retryQueryWithHistoryBasedOptimizationEnabled = retryQueryWithHistoryBasedOptimizationEnabled;
return this;
}

public boolean isTreatLowConfidenceZeroEstimationAsUnknownEnabled()
{
return treatLowConfidenceZeroEstimationAsUnknownEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void testDefaults()
.setPushTableWriteThroughUnion(true)
.setDictionaryAggregation(false)
.setConfidenceBasedBroadcastEnabled(false)
.setRetryQueryWithHistoryBasedOptimizationEnabled(false)
.setTreatLowConfidenceZeroEstimationAsUnknownEnabled(false)
.setAggregationPartitioningMergingStrategy(LEGACY)
.setLegacyArrayAgg(false)
Expand Down Expand Up @@ -343,6 +344,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.push-table-write-through-union", "false")
.put("optimizer.dictionary-aggregation", "true")
.put("optimizer.confidence-based-broadcast", "true")
.put("optimizer.retry-query-with-history-based-optimization", "true")
.put("optimizer.treat-low-confidence-zero-estimation-as-unknown", "true")
.put("optimizer.push-aggregation-through-join", "false")
.put("optimizer.aggregation-partition-merging", "top_down")
Expand Down Expand Up @@ -553,6 +555,7 @@ public void testExplicitPropertyMappings()
.setPushTableWriteThroughUnion(false)
.setDictionaryAggregation(true)
.setConfidenceBasedBroadcastEnabled(true)
.setRetryQueryWithHistoryBasedOptimizationEnabled(true)
.setTreatLowConfidenceZeroEstimationAsUnknownEnabled(true)
.setAggregationPartitioningMergingStrategy(TOP_DOWN)
.setPushAggregationThroughJoin(false)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.tests;

import com.facebook.presto.Session;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.testing.InMemoryHistoryBasedPlanStatisticsProvider;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static com.facebook.presto.SystemSessionProperties.QUERY_RETRY_LIMIT;
import static com.facebook.presto.SystemSessionProperties.RESTRICT_HISTORY_BASED_OPTIMIZATION_TO_COMPLEX_QUERY;
import static com.facebook.presto.SystemSessionProperties.RETRY_QUERY_WITH_HISTORY_BASED_OPTIMIZATION;
import static com.facebook.presto.SystemSessionProperties.TRACK_HISTORY_BASED_PLAN_STATISTICS;
import static com.facebook.presto.SystemSessionProperties.TRACK_HISTORY_STATS_FROM_FAILED_QUERIES;
import static com.facebook.presto.SystemSessionProperties.USE_HISTORY_BASED_PLAN_STATISTICS;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.testng.Assert.assertEquals;

public class TestHistoryBasedRetry
{
private ListeningExecutorService executor;

@BeforeClass(alwaysRun = true)
public void setUp()
{
executor = MoreExecutors.listeningDecorator(newCachedThreadPool());
}

@AfterClass(alwaysRun = true)
public void shutdown()
{
executor.shutdownNow();
}

@Test
public void testQueryRetryWithHBONaturalFail()
throws Exception
{
int retryLimit = 3;
int retryNum = 0;

String sql = "SELECT o.orderkey, l.partkey, l.mapcol[o.orderkey] FROM (select orderkey, partkey, mapcol " +
"FROM (SELECT *, map(array[1], array[2]) mapcol FROM lineitem)) l " +
"JOIN orders o ON l.partkey = o.custkey WHERE length(comment) > 10";

Session session = Session.builder(createSession())
.setSystemProperty(QUERY_RETRY_LIMIT, String.valueOf(retryLimit))
.build();

try (QueryRunner queryRunner = createQueryRunner(session)) {
List<ListenableFuture<?>> queryFutures = new ArrayList<>();

ListenableFuture<?> future = executor.submit(() -> queryRunner.execute(session, sql));
queryFutures.add(future);

waitForQueryToFinish(queryFutures);

retryNum = getRetryCount(queryRunner);
}

assertEquals(retryNum, 1, "Retry count should be one as the query plan has changed.");
}

@Test
public void testQueryRetryWithHBOForceFail()
throws Exception
{
int retryNum = 0;

String query = "SELECT if(COUNT(*)=1,1,fail(1, 'failed')) FROM part p LEFT JOIN lineitem l ON p.partkey = l.partkey WHERE l.comment like '%a%'";

Session session = Session.builder(createSession())
.build();

try (QueryRunner queryRunner = createQueryRunner(session)) {
List<ListenableFuture<?>> queryFutures = new ArrayList<>();

ListenableFuture<?> future = executor.submit(() -> queryRunner.execute(session, query));
queryFutures.add(future);

waitForQueryToFinish(queryFutures);

retryNum = getRetryCount(queryRunner);
}

assertEquals(retryNum, 1, "Retry count should be one as the query plan has changed and is the max retry limit.");
}

private int getRetryCount(QueryRunner queryRunner)
{
int retryNum = 0;

DistributedQueryRunner distributedQueryRunner = (DistributedQueryRunner) queryRunner;
List<BasicQueryInfo> queryInfos = distributedQueryRunner.getCoordinator().getQueryManager().getQueries();

for (BasicQueryInfo info : queryInfos) {
if (info.getQuery().contains("-- retry query")) {
retryNum++;
}
}
return retryNum;
}

private void waitForQueryToFinish(List<ListenableFuture<?>> queryFutures)
throws Exception
{
for (ListenableFuture<?> future : queryFutures) {
try {
future.get();
}
catch (ExecutionException e) {
//it is okay to fail, we are forcing it to test the retry mechanism
}
}
}

private QueryRunner createQueryRunner(Session session)
throws Exception
{
QueryRunner queryRunner = new DistributedQueryRunner(session, 1);

queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch", ImmutableMap.of("tpch.splits-per-node", "3"));

queryRunner.installPlugin(new Plugin()
{
@Override
public Iterable<HistoryBasedPlanStatisticsProvider> getHistoryBasedPlanStatisticsProviders()
{
return ImmutableList.of(new InMemoryHistoryBasedPlanStatisticsProvider());
}
});
return queryRunner;
}

private static Session createSession()
{
return testSessionBuilder()
.setCatalog("tpch")
.setSchema("tiny")
.setSystemProperty(USE_HISTORY_BASED_PLAN_STATISTICS, "true")
.setSystemProperty(TRACK_HISTORY_BASED_PLAN_STATISTICS, "true")
.setSystemProperty(RETRY_QUERY_WITH_HISTORY_BASED_OPTIMIZATION, "true")
.setSystemProperty(RESTRICT_HISTORY_BASED_OPTIMIZATION_TO_COMPLEX_QUERY, "false")
.setSystemProperty(TRACK_HISTORY_STATS_FROM_FAILED_QUERIES, "true")
.build();
}
}

0 comments on commit 106cc4b

Please sign in to comment.