-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add proposal for distributed execution #6012
Add proposal for distributed execution #6012
Conversation
4a493f7
to
d929716
Compare
d929716
to
8bc3e77
Compare
8bc3e77
to
db85d46
Compare
Signed-off-by: Filip Petkovski <[email protected]>
db85d46
to
a18a49d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for tackling this important problem! The explanation looks really great. However, I don't get one thing: if one series, let's say {foo="bar"}
, is replicated in two nodes with different external label sets then how would we end up not summing this series twice with a query such as sum({foo="bar"})
if the sum
would be distributed amongst these two nodes? 🤔
|
||
Thanos stores usually have a small overlap with ingestion components (Prometheus/Receiver) due to eventually consistency from uploading and downloading TSDB blocks. As a result, the central aggregation needs a way to deduplicate samples between ingestion and storage components. | ||
|
||
The proposed way to do time-based deduplication is by removing identical samples in the `coalesce` operator in the Thanos Engine itself. In order for data from independent Queriers to not get deduplicated, aggregations happening in remote engines must always preserve external labels from TSDB blocks that are being queried. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see - so even with a distributed sum()
, the external labels would be there and we would be able to deduplicate this way 💪
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the Sidecar approach, the samples wouldn't be identical. Other solutions such as Cortex have a "HA Tracker" - they choose one replica from a set of replicas and only ingest samples coming from that replica. Perhaps a "remote engine" could have the option to take a similar approach. In Thanos, we could detect that a node is a Sidecar and apply this deduplication approach instead. What do you think?
Actually, this probably isn't needed because the query engine should take of this for us with the deduplication iterator 🤔 if there are extra samples like in this case then it shouldn't be a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've thought about this, and I wonder if samples not being identical is really an issue.
The idea is to always query TSDB replicas using a remote engine. The central querier will only talk to other (remote) thanos queriers, never to stores. Remote queriers will talk to store components and deduplicate series before executing PromQL. If TSDBs pairs are found both in Prometheus and in object store, I would expect the resulting PromQL samples should be the same, and the central engine will deduplicate them.
But maybe this is something we need to double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also wonder if there are some correctness pitfalls that we haven't thought about 🤔 maybe it's worth sharing this in #prometheus-dev or something. But overall I would opt for much speedier queries over some nitpicks in correctness
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>
15c1729
to
fb10039
Compare
sum(rate(metric[2m])), | ||
sum(rate(metric[2m])) | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain this example a little bit more? The two queries sum(rate(metric[2m]))
are exactly the same or some filters like sum(rate(metric{shard="1"}[2m]))
will be applied first and then we do coalesce?
If there is no filters applied then basically all remote engines are running sum(rate(metric[2m]))
? Then how does the coalese work? Sounds like we need to do merge & sort, too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is correct, all remote engines run the same query because all of them are attached to different store components.
Coalesce works as it does now, it merges all series together. Sorting is applied at the end when we get the end result: https://github.com/thanos-community/promql-engine/blob/main/engine/engine.go#L278. Having sorted samples between operators is not a requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So deduplication only happens in the querier? Is it also possible to happen at the central querier or we disallow it in this architecture?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only dedup that needs to happen in the central querier is to remove identical samples due to time-based overlap between prometheus/receiver and store-gw. I tried to explain this here, but happy to elaborate more: https://github.com/fpetkovski/thanos/blob/proposal-distributed-exec/docs/proposals-accepted/202301-distributed-query-execution.md#time-based-overlap-resolution.
My idea was to to the dedup in the Coalesce operator, or have a new CoalesceWithDedup operator.
} | ||
``` | ||
|
||
The implementation of the `RemoteEngine` will be provided by Thanos itself and will use the gRPC Query API added in [https://github.com/thanos-io/thanos/pull/5250](https://github.com/thanos-io/thanos/pull/5250). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, the centralized querier is like the query frontend and remote engines are just queriers. What's the difference between the current architecture and the solution proposed in this proposal?
If we just want to have a planner/optimizer, is it the same if we simply move it to QFE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference is that QFE works in a round-robin manner, whereas with distributed execution we want to delegate a subquery to all available queriers. There is also no discovery and no query engine in QFE. We can add those components but then QFE will be the almost same as a querier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Umm, I feel the same thing could apply to QFE as well. Adding discoveries for queriers and send requests to all queries at the same time in this mode.
Let me ask another question, would you mind adding an architecture diagram in this proposal? Do we still need QFE in this case or we don't need it anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding discoveries for queriers and send requests to all queries at the same time in this mode.
We already have this in queriers actually: https://github.com/thanos-io/thanos/blob/main/pkg/query/endpointset.go#L512-L522. The endpoint set can discover query APIs in the same way it can discover stores.
Let me ask another question, would you mind adding an architecture diagram in this proposal? Do we still need QFE in this case or we don't need it anymore?
Yes, happy to add more diagrams. I added a few already, but I am happy to extend them with more information: https://github.com/fpetkovski/thanos/blob/proposal-distributed-exec/docs/proposals-accepted/202301-distributed-query-execution.md#how
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a section with deployment models to help visualize things: https://github.com/fpetkovski/thanos/blob/proposal-distributed-exec/docs/proposals-accepted/202301-distributed-query-execution.md#deployment-models.
Signed-off-by: Filip Petkovski <[email protected]>
This is going to be huuuuuge! Adding it to my lists of important things to review. Thanks a for the proposal, @fpetkovski 🙇 I've been running some performance tests on Thanos Store and indeed noticed Query consuming upwards of 3x the Store's memory to answer my query. Can't wait for this proposal's implementation to land already. 💪 |
|
||
Thanos Queriers currently need to pull in all data from Stores in memory before they can start evaluating a query. This has a large impact on the used memory inside a single querier, and drastically increases query execution latency. | ||
|
||
Even when a Querier is connected to other Queriers, it will still pull raw series instead of delegating parts of the execution to its downstreams. This document proposes a mode in the Thanos Querier where it will dispatch parts of the execution plan to different, independent Queriers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to understand how much this helps for query execution. Even if the central querier delegates part of execution to other queriers and we did some downsampling to fetch less data, it is only beneficial for central querier to use less memory as far as I can see.
For remote queriers and stores, we still need to pull all raw data into memory and evaluate locally. This sounds worse if we are just using one querier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I missed something. I can see it helps if we are using a layer architecure of queriers like you have a central querier and multiple sub queriers. In this case, central queriers can fetch less data with the help of fetching downsampled and aggregated data.
If there is only one layer of queriers I feel nothing changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, layering is the proposed way of running the distributed engine. This is also how Mimir's sharidng works, it's just that there's an embedded engine in query frontend. So layering still exists, it's just not so explicit. A layered setup is easy to achieve, for example with running one querier for N store gateways, or even having one querier per store gateway.
I don't think having shared storage for all queriers and trying to do some kind of filtering is a scalable approach, I tried to explain this a bit in the first section. You might have a different experience, but we notice a plateau with vertical sharding after 4 or 5 shards. Query latency also becomes far less predictable and we see timeouts from stores. I suspect this is because there is a lot of contention and load in stores with both vertical and horizontal sharding since each subquery goes to the same stores. With this approach, we can scale horizontally indefinitely because load will always be spread.
In this case, central queriers can fetch less data with the help of fetching downsampled and aggregated data.
The bigger advantage is that subqueriers will execute queries in parallel on a much smaller subset of the data.
Signed-off-by: Filip Petkovski <[email protected]>
9443e0e
to
dc98052
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say let's merge this and do work under hidden flags until we're comfortable that everything is good just like we did with the Thanos engine originally 👍
docs/components/tools.md
Outdated
@@ -815,7 +815,7 @@ Flags: | |||
--rewrite.to-relabel-config-file=<file-path> | |||
Path to YAML file that contains relabel configs | |||
that will be applied to blocks | |||
--tmp.dir="/tmp/thanos-rewrite" | |||
--tmp.dir="/var/folders/0b/r42s6hv96hs2w84ycr2yf7qh0000gn/T/thanos-rewrite" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a random change 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason make docs
does this on my machine. Should be reverted now :)
Signed-off-by: Filip Petkovski <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 since there are no further comments, let's merge it and continue work under hidden flags. Also, we can always amend this proposal when new information comes up.
* Add proposal for distributed execution Signed-off-by: Filip Petkovski <[email protected]> * Apply formatting Signed-off-by: Filip Petkovski <[email protected]> * Apply formatting Signed-off-by: Filip Petkovski <[email protected]> * Improve wording Signed-off-by: Filip Petkovski <[email protected]> * Add deployment models Signed-off-by: Filip Petkovski <[email protected]> * Add more deployment modes Signed-off-by: Filip Petkovski <[email protected]> * Revert tools.md Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: Sebastian Rabenhorst <[email protected]>
* Add proposal for distributed execution Signed-off-by: Filip Petkovski <[email protected]> * Apply formatting Signed-off-by: Filip Petkovski <[email protected]> * Apply formatting Signed-off-by: Filip Petkovski <[email protected]> * Improve wording Signed-off-by: Filip Petkovski <[email protected]> * Add deployment models Signed-off-by: Filip Petkovski <[email protected]> * Add more deployment modes Signed-off-by: Filip Petkovski <[email protected]> * Revert tools.md Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: Sebastian Rabenhorst <[email protected]>
Can this be used with Thanos receiver? |
@damnever I think the answer is no. Data are spread to all receiver/ingester replicas so distributed query execution doesn't help much as you need to always query all stores. |
@damnever @yeya24 One way I've thought we can do it is to allow queriers to select TSDBs, just like store gateways can do it through selectors. We can have a pool of queriers that hashmod some set of external labels and rewrite the query to only target their own TSDBs. Then this approach can work with receivers as well. |
@fpetkovski I wonder whether it's already a good time to move the new promql engine from thanos-community to thanos-io. It feels weird to see a proposal here (thanos-io/thanos) for something that is implemented on thanos-community 🤔 |
* Add proposal for distributed execution Signed-off-by: Filip Petkovski <[email protected]> * Apply formatting Signed-off-by: Filip Petkovski <[email protected]> * Apply formatting Signed-off-by: Filip Petkovski <[email protected]> * Improve wording Signed-off-by: Filip Petkovski <[email protected]> * Add deployment models Signed-off-by: Filip Petkovski <[email protected]> * Add more deployment modes Signed-off-by: Filip Petkovski <[email protected]> * Revert tools.md Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: Kartik-Garg <[email protected]>
* Add proposal for distributed execution Signed-off-by: Filip Petkovski <[email protected]> * Apply formatting Signed-off-by: Filip Petkovski <[email protected]> * Apply formatting Signed-off-by: Filip Petkovski <[email protected]> * Improve wording Signed-off-by: Filip Petkovski <[email protected]> * Add deployment models Signed-off-by: Filip Petkovski <[email protected]> * Add more deployment modes Signed-off-by: Filip Petkovski <[email protected]> * Revert tools.md Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: Filip Petkovski <[email protected]>
* Add proposal for distributed execution Signed-off-by: Filip Petkovski <[email protected]> * Apply formatting Signed-off-by: Filip Petkovski <[email protected]> * Apply formatting Signed-off-by: Filip Petkovski <[email protected]> * Improve wording Signed-off-by: Filip Petkovski <[email protected]> * Add deployment models Signed-off-by: Filip Petkovski <[email protected]> * Add more deployment modes Signed-off-by: Filip Petkovski <[email protected]> * Revert tools.md Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Filip Petkovski [email protected]
Changes
Verification