diff --git a/losoto/lib_operations.py b/losoto/lib_operations.py index d5740d2..9a60b5e 100644 --- a/losoto/lib_operations.py +++ b/losoto/lib_operations.py @@ -2,11 +2,22 @@ # Some utilities for operations -import sys, multiprocessing +import multiprocessing +import os import numpy as np from losoto.h5parm import h5parm from losoto._logging import logger as logging +def nproc(): + """ + Return the number of CPU cores _available_ to the current process, similar + to what the Linux `nproc` command does. This can be less than the total + number of CPU cores in the machine, which is returned by, e.g., + `multiprocessing.cpu_count()` + """ + return len(os.sched_getaffinity(0)) + + class multiprocManager(object): class multiThread(multiprocessing.Process): @@ -43,7 +54,7 @@ def __init__(self, procs=0, funct=None): and it will be linked to the output queue """ if procs == 0: - procs = multiprocessing.cpu_count() + procs = nproc() self.procs = procs self._threads = [] self.inQueue = multiprocessing.JoinableQueue() diff --git a/losoto/operations/faraday.py b/losoto/operations/faraday.py index 0839c88..4a34e80 100644 --- a/losoto/operations/faraday.py +++ b/losoto/operations/faraday.py @@ -117,7 +117,7 @@ def run( soltab, soltabOut='rotationmeasure000', refAnt='', maxResidual=1.,ncpu= tuples = [(t,coord_rr,coord_ll,wt,vl,solType,coord,maxResidual) for t,wt,vl in zip(list(np.arange(len(times))), weightsliced, valsliced)] if ncpu == 0: - ncpu = mp.cpu_count() + ncpu = nproc() with mp.Pool(ncpu) as pool: fitrm,fitweights = zip(*pool.starmap(_run_timestep,tuples)) diff --git a/losoto/operations/interpolatedirections.py b/losoto/operations/interpolatedirections.py index df587a5..941a224 100644 --- a/losoto/operations/interpolatedirections.py +++ b/losoto/operations/interpolatedirections.py @@ -260,7 +260,7 @@ def run( soltab, interp_dirs, soltabOut=None, prefix='interp_', ncpu=0): import sys sys.exit() # run the interpolation - ncpu = mp.cpu_count() if ncpu == 0 else ncpu # default use all cores + ncpu = nproc() if ncpu == 0 else ncpu # default use all cores with mp.Pool(ncpu) as pool: logging.info('Start interpolation.') results = pool.starmap(interpolate_directions3d, args) diff --git a/losoto/operations/prefactor_bandpass.py b/losoto/operations/prefactor_bandpass.py index 7c33ef5..c28af8f 100644 --- a/losoto/operations/prefactor_bandpass.py +++ b/losoto/operations/prefactor_bandpass.py @@ -519,7 +519,7 @@ def run(soltab, chanWidth='', outSoltabName='bandpass', BadSBList = '', interpol if autoFlag: if ncpu == 0: import multiprocessing - ncpu = multiprocessing.cpu_count() + ncpu = nproc() mpm = multiprocManager(ncpu, _flag_amplitudes) for s in range(nants): mpm.put([soltab.freq[:], amplitude_arraytmp[:, s, :, :], weights_arraytmp[:, s, :, :], diff --git a/losoto/operations/tec.py b/losoto/operations/tec.py index e53ab8c..0f9eb48 100644 --- a/losoto/operations/tec.py +++ b/losoto/operations/tec.py @@ -356,7 +356,7 @@ def run( soltab, soltabOut, refAnt, maxResidualFlag, maxResidualProp, ncpu ): selections.append(selection) if ncpu == 0: - ncpu = mp.cpu_count() + ncpu = nproc() with mp.Pool(ncpu) as pool: logging.info('Start TEC fitting.') results = pool.starmap(fit_tec_to_phases, args)