Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow using dill for pickling #36

Merged
merged 3 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
tests:
runs-on: ubuntu-latest
strategy:
matrix:
matrix:
python: ['3.5', '3.6', '3.7', '3.8', '3.9']
name: aioprocessing ${{ matrix.python }} tests
steps:
Expand All @@ -19,4 +19,9 @@ jobs:
- name: Install Flake8
run: pip install flake8
- run: flake8 .
- run: python runtests.py
- run: python runtests.py -v --failfast
timeout-minutes: 1
# tests should also pass when using multiprocess (dill)
- run: pip install multiprocess
- run: python runtests.py -v --failfast
timeout-minutes: 1
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ aioprocessing

`aioprocessing` provides asynchronous, [`asyncio`](https://docs.python.org/3/library/asyncio.html) compatible, coroutine
versions of many blocking instance methods on objects in the [`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html)
library. Here's an example demonstrating the `aioprocessing` versions of
library. To use [`dill`](https://pypi.org/project/dill) for universal pickling, install using `pip install aioprocessing[dill]`. Here's an example demonstrating the `aioprocessing` versions of
`Event`, `Queue`, and `Lock`:

```python
Expand Down Expand Up @@ -132,4 +132,4 @@ Keep in mind that, while the API exposes coroutines for interacting with
to a `ThreadPoolExecutor`, this means the caveats that apply with using
`ThreadPoolExecutor` with `asyncio` apply: namely, you won't be able to
cancel any of the coroutines, because the work being done in the worker
thread can't be interrupted.
thread can't be interrupted.
7 changes: 3 additions & 4 deletions aioprocessing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import multiprocessing

from . import mp as multiprocessing # noqa
from .connection import * # noqa
from .managers import * # noqa

Expand Down Expand Up @@ -27,8 +26,8 @@
# is zero for an official release, positive for a development branch,
# or negative for a release candidate or beta (after the base version
# number has been incremented)
version = "1.1.1"
version_info = (1, 1, 1, 0)
version = "1.2.0"
version_info = (1, 2, 0, 0)

if hasattr(multiprocessing, "get_context"):

Expand Down
24 changes: 12 additions & 12 deletions aioprocessing/connection.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
from concurrent.futures import ThreadPoolExecutor
from multiprocessing.connection import (
Listener,
Client,
deliver_challenge,
answer_challenge,
wait,
)

from .mp import connection as _connection
from .executor import CoroBuilder
from .util import run_in_executor

Expand Down Expand Up @@ -42,12 +36,12 @@ def __exit__(self, *args, **kwargs):

def AioClient(*args, **kwargs):
""" Returns an AioConnection instance. """
conn = Client(*args, **kwargs)
conn = _connection.Client(*args, **kwargs)
return AioConnection(conn)


class AioListener(metaclass=CoroBuilder):
delegate = Listener
delegate = _connection.Listener
coroutines = ["accept"]

def accept(self):
Expand All @@ -64,14 +58,20 @@ def __exit__(self, *args, **kwargs):

def coro_deliver_challenge(*args, **kwargs):
executor = ThreadPoolExecutor(max_workers=1)
return run_in_executor(executor, deliver_challenge, *args, **kwargs)
return run_in_executor(
executor, _connection.deliver_challenge, *args, **kwargs
)


def coro_answer_challenge(*args, **kwargs):
executor = ThreadPoolExecutor(max_workers=1)
return run_in_executor(executor, answer_challenge, *args, **kwargs)
return run_in_executor(
executor, _connection.answer_challenge, *args, **kwargs
)


def coro_wait(*args, **kwargs):
executor = ThreadPoolExecutor(max_workers=1)
return run_in_executor(executor, wait, *args, **kwargs)
return run_in_executor(
executor, _connection.wait, *args, **kwargs
)
2 changes: 1 addition & 1 deletion aioprocessing/executor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from multiprocessing import cpu_count
from functools import wraps
from concurrent.futures import ThreadPoolExecutor

from . import util
from .mp import cpu_count


def init_executor(func):
Expand Down
6 changes: 3 additions & 3 deletions aioprocessing/locks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from .executor import CoroBuilder
from multiprocessing import (
from .mp import (
Event,
Lock,
RLock,
BoundedSemaphore,
Condition,
Semaphore,
Barrier,
util as _util,
)
from multiprocessing.util import register_after_fork

__all__ = [
"AioLock",
Expand Down Expand Up @@ -57,7 +57,7 @@ def __init__(self, *args, **kwargs):
def _after_fork(obj):
obj._threaded_acquire = False

register_after_fork(self, _after_fork)
_util.register_after_fork(self, _after_fork)

def coro_acquire(self, *args, **kwargs):
""" Non-blocking acquire.
Expand Down
29 changes: 11 additions & 18 deletions aioprocessing/managers.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
import asyncio
from multiprocessing.util import register_after_fork
from queue import Queue
from threading import (
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These imports should still come from threading. They're getting proxied, so we don't need the multiprocessing versions.


from aioprocessing.locks import _ContextManager
from .executor import _ExecutorMixin
from .mp import (
Barrier,
BoundedSemaphore,
Condition,
Event,
Lock,
RLock,
Semaphore,
managers as _managers,
)
from multiprocessing.managers import (
SyncManager,
MakeProxyType,
BarrierProxy,
EventProxy,
ConditionProxy,
AcquirerProxy,
)

from aioprocessing.locks import _ContextManager
from .executor import _ExecutorMixin


AioBaseQueueProxy = MakeProxyType(
AioBaseQueueProxy = _managers.MakeProxyType(
"AioQueueProxy",
(
"task_done",
Expand Down Expand Up @@ -99,7 +92,7 @@ class AioQueueProxy(AioBaseQueueProxy, metaclass=ProxyCoroBuilder):
coroutines = ["get", "put"]


class AioAcquirerProxy(AcquirerProxy, metaclass=ProxyCoroBuilder):
class AioAcquirerProxy(_managers.AcquirerProxy, metaclass=ProxyCoroBuilder):
pool_workers = 1
coroutines = ["acquire", "release"]

Expand Down Expand Up @@ -166,19 +159,19 @@ def __iter__(self):
return _ContextManager(self)


class AioBarrierProxy(BarrierProxy, metaclass=ProxyCoroBuilder):
class AioBarrierProxy(_managers.BarrierProxy, metaclass=ProxyCoroBuilder):
coroutines = ["wait"]


class AioEventProxy(EventProxy, metaclass=ProxyCoroBuilder):
class AioEventProxy(_managers.EventProxy, metaclass=ProxyCoroBuilder):
coroutines = ["wait"]


class AioConditionProxy(ConditionProxy, metaclass=ProxyCoroBuilder):
class AioConditionProxy(_managers.ConditionProxy, metaclass=ProxyCoroBuilder):
coroutines = ["wait", "wait_for"]


class AioSyncManager(SyncManager):
class AioSyncManager(_managers.SyncManager):
""" A mp.Manager that provides asyncio-friendly objects. """

pass
Expand Down
7 changes: 7 additions & 0 deletions aioprocessing/mp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# flake8: noqa
try:
from multiprocess import *
from multiprocess import connection, managers, util
except ImportError:
from multiprocessing import *
from multiprocessing import connection, managers, util
2 changes: 1 addition & 1 deletion aioprocessing/pool.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from multiprocessing import Pool
from asyncio import Future
import asyncio

from .executor import CoroBuilder
from .mp import Pool

__all__ = ["AioPool"]

Expand Down
3 changes: 1 addition & 2 deletions aioprocessing/process.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from multiprocessing import Process

from .executor import CoroBuilder
from .mp import Process

__all__ = ["AioProcess"]

Expand Down
3 changes: 1 addition & 2 deletions aioprocessing/queues.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from multiprocessing import Queue, SimpleQueue, JoinableQueue

from .executor import CoroBuilder
from .mp import Queue, SimpleQueue, JoinableQueue


class AioBaseQueue(metaclass=CoroBuilder):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
),
zip_safe=False,
license="BSD",
extras_require={"dill": ["multiprocess"]},
keywords="asyncio multiprocessing coroutine",
url="https://github.com/dano/aioprocessing",
long_description=readme,
Expand Down
3 changes: 2 additions & 1 deletion tests/_base_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import unittest
import multiprocessing

import aioprocessing.mp as multiprocessing


class BaseTest(unittest.TestCase):
Expand Down
4 changes: 2 additions & 2 deletions tests/connection_tests.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import unittest
import multiprocessing
from multiprocessing import Process
from array import array

import aioprocessing
import aioprocessing.mp as multiprocessing
from aioprocessing.connection import AioConnection, AioListener, AioClient
from aioprocessing.mp import Process

from ._base_test import BaseTest

Expand Down
7 changes: 4 additions & 3 deletions tests/lock_tests.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import multiprocessing
import sys
import time
import asyncio
import unittest
import traceback

import aioprocessing
from multiprocessing import Process, Event, Queue, get_all_start_methods
import aioprocessing.mp as multiprocessing
from aioprocessing.mp import Process, Event, Queue, get_all_start_methods

try:
from multiprocessing import get_context
from aioprocessing.mp import get_context
except ImportError:

def get_context(param):
Expand Down
1 change: 1 addition & 0 deletions tests/pickle_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pickle
import unittest

from aioprocessing.executor import _ExecutorMixin


Expand Down
2 changes: 1 addition & 1 deletion tests/process_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest
import multiprocessing
import aioprocessing.mp as multiprocessing

import aioprocessing
from ._base_test import BaseTest, _GenMixin
Expand Down
9 changes: 7 additions & 2 deletions tests/queue_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import unittest
import aioprocessing
from multiprocessing import Process, Event
from concurrent.futures import ProcessPoolExecutor

import aioprocessing
from aioprocessing.mp import Process, Event, util
from ._base_test import BaseTest, _GenMixin


Expand Down Expand Up @@ -97,6 +97,11 @@ async def queue_put():


class ManagerQueueTest(BaseTest):
@unittest.skipIf(
"multiprocess.util" in str(util),
"concurrent.futures is not yet supported by uqfoundation "
"(https://github.com/uqfoundation/pathos/issues/90)"
)
Comment on lines +100 to +104
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sub-optimal to skip this test... especially for people that are using concurrent.futures.ProcessPoolExecutor in combination with aioprocessing, in an environment where multiprocess happens to be installed. As the author stated in uqfoundation/pathos#90, it should be an easy PR. I've tried figuring it out multiple times as I keep running into multiprocessing problems in async contexts, but repeatedly stranded.

Here's the traceback for this test case, caused by ProcessPoolExecutor internally using multiprocessing instead of multiprocess:

test_executor (tests.queue_test.ManagerQueueTest) ... Process SpawnProcess-1:
Traceback (most recent call last):
  File "~/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "~/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "~/.pyenv/versions/3.8.6/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "~/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/queues.py", line 116, in get
    return _ForkingPickler.loads(res)
  File "~/.pyenv/versions/pi/lib/python3.8/site-packages/multiprocess/managers.py", line 959, in RebuildProxy
    return func(token, serializer, incref=incref, **kwds)
  File "~/.pyenv/versions/pi/lib/python3.8/site-packages/multiprocess/managers.py", line 809, in __init__
    self._incref()
  File "~/.pyenv/versions/pi/lib/python3.8/site-packages/multiprocess/managers.py", line 863, in _incref
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "~/.pyenv/versions/pi/lib/python3.8/site-packages/multiprocess/connection.py", line 511, in Client
    answer_challenge(c, authkey)
  File "~/.pyenv/versions/pi/lib/python3.8/site-packages/multiprocess/connection.py", line 762, in answer_challenge
    raise AuthenticationError('digest sent was rejected')
multiprocess.context.AuthenticationError: digest sent was rejected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that even if uqfoundation would add compatibility, and expose pathos.futures.ProcessPoolExecutor or so, this error would still persist for people trying to combine the concurrent.futures.ProcessPoolExecutor with aioprocessing[dill].

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be a note in the readme about this incompatibility when using dill, as well as documenting the workaround that forces usage of multiprocessing. People upgrading to this version could suddenly have their code break, so it should be very clear how to fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, added. Assuming semver and this being a potentially breaking change ^ I went ahead and bumped to 2.0.0 🤙

def test_executor(self):
m = aioprocessing.AioManager()
q = m.AioQueue()
Expand Down