-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
backup: consider using distsql for work distribution #40239
Comments
This would be potentially useful for #44480 |
49425: backupccl: use distsql to distribute load of ExportRequests r=dt a=pbardea Prior to this change, the backup job synchronized parallelization through managing various channels. This change leverages DistSQL infrastructure to distribute this work. In particular, this change introduces a BackupProcessor whose primary responsibility is to issue an ExportRequest and report its progress back to the backup coordinator. A BackupProcessor is scheduled on each node and is responsible for exporting the spans whose leaseholder are on that node. This means that hopefully, the ExportRequest will be executed on the same node. Backup progress and checkpointing is maintained by each processor sending back the result from their export requests to the coordinator with through DistSQL's metadata stream. Part of #40239. Release note: None 49900: colexec: increase support of aggregate functions r=yuzefovich a=yuzefovich **colexec: add support of avg on ints and intervals** This commit adds support of `avg` aggregate function on ints and intervals. Adding intervals support was straightforward, but ints support was a little trickier: average of ints is a decimal, so there is some additional setup up work needed to override the result type. Another complication was that we need to perform the summation using "DECIMAL + INT" overload. In order to make the code more comprehensible I introduced a separate `avgTmplInfo` struct that contains all the necessary information for the code generation. Fixes: #38823. Addresses: #38845. Release note (sql change): Vectorized execution engine now supports `AVG` aggregate function on `Int`s and `Interval`s. **colexec: add full support of sum and sum_int** This commit adds the full support of `sum` and `sum_int` when operating on integers (previously, `sum` wasn't supported at all whereas `sum_int` only on `INT8`). The complication here is that `sum` on ints returns a decimal and `sum_int` always returns `INT8` regardless of the argument type width. The former is resolved in a similar fashion to how `avg` is supported on ints, and the latter required introduction of a special `assignFunc` that promotes the return type. Fixes: #38845. Release note (sql change): Vectorized execution engine now fully supports `SUM` aggregate function (previously, the summation of integers was incomplete). **colexec: support min and max on int2 and int4** Previously, `min` and `max` were disabled on INT2 and INT4 columns because there was a mismatch between the logical expected type (INT8) and the actual physical type (the type of the argument). Now this is fixed by performing the cast and some extra templating. Additionally, this commit cleans up the way we populate the output types of the aggregate functions as well as removes some redundant return values. Release note (sql change): Vectorized execution engine now support `MIN` and `MAX` aggregate functions on columns of INT2 and INT4 types. **sql: miscellaneous cleanups** This commit does the following: 1. unifies the way we set "no filter" on the window functions as well as reuses the same constant for "column omitted" index 2. sets `VectorizeMode` of the testing eval context to `on` by default 3. disables fallback to rowexec for operators against processors tests 4. updates execplan to be defensive about nil processorConstructor. Release note: None Co-authored-by: Paul Bardea <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
A summary of an offline discussion with @dt, with respect to distsql restore: BackgroundToday, restore processes the import entries by following a pipeline as follows:
Note that scattering the ranges takes a non-trivial amount of time and benefits from being done concurrently with ImportRequests. It's also important to note that restore also limits the number of in-flight requests in order to prevent over-congesting DistSender and causing other badness. This prevents us from sending all of the requests in parallel and waiting for them to be processed. Proposed DistSQL VersionThe leading distsql implementation of this flow is to create 2 processors: one which splits and scatters, and another that performs in the ImportRequest. The first would stream it's completed entries to the second. In particular, the SplitAndScatter processor would stream the entries to import to the processor that is colocated with the leaseholder of that range. This is beneficial because ImportRequests will usually go to the same node they were dispatched on. Otherwise, there may be a situation where all nodes attempt to send an ImportRequest to the same node and now the entire cluster is blocked from making progress until the ImportRequests are handled. Note, that this solution requires a new router type where data can be explicitly sent to a particular node in a DistSQL flow. Alternatives
|
#50637 tracks the work related to creating the router that will connect the split and scatter processor to the restore data processor. |
After chatting with the DistSQL folks, the preferred approach is to use a |
51562: backupccl: add RestoreData processor r=dt a=pbardea This commit adds a processor which actually performs the ImportRequest. It has an input which accepts rows with 2 columns that should be sent from SplitAndScatter processors. Each row represents one span that the processor should ingest. The intention is that the spans direcected to a processor on a given node have their leaseholder colocated on the same node (this work is done in the SplitAndScatter processor). All that remains is to send a request to ingest the data and stream back its progress to the coordinator upon completion. Part of #40239. Release note: None 51896: builkio: Add schedule control statements. r=miretskiy a=miretskiy Informs #51600 Introduce schedule control statements responsible for managing scheduled jobs. ``` PAUSE SCHEDULE 123 PAUSE SCHEDULES SELECT ... RESUME SCHEDULES SELECT schedule_id FROM system.schedules_jobs ... DROP SCHEDULE 123 ``` Release Notes (enterprise): Implement schedule control statements to pause, resume, or delete scheduled jobs. Co-authored-by: Paul Bardea <[email protected]> Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Closed by #51959. |
While watching
backup2tb
, which runs on 10 nodes, I noticed that during the BACKUP phase at various times some of the nodes are completely idle while others are busy, even though the leaseholders per store graph is pretty even.This isn't terribly surprising: the way the backup coordinator sends out work isn’t really aware of who it is sending it to and just has a global limit on the max number of outstanding requests, so if more of those go to a single node, others my have no work sent to them at times. The 10 node cluster just seems to highlight this more than our 3 node clusters did.
There's a long-standing TODO around that limiter about this issue, but it also highlights another thing: BACKUP and RETORE both have lots of work to do that they want to distribute among nodes, pushing the computation out to the data. They do this by sending KV commands (ExportRequest and ImportRequest) however since they were originally built, distsql has been built as a general purpose way to distribute work, and push computation close to data, and has since had years of polish.
Switching to to use distsql as the work-distribution layer of BACKUP and RESTORE could utilize its more sophisticated tools for routing work to the appropriate ranges as well as steaming metadata between longer-running workers and coordinators.
For backup, instead of one coordinator sending raw export requests individually, we’d just let distsql’s existing partition-span-by-nodes assign table spans to Export_Processors_, with one processor on each node, and then have those processors send the ExportRequests (likely to themselves). We’d could do more granular request pacing/limiting in the processor since we can assume its work is all for one node, and potentially could get better sequential access patterns as well (since we could coalesce spans, since they’re now grouped by node for us)
For restore, we’d could eliminate ImportRequest and do the download/rekey in a processor that then sends AddSTTable requests (essentially doing the same thing, just not in a KV command).
I mentioned how ImportRequest currently works (e.g. sending itself requests) to our core buddy a few weeks ago and they were pretty dubious that it belonged in the kv API, given that it doesn’t interact with the replica, i.e. doesn’t declare keys / acquire latches / update ts cache / etc or read/write to it at all -- It is essentially a pure function of its input (files + rekey) sends its output via another KV command (an AddSSTable request that we expect will go to itself). This pure computation could just as easily happen in a distsql processor, and not tie up a replica/
store.Send
for something that doesn't actually involve Store.Again, we built these this way before we had any alternative general purpose distributed computation infra, but it might be worth revisiting that design now that distsql has not only been built but also has years of polish too.
The text was updated successfully, but these errors were encountered: