-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added tasks tutorial #1027
Merged
Merged
Added tasks tutorial #1027
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
Network IO and the DiscoveryStrategy | ||
==================================== | ||
|
||
This document assumes you have a basic understanding of asyncio tasks, as documented in `the tasks tutorial <../../basics/tasks_tutorial>`_. | ||
You will learn how to use the IPv8's ``DiscoveryStrategy`` class to avoid network congestion. | ||
|
||
The DiscoveryStrategy | ||
--------------------- | ||
|
||
IPv8 only manages one socket (``Endpoint``), which is most likely using the UDP protocol. | ||
If every ``Community`` starts sending at the exact same time and overpowers the UDP socket, this causes packet drops. | ||
To counter this, IPv8 has the ``DiscoveryStrategy`` class. | ||
|
||
An IPv8 instance will call each of its registered ``DiscoveryStrategy`` instances sequentially to avoid network I/O clashes. | ||
If you have an ``interval`` task in your ``TaskManager`` that leads to network I/O, you should consider converting it to a ``DiscoveryStrategy``. | ||
You can make your own subclass as follows: | ||
|
||
.. literalinclude:: discoverystrategy_tutorial_1.py | ||
:lines: 13-20 | ||
|
||
Note that a ``DiscoveryStrategy`` should be thread-safe. | ||
You can use the ``walk_lock`` for thread safety. | ||
|
||
Using a DiscoveryStrategy | ||
------------------------- | ||
|
||
You can register your ``DiscoveryStrategy`` with a running ``IPv8`` instance as follows: | ||
|
||
.. literalinclude:: discoverystrategy_tutorial_1.py | ||
:lines: 23-28 | ||
|
||
Note that we specify a ``target_peers`` argument. | ||
This argument specifies the amount of peers after which the ``DiscoveryStrategy`` should no longer be called. | ||
Calls will be resumed when the amount of peers in your ``Community`` dips below this value again. | ||
For example, the built-in ``RandomWalk`` strategy can be configured to stop finding new peers after if an overlay already has ``20`` or more peers. | ||
In this example we have used the magic value ``-1``, which causes ``IPv8`` to never stop calling this strategy. | ||
|
||
You can also load your strategy through the ``configuration`` or ``loader``. | ||
First, an example of how to do this with the ``configuration``: | ||
|
||
.. literalinclude:: discoverystrategy_tutorial_2.py | ||
:lines: 14-35 | ||
|
||
Note that you can add as many strategies as you want to an overlay. | ||
Also note that for IPv8 to link the name ``"MyDiscoveryStrategy"`` to a class, you need to define it in your ``Community``'s ``get_available_strategies()`` dictionary. | ||
|
||
Lastly, alternatively, the way to add your custom ``MyDiscoveryStrategy`` class to a ``CommunityLauncher`` is as follows: | ||
|
||
.. code-block:: python | ||
|
||
@overlay('my_module.some_submodule', 'MyCommunity') | ||
@walk_strategy(MyDiscoveryStrategy) | ||
class MyLauncher(CommunityLauncher): | ||
pass | ||
|
||
This is the shortest way. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from os import urandom | ||
from random import choice | ||
|
||
from pyipv8.ipv8.community import Community | ||
from pyipv8.ipv8.peerdiscovery.discovery import DiscoveryStrategy | ||
from pyipv8.ipv8.types import IPv8 | ||
|
||
|
||
class MyCommunity(Community): | ||
community_id = urandom(20) | ||
|
||
|
||
class MyDiscoveryStrategy(DiscoveryStrategy): | ||
|
||
def take_step(self): | ||
with self.walk_lock: | ||
# Insert your logic here. For example: | ||
if self.overlay.get_peers(): | ||
peer = choice(self.overlay.get_peers()) | ||
self.overlay.send_introduction_request(peer) | ||
|
||
|
||
def main(ipv8_instance: IPv8): | ||
overlay = ipv8_instance.get_overlay(MyCommunity) | ||
target_peers = -1 | ||
ipv8_instance.add_strategy(overlay, | ||
MyDiscoveryStrategy(overlay), | ||
target_peers) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import os | ||
|
||
from pyipv8.ipv8.community import Community | ||
from pyipv8.ipv8.configuration import DISPERSY_BOOTSTRAPPER, get_default_configuration | ||
from pyipv8.ipv8.peerdiscovery.discovery import DiscoveryStrategy | ||
|
||
|
||
class MyDiscoveryStrategy(DiscoveryStrategy): | ||
|
||
def take_step(self): | ||
pass | ||
|
||
|
||
class MyCommunity(Community): | ||
community_id = os.urandom(20) | ||
|
||
def get_available_strategies(self): | ||
return {"MyDiscoveryStrategy": MyDiscoveryStrategy} | ||
|
||
|
||
definition = { | ||
'strategy': "MyDiscoveryStrategy", | ||
'peers': -1, | ||
'init': {} | ||
} | ||
|
||
config = get_default_configuration() | ||
config['overlays'] = [{ | ||
'class': 'MyCommunity', | ||
'key': "anonymous id", | ||
'walkers': [definition], | ||
'bootstrappers': [DISPERSY_BOOTSTRAPPER.copy()], | ||
'initialize': {}, | ||
'on_start': [] | ||
}] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
Task Management | ||
=============== | ||
|
||
This document assumes you have a basic understanding of network overlays in IPv8, as documented in `the overlay tutorial <../../basics/overlay_tutorial>`_. | ||
You will learn how to use the IPv8's ``TaskManager`` class to manage ``asyncio`` tasks, when to use ``DiscoveryStrategy`` and how to use ``NumberCache`` to manage ``Future`` instances. | ||
|
||
What is a task? | ||
--------------- | ||
|
||
Essentially, a task is a way for ``asyncio`` to point to code that should be executed at some point. | ||
The ``asyncio`` library checks if it has something to execute and executes it until it has no more tasks to execute. | ||
However, there are many intricacies when dealing with tasks. | ||
Consider the following example: | ||
|
||
.. literalinclude:: tasks_tutorial_1.py | ||
|
||
Can you guess what will be printed? | ||
|
||
The correct answer is ``[2, 3, 5, 4]`` and equally important is that the answer changes to ``[2, 3, 5]`` if you omit the final ``await asyncio.sleep(1)``. | ||
Feel free to skip to the "The TaskManager" section if both of these answers are obvious to you. | ||
|
||
Let's run through ``execute_me()`` to explain this behavior: | ||
|
||
1. The ``execute_me_too(1)`` will not be executed, though your application will not crash. You will be informed of this through the ``RuntimeWarning: coroutine 'execute_me_too' was never awaited`` warning message. You should have awaited this ``execute_me_too`` call, we do this properly in the next line. | ||
2. By awaiting ``execute_me_too(2)`` the ``execute_me()`` call will wait until ``execute_me_too(2)`` has finished executing. This adds the value ``2`` to the ``COMPLETED`` list. | ||
3. After waiting for ``execute_me_too(2)`` to finish ``COMPLETED.append(3)`` is allowed to append the value ``3`` to the ``COMPLETED`` list. | ||
4. By creating a future for ``execute_me_too(4)``, we allow ``execute_me()`` to continue executing. | ||
5. While we wait for ``execute_me_too(4)`` to finish, ``COMPLETED.append(5)`` can already access the ``COMPLETED`` list and insert its value ``5``. | ||
6. While ``execute_me()`` waits for another second, ``execute_me_too(4)`` is allowed to add to the ``COMPLETED`` list. | ||
|
||
Note that if you had omitted ``await asyncio.sleep(1)``, the ``execute_me()`` call would not be waiting for anything anymore and return (outputting ``[2, 3, 5]``). | ||
Instead, the future returned by ``asyncio.ensure_future()`` should have been awaited, as follows: | ||
|
||
.. literalinclude:: tasks_tutorial_2.py | ||
:lines: 11-16 | ||
|
||
As an added bonus, this is also faster than the previous method. | ||
This new example will finish when the ``execute_me_too(4)`` call completes, instead of after waiting a full second. | ||
|
||
This concludes the basics of ``asyncio`` tasks. | ||
Now, we'll show you how to make your life easier by using IPv8's ``TaskManager`` class. | ||
|
||
The TaskManager | ||
--------------- | ||
|
||
Managing ``Future`` instances and ``coroutine`` instances is complex and error-prone and, to help you with managing these, IPv8 has the ``TaskManager`` class. | ||
The ``TaskManager`` takes care of managing all of your calls that have not completed yet and allows you to cancel them on demand. | ||
You can even find this class in a separate PyPi repository: https://pypi.org/project/ipv8-taskmanager/ | ||
|
||
Adding tasks | ||
^^^^^^^^^^^^ | ||
|
||
To add tasks to your ``TaskManager`` you can call ``register_task()``. | ||
You register a task with a name, which you can later use to inspect the state of a task or cancel it. | ||
For example: | ||
|
||
.. literalinclude:: tasks_tutorial_3.py | ||
|
||
This example prints ``[2]``. | ||
If you had not called ``wait_for_tasks()`` before ``shutdown_task_manager()`` in this example, all of your registered tasks would have been canceled, printing ``[]``. | ||
|
||
In some cases you may also not be interested in canceling a particular task. | ||
For this use case the ``register_anonymous_task()`` call exists, for example: | ||
|
||
.. literalinclude:: tasks_tutorial_4.py | ||
:lines: 16-19 | ||
|
||
Note that this example takes just over half a second to execute, all 20 calls to ``execute_me_too()`` are scheduled at (almost) the same time! | ||
|
||
Periodic and delayed tasks | ||
^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
Next to simply adding tasks, the ``TaskManager`` also allows you to invoke calls after a delay or periodically. | ||
The following example will add the value ``1`` to the ``COMPLETED`` list periodically and inject a single value of ``2`` after half a second: | ||
|
||
.. literalinclude:: tasks_tutorial_5.py | ||
:lines: 8-26 | ||
|
||
This example prints ``[1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]``. | ||
|
||
Note that you can also create a task with both an interval and a delay. | ||
|
||
Community Integration | ||
^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
For your convenience, every ``Community`` is also a ``TaskManager``. | ||
However, try to avoid spawning any tasks in the ``__init__`` of your ``Community``. | ||
Specifically, you could end up scheduling a task in between your ``Community`` and other ``Community`` initialization. | ||
Working with a half-initialized IPv8 may lead to problems. | ||
This practice also makes your ``Community`` more difficult to test. | ||
|
||
If you do end up in a situation where you absolutely must schedule a task from the ``__init__()``, you can use the ``TaskManager``'s ``cancel_all_pending_tasks()`` method to cancel all registered tasks. | ||
This is an example of how to deal with this in your unit tests: | ||
|
||
.. literalinclude:: tasks_tutorial_6.py | ||
:lines: 18-26 | ||
|
||
Futures and caches | ||
^^^^^^^^^^^^^^^^^^ | ||
|
||
Sometimes you may find that certain tasks belong to a message context. | ||
In other words, you may have a task that belongs to a *cache* (see `the storing states tutorial <../../basics/overlay_tutorial>`_). | ||
By registering a ``Future`` instance in your ``NumberCache`` subclass it will automatically be canceled when the ``NumberCache`` gets canceled or times out. | ||
You can do so using the ``register_future()`` method. | ||
This is a complete example: | ||
|
||
.. literalinclude:: tasks_tutorial_7.py | ||
|
||
This example prints: | ||
|
||
.. code-block:: console | ||
|
||
future0.result()=True | ||
future1.result()=False | ||
future2.cancelled()=True | ||
|
||
This example showcases the three states your ``Future`` may find itself in. | ||
First, it may have been called and received an explicit result (``True`` in this case). | ||
Second, the future may have timed out. | ||
By default a timed-out ``Future`` will have its result set to ``None``, but you can even give this method an exception class to have it raise an exception for you. | ||
In this example we set the value to ``False``. | ||
Third and last is the case where your future was cancelled, which will raise a ``asyncio.exceptions.CancelledError`` if you ``await`` it. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
import asyncio | ||
|
||
COMPLETED = [] | ||
|
||
|
||
async def execute_me_too(i): | ||
await asyncio.sleep(0.5) | ||
COMPLETED.append(i) | ||
|
||
|
||
async def execute_me(): | ||
execute_me_too(1) # 1 | ||
await execute_me_too(2) # 2 | ||
COMPLETED.append(3) # 3 | ||
asyncio.ensure_future(execute_me_too(4)) # 4 | ||
COMPLETED.append(5) # 5 | ||
await asyncio.sleep(1) # 6 | ||
|
||
asyncio.run(execute_me()) | ||
print(COMPLETED) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
import asyncio | ||
|
||
COMPLETED = [] | ||
|
||
|
||
async def execute_me_too(i): | ||
await asyncio.sleep(0.5) | ||
COMPLETED.append(i) | ||
|
||
|
||
async def execute_me(): | ||
await execute_me_too(2) | ||
COMPLETED.append(3) | ||
fut = asyncio.ensure_future(execute_me_too(4)) # store future | ||
COMPLETED.append(5) | ||
await fut # await future before exiting | ||
|
||
asyncio.run(execute_me()) | ||
print(COMPLETED) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import asyncio | ||
|
||
from pyipv8.ipv8.taskmanager import TaskManager | ||
|
||
COMPLETED = [] | ||
|
||
|
||
async def execute_me_too(i): | ||
await asyncio.sleep(0.5) | ||
COMPLETED.append(i) | ||
|
||
|
||
async def main(): | ||
task_manager = TaskManager() | ||
|
||
task_manager.register_task("execute_me_too1", execute_me_too, 1) | ||
task_manager.register_task("execute_me_too2", execute_me_too, 2) | ||
task_manager.cancel_pending_task("execute_me_too1") | ||
await task_manager.wait_for_tasks() | ||
|
||
await task_manager.shutdown_task_manager() | ||
print(COMPLETED) | ||
asyncio.get_event_loop().stop() | ||
|
||
asyncio.ensure_future(main()) | ||
asyncio.get_event_loop().run_forever() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import asyncio | ||
|
||
from pyipv8.ipv8.taskmanager import TaskManager | ||
|
||
COMPLETED = [] | ||
|
||
|
||
async def execute_me_too(i): | ||
await asyncio.sleep(0.5) | ||
COMPLETED.append(i) | ||
|
||
|
||
async def main(): | ||
task_manager = TaskManager() | ||
|
||
for i in range(20): | ||
task_manager.register_anonymous_task("execute_me_too", | ||
execute_me_too, i) | ||
await task_manager.wait_for_tasks() | ||
|
||
await task_manager.shutdown_task_manager() | ||
print(COMPLETED) | ||
asyncio.get_event_loop().stop() | ||
|
||
asyncio.ensure_future(main()) | ||
asyncio.get_event_loop().run_forever() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import asyncio | ||
|
||
from pyipv8.ipv8.taskmanager import TaskManager | ||
|
||
COMPLETED = [] | ||
|
||
|
||
async def execute_me_too(i, task_manager): | ||
if len(COMPLETED) == 20: | ||
task_manager.cancel_pending_task("keep adding 1") | ||
return | ||
COMPLETED.append(i) | ||
|
||
|
||
async def main(): | ||
task_manager = TaskManager() | ||
|
||
task_manager.register_task("keep adding 1", execute_me_too, | ||
1, task_manager, interval=0.1) | ||
task_manager.register_task("sneaky inject", execute_me_too, | ||
2, task_manager, delay=0.5) | ||
await task_manager.wait_for_tasks() | ||
|
||
await task_manager.shutdown_task_manager() | ||
print(COMPLETED) | ||
asyncio.get_event_loop().stop() | ||
|
||
asyncio.ensure_future(main()) | ||
asyncio.get_event_loop().run_forever() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@devos50 These lines and the next section heading are new since your last review.