Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelize bootstrap #145

Closed
aaronspring opened this issue May 23, 2019 · 19 comments · Fixed by #285, #332 or #355
Closed

parallelize bootstrap #145

aaronspring opened this issue May 23, 2019 · 19 comments · Fixed by #285, #332 or #355

Comments

@aaronspring
Copy link
Collaborator

Code Sample

https://stackoverflow.com/questions/56281148/parallelized-bootstrapping-with-replacement-with-xarray-dask

Problem description

I want to perform N=1000 bootstrapping with replacement on gridded data. One computation takes about 0.5s. I have access to a supercomputer exclusive node with 48 cores. Because the resampling are independent of each other, I naively hope to distribute the workload on all or at least many cores and get a performance increase by .8 * ncores. But I dont get it.

@ahuang11
Copy link
Member

ahuang11 commented May 27, 2019

Because the resampling are independent of each other, I naively hope to distribute the workload on all or at least many cores and get a performance increase by .8 * ncores. But I dont get it.

You aren't able to achieve the performance increase or you are not sure about how to implement it?

If it's the latter, this is the solution I use to parallelize for loops straightforwardly (parallel: 4.1 seconds, for loop: 10 seconds):

import time
import dask.bag as db

def waiting_func(time_to_process):
    time.sleep(time_to_process)
    print(time_to_process)

job_times = [1, 3, 4, 2]

db.from_sequence(job_times).map(waiting_func).compute()

for job_time in job_times:
    waiting_func(job_time)

image

However, it also depends on partition size and how long each job lasts. If each job runs in nanosecond, it's hard to reduce runtime through parallelization due to the overhead unless you batch multiple jobs in one partition.

@aaronspring
Copy link
Collaborator Author

I know a few ways how to implement this (see the link), not particularly this one. unfortunately, my ways didnt scale.

@bradyrx
Copy link
Collaborator

bradyrx commented Jun 1, 2019

Just some thoughts from my phone. Maybe push the objects to numpy and use jit then force back to xarray? Not sure if that would do anything here or not..

@ahuang11
Copy link
Member

ahuang11 commented Jun 1, 2019 via email

@ahuang11
Copy link
Member

ahuang11 commented Jun 1, 2019

Perhaps could do some line profiling to see where it's taking the most time per call.

@bradyrx
Copy link
Collaborator

bradyrx commented Jun 1, 2019

Agreed. You could pull the dimension names from the xarray object so that you know what is what when you go to a numpy array. Then do all the resampling/stacking/concatenating there and push it back to the xarray object after.

@aaronspring
Copy link
Collaborator Author

Probably this helps. The uninit bootstrapping function is the pain point of bootstrap_compute.

What I would really like with this issue is to find a way how to compute independently in parallel and by that speed up.

@ahuang11
Copy link
Member

ahuang11 commented Jun 1, 2019

Testing it right now :)

@aaronspring
Copy link
Collaborator Author

Some benchmarking would eventually be nice

@ahuang11
Copy link
Member

ahuang11 commented Jun 1, 2019

267 100 31454182.0 314541.8 47.6 uninit_hind = resample_uninit(hind, hist)
275 100 15557988.0 155579.9 23.5 pers.append(compute_persistence(smp_hind, reference, metric=metric))

The second one, we could pre-initialize the array i.e. np.zeros() and fill that up, rather than appending to end of list.

Timer unit: 1e-06 s

Total time: 66.0949 s
...
Line #      Hits         Time  Per Hit   % Time  Line Contents
   242         1          3.0      3.0      0.0      if pers_sig is None:
   243         1          2.0      2.0      0.0          pers_sig = sig
   244                                           
   245         1          2.0      2.0      0.0      p = (100 - sig) / 100  # 0.05
   246         1          2.0      2.0      0.0      ci_low = p / 2  # 0.025
   247         1          3.0      3.0      0.0      ci_high = 1 - p / 2  # 0.975
   248         1          2.0      2.0      0.0      p_pers = (100 - pers_sig) / 100  # 0.5
   249         1          2.0      2.0      0.0      ci_low_pers = p_pers / 2
   250         1          2.0      2.0      0.0      ci_high_pers = 1 - p_pers / 2
   251                                           
   252         1        282.0    282.0      0.0      inits = hind.init.values
   253         1          3.0      3.0      0.0      init = []
   254         1          2.0      2.0      0.0      uninit = []
   255         1          2.0      2.0      0.0      pers = []
   256                                               # resample with replacement
   257                                               # DoTo: parallelize loop
   258       101        335.0      3.3      0.0      for _ in range(bootstrap):
   259       100       4565.0     45.6      0.0          smp = np.random.choice(inits, len(inits))
   260       100     161594.0   1615.9      0.2          smp_hind = hind.sel(init=smp)
   261                                                   # compute init skill
   262       100        273.0      2.7      0.0          init.append(
   263       100   13048879.0 130488.8     19.7              compute(smp_hind, reference, metric=metric, comparison=comparison))
   264                                                   # generate uninitialized ensemble from hist
   265       100        338.0      3.4      0.0          if hist is None:  # PM path, use reference = control
   266         1          2.0      2.0      0.0              hist = reference
   267       100   31454182.0 314541.8     47.6          uninit_hind = resample_uninit(hind, hist)
   268                                                   # compute uninit skill
   269       100        316.0      3.2      0.0          uninit.append(
   270       100        184.0      1.8      0.0              compute(uninit_hind,
   271       100        185.0      1.9      0.0                      reference,
   272       100        190.0      1.9      0.0                      metric=metric,
   273       100    5434717.0  54347.2      8.2                      comparison=comparison))
   274                                                   # compute persistence skill
   275       100   15557988.0 155579.9     23.5          pers.append(compute_persistence(smp_hind, reference, metric=metric))
   276         1      53347.0  53347.0      0.1      init = xr.concat(init, dim='bootstrap')
   277         1      55311.0  55311.0      0.1      uninit = xr.concat(uninit, dim='bootstrap')
   278         1      52387.0  52387.0      0.1      pers = xr.concat(pers, dim='bootstrap')
   279                                           
   280         1         16.0     16.0      0.0      if set(pers.coords) != set(init.coords):
   281                                                   init, pers = xr.broadcast(init, pers)
   282                                           
   283         1       3911.0   3911.0      0.0      init_ci = _distribution_to_ci(init, ci_low, ci_high)
   284         1       3890.0   3890.0      0.0      uninit_ci = _distribution_to_ci(uninit, ci_low, ci_high)
   285         1       3848.0   3848.0      0.0      pers_ci = _distribution_to_ci(pers, ci_low_pers, ci_high_pers)
   286                                           
   287         1       1445.0   1445.0      0.0      p_uninit_over_init = _pvalue_from_distributions(uninit, init)
   288         1       1387.0   1387.0      0.0      p_pers_over_init = _pvalue_from_distributions(pers, init)
   289                                           
   290                                               # calc skill
   291         1      85911.0  85911.0      0.1      init_skill = compute(hind, reference, metric=metric, comparison=comparison)
   292         1        200.0    200.0      0.0      uninit_skill = uninit.mean('bootstrap')
   293         1     149341.0 149341.0      0.2      pers_skill = compute_persistence(hind, reference, metric=metric)
   294                                           
   295         1         14.0     14.0      0.0      if set(pers_skill.coords) != set(init_skill.coords):
   296                                                   init_skill, pers_skill = xr.broadcast(init_skill, pers_skill)
   297                                           
   298                                               # wrap results together in one dataarray
   299         1       2456.0   2456.0      0.0      skill = xr.concat([init_skill, uninit_skill, pers_skill], 'kind')
   300         1        747.0    747.0      0.0      skill['kind'] = ['init', 'uninit', 'pers']
   301                                           
   302                                               # probability that i beats init
   303         1       2124.0   2124.0      0.0      p = xr.concat([p_uninit_over_init, p_pers_over_init], 'kind')
   304         1        800.0    800.0      0.0      p['kind'] = ['uninit', 'pers']
   305                                           
   306                                               # ci for each skill
   307         1          3.0      3.0      0.0      ci = xr.concat([init_ci, uninit_ci, pers_ci],
   308         1       3625.0   3625.0      0.0                     'kind').rename({'quantile': 'results'})
   309         1        800.0    800.0      0.0      ci['kind'] = ['init', 'uninit', 'pers']
   310                                           
   311         1       3937.0   3937.0      0.0      results = xr.concat([skill, p], 'results')
   312         1        878.0    878.0      0.0      results['results'] = ['skill', 'p']
   313         1         17.0     17.0      0.0      if set(results.coords) != set(ci.coords):
   314                                                   res_drop = [c for c in results.coords if c not in ci.coords]
   315                                                   ci_drop = [c for c in ci.coords if c not in results.coords]
   316                                                   results = results.drop(res_drop)
   317                                                   ci = ci.drop(ci_drop)
   318         1       4477.0   4477.0      0.0      results = xr.concat([results, ci], 'results')
   319         1          2.0      2.0      0.0      return results

@aaronspring
Copy link
Collaborator Author

I was more curious how the bootstrapping could be spread and only but those means get fast. I know creating the uninitialized ruins speed.

Different approach. They get speed ups but they have huge data from satellites it seems. https://github.com/observingClouds/pyresample/blob/master/pyresample/_multi_proc.py

@aaronspring
Copy link
Collaborator Author

I somehow got this working for the stats functions: https://stackoverflow.com/questions/56281148/parallelized-bootstrapping-with-replacement-with-xarray-dask/59512878#59512878

Will try the compute funcs soon...

@aaronspring
Copy link
Collaborator Author

will implement this soon. just got it working: https://gist.github.com/aaronspring/4750d2b1eb0468ac59a034ed5a5f136c

@aaronspring aaronspring mentioned this issue Dec 29, 2019
12 tasks
@aaronspring
Copy link
Collaborator Author

267 100 31454182.0 314541.8 47.6 uninit_hind = resample_uninit(hind, hist)
275 100 15557988.0 155579.9 23.5 pers.append(compute_persistence(smp_hind, reference, metric=metric))

The second one, we could pre-initialize the array i.e. np.zeros() and fill that up, rather than appending to end of list.

Timer unit: 1e-06 s

Total time: 66.0949 s
...
Line #      Hits         Time  Per Hit   % Time  Line Contents
   242         1          3.0      3.0      0.0      if pers_sig is None:
   243         1          2.0      2.0      0.0          pers_sig = sig
   244                                           
   245         1          2.0      2.0      0.0      p = (100 - sig) / 100  # 0.05
   246         1          2.0      2.0      0.0      ci_low = p / 2  # 0.025
   247         1          3.0      3.0      0.0      ci_high = 1 - p / 2  # 0.975
   248         1          2.0      2.0      0.0      p_pers = (100 - pers_sig) / 100  # 0.5
   249         1          2.0      2.0      0.0      ci_low_pers = p_pers / 2
   250         1          2.0      2.0      0.0      ci_high_pers = 1 - p_pers / 2
   251                                           
   252         1        282.0    282.0      0.0      inits = hind.init.values
   253         1          3.0      3.0      0.0      init = []
   254         1          2.0      2.0      0.0      uninit = []
   255         1          2.0      2.0      0.0      pers = []
   256                                               # resample with replacement
   257                                               # DoTo: parallelize loop
   258       101        335.0      3.3      0.0      for _ in range(bootstrap):
   259       100       4565.0     45.6      0.0          smp = np.random.choice(inits, len(inits))
   260       100     161594.0   1615.9      0.2          smp_hind = hind.sel(init=smp)
   261                                                   # compute init skill
   262       100        273.0      2.7      0.0          init.append(
   263       100   13048879.0 130488.8     19.7              compute(smp_hind, reference, metric=metric, comparison=comparison))
   264                                                   # generate uninitialized ensemble from hist
   265       100        338.0      3.4      0.0          if hist is None:  # PM path, use reference = control
   266         1          2.0      2.0      0.0              hist = reference
   267       100   31454182.0 314541.8     47.6          uninit_hind = resample_uninit(hind, hist)
   268                                                   # compute uninit skill
   269       100        316.0      3.2      0.0          uninit.append(
   270       100        184.0      1.8      0.0              compute(uninit_hind,
   271       100        185.0      1.9      0.0                      reference,
   272       100        190.0      1.9      0.0                      metric=metric,
   273       100    5434717.0  54347.2      8.2                      comparison=comparison))
   274                                                   # compute persistence skill
   275       100   15557988.0 155579.9     23.5          pers.append(compute_persistence(smp_hind, reference, metric=metric))
   276         1      53347.0  53347.0      0.1      init = xr.concat(init, dim='bootstrap')
   277         1      55311.0  55311.0      0.1      uninit = xr.concat(uninit, dim='bootstrap')
   278         1      52387.0  52387.0      0.1      pers = xr.concat(pers, dim='bootstrap')
   279                                           
   280         1         16.0     16.0      0.0      if set(pers.coords) != set(init.coords):
   281                                                   init, pers = xr.broadcast(init, pers)
   282                                           
   283         1       3911.0   3911.0      0.0      init_ci = _distribution_to_ci(init, ci_low, ci_high)
   284         1       3890.0   3890.0      0.0      uninit_ci = _distribution_to_ci(uninit, ci_low, ci_high)
   285         1       3848.0   3848.0      0.0      pers_ci = _distribution_to_ci(pers, ci_low_pers, ci_high_pers)
   286                                           
   287         1       1445.0   1445.0      0.0      p_uninit_over_init = _pvalue_from_distributions(uninit, init)
   288         1       1387.0   1387.0      0.0      p_pers_over_init = _pvalue_from_distributions(pers, init)
   289                                           
   290                                               # calc skill
   291         1      85911.0  85911.0      0.1      init_skill = compute(hind, reference, metric=metric, comparison=comparison)
   292         1        200.0    200.0      0.0      uninit_skill = uninit.mean('bootstrap')
   293         1     149341.0 149341.0      0.2      pers_skill = compute_persistence(hind, reference, metric=metric)
   294                                           
   295         1         14.0     14.0      0.0      if set(pers_skill.coords) != set(init_skill.coords):
   296                                                   init_skill, pers_skill = xr.broadcast(init_skill, pers_skill)
   297                                           
   298                                               # wrap results together in one dataarray
   299         1       2456.0   2456.0      0.0      skill = xr.concat([init_skill, uninit_skill, pers_skill], 'kind')
   300         1        747.0    747.0      0.0      skill['kind'] = ['init', 'uninit', 'pers']
   301                                           
   302                                               # probability that i beats init
   303         1       2124.0   2124.0      0.0      p = xr.concat([p_uninit_over_init, p_pers_over_init], 'kind')
   304         1        800.0    800.0      0.0      p['kind'] = ['uninit', 'pers']
   305                                           
   306                                               # ci for each skill
   307         1          3.0      3.0      0.0      ci = xr.concat([init_ci, uninit_ci, pers_ci],
   308         1       3625.0   3625.0      0.0                     'kind').rename({'quantile': 'results'})
   309         1        800.0    800.0      0.0      ci['kind'] = ['init', 'uninit', 'pers']
   310                                           
   311         1       3937.0   3937.0      0.0      results = xr.concat([skill, p], 'results')
   312         1        878.0    878.0      0.0      results['results'] = ['skill', 'p']
   313         1         17.0     17.0      0.0      if set(results.coords) != set(ci.coords):
   314                                                   res_drop = [c for c in results.coords if c not in ci.coords]
   315                                                   ci_drop = [c for c in ci.coords if c not in results.coords]
   316                                                   results = results.drop(res_drop)
   317                                                   ci = ci.drop(ci_drop)
   318         1       4477.0   4477.0      0.0      results = xr.concat([results, ci], 'results')
   319         1          2.0      2.0      0.0      return results

@ahuang11 how do you do this line profiling?

@ahuang11
Copy link
Member

ahuang11 commented Dec 31, 2019 via email

@aaronspring
Copy link
Collaborator Author

Thanks.

@aaronspring
Copy link
Collaborator Author

aaronspring commented Feb 8, 2020

its getter better, but we can get even more, I'm convinced!

new observations regarding bootstrap_hindcast:

  • building the graph takes very long. compute is rather short.
  • best chunking: hind.lead 1, hist -1, obs -1
  • 10K task for boostrap=16 too many

new ideas to check:

  • first build resampled ensembles, then calc skill on those in batch
  • persist resampled ensembles useful?
  • check again whether compute after bootstrap loop would be useful
  • try to write few function with resampling along member dim only

@aaronspring aaronspring pinned this issue Feb 8, 2020
@aaronspring aaronspring mentioned this issue Feb 8, 2020
11 tasks
@bradyrx
Copy link
Collaborator

bradyrx commented Feb 8, 2020

We can consider wrapping with map_blocks explicitly in certain places which tends to drastically reduce ntasks in some cases.

@bradyrx bradyrx self-assigned this Feb 28, 2020
@aaronspring aaronspring linked a pull request Mar 1, 2020 that will close this issue
7 tasks
@bradyrx
Copy link
Collaborator

bradyrx commented Mar 13, 2020

I don't think this is fully resolved yet, but worked toward with https://github.com/bradyrx/climpred/pull/332.

@bradyrx bradyrx reopened this Mar 13, 2020
@aaronspring aaronspring linked a pull request Apr 24, 2020 that will close this issue
8 tasks
@aaronspring aaronspring unpinned this issue Jun 11, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment