-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement concurrent filtering and postprocessing #526
Comments
Here's what I'm trying, running locally on naic-monitor:
(can't run against the daemon with -cpuprofile and it's not very sensible to run the daemon with -cpuprofile). Unsurprisingly parsing dominates this:
but the hashtable thing is interesting, because the profiler uses hashtables for its sparse matrices. Still, more heat than light here - the call tree for the profile doesn't really show the profiler at all. |
Some manual instrumentation:
It really is the ~38 profiler runs that are expensive here, everything else is noise. Re the sparse matrix:
Profiling locally on my laptop does not shed much light. It still looks like reading data is the main culprit. The processing time barely shows up, the profile computation really not at all. It would be worth it to investigate whether caching works properly in the daemon. On naic-monitor it has only a 512MB resident set, but the cache size was set to 12GB, and it should really have a lot more data cached, given how often reports are created and so on. |
Having isolated reading from the rest of it, here's the top profile of a request where every read hit the cache:
This illustrates that it is the underlying stream creation machinery that is expensive, although frankly the resulting call graph with profile annotations still feels wrong, as the profile package does not show up anywhere: It's worth observing that createInputStreams precedes ComputeAndFilter in the source code and that the go profiler can get things wrong, golang/go#41338. So more likely than not we should view these as part of the same problem. Clearly main.builfFilters.func1 is also this - it's the constructed filter function called by ComputeAndFilter. Anyway, the profiler is not the issue here per se, but stream creation and filtering. |
Another dimension is that this was for a particularly long time period - four weeks. There will be a lot of records to filter and process, it's not insane that this can take a second or more. |
Compiling the filters as a set of closures - to avoid redundant filters - is not much of a winner, basically there will always be at least a couple, for the time span and probably a job, a user, or a host. The function call overhead eats up the winnings. Basically:
To prove that ReadSampleStreams would return the same value we need to prove (in the current implementation) that all files are cached, or (more generally) that all files have the same mtime as last time. Additionally, there's probably a ceiling on how long it could take for a record to arrive, and if the "to" timestamp is more than - say - 30 minutes in the past, we could reuse a query. Thus there could be a query cache and maybe some way of asking the db about whether there have been new data for some set of files involved in a query. This would remove all of the time for createInputStreams, and maybe 25% of the time for ComputeAndFilter, for a total of about half the running time. But maintaining the cache will be interesting, if there's cache pressure it will have to be purged. |
Another thing that would make sense here is amortisation. For the specific use case of heavy-users.py we're going to be running the exact same command except for the job number for a number of jobs. Being able to just ask for multiple profiles by job number would speed this process up because the input record set could be computed once and filtered multiple times, once for each job. Another thing that would help the script (but not fix the problem per se) is to pass in a host filter - the hosts for the job are known from the initial jobs command (in fact right now it must be precisely one) and can be attached to the profile query. This should speed up the record ingestion a lot. Testing this, it turns out to be a massive winner: per-invocation time drops from 1.10s to about 0.08s. |
The insight that the host filter should help is important, but it's astonishing that it helps as much as it does. The code looks like it is linear in the size of the input set but there are nonlinear components hidden here:
|
profile
command can be pretty slow
The most robust solution would probably be an in-memory index that is built lazily and attached to each cached blob. Consider filtering records by job number. The first time through we have to scan all records in all the blobs (and we do this late, after building up an initial search set). Subsequently the files will be in memory but if the first scan built up an index then we could go straight to that. The problem is that the index takes space and takes some time to build, and we don't know which attributes we need, so we'll have to not do more than we have to. An index for exact inclusive filtering is a table that maps a key to a set of rows. In our case this could be a set of row indices since the table data are immutable; this would save space and be kind to the GC. (If space is a huge concern then we would do well to have the option of using 16-bit row indices as this will almost always be enough, but not actually always.) The size of the index would have to be accounted for in the cached content. To make this effective we would apply the filter (ie use the index) early - during the building of the initial record set. Doing that now with the existing filter structure might also be beneficial. I think it's not done early now for some code structuring reasons - there's some filtering that can't be done until late in the game - but most filters are truly record filters, not inter-record filters, and apply at any time after ingestion. |
Another matter is that a lot of concurrency is being left on the table right now. Most operations in postprocessing are massively parallelizable: we can construct partial sample streams per input blob in parallel, then these can be merged (this step is sequential but need not be awful), and after that everything happens per-stream. If the number of streams is large then the speedup potential by throwing cores at the problem is very significant. For a large data set from a large number of nodes, there will almost always be a large number of streams as there is one stream per (node,jobid,cmd) triplet. |
This is a massive win, bringing the time for a run of the profiler down from about 1200ms to about 180ms (more numbers coming). |
Final end-to-end numbers for the move-filtering patch: without host filtering, |
For #526 - move filtering earlier
Avoiding the allocation of a huge intermediate array is also a significant win. The main consumers are now various maps and hash functions used by |
Actually the filter function specialization is a little trickier than I thought. The default filter function is used to exclude heartbeat records via excludeCommands. But it may still be worth it: running time for the test case drops from 28s to 25s in an informal test, ie 10% from an already pretty good time. Also branch larstha-526-better-filter. |
For #526 - Change DB APIs to return slice-of-slice with shared data
Avoiding allocating the huge array took 25% off the running time of heavy-users.py (with either warm or cold cache), bringing us down to about 26s for this code for two months of data on a warm cache. |
Getting rid of the bounds computation for the profile runs is a massive win, dropping the total running time for the heavy-users.py report by 50%, with this we are now down by over 90% relative to the original running time. To be sure, bounds info is used by several other verbs and they will continue to suffer, but they seem to be less affected than the profiler. |
For #526 - don't compute unnecessary host bounds
For #526 - Refactor filter, create filter compiler
Compiling the filter to a bytecode program and executing that cut the time by half again. |
For the two remaining items (concurrency): file reading and parsing is being done in parallel but there is one worker per CPU and there will be plenty of idle time while we're waiting for I/O to complete, if there is a cache miss. If there is a cache hit then the "I/O" is trivial but the workers will be busy. I think a worker pool separate from the I/O workers may be the best thing here. Implementing anything more here is not a priority though, what remains here is background work. |
Let's just defer the rest of this and handle it if it becomes an issue again. |
(This bug has morphed a couple of times but has mostly been about optimizing the filtering pipeline in the sonarlog and db layers. What remains now is making use of available parallelism - this is not high priority, perf is pretty good already.)
For the heavy-users script, we end up running the profile command for every job that passes the initial filtering. The profile command is actually pretty slow. I don't know why precisely, but there is one main loop that is quadratic-ish (though there's an assertion that "the inner loop will tend to be very short") and then when we do bucketing, which I do, there's a bucketing loop that is cubic-ish. One would want to profile this. File I/O should not be a factor because this is with the caching daemon.
I'm running this on four weeks of fox data:
heavy-users.py fox 4w
, which would be roughly may 24 through june 21. There are 38 output lines unless I miscounted.The text was updated successfully, but these errors were encountered: