Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask support improvements #367

Merged
merged 1 commit into from
Jul 13, 2023
Merged

Dask support improvements #367

merged 1 commit into from
Jul 13, 2023

Conversation

astrofrog
Copy link
Member

@astrofrog astrofrog commented May 26, 2023

This is a work in progress as I try and improve the performance of the parallelization

Issues I'm still seeing:

  • When using the progress bar, for some reason it stays stuck at low values and then very quickly fills up to 100%, whereas normally to_zarr should really just operate chunk by chunk. This might be to do with returning the footprint and array and also the trimming of the image, which might add a layer of chunks to the whole thing. To be investigated.
  • We should really clean up all temp files as it's easy to fill one's /tmp directory otherwise

Performance notes:

  • I have tried reprojecting a (32000, 8600) array to a similar WCS and when using parallel mode, it is quite a bit faster (56 seconds with 16 processes, instead of 4m26s in synchronous mode). When using (2048, 2048) chunks, using 4 processes (... s) is almost as fast as 16 processes (1m26s instead of 56s). Using (4096, 4096) chunks (to try and reduce overhead) actually makes things slower for 16 processes, presumably because it's not an efficient division of the data. Using (1024, 1024) chunks makes things a little faster (52s) with 16 processes. Using (512, 512) chunks makes things a little faster still (49s), and going all the way down to (256, 256) is surprisingly still ok (49s).

  • Using dask-distributed works well but one needs (for now) to change:

        if isinstance(parallel, int):
            workers = {"num_workers": parallel}
        else:
            workers = {}
        with dask.config.set(scheduler="processes", **workers):
            result.to_zarr(filename)

to

        result.to_zarr(filename)

With these changes, using 16 single-threaded local workers and the above large array example, with (1024, 1024) chunks, the runtime is 1 minute (compared to 52s with the regular processes scheduler). With (2048, 2048) chunks, the runtime is 1m6s, so similar. It would be interesting to try out an example with multiple machines to see how efficient the network communication is.

  • I'm struggling to get more than a factor of 2x improvement in performance with @Cadair's DKIST coadd example, which might be due to the size of the images (4096, 4096). This does make me wonder whether we might want to consider having a separate parallelization implementation in the coadd function, with one image per process rather than trying to split it up. I don't fully understand why I can't get better than 2x improvement though. In synchronous mode, and with (512, 512) chunks, the code takes 1m22s whereas with 16 processes it takes 45s. If I replace:
        array, footprint = reproject_func(
            array_in, wcs_in, wcs_out_sub, block_info[None]["chunk-shape"][1:]
        )

with

        array = np.random.random(block_info[None]["chunk-shape"][1:])
        footprint = np.random.random(block_info[None]["chunk-shape"][1:])

inside reproject_single_block, the synchronous mode takes 13s, and parallel with 16 processes takes 15s, so that gives us an idea of the overhead of the chunking itself before any reprojection. Taking that into account, the main parallelizable part of the code then takes 30s in parallel compared to 1m07s, so still not massively faster.

  • One weird thing I've seen with @Cadair's example mentioned above is that the individual processes seem to use >> 100% each, so for some reason something is multi-threaded and I can't figure out what it is. This could explain why it's hard to get significant performance improvements with more processes. I don't see this issue with the large mosaic example I was trying out, all processes max out at 100%. If I load up @Cadair's example using the FITS files directly I don't see this issue, all processes max out at 100%. If I use the same example again but changing just the WCS to the GWCSes and keep the data as numpy arrays read in from the FITS files, I see the multi-threading again.

TODOs:

  • Figure out how to properly support dask distributed without having to comment out the above code. Perhaps we need a way to pass in a scheduler, though we need to warn people not to use multi-threaded schedulers since those can cause issues with some of the reprojection algorithms.
  • Make it possible to return dask arrays from reproject_interp

…ray, and set a maximum block size for chunks if not specified
@astrofrog astrofrog marked this pull request as draft May 26, 2023 10:56
# Sometimes compute() has to be called twice to return a Numpy array,
# so we need to check here if this is the case and call the first compute()
if isinstance(dask_array.ravel()[0].compute(), da.Array):
dask_array = dask_array.compute()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to avoid issues seen with @Cadair's data - interestingly a similar issue was seen with a different dataset in glue (glue-viz/glue#2399) so maybe this is a common issue, hence why it might be worth having the workaround here.

if block_size:
output_array_dask = da.empty(shape_out, chunks=block_size)
else:
output_array_dask = da.empty(shape_out).rechunk(block_size_limit=8 * 1024**2)
Copy link
Member Author

@astrofrog astrofrog May 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't do this, then for a 4k by 4k image the default chunk is the whole image so there is no gain from parallelizing. For small images that is true anyway (no gain from parallelization) but I do think the default chunk size is too big.

@codecov
Copy link

codecov bot commented May 26, 2023

Codecov Report

Merging #367 (d37fc5f) into main (8ea36e7) will decrease coverage by 0.21%.
The diff coverage is 60.00%.

@@            Coverage Diff             @@
##             main     #367      +/-   ##
==========================================
- Coverage   92.50%   92.29%   -0.21%     
==========================================
  Files          24       24              
  Lines         840      844       +4     
==========================================
+ Hits          777      779       +2     
- Misses         63       65       +2     
Impacted Files Coverage Δ
reproject/utils.py 84.48% <60.00%> (-0.82%) ⬇️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@astrofrog
Copy link
Member Author

astrofrog commented May 26, 2023

@Cadair - recording this here because not sure where else to put it, but if I run:

import numpy as np
import dkist
from astropy.wcs.utils import pixel_to_pixel

ds = dkist.Dataset.from_asdf("BLKGA_test.asdf")

N = 10000000

pixel_to_pixel(ds.flat[0][0].wcs, ds.flat[1][0].wcs, np.ones(N), np.ones(N))

I get the following CPU usage:
plot

So there are indeed multi-threaded spikes. I don't think this is a problem per se although it would be nice to know where this is coming from. But if some of the transformation is indeed multi-threaded it does explain why I was having a hard time gaining performance with multi-processing as in your case the coordinate transforms are the bulk of the time.

@astrofrog
Copy link
Member Author

astrofrog commented May 26, 2023

Simpler example:

import numpy as np
import dkist
import dask

dask.config.set(scheduler='single-threaded')

ds = dkist.Dataset.from_asdf("BLKGA_test.asdf")

N = 100000000

ds.flat[0][0].wcs.low_level_wcs.pixel_to_world_values(np.ones(N), np.ones(N))

plot3

Interestingly there are two spikes in CPU, one near the start of the transform and one near the end (ignore the ones close to t=0) - regardless of how many coordinates there are to transform.

@astrofrog astrofrog marked this pull request as ready for review July 13, 2023 10:11
@astrofrog
Copy link
Member Author

I'm going to merge this as-is as I am now going to do a reasonably big refactor of the dask stuff so want to keep it separate from this simple diff.

@astrofrog astrofrog merged commit 00351ae into astropy:main Jul 13, 2023
@astrofrog
Copy link
Member Author

Just to close the loop on the question of what in the solar examples is multi-threaded, it looks like this is because GWCS uses astropy modeling and in particular AffineTransformation2D which in turn uses np.dot which is multi-threaded on some installations. An easy way to disable the multi-threading prior to doing profiling work is to do:

import os
os.environ['OPENBLAS_NUM_THREADS'] = '1' 

before Numpy and Scipy are imported.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant