Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datafusion-query-cache - caching intermediate results for faster repeated queries #12779

Open
2 of 5 tasks
samuelcolvin opened this issue Oct 6, 2024 · 9 comments
Open
2 of 5 tasks

Comments

@samuelcolvin
Copy link
Contributor

samuelcolvin commented Oct 6, 2024

Last week on a call with @alamb and some of the Pydantic team (@adriangb @dmontagu @davidhewitt) I introduced the idea of caching intermediate results in DataFusion timeseries queries to make repeat queries much faster.

Frustrated by the ambivalence with which my idea was received, I've spending the weekend implementing a first draft (😉):

Meet datafusion-query-cache.

This is obviously a very early WIP, but I think it proves that the intermediate caching idea I proposed works.

Feedback and contributions very welcome, I'd love to donate the project to datafusion-contrib even DataFusion itself if there's willingness? My ulterior motive being that the more widely used datafusion-query-cache is, the less likely it is that changes in DataFusion will break it.

From the README:

How it works (the very quick version)

Let's say you run the query:

SELECT max(price) FROM stock_prices WHERE symbol = 'AAPL' and timestamp > '2000-01-01'

Then 10 minutes later you run the same query — by default DataFusion will process every one of the millions of record in the stock_prices table again to calculate the result, even though only the last 10 minutes of data has changed.

Obvious we could save a lot of time and compute if we could remember the result of the first query, then combining it with a query on the last 10 minutes of data to get a result.

That's what datafusion-query-cache does!

The key is that often in timeseries data, new data is inserted with a timestamp column that is close to now(), so it's trivial to know what results we can cache and what results we must recompute.

datafusion-query-cache doesn't have opinions about where the cached data is stored, instead you need to implement the QueryCache trait to store data. A very simple MemoryQueryCache is provided for testing, we should add ObjectStoreQueryCache too.

How it works (the longer version)

Some people reading the above example will already being asking

But combining max values is easy (you just take the max of the maxes), what about more complex queries?
If we had used avg instead of max you can't combine two averages by just averaging them.

The best bit is: DataFusion already has all the machinery to combine partial query results, so datafusion-query-cache doesn't need any special logic for different aggregations, indeed it doesn't even know what they are.

Instead we just hook into the right place in the physical plan to provide the cached results, constrain the query on new data and store the new result.

Let's look at an example

The physical plan for

SELECT avg(price) FROM stock_prices WHERE symbol = 'AAPL' and timestamp > '2000-01-01'

looks something like this (lots of details omitted):

AggegateExec {
    mode: Final,
    aggr_expr: [Avg(price)],
    input: AggegateExec {
        mode: Parital,
        aggr_expr: [Avg(price)],
        input: FilterExec {
            predicate: (symbol = 'AAPL' and timestamp > '2000-01-01'),
            input: TableScanExec {
                table: stock_prices
            }
        }
    }
}

Notice how the input for the top level AggegateExec is another AggegateExec? That's DataFusion allowing parallel execution by splitting the data into chunks and aggregating them separately. The output of the inner AggegateExec (note mode: Parital) will look something like:

avg(price)[count] avg(price)[sum]
123.4 1000
125.4 1000
127.4 1000
... ...

The top level AggegateExec with (mode: Final), then combines these partial results to get the final answer.

This "combine partial results" is exactly what datafusion-query-cache uses to combine the cached result with the new data.

So datafusion-query-cache, would rewrite the above query to have the following physical plan:

AggegateExec {
    mode: Final,
    aggr_expr: [Avg(price)],
    input: CacheUpdateAggregateExec {  // wrap the partial aggegations and stores the result for later
        input: UnionExec {
            inputs: [
                AggegateExec {  // compute aggegates for the new data
                    mode: Parital,
                    aggr_expr: [Avg(price)],
                    input: FilterExec {
                        predicate: ((symbol = 'AAPL' and timestamp > '2000-01-01') and timestamp > '{last run}'),
                        input: TableScanExec {
                            table: stock_prices
                        }
                    }
                },
                CachedAggregateExec {  // get the cached result
                    cache_key: "SELECT avg(price)...",
                }
            ]
        }
    }
}

The beauty is, if we wrote a more complex query, say:

SELECT
    date_trunc('hour', timestamp) AS time_bucket,
    round(avg(value), 2) as avg_value,
    round(min(value), 2) as min_value,
    round(max(value), 2) as max_value
FROM stock_prices
WHERE symbol = 'AAPL' AND timestamp > '2000-01-01'
GROUP BY time_bucket
ORDER BY time_bucket DESC

datafusion-query-cache doesn't need to be any cleverer, DataFusion does the hard work of combining the partial results, even accounting for the different buckets and aggregations and combining them correctly.

Prior art

Other database have similar concepts, e.g. continuous aggregates in TimeScaleDB, but they require explicit setup. In contrast, datafusion-query-cache analyses queries (including subqueries) and automatically applies the cache if it can.

What's supported

  • GROUP BY aggregation queries with a static lower bound (or no lower bound)
  • Aggregation queries (no GROUP BY) with a static lower bound (or no lower bound)
  • Simple filter queries — this should be simple enough
  • GROUP BY aggregation queries with a dynamic lower bound (e.g . timestamp > now() - interval '1 day') - this requires a FilterExec wrapping the UnionExec and discarding older data
  • Aggregation queries (no GROUP BY) with a dynamic lower bound - this is harder, we probably have to rewrite the aggregation to include a group_by clause, then filter, then aggregate again???

How to use

datafusion-query-cache implements QueryPlanner, OptimizerRule, UserDefinedLogicalNodeCore and ExecutionPlan to customise query execution.

Usage is as simple as calling with_query_cache on a SessionStateBuilder, here's a complete (if minimal) example of creating a SessionContext:

async fn session_ctx() -> SessionContext {
    let state_builder = SessionStateBuilder::new()
        .with_config(SessionConfig::new())
        .with_runtime_env(Arc::new(RuntimeEnv::default()))
        .with_default_features();

    // records.timetamp is the default (and only) temporal column to look at
    let temporal_col = Column::new(Some("records".to_string()), "timestamp".to_string());

    // create an in memory cache for the query results
    // (in reality, you'd want to impl the `QueryCache` trait and store the data somewhere persistent)
    let cache = Arc::new(datafusion_query_cache::MemoryQueryCache::default());

    // create the query cache config
    let query_cache_config = datafusion_query_cache::QueryCacheConfig::new(temporal_col, cache)
        .with_group_by_function("date_trunc");

    // call with_query_cache to register the planners and optimizers
    let state_builder = datafusion_query_cache::with_query_cache(state_builder, query_cache_config);
    SessionContext::new_with_state(state_builder.build())
}

See examples/demo.rs for a more complete working example.

@cardosofede
Copy link

I need to understand more how DataFusion works but based on your description makes a lot of sense.
One question, the predicate of the AggregateExec that has the original filter + a condition based on the last run (that I think that is > instead of <), will be something configurable based on the table or there is always a timestamp column?
Another question will be, what happens if new data is inserted in the middle and you want to refresh the cached data, will be good to use like a deprecate after x or what do you think?

@samuelcolvin
Copy link
Contributor Author

samuelcolvin commented Oct 6, 2024

that I think that is > instead of <

corrected, thank you.

will be something configurable

You can configure currently

  • the temporal column or columns to base the calculation on, e.g. in Logfire we have created_at, but also start_timestamp and end_timestamp which are generally pretty close to now()
  • group by functions, generally this will be date_trunc, but logfire we also have time_bucket which roughly matches the function of the same name in timescale

Another question will be, what happens if new data is inserted in the middle and you want to refresh the cached data, will be good to use like a deprecate after x or what do you think?

How you store cached results is up to you, but they're effectively a table of

query_fingerprint (string) timestamp (i64) results (binary)
... 1234567 ...

You could also extract table name pretty easily, or we could make it available, so you could purge the cache however you like.

@cardosofede
Copy link

That's really cool.
Actually the sentence of how you store cache results is up to you, makes me think that this upgrade can lead to the creation of a complementary services attached to this database where you just need to define the policies of the checkpoints and updates, making your analytics systems more efficient without to much code.
I will try to review the code to learn tomorrow, thanks for teaching in public!

@metegenez
Copy link
Contributor

Do you have any suggestions for pruning old data using a dynamic lower bound? I think that's the cheery on top. From what I understand, the pattern timestamp > now() - interval '1 day' is commonly used and seems difficult to implement effectively since "final" can union the new states but cannot extract old ones.

@samuelcolvin
Copy link
Contributor Author

@metegenez, yes I agree that's a complicated.

I think there are two case:

  1. select date_trunc(..., timestamp) bucket, avg(data) from records where timestamp > now() - interval '7 days' - I think we can solve this (albeit approximately) by filtering on the bucket column to exclude values more than 7 days ago - e.g. run another FilterExec like bucket > {dynamic lower bound expr}
  2. For queries like select avg(data) from records where timestamp > now() - interval '7 days' it's fundamentally impossible to exclude some of the old data - you have one number like avg(data) = 42, and you can't filter out part of it. I think solution here would be to rewrite (in the logical or physical plan) into two aggregations, where the inner aggregation is of the form select date_trunc(..., timestamp) bucket, avg(data) from records where timestamp > now() - interval '7 days', and the outer aggregation aggregates the result, then apply the same filtering technique on the inner grouped agregation.

Luckily the first case which is somewhat easier is more common I think.

@sgrebnov
Copy link
Member

sgrebnov commented Oct 9, 2024

This is very cool, wanted to share a few ideas/concepts that worked well for https://github.com/spiceai/spiceai – recently added simple caching using DataFusion with a slightly different approach – without solving the delta problem between prefetched/cached and actual data. In our case, we control dataset updates in most cases and just perform cache invalidation. Responding with outdated information per configurable TTL for other cases is expected behavior.

  1. Cache is based on the root LogicalPlan, where the query is first transformed into a logical plan that is used as a key.
    We used the root logical plan, but I imagine this can be generalized to apply the same approach on the execution plan level to return cached items instead of actual execution. Will work well for scenarios where predicate push downs are not fully supported, for example unavailable parquet encoded statistics so there are repetitive executions/inputs even for different queries.
  2. I like the idea of not having opinions about where the cached data is stored; for us, moka worked the best. We compared it with a few other libraries in terms of performance, etc.
  3. Worked well for us – specifying max cache size (configurable cache size). We operate streams, so we just wrap response record batches stream to try caching records until we see that the response is too large and we don’t want to cache it. Total size is implemented by adding weights (actual size) to cached items (part of moka functionality).
  4. Worked well for us: cache invalidation approach – tracking input datasets as part of the cache based on the logical plan information allows for simple cache invalidation when a dataset is updated. We do this when we update the local dataset copy (materialized dataset copy).
  5. Cached items eviction algorithm - should be independent IMO. We use LRU + configurable TTL

Cache implementation: https://github.com/spiceai/spiceai/blob/trunk/crates/cache/src/lru_cache.rs
Usage example: https://github.com/spiceai/spiceai/blob/trunk/crates/runtime/src/datafusion/query.rs#L167

@alamb
Copy link
Contributor

alamb commented Oct 9, 2024

I'd love to donate the project to datafusion-contrib

Done -- https://github.com/datafusion-contrib/datafusion-query-cache

@metegenez
Copy link
Contributor

@metegenez, yes I agree that's a complicated.

I think there are two case:

  1. select date_trunc(..., timestamp) bucket, avg(data) from records where timestamp > now() - interval '7 days' - I think we can solve this (albeit approximately) by filtering on the bucket column to exclude values more than 7 days ago - e.g. run another FilterExec like bucket > {dynamic lower bound expr}
  2. For queries like select avg(data) from records where timestamp > now() - interval '7 days' it's fundamentally impossible to exclude some of the old data - you have one number like avg(data) = 42, and you can't filter out part of it. I think solution here would be to rewrite (in the logical or physical plan) into two aggregations, where the inner aggregation is of the form select date_trunc(..., timestamp) bucket, avg(data) from records where timestamp > now() - interval '7 days', and the outer aggregation aggregates the result, then apply the same filtering technique on the inner grouped agregation.

Luckily the first case which is somewhat easier is more common I think.

The first method is the well-known pre-aggregation, which is widely used. It's possible to automate this by writing a logical plan analyzer for the operation, though it may not support all functions out of the box. Basic functions like count, min, max, sum, last, and first are straightforward, but more complex ones, like median, add complications.

In the second case, I recall implementing a retract mechanism for certain aggregation functions to support streaming window operations. While it's not achievable through SQL syntax, adding a physical operator to retract unwanted data might be a viable solution, especially if the filtered column is ordered.

To sum up, the first method offers more flexibility when adjusting time ranges, such as moving from the last hour to the last six hours, as long as the data has already been pre-aggregated.

@samuelcolvin
Copy link
Contributor Author

I've moved the code to https://github.com/datafusion-contrib/datafusion-query-cache, I'll try and resume work on it soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants