Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projections #17481

Open
clintropolis opened this issue Nov 14, 2024 · 0 comments
Open

Projections #17481

clintropolis opened this issue Nov 14, 2024 · 0 comments

Comments

@clintropolis
Copy link
Member

Motivation

Today Druid rollup tables can be utilized to greatly improve performance by significantly reducing the overall number of rows to be scanned by pre-aggregating the data at ingest time. However, this does come at the cost of forcing operators to make difficult decisions about which columns to include and at what granularity to perform rollup.

If the detailed table is still needed, then the only current way to possibly handle this is by having totally separate datasources with different ingestion configurations, which creates additional complexity for users having to know which datasources contain which columns at what level of rollup. This can quite quickly get complicated if multiple views are needed, and is a large burden to place on users to 'get it right' at query time, not to mention the operational overhead, and any potential differences in how 'up to date' each individual view is.

Proposed changes

This proposal intends to introduce the concept of projections to Druid, which for the initial implementation are materialized transformations to pre-aggregate data contained within Druid segments as part of the same datasource. By building projections for commonly used queries, especially those having columns with lower value cardinality, we can significantly reduce the number of rows to be processed by those queries, dramatically lowering both compute and io cost. This gives us the ability to take advantage of all of the features of rollup tables, but still retain a fine level of detail when required, all within the same table in exchange for some extra storage space.

dataSchema JSON spec

The intial implementation is focused on the low level mechanics of building projections for realtime segments and persisting them to immutable segments with query time support for both states. The implication here is that at first, projections only can be defined by classic JSON based ingestion (but they can still be used by queries using MSQ or the new Dart engine). Future development will allow projections to be created as part of MSQ based ingestion as well.

This approach was chosen to prove the viability of the implementation before we design a more user friendly (and modern) way to define and manage projections, which will be detailed in a follow-up proposal once many of the lower level pieces are in place. Ultimately, MSQ based ingestion goes through the same code-paths to actually generate segments, and so the work done so far will be re-usable for SQL inserts and replaces once we finish designing and implementing the higher level interfaces for defining projections for a table.

To define a projection, a new projections section to the dataSchema of JSON based ingestion specs has been added. Since the initial implementation is focusing on grouping and pre-aggregating, the spec looks kind of similar to what you might find in the JSON spec for a group by query.

field description required
type must be 'aggregate' yes
name The internal name of the projection. This value is not typically needed to be known by users, but can optionally be specified with the useProjection context flag yes
virtualColumns Virtual columns used to compute the projection. The inputs to these virtual columns must exist on the base table. Virtual columns on time can be used to define a query granularity for the projection. During ingestion the processing logic finds the ‘finest’ granularity virtual column that is a timestamp_floor expression and uses it as the __time column for the projection if specified in the grouping columns. Projections do not need to have a time column defined, in which case they can still match queries that are not grouping on time. At query time, the virtual columns can be matched to the virtual columns of a query, regardless of their internal name in the projection. no
groupingColumns The grouping columns of the projection. These columns must exist on the base table, or must be defined in the virtual columns. If grouping on a virtual time column, the 'finest' granularity time column is used as the __time column for the projection. If no time column is defined, all rows of the projection have the same __time value, controlled by the start timestamp of the segment granularity. The order of the grouping columns define the order in which data is sorted in the projection, always ascending. yes
aggregators Pre-aggregated columns. The inputs to these must exist on the base table, or be defined in the virtualColumns of the projection. At query time the aggregators of the projection can be matched to equivalent aggregators defined in a query, regardless of their name, or any options that do not impact the 'intermediary' aggregate processing, such as finalization options. no

For example, using the standard wikipedia quickstart data, we could define a projection that contains the channel and page columns, an HLL sketch of the users column for a 'distinct count' approximation of which users modified things for a given channel and page, and the sum of lines added and deleted for those channels and pages. The projection spec would look like this:

    "dataSchema": {
      "granularitySpec": {
        ...
      },
      "dataSource": ...,
      "timestampSpec": {
        ...
      },
      "dimensionsSpec": {
        ...
      },
      "projections": [
        {
          "type": "aggregate",
          "name": "channel_page_hourly_distinct_user_added_deleted",
          "groupingColumns": [
            {
              "type": "long",
              "name": "__gran"
            },
            {
              "type": "string",
              "name": "channel"
            },
            {
              "type": "string",
              "name": "page"
            }
          ],
          "virtualColumns": [
            {
              "type": "expression",
              "expression": "timestamp_floor(__time, 'PT1H')",
              "name": "__gran",
              "outputType": "LONG"
            }
          ],
          "aggregators": [
            {
              "type": "HLLSketchBuild",
              "name": "distinct_users",
              "fieldName": "user"
            },
            {
              "type": "longSum",
              "name": "sum_added",
              "fieldName": "added"
            },
            {
              "type": "longSum",
              "name": "sum_deleted",
              "fieldName": "deleted"
            }
          ]
        },
        ...
      ]
    },
    ...

Context flags

There are a few new query context flags which have been added to aid in experimentation with projections.

  • useProjection accepts a specific projection name and instructs the query engine that it must use that projection, and will fail the query if the projection does not match the query
  • forceProjections accepts true or false and instructs the query engine that it must use a projection, and will fail the query if it cannot find a matching projection
  • noProjections accpets true or false and instructs the query engines to not use any projections

These were primarily added to aid in testing, but could potentially be used by advanced users.

Query metrics

To help aid in tracking how effective projections are, a projection dimension has been added which contains the name of the projection has been added. Additionally, methods such as reportSegmentRows and reportPreFilteredRows will reflect the values of using the projection rather than the base table since the projection is substituted at query processing time.

How it works

Projections required a number of internal changes to set the stage for implementing the feature. #16533 rather heavily refactored how Cursor and VectorCursor were created at query time by pushing in a new CursorBuildSpec object which contains a much more rich set of data describing the 'shape' of what data is required to be read by the cursor, which was key to implemeting projections at query time. #17058 added some hooks into the CursorHolder interface so it could report if data is available in a pre-aggregated state for a given CursorBuildSpec, allowing query engines to switch to using 'combining' aggregators if such pre-aggregated data exists. #17064 prepared IncrementalIndex by introducing an abstraction allowing for different views of the same data to be the backing store for cursors and column selector factories, and #17084 added the means of finding 'equivalent' virtual columns, so that a projection could be allowed to precompute some virtual column and still match at query time.

Projections are built on the fly by the IncrementalIndex. As a row is processed for the base schema, it is also processed by each defined projection. All grouping columns defined on the projection either have a hook into the 'parent' column indexer if it exists as a physical column on the base table, else a projection only indexer is used if the column is a virtual column that only exists on the projection. Each projection has its own 'facts table', and all will add to the estimated memory usage so that the extra overhead of projections is accounted for.

When persisted, projections are stored as additional internal columns in a segment, together which effectively form an embedded table for the projection. Grouping columns which have a 'parent' column on the base table will share some parts, such as the value dictionary, but have their own dictionary id column file and their own bitmap indexes.

Querying currently works for both realtime and historical segments works pretty similarly; projections are ordered by 'best', where 'best' is defined has having the lowest number of rows to scan (or estimated lowest number for realtime, based on fewest number of grouping columns). CursorFactory implementations iterate over this sorted list to check if the projection matches a given CursorBuildSpec where matches is defined as having all required grouping columns, virtual columns, and aggregators, and that the query granularity is fine enough. If a match is found, then that projection will be substituted as the CursorHolder, and the query engine can rewrite the query to be prepared to process pre-aggregated data.

Rationale

The more traditional way to achieve this is with materialized views, however we decided to implement projections instead for a number of reasons. Materialized views are typically done as separate physical tables, and in a database like Druid, would bring with them a lot of very complicated synchronization problems to solve. Projections on the other hand, live inside of a regular Druid segment, and as such are constrained to be within the same interval as the segment.

Users will not typically query a projection directly, or even need to know they exist, rather they will be used automatically at query time if the projection 'fits' the query being issued. If the projection does not exist in some of the segments, such as if they are added later or only in some of the segments, then the query can always fall back to the base table to compute everything that would have been in the projection if it existed. This model is also what makes them dramatically easier to implement than traditional materialized views, and frankly, a lot easier to take advantage of for users, because the burden of knowing which view to query simply goes away.

Operational impact

This feature was designed to be as isolated as possible - projections should not impact ingestion unless they are specified, projections do not affect query performance unless they exist within segments, projections do no affect segment load unless they are defined.

The ingest time impact, if projections are defined, is likely more frequent persists when using byte based incremental persist limits since projections will add to the total tracked byte limit. Using row count based incremental persist limits will be quite tough to use effectively with projections, as those limits will only consider the base table, so there is likely an increased likelyhood of running out of heap memory while building incremental segments.

On the historical side, projections being defined in segments will cause a slight increase in heap pressure from the additional column suppliers associated with the projections, though approximately the same as the overhead of additional columns on the base table. The more projections (e.g. the more columns) which are defined will increase how pronounced this issue is, so to be conservative, operators should monitor the heap overhead of segments which have been loaded 'at rest'.

Projections also obviously will require additional disk space, though it should be smaller than if the equivalent data was defined within a totally separate segment, as value dictionaries are shared between the base table and the projection tables within the same segment.

Test plan

I have access to some clusters which I plan to begin doing some larger scale tests to determine the effectiveness of projections on cluster performance, other than that the feature is pretty self contained, so is low risk to existing deployments.

Future work

Projection introspection

The initial prototype of projections has very low visibility for users. Basically, there is no way to determine if projections are defined at all, nor a way to determine which segments have the projections and which do not. I would like to add a friendly way to expose this, such as via INFORMATION_SCHEMA or sys. Internally this will likely be done by expanding the native segmentMetadata query to include information about projections. I am unsure if there is work to due to make this integrated with the 'centralized' schema work that has been happening, I need to catch up on the state of that project to determine what (if anything) needs done to work ok with that stuff.

MSQ support for creating projections

This part is still being designed and will be the subject of a subsequent proposal. We have a couple of options here. My preference would be to build out a catalog and make the management of projection schemas part of this. I think ideally we would probably inject some catalog supplier into all kinds of ingestion tasks so that the set of projections could be fetched and applied when building segments. Eventually this feels like it should

The other option is to design some SQL syntax for adding projections as part of INSERT/REPLACE statements. This would work, but it feels a bit clunky to me as well when compared to the catalog based solution.

Auto-compaction (or some other mechanism?) to apply projection changes to existing segments

In the prototype, auto-compaction is not yet aware of projections, so the first step will be updating auto-compaction to preserve existing projections. Beyond this, I am imagining either auto-compaction, or something very similar will be used to apply projections to existing.

filters

This is likely the near the top of the list of the follow-up work, adding support for filtering projections to reduce row counts even further. The primary reason that they are not yet included in the design is because I am still deciding how exactly I'd like them to work.

We could just make it work the same way as this initial implementation, but the downside of this is that if the filtering dramatically reduces the value dictionary sizes of the projection columns, we still must do stuff like binary search against the parent dictionary in order to locate values.

Another option at the cost of a bit of extra complexity, is to do something like we do with JSON columns, where projection columns would have a mapping dictionary of integer to integer, so that a smaller dictionary could be used for operations and a second lookup operation performed to substitute the projection dictionary id with the parent dictionary id to lookup the actual value. This would speed up the binary search used to find values for filtering, and I think is worth considering, and could re-use a similar strategy that is used by JSON columns.

More flexible projection row ordering

The initial design and prototype behaves a lot like top level druid rollup tables for ordering, that is the ordering is defined by the order the grouping columns appear in the spec, always in ascending order. However, I believe it should be possible to open this up a bit to allow defining different orderings, without a lot of extra work. However, this was not yet done for the initial design because there is a larger external piece required to actually take advantage of these more flexible orderings, that is making the query engines handling of segment ordering much more sophisticated beyond the current uses (timeseries checking that the segment is time ordered). In the future, query engines should be able to specify a preferred ordering based on the query being issued by the user, and if the segment ordering matches, the query engine should be able to take advantage of this fact to reduce the amount of work required for processing at the segment level. This work isn't specific to projections, and does lead nicely to the next 'future work' i'd like to talk about:

better projection matching

The comparator does ok and has worked well enough to get this feature off the ground, but is obviously not the best solution long term for matching projections. I do not yet know what is the best solution however, so this remains an area ripe for exploration so we can drop the current big for loop over the ordered set that is used to find a matching projection at query time.

larger scale refactors to continue reducing the prominence of granularity and the time column

There are a number of what I would consider "ugly" things which were done in the initial prototype of projections in order to work well with the current query engines, a big one of those revolves around the special Druid time column. We have been doing some recent work to downplay the prominence of the special time column, for example we now allow segments to be ordered no longer by time first. Projections carry this idea a bit further by making a time column completely optional on projections, though this is done by making a constant time column, since in many places we still expect a __time column to exist.

new segment format

This PR is very conservative in terms of segment format changes, going to pretty great lengths to work within the constraints of what is currently possible with the Druid V9 segment format, however a number of pretty ugly things have to be done that I think could be done a lot better. I will have a proposal for what I have in mind for this soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant