Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add tests and big refactor #5

Merged
merged 8 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
coverage:
status:
patch:
default:
target: 100%
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ disallow_subclassing_any = false
show_error_codes = true
pretty = true

[[tool.mypy.overrides]]
modules = ['tests.*']
disallow_untyped_defs = false


# https://coverage.readthedocs.io/en/6.4/config.html
[tool.coverage.report]
Expand All @@ -140,7 +144,8 @@ ignore = [
"tests/**/*",
"**/*.c",
"tox.ini",
"Makefile"
"Makefile",
"codecov.yml",
]

# https://python-semantic-release.readthedocs.io/en/latest/configuration.html
Expand Down
5 changes: 3 additions & 2 deletions src/in_n_out/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

try:
__version__ = version("in-n-out")
except PackageNotFoundError:
except PackageNotFoundError: # pragma: no cover
__version__ = "uninstalled"
__author__ = "Talley Lambert"
__email__ = "[email protected]"

from ._inject import inject_dependencies
from ._processors import get_processor, set_processors
from ._processors import get_processor, processor, set_processors
from ._providers import get_provider, provider, set_providers
from ._type_resolution import (
resolve_single_type_hints,
Expand All @@ -24,6 +24,7 @@
"get_processor",
"get_provider",
"inject_dependencies",
"processor",
"provider",
"resolve_single_type_hints",
"resolve_type_hints",
Expand Down
2 changes: 1 addition & 1 deletion src/in_n_out/_inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def _exec(*args: P.args, **kwargs: P.kwargs) -> R:
# first, get and call the provider functions for each parameter type:
_kwargs = {}
for param in _sig.parameters.values():
provider = get_provider(param.annotation)
provider: Optional[Callable] = get_provider(param.annotation)
if provider:
_kwargs[param.name] = provider()

Expand Down
100 changes: 59 additions & 41 deletions src/in_n_out/_processors.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,29 @@
import warnings
from typing import (
Any,
Callable,
Dict,
Mapping,
Optional,
Type,
TypeVar,
Union,
get_args,
get_origin,
cast,
get_type_hints,
overload,
)

T = TypeVar("T")
C = TypeVar("C", bound=Callable)
_NULL = object()
from ._store import _STORE, Processor, T

# add default processors
_PROCESSORS: Dict[Any, Callable[[Any], Any]] = {}


def processor(func: C) -> C:
def processor(func: Processor) -> Processor:
"""Decorator that declares `func` as a processor of its first parameter type."""
hints = get_type_hints(func)
hints.pop("return", None)
if not hints: # pragma: no cover
raise TypeError(f"{func} has no argument type hints. Cannot be a processor.")
hint0 = list(hints.values())[0]
if not hints:
warnings.warn(f"{func} has no argument type hints. Cannot be a processor.")
return func

if hint0 is not None:
if get_origin(hint0) == Union:
for arg in get_args(hint0):
if arg is not None:
_PROCESSORS[arg] = func
else:
_PROCESSORS[hint0] = func
hint0 = list(hints.values())[0]
set_processors({hint0: func})
return func


Expand All @@ -44,14 +34,47 @@ def get_processor(type_: Type[T]) -> Optional[Callable[[T], Any]]:
process here leaves a lot of ambiguity, it mostly means the function "can
do something" with a single input of the given type.
"""
if type_ in _PROCESSORS:
return _PROCESSORS[type_]
return _STORE._get(type_, provider=False, pop=False)


@overload
def clear_processor(type_: Type[T]) -> Union[Callable[[], T], None]:
...


@overload
def clear_processor(type_: object) -> Union[Callable[[], Optional[T]], None]:
...


if isinstance(type_, type):
for key, val in _PROCESSORS.items():
if isinstance(key, type) and issubclass(type_, key):
return val
return None
def clear_processor(
type_: Union[object, Type[T]], warn_missing: bool = False
) -> Union[Callable[[], T], Callable[[], Optional[T]], None]:
"""Clear provider for a given type.

Note: this does NOT yet clear sub/superclasses of type_. So if there is a registered
provider for Sequence, and you call clear_processor(list), the Sequence provider
will still be registered, and vice versa.

Parameters
----------
type_ : Type[T]
The provider type to clear
warn_missing : bool, optional
Whether to emit a warning if there was not type registered, by default False

Returns
-------
Optional[Callable[[], T]]
The provider function that was cleared, if any.
"""
result = _STORE._get(type_, provider=False, pop=True)

if result is None and warn_missing:
warnings.warn(
f"No processor was registered for {type_}, and warn_missing is True."
)
return result


class set_processors:
Expand Down Expand Up @@ -79,23 +102,18 @@ class set_processors:
"""

def __init__(
self, mapping: Dict[Type[T], Callable[..., Optional[T]]], clobber: bool = False
self, mapping: Mapping[Any, Callable[[T], Any]], clobber: bool = False
):
self._before = {}
for k in mapping:
if k in _PROCESSORS and not clobber:
raise ValueError(
f"Class {k} already has a processor and clobber is False"
)
self._before[k] = _PROCESSORS.get(k, _NULL)
_PROCESSORS.update(mapping)
self._before = _STORE._set(mapping, provider=False, clobber=clobber)

def __enter__(self) -> None:
return None

def __exit__(self, *_: Any) -> None:
for key, val in self._before.items():
if val is _NULL:
del _PROCESSORS[key]

for (type_, _), val in self._before.items():
MAP: dict = _STORE.processors
if val is _STORE._NULL:
del MAP[type_]
else:
_PROCESSORS[key] = val # type: ignore[assignment]
MAP[type_] = cast(Callable, val)
119 changes: 78 additions & 41 deletions src/in_n_out/_providers.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,20 @@
import warnings
from typing import (
Any,
Callable,
Dict,
Optional,
Type,
TypeVar,
Union,
cast,
get_args,
get_origin,
get_type_hints,
overload,
)

T = TypeVar("T")
C = TypeVar("C", bound=Callable)
_NULL = object()
from ._store import _STORE, Provider, T


# registry of Type -> "provider function"
# where each value is a function that is capable
# of retrieving an instance of its corresponding key type.
_PROVIDERS: Dict[Type, Callable[..., Optional[object]]] = {}
Provider = Callable[..., Optional[T]]


def provider(func: C) -> C:
def provider(func: Provider) -> Provider:
"""Decorator that declares `func` as a provider of its return type.

Note, If func returns `Optional[Type]`, it will be registered as a provider
Expand All @@ -37,16 +27,27 @@ def provider(func: C) -> C:
... return 42
"""
return_hint = get_type_hints(func).get("return")
if get_origin(return_hint) == Union:
args = get_args(return_hint)
if args and len(args) == 2 and type(None) in args:
return_hint = next(a for a in args if a is not type(None)) # noqa
if return_hint is not None:
_PROVIDERS[return_hint] = func
if return_hint is None:
warnings.warn(f"{func} has no return type hint. Cannot be a processor.")
else:
set_providers({return_hint: func})
return func


def get_provider(type_: Union[Any, Type[T]]) -> Optional[Provider]:
@overload
def get_provider(type_: Type[T]) -> Union[Callable[[], T], None]:
...


@overload
def get_provider(type_: object) -> Union[Callable[[], Optional[T]], None]:
# `object` captures passing get_provider(Optional[type])
...


def get_provider(
type_: Union[object, Type[T]]
) -> Union[Callable[[], T], Callable[[], Optional[T]], None]:
"""Return object provider function given a type.

An object provider is a function that returns an instance of a
Expand All @@ -56,14 +57,53 @@ def get_provider(type_: Union[Any, Type[T]]) -> Optional[Provider]:
`inject_dependencies`, allows us to inject objects into functions based on
type hints.
"""
if type_ in _PROVIDERS:
return cast(Provider, _PROVIDERS[type_])
return _get_provider(type_, pop=False)


def _get_provider(
type_: Union[object, Type[T]], pop: bool = False
) -> Union[Callable[[], T], Callable[[], Optional[T]], None]:
return _STORE._get(type_, provider=True, pop=pop)


@overload
def clear_provider(type_: Type[T]) -> Union[Callable[[], T], None]:
...


@overload
def clear_provider(type_: object) -> Union[Callable[[], Optional[T]], None]:
...


def clear_provider(
type_: Union[object, Type[T]], warn_missing: bool = False
) -> Union[Callable[[], T], Callable[[], Optional[T]], None]:
"""Clear provider for a given type.

Note: this does NOT yet clear sub/superclasses of type_. So if there is a registered
provider for Sequence, and you call clear_provider(list), the Sequence provider
will still be registered, and vice versa.

Parameters
----------
type_ : Type[T]
The provider type to clear
warn_missing : bool, optional
Whether to emit a warning if there was not type registered, by default False

Returns
-------
Optional[Callable[[], T]]
The provider function that was cleared, if any.
"""
result = _get_provider(type_, pop=True)

if isinstance(type_, type):
for key, val in _PROVIDERS.items():
if issubclass(type_, key):
return cast(Provider, val)
return None
if result is None and warn_missing:
warnings.warn(
f"No provider was registered for {type_}, and warn_missing is True."
)
return result


class set_providers:
Expand All @@ -90,23 +130,20 @@ class set_providers:
"""

def __init__(
self, mapping: Dict[Type[T], Callable[..., Optional[T]]], clobber: bool = False
self,
mapping: Dict[Type[T], Union[T, Callable[[], T]]],
clobber: bool = False,
) -> None:
self._before = {}
for k in mapping:
if k in _PROVIDERS and not clobber:
raise ValueError(
f"Class {k} already has a provider and clobber is False"
)
self._before[k] = _PROVIDERS.get(k, _NULL)
_PROVIDERS.update(mapping)
self._before = _STORE._set(mapping, provider=True, clobber=clobber)

def __enter__(self) -> None:
return None

def __exit__(self, *_: Any) -> None:
for key, val in self._before.items():
if val is _NULL:
del _PROVIDERS[key]

for (type_, optional), val in self._before.items():
MAP: dict = _STORE.opt_providers if optional else _STORE.providers
if val is _STORE._NULL:
del MAP[type_]
else:
_PROVIDERS[key] = val # type: ignore
MAP[type_] = cast(Callable, val)
Loading