diff --git a/presto-docs/src/main/sphinx/admin.rst b/presto-docs/src/main/sphinx/admin.rst index d14da1570257e..4f685d2bbd1a2 100644 --- a/presto-docs/src/main/sphinx/admin.rst +++ b/presto-docs/src/main/sphinx/admin.rst @@ -13,3 +13,4 @@ Administration admin/resource-groups admin/session-property-managers admin/dist-sort + admin/dynamic-filtering diff --git a/presto-docs/src/main/sphinx/admin/dynamic-filtering.rst b/presto-docs/src/main/sphinx/admin/dynamic-filtering.rst new file mode 100644 index 0000000000000..c0cf1262971da --- /dev/null +++ b/presto-docs/src/main/sphinx/admin/dynamic-filtering.rst @@ -0,0 +1,199 @@ +================= +Dynamic Filtering +================= + +Dynamic filtering optimizations significantly improve the performance of queries +with selective joins by avoiding reading of data that would be filtered by join condition. + +Consider the following query which captures a common pattern of a fact table ``store_sales`` +joined with a filtered dimension table ``date_dim``: + + SELECT count(*) + FROM store_sales + JOIN date_dim ON store_sales.ss_sold_date_sk = date_dim.d_date_sk + WHERE d_following_holiday='Y' AND d_year = 2000; + +Without dynamic filtering, Presto pushes predicates for the dimension table to the +table scan on ``date_dim``, and it scans all the data in the fact table since there +are no filters on ``store_sales`` in the query. The join operator ends up throwing away +most of the probe-side rows as the join criteria is highly selective. + +When dynamic filtering is enabled, Presto collects candidate values for join condition +from the processed dimension table on the right side of join. In the case of broadcast joins, +the runtime predicates generated from this collection are pushed into the local table scan +on the left side of the join running on the same worker. + +Additionally, these runtime predicates are communicated to the coordinator over the network +so that dynamic filtering can also be performed on the coordinator during enumeration of +table scan splits. + +For example, in the case of the Hive connector, dynamic filters are used +to skip loading of partitions which don't match the join criteria. +This is known as **dynamic partition pruning**. + +The results of dynamic filtering optimization can include the following benefits: + +* improved overall query performance +* reduced network traffic between Presto and the data source +* reduced load on the remote data source + +Support for push down of dynamic filters is specific to each connector, +and the relevant underlying database or storage system. + +Analysis and confirmation +------------------------- + +Dynamic filtering depends on a number of factors: + +* Planner support for dynamic filtering for a given join operation in Presto. + Currently inner joins with equality join conditions and semi-joins with IN conditions are supported. +* Connector support for utilizing dynamic filters pushed into the table scan at runtime. + For example, the Hive connector can push dynamic filters into ORC and Parquet readers + to perform stripe or row-group pruning. +* Connector support for utilizing dynamic filters at the splits enumeration stage. +* Size of right (build) side of the join. + +You can take a closer look at the :doc:`EXPLAIN plan ` of the query +to analyze if the planner is adding dynamic filters to a specific query's plan. +For example, the explain plan for the above query can be obtained by running +the following statement:: + + EXPLAIN + SELECT count(*) + FROM store_sales + JOIN date_dim ON store_sales.ss_sold_date_sk = date_dim.d_date_sk + WHERE d_following_holiday='Y' AND d_year = 2000; + +The explain plan for this query shows ``dynamicFilterAssignments`` in the +``InnerJoin`` node with dynamic filter ``df_370`` collected from build symbol ``d_date_sk``. +You can also see the ``dynamicFilter`` predicate as part of the Hive ``ScanFilterProject`` +operator where ``df_370`` is associated with probe symbol ``ss_sold_date_sk``. +This shows you that the planner is successful in pushing dynamic filters +down to the connector in the query plan. + +.. code-block:: none + + ... + + Fragment 1 [SOURCE] + Output layout: [count_3] + Output partitioning: SINGLE [] + Stage Execution Strategy: UNGROUPED_EXECUTION + Aggregate(PARTIAL) + │ Layout: [count_3:bigint] + │ count_3 := count(*) + └─ InnerJoin[(""ss_sold_date_sk"" = ""d_date_sk"")][$hashvalue, $hashvalue_4] + │ Layout: [] + │ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B} + │ Distribution: REPLICATED + │ dynamicFilterAssignments = {d_date_sk -> df_370} + ├─ ScanFilterProject[table = hive:default:store_sales, grouped = false, filterPredicate = true, dynamicFilter = {df_370 -> ""ss_sold_date_sk""}] + │ Layout: [ss_sold_date_sk:bigint, $hashvalue:bigint] + │ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B} + │ $hashvalue := combine_hash(bigint '0', COALESCE(""$operator$hash_code""(""ss_sold_date_sk""), 0)) + │ ss_sold_date_sk := ss_sold_date_sk:bigint:REGULAR + └─ LocalExchange[HASH][$hashvalue_4] (""d_date_sk"") + │ Layout: [d_date_sk:bigint, $hashvalue_4:bigint] + │ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B} + └─ RemoteSource[2] + Layout: [d_date_sk:bigint, $hashvalue_5:bigint] + + Fragment 2 [SOURCE] + Output layout: [d_date_sk, $hashvalue_6] + Output partitioning: BROADCAST [] + Stage Execution Strategy: UNGROUPED_EXECUTION + ScanFilterProject[table = hive:default:date_dim, grouped = false, filterPredicate = ((""d_following_holiday"" = CAST('Y' AS char(1))) AND (""d_year"" = 2000))] + Layout: [d_date_sk:bigint, $hashvalue_6:bigint] + Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B} + $hashvalue_6 := combine_hash(bigint '0', COALESCE(""$operator$hash_code""(""d_date_sk""), 0)) + d_following_holiday := d_following_holiday:char(1):REGULAR + d_date_sk := d_date_sk:bigint:REGULAR + d_year := d_year:int:REGULAR + + +During execution of a query with dynamic filters, Presto populates statistics +about dynamic filters in the QueryInfo JSON available through the +:doc:`/admin/web-interface`. +In the ``queryStats`` section, statistics about dynamic filters collected +by the coordinator can be found in the ``dynamicFiltersStats`` structure. + +.. code-block:: none + + "dynamicFiltersStats" : { + "dynamicFilterDomainStats" : [ { + "dynamicFilterId" : "df_370", + "simplifiedDomain" : "[ [[2451546, 2451905]] ]", + "rangeCount" : 3, + "discreteValuesCount" : 0 + } ], + "lazyDynamicFilters" : 1, + "replicatedDynamicFilters" : 1, + "totalDynamicFilters" : 1, + "dynamicFiltersCompleted" : 1 + } + +Push down of dynamic filters into a table scan on the worker nodes can be +verified by looking at the operator statistics for that table scan. +``dynamicFilterSplitsProcessed`` records the number of splits +processed after a dynamic filter is pushed down to the table scan. + +.. code-block:: none + + "operatorType" : "ScanFilterAndProjectOperator", + "totalDrivers" : 1, + "addInputCalls" : 762, + "addInputWall" : "0.00ns", + "addInputCpu" : "0.00ns", + "physicalInputDataSize" : "0B", + "physicalInputPositions" : 28800991, + "inputPositions" : 28800991, + "dynamicFilterSplitsProcessed" : 1, + +Dynamic filter collection thresholds +------------------------------------ + +In order for dynamic filtering to work, the smaller dimension table +needs to be chosen as a join’s build side. The cost-based optimizer can automatically +do this using table statistics provided by connectors. Therefore, it is recommended +to keep :doc:`table statistics ` up to date and rely on the +CBO to correctly choose the smaller table on the build side of join. + +Collection of values of the join key columns from the build side for +dynamic filtering may incur additional CPU overhead during query execution. +Therefore, to limit the overhead of collecting dynamic filters +to the cases where the join operator is likely to be selective, +Presto defines thresholds on the size of dynamic filters collected from build side tasks. +Collection of dynamic filters for joins with large build sides can be enabled +using the ``enable-large-dynamic-filters`` configuration property or the +``enable_large_dynamic_filters`` session property. + +When large dynamic filters are enabled, limits on the size of dynamic filters can +be configured for each join distribution type using the configuration properties +``dynamic-filtering.large-broadcast.max-distinct-values-per-driver``, +``dynamic-filtering.large-broadcast.max-size-per-driver`` and +``dynamic-filtering.large-broadcast.range-row-limit-per-driver`` and their +equivalents for partitioned join distribution type. + +Similarly, limits for dynamic filters when ``enable-large-dynamic-filters`` +is not enabled can be configured using configuration properties like +``dynamic-filtering.large-partitioned.max-distinct-values-per-driver``, +``dynamic-filtering.large-partitioned.max-size-per-driver`` and +``dynamic-filtering.large-partitioned.range-row-limit-per-driver`` and their +equivalent for broadcast join distribution type. + +The properties based on ``max-distinct-values-per-driver`` and ``max-size-per-driver`` +define thresholds for the size up to which dynamic filters are collected in a +distinct values data structure. When the build side exceeds these thresholds, +Presto switches to collecting min and max values per column to reduce overhead. +This min-max filter has much lower granularity than the distinct values filter. +However, it may still be beneficial in filtering some data from the probe side, +especially when a range of values is selected from the build side of the join. +The limits for min-max filters collection are defined by the properties +based on ``range-row-limit-per-driver``. + +Limitations +----------- + +* Dynamic filtering is currently implemented only for :doc:`/connector/hive` and :doc:`/connector/memory` connectors. +* Push down of dynamic filters into local table scan on worker nodes is limited to broadcast joins. +* Min-max dynamic filter collection is not supported for DOUBLE, REAL and unorderable data types. diff --git a/presto-docs/src/main/sphinx/connector/hive.rst b/presto-docs/src/main/sphinx/connector/hive.rst index 0e63d2ff9f057..4283c700fda58 100644 --- a/presto-docs/src/main/sphinx/connector/hive.rst +++ b/presto-docs/src/main/sphinx/connector/hive.rst @@ -526,6 +526,34 @@ You can also drop statistics for selected partitions only:: CALL system.drop_stats(schema_name, table_name, ARRAY[ARRAY['p2_value1', 'p2_value2']]) +Dynamic Filtering +----------------- + +The Hive connector supports the :doc:`dynamic filtering ` optimization. +Dynamic partition pruning is supported for partitioned tables stored in any file format +for broadcast as well as partitioned joins. + +For tables stored in ORC or Parquet file format, dynamic filters are also pushed into +local table scan on worker nodes for broadcast joins. Dynamic filter predicates +pushed into the ORC and Parquet readers are used to perform stripe or row-group pruning +and save on disk I/O. Sorting the data within ORC or Parquet files by the columns used in +join criteria significantly improves the effectiveness of stripe or row-group pruning. +This is because grouping similar data within the same stripe or row-group +greatly improves the selectivity of the min/max indexes maintained at stripe or +row-group level. + +Delaying execution for dynamic filters +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +It can often be beneficial to wait for the collection of dynamic filters before starting +a table scan. This extra wait time can potentially result in significant overall savings +in query and CPU time, if dynamic filtering is able to reduce the amount of scanned data. + +For the Hive connector, a table scan can be delayed for a configured amount of +time until the collection of dynamic filters by using the configuration property +``hive.dynamic-filtering-probe-blocking-timeout`` in the catalog file or the catalog +session property ``.dynamic_filtering_probe_blocking_timeout``. + Schema Evolution ---------------- diff --git a/presto-docs/src/main/sphinx/connector/memory.rst b/presto-docs/src/main/sphinx/connector/memory.rst index 3997b230540c6..b93418ebb3a00 100644 --- a/presto-docs/src/main/sphinx/connector/memory.rst +++ b/presto-docs/src/main/sphinx/connector/memory.rst @@ -41,6 +41,18 @@ Drop table:: DROP TABLE memory.default.nation; +Dynamic Filtering +----------------- + +The Memory connector supports the :doc:`dynamic filtering ` optimization. +Dynamic filters are pushed into local table scan on worker nodes for broadcast joins. + +Delayed execution for dynamic filters +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +For the Memory connector, a table scan is delayed until the collection of dynamic filters. +This can be disabled by using the configuration property ``memory.enable-lazy-dynamic-filtering`` +in the catalog file. Limitations -----------