Skip to content

Commit

Permalink
log= is now timer=, closes #7
Browse files Browse the repository at this point in the history
  • Loading branch information
simonw committed Apr 16, 2022
1 parent 1d7d03c commit 20262d1
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 25 deletions.
28 changes: 21 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,19 @@ print(await Registry(calc1, go).resolve(go))
# Prints 10
```

### Debug logging
### Tracking with a timer

You can pass a `timer=` callable to the `Registry` constructor to gather timing information about executed tasks.. Your function should take three positional arguments:

- `name` - the name of the function that is being timed
- `start` - the time that it started executing, using `time.perf_counter()` ([perf_counter() docs](https://docs.python.org/3/library/time.html#time.perf_counter))
- `end` - the time that it finished executing

You can use `print` here too:

You can pass a `log=` callable to the `Registry` constructor. Your function should take a single `message` argument - the easiest way to do this is to use `print`:
```python
combined = await Registry(
get_param_1, get_param_2, both, log=print
get_param_1, get_param_2, both, timer=print
).resolve(
both,
param1 = "http://www.example.com/",
Expand All @@ -101,11 +108,18 @@ combined = await Registry(
```
This will output:
```
Resolving ['both']
Run []
Run ['get_param_2', 'get_param_1']
Run ['both']
get_param_1 436633.584580685 436633.797921747
get_param_2 436633.641832699 436634.196364347
both 436634.196570217 436634.196575639
```
### Turning off parallel execution

By default, functions that can run in parallel according to the execution plan will run in parallel using `asyncio.gather()`.

You can disable this parallel exection by passing `parallel=False` to the `Registry` constructor, or by setting `registry.parallel = False` after the registry object has been created.

This is mainly useful for benchmarking the difference between parallel and serial execution for your project.

## Development

To contribute to this library, first checkout the code. Then create a new virtual environment:
Expand Down
31 changes: 23 additions & 8 deletions asyncinject/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import time

try:
import graphlib
Expand All @@ -8,11 +9,11 @@


class Registry:
def __init__(self, *fns, parallel=True, log=None):
def __init__(self, *fns, parallel=True, timer=None):
self._registry = {}
self._graph = None
self.parallel = parallel
self.log = log or (lambda *args: None)
self.timer = timer
for fn in fns:
self.register(fn)

Expand All @@ -21,6 +22,16 @@ def register(self, fn):
# Clear _graph cache:
self._graph = None

def _make_time_logger(self, awaitable):
async def inner():
start = time.perf_counter()
result = await awaitable
end = time.perf_counter()
self.timer(awaitable.__name__, start, end)
return result

return inner()

@property
def graph(self):
if self._graph is None:
Expand All @@ -39,11 +50,10 @@ async def resolve(self, fn, **kwargs):
results = await self.resolve_multi([name], results=kwargs)
return results[name]

async def resolve_multi(self, names, results=None):
def _plan(self, names, results=None):
if results is None:
results = {}

# Come up with an execution plan, just for these nodes
ts = graphlib.TopologicalSorter()
to_do = set(names)
done = set(results.keys())
Expand All @@ -62,21 +72,26 @@ async def resolve_multi(self, names, results=None):
plan.append(node_group)
ts.done(*node_group)

self.log("Resolving {}".format(names))
return plan

async def resolve_multi(self, names, results=None):
if results is None:
results = {}

for node_group in plan:
for node_group in self._plan(names, results):
awaitable_names = [name for name in node_group if name in self._registry]
self.log(" Run {}".format(sorted(awaitable_names)))
awaitables = [
self._registry[name](
**{k: v for k, v in results.items() if k in self.graph[name]},
)
for name in awaitable_names
]
if self.timer:
awaitables = [self._make_time_logger(a) for a in awaitables]
if self.parallel:
awaitable_results = await asyncio.gather(*awaitables)
else:
awaitable_results = (await fn() for fn in awaitables)
awaitable_results = [await fn for fn in awaitables]
results.update(dict(zip(awaitable_names, awaitable_results)))

return results
25 changes: 15 additions & 10 deletions tests/test_asyncinject.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from asyncinject import Registry
from random import random

from pprint import pprint


@pytest.fixture
def complex_registry():
Expand Down Expand Up @@ -92,15 +94,18 @@ async def calc1():


@pytest.mark.asyncio
async def test_log(complex_registry):
async def test_timer(complex_registry):
collected = []
complex_registry.log = collected.append
complex_registry.timer = lambda name, start, end: collected.append(
(name, start, end)
)
await complex_registry.resolve("go")
assert collected == [
"Resolving ['go']",
" Run ['log']",
" Run ['c', 'd']",
" Run ['b']",
" Run ['a']",
" Run ['go']",
]
assert len(collected) == 6
names = [c[0] for c in collected]
starts = [c[1] for c in collected]
ends = [c[2] for c in collected]
assert all(isinstance(n, float) for n in starts)
assert all(isinstance(n, float) for n in ends)
assert names[0] == "log"
assert names[5] == "go"
assert sorted(names[1:5]) == ["a", "b", "c", "d"]

0 comments on commit 20262d1

Please sign in to comment.