From 06d489b394fa6688c32dbf9079c0df764b3bc09a Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 17 May 2019 15:11:45 -0500 Subject: [PATCH 1/4] wip --- cyberpandas/__init__.py | 4 ++ cyberpandas/_accessor.py | 22 ++++++++++ cyberpandas/_compat.py | 50 ++++++++++++++++++++++ cyberpandas/dask_ip_array.py | 34 +++++++++++++++ cyberpandas/ip_array.py | 82 +++++++++++++++++++++++++++++------- cyberpandas/parser.py | 15 +++---- 6 files changed, 183 insertions(+), 24 deletions(-) create mode 100644 cyberpandas/_compat.py create mode 100644 cyberpandas/dask_ip_array.py diff --git a/cyberpandas/__init__.py b/cyberpandas/__init__.py index 710b1b2..20c3ad8 100644 --- a/cyberpandas/__init__.py +++ b/cyberpandas/__init__.py @@ -8,6 +8,7 @@ from .ip_methods import ip_range from .parser import to_ipaddress from .mac_array import MACType, MACArray +from ._compat import HAS_DASK from pkg_resources import get_distribution, DistributionNotFound try: @@ -30,3 +31,6 @@ 'ip_range', 'to_ipaddress', ] + +if HAS_DASK: + from . import dask_ip_array diff --git a/cyberpandas/_accessor.py b/cyberpandas/_accessor.py index 646267d..b8a9bb2 100644 --- a/cyberpandas/_accessor.py +++ b/cyberpandas/_accessor.py @@ -1,7 +1,18 @@ import pandas as pd +from ._compat import HAS_DASK + def delegated_method(method, index, name, *args, **kwargs): + values = method(*args, **kwargs) + if HAS_DASK and hasattr(index, '__dask_graph__'): + import dask.dataframe as dd + # TODO; pass this info ahead of time, from the accessor + + result = dd.from_dask_array(values, index=index) + result.name = name + return result + return pd.Series(method(*args, **kwargs), index, name=name) @@ -16,8 +27,19 @@ def __get__(self, obj, type=None): index = object.__getattribute__(obj, '_index') name = object.__getattribute__(obj, '_name') result = self._get_result(obj) + + if HAS_DASK and hasattr(result, '__dask_graph__'): + import dask.dataframe as dd + + result = dd.from_dask_array(result, index=index) + result.name = name + return result + return pd.Series(result, index, name=name) + def _getresult(self, obj, type=None): + raise NotImplementedError + class DelegatedProperty(Delegated): def _get_result(self, obj, type=None): diff --git a/cyberpandas/_compat.py b/cyberpandas/_compat.py new file mode 100644 index 0000000..1c1cb6f --- /dev/null +++ b/cyberpandas/_compat.py @@ -0,0 +1,50 @@ +from functools import singledispatch +from collections import abc +import numpy as np + +try: + import dask.array + import dask.dataframe +except ImportError: + HAS_DASK = False +else: + HAS_DASK = True + + +@singledispatch +def asarray(values, *args, **kwargs): + return np.asarray(values, *args, **kwargs) + + +@singledispatch +def atleast_1d(values): + return np.atleast_1d(values) + +if HAS_DASK: + @asarray.register(dask.array.Array) + def _(values, *args, **kwargs): + return dask.array.asarray(values, *args, **kwargs) + + @atleast_1d.register(dask.array.Array) + def _(values): + return dask.array.atleast_1d(values) + + +def is_array_like(obj): + attrs = set(dir(obj)) + return bool(attrs & {'__array__', 'ndim', 'dtype'}) + + +def is_list_like(obj): + return isinstance(obj, abc.Sized) + + +ARRAY_TYPES = [ + np.ndarray +] +if HAS_DASK: + ARRAY_TYPES.append(dask.array.Array) + +__all__ = [ + 'HAS_DASK' +] \ No newline at end of file diff --git a/cyberpandas/dask_ip_array.py b/cyberpandas/dask_ip_array.py new file mode 100644 index 0000000..c6cb6de --- /dev/null +++ b/cyberpandas/dask_ip_array.py @@ -0,0 +1,34 @@ +import ipaddress + +import numpy as np +import dask.array as da +import dask.dataframe as dd +from dask.dataframe.extensions import make_scalar, make_array_nonempty, register_series_accessor +from .ip_array import IPAccessor, IPType, IPArray + + +@make_array_nonempty.register(IPType) +def _(dtype): + return IPArray._from_sequence([1, 2], dtype=dtype) + + +@make_scalar.register(ipaddress.IPv4Address) +@make_scalar.register(ipaddress.IPv6Address) +def _(x): + return ipaddress.ip_address(x) + + +@register_series_accessor("ip") +class DaskIPAccessor(IPAccessor): + @staticmethod + def _extract_array(obj): + # TODO: remove delayed trip + objs = obj.to_delayed() + dtype = obj.dtype._record_type + arrays = [da.from_delayed(x, shape=(np.nan,), dtype=dtype) for x in objs] + arr = da.concatenate(arrays) + return IPArray(arr) + + @property + def _constructor(self): + return dd.Series diff --git a/cyberpandas/ip_array.py b/cyberpandas/ip_array.py index e2cc67f..93f67ca 100644 --- a/cyberpandas/ip_array.py +++ b/cyberpandas/ip_array.py @@ -13,6 +13,7 @@ from .base import NumPyBackedExtensionArrayMixin from .common import _U8_MAX, _IPv4_MAX from .parser import _to_ipaddress_pyint, _as_ip_object +from . import _compat # ----------------------------------------------------------------------------- # Extension Type @@ -221,21 +222,23 @@ def __repr__(self): def _format_values(self): formatted = [] # TODO: perf - for i in range(len(self)): - hi, lo = self.data[i] - if lo == -1: - formatted.append("NA") - elif hi == 0 and lo <= _IPv4_MAX: - formatted.append(ipaddress.IPv4Address._string_from_ip_int( - int(lo))) - elif hi == 0: - formatted.append(ipaddress.IPv6Address._string_from_ip_int( - int(lo))) - else: - # TODO: - formatted.append(ipaddress.IPv6Address._string_from_ip_int( - (int(hi) << 64) + int(lo))) - return formatted + if isinstance(self.data, np.ndarray): + for i in range(len(self)): + hi, lo = self.data[i] + if lo == -1: + formatted.append("NA") + elif hi == 0 and lo <= _IPv4_MAX: + formatted.append(ipaddress.IPv4Address._string_from_ip_int( + int(lo))) + elif hi == 0: + formatted.append(ipaddress.IPv6Address._string_from_ip_int( + int(lo))) + else: + # TODO: + formatted.append(ipaddress.IPv6Address._string_from_ip_int( + (int(hi) << 64) + int(lo))) + return formatted + return self.data @staticmethod def _box_scalar(scalar): @@ -562,6 +565,7 @@ def _apply_mask(self, op, v4_prefixlen, v6_prefixlen): ipaddress.ip_network(u'0.0.0.0/{}'.format(v4_prefixlen)), op) v4_mask = IPArray([v4_net]) + import pdb; pdb.set_trace() self.data[is_v4] = v4_mask.data v6_net = getattr( @@ -658,6 +662,44 @@ def mask(self, mask): masked = np.bitwise_and(a, b).ravel().view(self.dtype._record_type) return type(self)(masked) + if _compat.HAS_DASK: + import dask.threaded + import dask.context + + def __dask_graph__(self): + return self.data.__dask_graph__() + + def __dask_keys__(self): + return self.data.__dask_keys__() + + def __dask_layers__(self): + return self.data.__dask_layers__() + + @property + def __dask_optimize__(self): + return self.data.__dask_optimize__ + + @property + def __dask_scheduler__(self): + return self.data.__dask_scheduler__ + + def __dask_postcompute__(self): + func, args = self.data.__dask_postcompute__() + return self._dask_finalize, (func, args, self.data.name) + + def __dask_postpersist__(self): + func, args = self.data.__dask_postpersist__() + return self._dask_finalize, (func, args, self.data.name) + + @staticmethod + def _dask_finalize(results, func, args, name): + results = [x.array.data for x in results] + ds = func(results, *args) + return IPArray(ds) + + # __dask_scheduler__ = staticmethod(dask.threaded.get) + # __dask_optimize__ = dask.context.globalmethod(dask.optimize, key='delayed_optimize') + # ----------------------------------------------------------------------------- # Accessor @@ -683,10 +725,18 @@ class IPAccessor: def __init__(self, obj): self._validate(obj) - self._data = obj.values + self._data = self._extract_array(obj) self._index = obj.index self._name = obj.name + @property + def _constructor(self): + return pd.Series + + @staticmethod + def _extract_array(obj): + return obj.array + @staticmethod def _validate(obj): if not is_ipaddress_type(obj): diff --git a/cyberpandas/parser.py b/cyberpandas/parser.py index 215381b..7139331 100644 --- a/cyberpandas/parser.py +++ b/cyberpandas/parser.py @@ -1,9 +1,9 @@ import ipaddress import numpy as np -from pandas.api.types import is_list_like from ._utils import pack, unpack +from . import _compat def to_ipaddress(values): @@ -36,7 +36,7 @@ def to_ipaddress(values): """ from . import IPArray - if not is_list_like(values): + if not _compat.is_list_like(values): values = [values] return IPArray(_to_ip_array(values)) @@ -47,19 +47,18 @@ def _to_ip_array(values): if isinstance(values, IPArray): return values.data + array_like = _compat.is_array_like(values) - if (isinstance(values, np.ndarray) and - values.ndim == 1 and + if (array_like and values.ndim == 1 and np.issubdtype(values.dtype, np.integer)): # We assume we're given the low bits here. values = values.astype("u8") - values = np.asarray(values, dtype=IPType._record_type) + values = _compat.asarray(values).astype(dtype=IPType._record_type) values['hi'] = 0 - elif not (isinstance(values, np.ndarray) and - values.dtype == IPType._record_type): + elif not (array_like and values.dtype == IPType._record_type): values = _to_int_pairs(values) - return np.atleast_1d(np.asarray(values, dtype=IPType._record_type)) + return _compat.atleast_1d(_compat.asarray(values, dtype=IPType._record_type)) def _to_int_pairs(values): From 9d7841d5be239ff08a10fa4443f2c51a93be5894 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 21 May 2019 10:15:32 -0500 Subject: [PATCH 2/4] dask-backed extension array --- cyberpandas/__init__.py | 2 +- cyberpandas/_accessor.py | 41 ++++++++------- cyberpandas/_compat.py | 19 +++---- cyberpandas/dask_ip_array.py | 6 ++- cyberpandas/ip_array.py | 97 ++++++++++++++++++++++++++---------- cyberpandas/mac_array.py | 2 +- cyberpandas/parser.py | 4 +- tests/test_dask.py | 52 +++++++++++++++++++ 8 files changed, 160 insertions(+), 63 deletions(-) create mode 100644 tests/test_dask.py diff --git a/cyberpandas/__init__.py b/cyberpandas/__init__.py index 20c3ad8..97bdcac 100644 --- a/cyberpandas/__init__.py +++ b/cyberpandas/__init__.py @@ -33,4 +33,4 @@ ] if HAS_DASK: - from . import dask_ip_array + from . import dask_ip_array # noqa diff --git a/cyberpandas/_accessor.py b/cyberpandas/_accessor.py index b8a9bb2..0961ce8 100644 --- a/cyberpandas/_accessor.py +++ b/cyberpandas/_accessor.py @@ -1,19 +1,27 @@ import pandas as pd -from ._compat import HAS_DASK +from ._compat import is_dask_collection def delegated_method(method, index, name, *args, **kwargs): values = method(*args, **kwargs) - if HAS_DASK and hasattr(index, '__dask_graph__'): + return wrap_result(values, index, name) + + +def wrap_result(values, index, name): + from cyberpandas.ip_array import IPType + + if is_dask_collection(values): + import dask.array as da import dask.dataframe as dd - # TODO; pass this info ahead of time, from the accessor - result = dd.from_dask_array(values, index=index) - result.name = name - return result + if isinstance(values.dtype, IPType): + return values.to_dask_series(index=index, name=name) - return pd.Series(method(*args, **kwargs), index, name=name) + elif isinstance(values, da.Array): + return dd.from_dask_array(values, columns=name, index=index) + + return pd.Series(values, index=index, name=name) class Delegated: @@ -27,17 +35,9 @@ def __get__(self, obj, type=None): index = object.__getattribute__(obj, '_index') name = object.__getattribute__(obj, '_name') result = self._get_result(obj) + return wrap_result(result, index, name) - if HAS_DASK and hasattr(result, '__dask_graph__'): - import dask.dataframe as dd - - result = dd.from_dask_array(result, index=index) - result.name = name - return result - - return pd.Series(result, index, name=name) - - def _getresult(self, obj, type=None): + def _get_result(self, obj, type=None): raise NotImplementedError @@ -47,8 +47,7 @@ def _get_result(self, obj, type=None): class DelegatedMethod(Delegated): - def __get__(self, obj, type=None): - index = object.__getattribute__(obj, '_index') - name = object.__getattribute__(obj, '_name') + def _get_result(self, obj, type=None): method = getattr(object.__getattribute__(obj, '_data'), self.name) - return delegated_method(method, index, name) + values = method() + return values diff --git a/cyberpandas/_compat.py b/cyberpandas/_compat.py index 1c1cb6f..34e878a 100644 --- a/cyberpandas/_compat.py +++ b/cyberpandas/_compat.py @@ -20,6 +20,14 @@ def asarray(values, *args, **kwargs): def atleast_1d(values): return np.atleast_1d(values) + +def is_dask_collection(x): + if HAS_DASK: + import dask + return dask.is_dask_collection(x) + return False + + if HAS_DASK: @asarray.register(dask.array.Array) def _(values, *args, **kwargs): @@ -37,14 +45,3 @@ def is_array_like(obj): def is_list_like(obj): return isinstance(obj, abc.Sized) - - -ARRAY_TYPES = [ - np.ndarray -] -if HAS_DASK: - ARRAY_TYPES.append(dask.array.Array) - -__all__ = [ - 'HAS_DASK' -] \ No newline at end of file diff --git a/cyberpandas/dask_ip_array.py b/cyberpandas/dask_ip_array.py index c6cb6de..a99bc1f 100644 --- a/cyberpandas/dask_ip_array.py +++ b/cyberpandas/dask_ip_array.py @@ -3,7 +3,8 @@ import numpy as np import dask.array as da import dask.dataframe as dd -from dask.dataframe.extensions import make_scalar, make_array_nonempty, register_series_accessor +from dask.dataframe.extensions import ( + make_scalar, make_array_nonempty, register_series_accessor) from .ip_array import IPAccessor, IPType, IPArray @@ -25,7 +26,8 @@ def _extract_array(obj): # TODO: remove delayed trip objs = obj.to_delayed() dtype = obj.dtype._record_type - arrays = [da.from_delayed(x, shape=(np.nan,), dtype=dtype) for x in objs] + arrays = [da.from_delayed(x.array.data, shape=(np.nan,), dtype=dtype) + for x in objs] arr = da.concatenate(arrays) return IPArray(arr) diff --git a/cyberpandas/ip_array.py b/cyberpandas/ip_array.py index 93f67ca..c08ee2c 100644 --- a/cyberpandas/ip_array.py +++ b/cyberpandas/ip_array.py @@ -216,29 +216,30 @@ def take(self, indices, allow_fill=False, fill_value=None): # ------------------------------------------------------------------------- def __repr__(self): - formatted = self._format_values() + if isinstance(self.data, np.ndarray): + formatted = self._format_values() + else: + formatted = self.data return "IPArray({!r})".format(formatted) def _format_values(self): formatted = [] # TODO: perf - if isinstance(self.data, np.ndarray): - for i in range(len(self)): - hi, lo = self.data[i] - if lo == -1: - formatted.append("NA") - elif hi == 0 and lo <= _IPv4_MAX: - formatted.append(ipaddress.IPv4Address._string_from_ip_int( - int(lo))) - elif hi == 0: - formatted.append(ipaddress.IPv6Address._string_from_ip_int( - int(lo))) - else: - # TODO: - formatted.append(ipaddress.IPv6Address._string_from_ip_int( - (int(hi) << 64) + int(lo))) - return formatted - return self.data + for i in range(len(self)): + hi, lo = self.data[i] + if lo == -1: + formatted.append("NA") + elif hi == 0 and lo <= _IPv4_MAX: + formatted.append(ipaddress.IPv4Address._string_from_ip_int( + int(lo))) + elif hi == 0: + formatted.append(ipaddress.IPv6Address._string_from_ip_int( + int(lo))) + else: + # TODO: + formatted.append(ipaddress.IPv6Address._string_from_ip_int( + (int(hi) << 64) + int(lo))) + return formatted @staticmethod def _box_scalar(scalar): @@ -323,6 +324,47 @@ def to_bytes(self): """ return self.data.tobytes() + def to_delayed(self): + """ + Convert an IPArray to a list of Delayed objects. + + This only works for IPArrays backed by a Dask Array. + Returns + ------- + List[dask.delayed.Delayed] + """ + from dask import delayed + cls = delayed(type(self)) + + return [cls(x) for x in self.data.to_delayed()] + + def to_dask_series(self, index=None, name=None): + """ + Convert to a dask Series + + index : dask.dataframe.Index, optional + name : str, optional + Name to use for the resulting dask Series. + + returns + ------- + dask.dataframe.Series + """ + import dask + import dask.dataframe as dd + + blocks = self.to_delayed() + if index is not None: + args = zip(blocks, index.to_delayed()) + divisions = index.divisions + else: + args = zip(blocks) + divisions = None + blocks = [dask.delayed(pd.Series)(*b) for b in args] + result = dd.from_delayed(blocks, meta=(name, self.dtype), + divisions=divisions) + return result + def astype(self, dtype, copy=True): if isinstance(dtype, IPType): if copy: @@ -565,7 +607,6 @@ def _apply_mask(self, op, v4_prefixlen, v6_prefixlen): ipaddress.ip_network(u'0.0.0.0/{}'.format(v4_prefixlen)), op) v4_mask = IPArray([v4_net]) - import pdb; pdb.set_trace() self.data[is_v4] = v4_mask.data v6_net = getattr( @@ -662,6 +703,14 @@ def mask(self, mask): masked = np.bitwise_and(a, b).ravel().view(self.dtype._record_type) return type(self)(masked) + def compute(self, **kwargs): + import dask + return dask.compute(self, **kwargs) + + def persist(self, *args, **kwargs): + import dask + return dask.persist(self, *args, **kwargs) + if _compat.HAS_DASK: import dask.threaded import dask.context @@ -685,21 +734,17 @@ def __dask_scheduler__(self): def __dask_postcompute__(self): func, args = self.data.__dask_postcompute__() - return self._dask_finalize, (func, args, self.data.name) + return self._dask_finalize, (func, args) def __dask_postpersist__(self): func, args = self.data.__dask_postpersist__() - return self._dask_finalize, (func, args, self.data.name) + return self._dask_finalize, (func, args) @staticmethod - def _dask_finalize(results, func, args, name): - results = [x.array.data for x in results] + def _dask_finalize(results, func, args): ds = func(results, *args) return IPArray(ds) - # __dask_scheduler__ = staticmethod(dask.threaded.get) - # __dask_optimize__ = dask.context.globalmethod(dask.optimize, key='delayed_optimize') - # ----------------------------------------------------------------------------- # Accessor diff --git a/cyberpandas/mac_array.py b/cyberpandas/mac_array.py index 183b4a5..40e522c 100644 --- a/cyberpandas/mac_array.py +++ b/cyberpandas/mac_array.py @@ -1,4 +1,4 @@ -from collections import Iterable +from collections.abc import Iterable import numpy as np import six diff --git a/cyberpandas/parser.py b/cyberpandas/parser.py index 7139331..9600dc6 100644 --- a/cyberpandas/parser.py +++ b/cyberpandas/parser.py @@ -50,6 +50,7 @@ def _to_ip_array(values): array_like = _compat.is_array_like(values) if (array_like and values.ndim == 1 and + isinstance(values.dtype, np.dtype) and np.issubdtype(values.dtype, np.integer)): # We assume we're given the low bits here. values = values.astype("u8") @@ -58,7 +59,8 @@ def _to_ip_array(values): elif not (array_like and values.dtype == IPType._record_type): values = _to_int_pairs(values) - return _compat.atleast_1d(_compat.asarray(values, dtype=IPType._record_type)) + return _compat.atleast_1d(_compat.asarray(values, + dtype=IPType._record_type)) def _to_int_pairs(values): diff --git a/tests/test_dask.py b/tests/test_dask.py new file mode 100644 index 0000000..555af21 --- /dev/null +++ b/tests/test_dask.py @@ -0,0 +1,52 @@ +import pytest +import cyberpandas +import pandas as pd +import pandas.util.testing as tm + +dd = pytest.importorskip('dask.dataframe') +dask = pytest.importorskip("dask") +da = pytest.importorskip("dask.array") + + +def test_constructor(): + a = cyberpandas.to_ipaddress([1, 2, 3, 4]).data + b = da.from_array(a, chunks=2) + + iparr = cyberpandas.IPArray(b) + assert isinstance(iparr.data, da.Array) + da.utils.assert_eq(iparr.data, b) + da.utils.assert_eq(iparr.data, a) + + result, = dask.compute(iparr) + assert isinstance(result, cyberpandas.IPArray) + + +def test_basics(): + a = cyberpandas.to_ipaddress([1, 2, 3, 4]).data + b = da.from_array(a, chunks=2) + + c = cyberpandas.IPArray(a) + d = cyberpandas.IPArray(b) + + da.utils.assert_eq(c.isna(), d.isna()) + da.utils.assert_eq(c.is_ipv4, d.is_ipv4) + + meta = dd.utils.meta_nonempty(pd.Series(c)) + expected = pd.Series(cyberpandas.IPArray(['0.0.0.1', '0.0.0.2'])) + tm.assert_series_equal(meta, expected) + + +def test_dask_series(): + a = cyberpandas.IPArray(cyberpandas.to_ipaddress([1, 2, 3, 4]).data) + b = cyberpandas.IPArray(da.from_array(a.data, chunks=2)).to_dask_series() + + b.loc[0] + + +def test_accessor(): + a = cyberpandas.to_ipaddress([1, 2, 3, 4]).data + a = pd.Series(cyberpandas.IPArray(a)) + b = dd.from_pandas(a, 2) + + dd.utils.assert_eq(a.ip.is_ipv4, b.ip.is_ipv4) + dd.utils.assert_eq(a.ip.netmask(), b.ip.netmask()) From 2ba3a525bea68c67c21c902593e4f532fa2c6752 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 21 May 2019 15:01:05 -0500 Subject: [PATCH 3/4] docs --- docs/source/usage.rst | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 7d0eedf..51e52a0 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -133,3 +133,37 @@ IP Accessor ser.ip.isna df['addresses'].ip.is_ipv6 + + +Dask Integration +---------------- + +:class:`IPArray` also works well with `Dask `_. +In this case the ``.data`` attribute backing an :class:`IPArray` +will be a Dask Array, rather than a NumPy ndarray. + +.. ipython:: python + + import dask.array as da + + arr = cyberpandas.ip_range(10) + arr.data + + dask_data = da.from_array(arr.data, chunks=2) + dask_data + + dask_arr = cyberpandas.IPArray(dask_data) + dask_arr + +These dask-backed IPArrays may be stored in a dask Series or DataFrame + +.. ipython:: python + + ds = dask_arr.to_dask_series() + ds + +An ``.ip`` accessor is provided for dask Series + +.. ipython:: python + + ds.ip.isna.compute() From c6536ec633fbe2ecbab48a6bede9ebc1576d6eff Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 21 May 2019 15:07:46 -0500 Subject: [PATCH 4/4] add dask --- ci/install-travis.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/ci/install-travis.sh b/ci/install-travis.sh index 1921ade..145a7d0 100755 --- a/ci/install-travis.sh +++ b/ci/install-travis.sh @@ -29,6 +29,7 @@ source activate test-environment conda install \ coverage \ cython \ + dask \ flake8 \ hypothesis \ numpy \