Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is this fixed: "can't send large data back to main thread"? #217

Closed
y-he2 opened this issue Jul 21, 2021 · 6 comments
Closed

Is this fixed: "can't send large data back to main thread"? #217

y-he2 opened this issue Jul 21, 2021 · 6 comments

Comments

@y-he2
Copy link

y-he2 commented Jul 21, 2021

The written problem was suppose to be fixed in:
https://stackoverflow.com/questions/47692566/python-multiprocessing-apply-async-assert-left-0-assertionerror
python/cpython#9027

However Im still encountering this error when using Multiprocess.
I wonder whether that fix was merged in Multiprocess?

@mmckerns
Copy link
Member

For which version of multiprocess and python are you experiencing this error? Also, if you can give a simple example here that reproduces the error, that would also be good.

@y-he2
Copy link
Author

y-he2 commented Jul 23, 2021

Thanks for a quick reply.
Versions: multiprocess 0.70.12.2 py38h294d835_0 Python 3.8.5

Heres my helper module that suppose to apply a function defined in "func_module_name.py" module to a tensor passed into parallel_tensor_apply(), in a shared memory manner.

import numpy as np
import multiprocess as mp
from multiprocess import shared_memory
import os

def init_worker( func_module_name, shared_memory_block_master, data_shape_ref, data_type_ref ): 
	print( "Init worker processing:\n\t", mp.current_process() )

	global func_module
	try:
		func_module = __import__( func_module_name )
	except ModuleNotFoundError:
		print( "The function module file must be in the same folder as the calling script!" )
	print( dir( func_module ) )
	assert hasattr( func_module, 'proc' ), "The function module must include a function in form of proc( data_tensor, idx )!"

	global data_shape
	data_shape = data_shape_ref
	global data_type
	data_type = data_type_ref

	## suppose to define a global ref to the shared block on each worker
	global shared_memory_block_ref
	shared_memory_block_ref = mp.shared_memory.SharedMemory( name = shared_memory_block_master.name )

def worker_proc( idx ):
	print( "Processing on worker:\n\t", mp.current_process() )

	# suppose on each worker try to access the global ref to create a buffered ndarray
	global data_shape
	global data_type
	shared_data_ref = np.ndarray( shape = data_shape, dtype = data_type, buffer = shared_memory_block_ref.buf )

	# print( shared_data_ref, flush = True )
	return( func_module.proc( shared_data_ref, idx ) )

def parallel_tensor_apply( func_module_name, data_tensor, index_set, max_processes = 99 ):
	if mp.current_process().name == 'MainProcess':
		shared_memory_block_master = shared_memory.SharedMemory( 
			create = True, 
			size = data_tensor.nbytes 
		)
		shared_data_master = np.ndarray( 
			shape = data_tensor.shape, 
			dtype = data_tensor.dtype, 
			buffer = shared_memory_block_master.buf 
		)
		## Copy the data tensor to the shared memory block once, performed only on the master process. 
		shared_data_master[:] = data_tensor[:]

		with mp.Pool( 
			processes = min( max_processes, os.cpu_count() ), 
			initializer = init_worker, 
			initargs = (func_module_name, shared_memory_block_master, data_tensor.shape, data_tensor.dtype)
		) as pool: 
			res = pool.map( 
				worker_proc, 
				index_set
			)
		return( res )

However when using Multiprocess with Jupyter Notebook Im getting the following error after (i think) the applied function successfully executed and try to return the res. Im not sure whether its my shared memory code that was wrong or the Multiprocess library bug yet. But im suspecting the library still contains the bug mentioned in the previous mentioned links.

Exception in thread Thread-8:
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\site-packages\multiprocess\pool.py", line 576, in _handle_results
    task = get()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\site-packages\multiprocess\connection.py", line 253, in recv
    buf = self._recv_bytes()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\site-packages\multiprocess\connection.py", line 321, in _recv_bytes
    return self._get_more_data(ov, maxsize)
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\site-packages\multiprocess\connection.py", line 340, in _get_more_data
    assert left > 0
AssertionError

Which is why im asking whether the fork u guys done (which is a good job btw) included those fixes.

@y-he2
Copy link
Author

y-he2 commented Jul 23, 2021

and i guess we can just assume that func_module.proc() simply returns a tensor with size larger than 2GB for any input.

@mmckerns
Copy link
Member

How does this code run? It seems that some of your example is missing. Can you also simplify it to the minimum that exhibits the behavior you are experiencing? Also, does the behavior only happen when run from a Jupyter notebook?

@y-he2
Copy link
Author

y-he2 commented Jul 25, 2021

Heres an absolute minimal example that should be equivalent:

import numpy as np
import multiprocessing as mp
from multiprocessing import shared_memory
import os
import sys
def worker_proc( something ):
    import pickle
    ## "training.data" can be any object pickled into a file larger than 2GB (just pickle a large numpy tensor).
    with open( 'training.data', 'rb' ) as handle:
        temp_load = pickle.load( handle )
    return( temp_load )

if( __name__ == '__main__' ): 
    with mp.Pool( processes = 2 ) as pool: 
        res = pool.map( worker_proc, range( 2 ) )
    print( sys.getsizeof( res ) )

Error msg when running from cmd with Multiprocessing (yes even that):

Exception in thread Thread-3:
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\multiprocessing\pool.py", line 576, in _handle_results
    task = get()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\multiprocessing\connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\multiprocessing\connection.py", line 318, in _recv_bytes
    return self._get_more_data(ov, maxsize)
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\multiprocessing\connection.py", line 337, in _get_more_data
    assert left > 0
AssertionError

To answer ur question, yes the error appeared everywhere in all:
Using Multiprocess in Jupyter
Using Multiprocess in cmd
Using Multiprocessing in cmd

So I could've been totally wrong, that the error could have lay in the Multiprocessing library itself instead of Multiprocess, and the fix in the links I posted may not been perfect, so the error somehow remained for this scenario.

Although for my case I found a way to walkaround to not use 2GB large objects in children, however if u find this bug interesting feel free to investigate further, otherwise lets close this case as the bug could've been in Multiprocessing.

@mmckerns mmckerns added this to the pathos-0.3.1 milestone Jan 31, 2023
@mmckerns
Copy link
Member

closing this as a duplicate of #150

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants