Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join reordering stats11 speedup #605

Open
wants to merge 15 commits into
base: join-reordering-stats11
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,12 @@
<version>42.0.0</version>
</dependency>

<dependency>
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
<version>2.1.2</version>
</dependency>

<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ datasource: presto
query-names: presto/tpcds/${query}.sql
runs: 6
prewarm-runs: 2
before-execution: sleep-4s, presto/session_set_reorder_joins.sql
before-execution: sleep-4s, presto/session_set_join_reordering_strategy.sql
frequency: 7
database: hive
tpcds_small: tpcds_10gb_orc
Expand All @@ -12,22 +12,22 @@ variables:
1:
query: q01,q06,q14_1,q39_1,q39_2,q47,q57,q67,q81
schema: ${tpcds_small}
reorder_joins: true, false
join_reordering_strategy: ELIMINATE_CROSS_JOINS, NONE
2:
query: q02,q03,q04,q05,q07,q09,q10,q11,q13,q14_2,q16,q17,q19,q22,q23_1,q23_2,q24_1,q24_2,q25,q28,q29,q30,q31,q32,q33,q35,q37,q38,q42,q43,q44,q46,q48,q49,q50,q51,q52,q53,q54,q55,q56,q58,q59,q60,q61,q63,q65,q66,q68,q69,q70,q71,q72,q74,q75,q77,q78,q80,q82,q88,q89,q94,q95
schema: ${tpcds_medium}
reorder_joins: true, false
join_reordering_strategy: ELIMINATE_CROSS_JOINS, NONE
3:
# query not passing quick enough without reordering
query: q18,q64
schema: ${tpcds_medium}
reorder_joins: true
join_reordering_strategy: ELIMINATE_CROSS_JOINS
4:
query: q08,q12,q15,q20,q21,q26,q27,q34,q36,q40,q41,q45,q62,q73,q76,q79,q83,q84,q85,q86,q87,q90,q91,q92,q93,q96,q97,q98,q99
schema: ${tpcds_large}
reorder_joins: true, false
join_reordering_strategy: ELIMINATE_CROSS_JOINS, NONE
5:
# extra runs with reordering on 1tb schema (too slow without reordering on 1tb). For 100g we keep both runs, with and without reordering
query: q03,q37,q42,q43,q52,q53
schema: ${tpcds_large}
reorder_joins: true
join_reordering_strategy: ELIMINATE_CROSS_JOIN
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ datasource: presto
query-names: presto/tpch/${query}.sql
runs: 6
prewarm-runs: 2
before-execution: sleep-4s, presto/session_set_reorder_joins.sql
before-execution: sleep-4s, presto/session_set_join_reordering_strategy.sql
frequency: 7
database: hive
tpch_small: tpch_10gb_orc
Expand All @@ -14,28 +14,28 @@ variables:
# queries too slow to run on 100gb without reordering
query: q2, q8, q9
schema: ${tpch_small}
reorder_joins: true, false
join_reordering_strategy: ELIMINATE_CROSS_JOINS, NONE
2:
# queries too slow to run on 100gb without reordering
query: q8, q9
schema: ${tpch_medium}
reorder_joins: true
join_reordering_strategy: ELIMINATE_CROSS_JOINS
3:
# queries too slow to run on 100gb without reordering
query: q2
schema: ${tpch_large}
reorder_joins: true
join_reordering_strategy: ELIMINATE_CROSS_JOINS
4:
# queries too slow to run on 1tb
query: q3, q4, q5, q7, q17, q18, q21
schema: ${tpch_medium}
reorder_joins: true, false
join_reordering_strategy: ELIMINATE_CROSS_JOINS, NONE
5:
query: q10, q11, q12, q13, q14, q15, q16, q19, q20, q22
schema: ${tpch_large}
reorder_joins: true, false
join_reordering_strategy: ELIMINATE_CROSS_JOINS, NONE
6:
# queries without joins
query: q1, q6
schema: ${tpch_large}
reorder_joins: false
join_reordering_strategy: NONE
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SET SESSION join_reordering_strategy='${join_reordering_strategy}'

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -143,27 +143,27 @@ public void testUpdateSessionParameters()
ClientSession session = options.toClientSession();
SqlParser sqlParser = new SqlParser();

ImmutableMap<String, String> existingProperties = ImmutableMap.of("query_max_memory", "10GB", "distributed_join", "true");
ImmutableMap<String, String> existingProperties = ImmutableMap.of("query_max_memory", "10GB", "join_distribution_type", "repartitioned");
ImmutableMap<String, String> preparedStatements = ImmutableMap.of("my_query", "select * from foo");
session = Console.processSessionParameterChange(sqlParser.createStatement("USE test_catalog.test_schema"), session, existingProperties, preparedStatements);
assertEquals(session.getCatalog(), "test_catalog");
assertEquals(session.getSchema(), "test_schema");
assertEquals(session.getProperties().get("query_max_memory"), "10GB");
assertEquals(session.getProperties().get("distributed_join"), "true");
assertEquals(session.getProperties().get("join_distribution_type"), "repartitioned");
assertEquals(session.getPreparedStatements().get("my_query"), "select * from foo");

session = Console.processSessionParameterChange(sqlParser.createStatement("USE test_schema_b"), session, existingProperties, preparedStatements);
assertEquals(session.getCatalog(), "test_catalog");
assertEquals(session.getSchema(), "test_schema_b");
assertEquals(session.getProperties().get("query_max_memory"), "10GB");
assertEquals(session.getProperties().get("distributed_join"), "true");
assertEquals(session.getProperties().get("join_distribution_type"), "repartitioned");
assertEquals(session.getPreparedStatements().get("my_query"), "select * from foo");

session = Console.processSessionParameterChange(sqlParser.createStatement("USE test_catalog_2.test_schema"), session, existingProperties, preparedStatements);
assertEquals(session.getCatalog(), "test_catalog_2");
assertEquals(session.getSchema(), "test_schema");
assertEquals(session.getProperties().get("query_max_memory"), "10GB");
assertEquals(session.getProperties().get("distributed_join"), "true");
assertEquals(session.getProperties().get("join_distribution_type"), "repartitioned");
assertEquals(session.getPreparedStatements().get("my_query"), "select * from foo");
}
}
52 changes: 38 additions & 14 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,34 @@ This section describes the most important config properties that
may be used to tune Presto or alter its behavior when required.

.. contents::
:local:
:local:
:backlinks: none
:depth: 1
:depth: 1

General Properties
------------------

``distributed-joins-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
``join-distribution-type``
^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``true``

Use hash distributed joins instead of broadcast joins. Distributed joins
require redistributing both tables using a hash of the join key. This can
be slower (sometimes substantially) than broadcast joins, but allows much
larger joins. Broadcast joins require that the tables on the right side of
the join after filtering fit in memory on each node, whereas distributed joins
only need to fit in distributed memory across all nodes. This can also be
specified on a per-query basis using the ``distributed_join`` session property.
* **Type:** ``string``
* **Allowed values:** ``AUTOMATIC``, ``REPARTITIONED``, ``REPLICATED``
* **Default value:** ``REPARTITIONED``

The type of distributed join to use. When set to ``REPARTITIONED``, presto will
use hash distributed joins. When set to ``REPLICATED``, it will broadcast the
right table to all nodes in the cluster that have data from the left table.
Repartitioned joins require redistributing both tables using a hash of the join key.
This can be slower (sometimes substantially) than broadcast joins, but allows much
larger joins. In particular broadcast joins will be faster if the right table is
much smaller than the left. However, broadcast joins require that the tables on the right
side of the join after filtering fit in memory on each node, whereas distributed joins
only need to fit in distributed memory across all nodes. When set to ``AUTOMATIC``,
Presto will make a cost based decision as to which distribution type is optimal.
It will also consider switching the left and right inputs to the join. In ``AUTOMATIC``
mode, Presto will default to replicated joins if no cost could be computed, such as if
the tables do not have statistics. This can also be specified on a per-query basis using
the ``join_distribution_type`` session property.

``redistribute-writes``
^^^^^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -368,6 +376,22 @@ Optimizer Properties
using the ``push_table_write_through_union`` session property.


``optimizer.join-reordering-strategy``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Allowed values:** ``COST_BASED``, ``ELIMINATE_CROSS_JOINS``, ``NONE``
* **Default value:** ``ELIMINATE_CROSS_JOINS``

The join reordering strategy to use. ``NONE`` maintains the order the tables are listed in the
query. ``ELIMINATE_CROSS_JOINS`` reorders joins to eliminate cross joins where possible and
otherwise maintains the original query order. When reordering joins it also strives to maintain the
original table order as much as possible. ``COST_BASED`` enumerates possible orders and uses
statistics-based cost estimation to determine the least cost order. If stats are not available or if
for any reason a cost could not be computed, the ``ELIMINATE_CROSS_JOINS`` strategy is used. This can
also be specified on a per-query basis using the ``join_reordering_strategy`` session property.


Regular Expression Function Properties
--------------------------------------

Expand Down
1 change: 0 additions & 1 deletion presto-main/etc/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,4 @@ plugin.bundles=\
../presto-postgresql/pom.xml

presto.version=testversion
distributed-joins-enabled=true
node-scheduler.include-coordinator=true
5 changes: 5 additions & 0 deletions presto-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@
<artifactId>jgrapht-core</artifactId>
</dependency>

<dependency>
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
</dependency>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you cannot add something to pom.xml which is not used in the code. mvn validate will fail in such case.


<!-- used by tests but also needed transitively -->
<dependency>
<groupId>org.apache.bval</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import javax.inject.Inject;

import java.util.List;
import java.util.stream.Stream;

import static com.facebook.presto.spi.session.PropertyMetadata.booleanSessionProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerSessionProperty;
Expand All @@ -37,11 +40,12 @@
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.String.format;
import static java.util.stream.Collectors.joining;

public final class SystemSessionProperties
{
public static final String OPTIMIZE_HASH_GENERATION = "optimize_hash_generation";
public static final String DISTRIBUTED_JOIN = "distributed_join";
public static final String JOIN_DISTRIBUTION_TYPE = "join_distribution_type";
public static final String DISTRIBUTED_INDEX_JOIN = "distributed_index_join";
public static final String HASH_PARTITION_COUNT = "hash_partition_count";
public static final String PREFER_STREAMING_OPERATORS = "prefer_streaming_operators";
Expand All @@ -58,7 +62,7 @@ public final class SystemSessionProperties
public static final String DICTIONARY_AGGREGATION = "dictionary_aggregation";
public static final String PLAN_WITH_TABLE_NODE_PARTITIONING = "plan_with_table_node_partitioning";
public static final String COLOCATED_JOIN = "colocated_join";
public static final String REORDER_JOINS = "reorder_joins";
public static final String JOIN_REORDERING_STRATEGY = "join_reordering_strategy";
public static final String INITIAL_SPLITS_PER_NODE = "initial_splits_per_node";
public static final String SPLIT_CONCURRENCY_ADJUSTMENT_INTERVAL = "split_concurrency_adjustment_interval";
public static final String OPTIMIZE_METADATA_QUERIES = "optimize_metadata_queries";
Expand Down Expand Up @@ -101,11 +105,18 @@ public SystemSessionProperties(
"Compute hash codes for distribution, joins, and aggregations early in query plan",
featuresConfig.isOptimizeHashGeneration(),
false),
booleanSessionProperty(
DISTRIBUTED_JOIN,
"Use a distributed join instead of a broadcast join",
featuresConfig.isDistributedJoinsEnabled(),
false),
new PropertyMetadata<>(
JOIN_DISTRIBUTION_TYPE,
format("The join method to use. Options are %s",
Stream.of(JoinDistributionType.values())
.map(FeaturesConfig.JoinDistributionType::name)
.collect(joining(","))),
VARCHAR,
JoinDistributionType.class,
featuresConfig.getJoinDistributionType(),
false,
value -> JoinDistributionType.valueOf(((String) value).toUpperCase()),
JoinDistributionType::name),
booleanSessionProperty(
DISTRIBUTED_INDEX_JOIN,
"Distribute index joins on join keys instead of executing inline",
Expand Down Expand Up @@ -240,11 +251,18 @@ public SystemSessionProperties(
"Experimental: Adapt plan to pre-partitioned tables",
true,
false),
booleanSessionProperty(
REORDER_JOINS,
"Experimental: Reorder joins to optimize plan",
featuresConfig.isJoinReorderingEnabled(),
false),
new PropertyMetadata<>(
JOIN_REORDERING_STRATEGY,
format("The join reordering strategy to use. Options are %s",
Stream.of(JoinReorderingStrategy.values())
.map(FeaturesConfig.JoinReorderingStrategy::name)
.collect(joining(","))),
VARCHAR,
JoinReorderingStrategy.class,
featuresConfig.getJoinReorderingStrategy(),
false,
value -> JoinReorderingStrategy.valueOf(((String) value).toUpperCase()),
JoinReorderingStrategy::name),
booleanSessionProperty(
FAST_INEQUALITY_JOINS,
"Use faster handling of inequality join if it is possible",
Expand Down Expand Up @@ -347,11 +365,6 @@ public static boolean isOptimizeHashGenerationEnabled(Session session)
return session.getSystemProperty(OPTIMIZE_HASH_GENERATION, Boolean.class);
}

public static boolean isDistributedJoinEnabled(Session session)
{
return session.getSystemProperty(DISTRIBUTED_JOIN, Boolean.class);
}

public static boolean isDistributedIndexJoinEnabled(Session session)
{
return session.getSystemProperty(DISTRIBUTED_INDEX_JOIN, Boolean.class);
Expand Down Expand Up @@ -427,9 +440,9 @@ public static boolean isFastInequalityJoin(Session session)
return session.getSystemProperty(FAST_INEQUALITY_JOINS, Boolean.class);
}

public static boolean isJoinReorderingEnabled(Session session)
public static JoinReorderingStrategy getJoinReorderingStrategy(Session session)
{
return session.getSystemProperty(REORDER_JOINS, Boolean.class);
return session.getSystemProperty(JOIN_REORDERING_STRATEGY, JoinReorderingStrategy.class);
}

public static boolean isColocatedJoinEnabled(Session session)
Expand Down Expand Up @@ -515,4 +528,9 @@ public static boolean isUseNewStatsCalculator(Session session)
{
return session.getSystemProperty(USE_NEW_STATS_CALCULATOR, Boolean.class);
}

public static JoinDistributionType getJoinDistributionType(Session session)
{
return session.getSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.Symbol;
import com.facebook.presto.sql.planner.iterative.Lookup;
import com.facebook.presto.sql.planner.plan.PlanNode;

import java.util.HashMap;
import java.util.Map;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class CachingCostCalculator
implements CostCalculator
{
private final CostCalculator costCalculator;
private final Map<PlanNode, PlanNodeCostEstimate> costs = new HashMap<>();

public CachingCostCalculator(CostCalculator costCalculator)
{
this.costCalculator = requireNonNull(costCalculator, "costCalculator is null");
}

@Override
public PlanNodeCostEstimate calculateCost(PlanNode planNode, Lookup lookup, Session session, Map<Symbol, Type> types)
{
if (!costs.containsKey(planNode)) {
// cannot use Map.computeIfAbsent due to costs map modification in the mappingFunction callback
PlanNodeCostEstimate cost = costCalculator.calculateCumulativeCost(planNode, lookup, session, types);
requireNonNull(costs, "computed cost can not be null");
checkState(costs.put(planNode, cost) == null, "cost for " + planNode + " already computed");
}
return costs.get(planNode);
}
}
Loading