Skip to content

Pushdown of complex operations

Martin Traverso edited this page Feb 12, 2019 · 12 revisions

The intent of this document is to capture high-level thoughts and ideas about how to add support for pushing down complex operations (filters, projections, aggregations, joins, etc) into connectors and allowing connectors to surface additional filters (for row-level security) or projections (for masking) during query planning.

Goals and non-goals:

  • This is explicitly a short-term solution. In the long term, we want connectors to be able to provide transformation rules that get evaluated during the optimization loop.
  • It's not a goal to build a generic mechanism to support every operation.
  • We need to be able to start simple and improve iteratively (near-term improvements should, ideally, be additive).
  • It should be implemented using the existing Rule/Iterative optimizer framework instead of visitor-based PlanOptimizers.

Approach

Pushdown

The general idea is to introduce a new set of transformation rules, each of which fires on patterns such as filter(tableScan(...)), project(tableScan(...)), etc. Each rule would be responsible for pushing down the corresponding type of operation into the table scan. Examples (names TBD) would be: PushFilterIntoConnector, PushProjectionIntoConnector, PushAggregationIntoConnector, PushJoinIntoConnector.

These rules interact with connectors via a set of specialized metadata calls. Below is a straw-man for what they might look like (names and concrete signatures TBD). If a connector doesn't support or understand the action, result of the call would indicate so. The language to express filters, projections, aggregates, join criteria, etc. is also TBD.

Filter pushdown

Metadata.pushFilter(TableHandle, Filter) => TableHandle + remaining Filter

Returns a new table handle and any part of the filter that the connector doesn't understand or support. The result TableHandle represents a table with the same schema as the input TableHandle.

The optimization rule uses this method to transform filter(f1, tablescan(th1)) into filter(f2, tablescan(th2)).

Example:

Let's say a connector knows how to handle LIKE expressions. Given the following plan fragment,

- Filter (a like 'xy%z%' AND f(b)) :: (a varchar, b bigint)
  - TableScan (TH(0)) :: (a varchar, b bigint)
    a = CH(0)
    b = CH(1)

PushFilterIntoTableScan would call:

Metadata.pushFilter(
  TH(0), 
  call("and", 
      call("like", CH(0), 'xy%z%'), 
      call("f", CH(1))))

which would return

new table handle: TH(0')
remaining filter: call("f", CH(1))

The rule would then replace the original fragment with:

- Filter (f(b)) :: (a varchar, b bigint)
  - TableScan (TH(0')) :: (a varchar, b bigint)
    a = CH(0)
    b = CH(1)

Projection pushdown

Metadata.pushProjection(TableHandle, Projections) -> TableHandle + new projections

Returns a new table handle and a new set of projections that include any projection that aren't supported by the connector.

Aggregation pushdown

TODO

Metadata.pushAggregation(TableHandle, Aggregates) -> TableHandle + ???

Join pushdown

TODO

Metadata.pushJoin(TableHandle, TableHandle, JoinType, Criteria) -> TableHandle + ???

Exposing filter/projections for security-based filtering and masking

During analysis and initial query planning, connectors can provide additional filters or projections to evaluate immediately after the data is produced during query execution. This can be used to implement row-level filtering based on security rules or masking for sensitive data.

Metadata.getAdditionalFilter(TableHandle) -> Filter

TODO: projections for masking

Miscellaneous

We should deprecate TableLayouts. They complicate the optimizer and can be replaced by a combination of "pushFilter" above plus a an API to obtain (physical) properties of a table during query optimization. E.g.,

Metadata.getProperties(TableHandle)

Open questions

  • How to handle a plan like this one when the filter cannot be pushed into the connector? The generalized predicate pushdown rules moves filters below projections, so adding a rule that moves a projection below a filter would result in non-converging optimization under the IterativeOptimizer. In the long term, it won't be an issue since the optimizer will be able to consider multiple plans simultaneously. One option in the short term is to have specialized rules that match "project(filter(tablescan)))" and handle that specific scenario.
- Project (a.x)
  - Filter (f(b))
    - TableScan (TH(0))