Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dask-backed IPArrays #39

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/install-travis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ source activate test-environment
conda install \
coverage \
cython \
dask \
flake8 \
hypothesis \
numpy \
Expand Down
4 changes: 4 additions & 0 deletions cyberpandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .ip_methods import ip_range
from .parser import to_ipaddress
from .mac_array import MACType, MACArray
from ._compat import HAS_DASK

from pkg_resources import get_distribution, DistributionNotFound
try:
Expand All @@ -30,3 +31,6 @@
'ip_range',
'to_ipaddress',
]

if HAS_DASK:
from . import dask_ip_array # noqa
33 changes: 27 additions & 6 deletions cyberpandas/_accessor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
import pandas as pd

from ._compat import is_dask_collection


def delegated_method(method, index, name, *args, **kwargs):
return pd.Series(method(*args, **kwargs), index, name=name)
values = method(*args, **kwargs)
return wrap_result(values, index, name)


def wrap_result(values, index, name):
from cyberpandas.ip_array import IPType

if is_dask_collection(values):
import dask.array as da
import dask.dataframe as dd

if isinstance(values.dtype, IPType):
return values.to_dask_series(index=index, name=name)

elif isinstance(values, da.Array):
return dd.from_dask_array(values, columns=name, index=index)

return pd.Series(values, index=index, name=name)


class Delegated:
Expand All @@ -16,7 +35,10 @@ def __get__(self, obj, type=None):
index = object.__getattribute__(obj, '_index')
name = object.__getattribute__(obj, '_name')
result = self._get_result(obj)
return pd.Series(result, index, name=name)
return wrap_result(result, index, name)

def _get_result(self, obj, type=None):
raise NotImplementedError


class DelegatedProperty(Delegated):
Expand All @@ -25,8 +47,7 @@ def _get_result(self, obj, type=None):


class DelegatedMethod(Delegated):
def __get__(self, obj, type=None):
index = object.__getattribute__(obj, '_index')
name = object.__getattribute__(obj, '_name')
def _get_result(self, obj, type=None):
method = getattr(object.__getattribute__(obj, '_data'), self.name)
return delegated_method(method, index, name)
values = method()
return values
47 changes: 47 additions & 0 deletions cyberpandas/_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from functools import singledispatch
from collections import abc
import numpy as np

try:
import dask.array
import dask.dataframe
except ImportError:
HAS_DASK = False
else:
HAS_DASK = True


@singledispatch
def asarray(values, *args, **kwargs):
return np.asarray(values, *args, **kwargs)


@singledispatch
def atleast_1d(values):
return np.atleast_1d(values)


def is_dask_collection(x):
if HAS_DASK:
import dask
return dask.is_dask_collection(x)
return False


if HAS_DASK:
@asarray.register(dask.array.Array)
def _(values, *args, **kwargs):
return dask.array.asarray(values, *args, **kwargs)

@atleast_1d.register(dask.array.Array)
def _(values):
return dask.array.atleast_1d(values)


def is_array_like(obj):
attrs = set(dir(obj))
return bool(attrs & {'__array__', 'ndim', 'dtype'})


def is_list_like(obj):
return isinstance(obj, abc.Sized)
36 changes: 36 additions & 0 deletions cyberpandas/dask_ip_array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import ipaddress

import numpy as np
import dask.array as da
import dask.dataframe as dd
from dask.dataframe.extensions import (
make_scalar, make_array_nonempty, register_series_accessor)
from .ip_array import IPAccessor, IPType, IPArray


@make_array_nonempty.register(IPType)
def _(dtype):
return IPArray._from_sequence([1, 2], dtype=dtype)


@make_scalar.register(ipaddress.IPv4Address)
@make_scalar.register(ipaddress.IPv6Address)
def _(x):
return ipaddress.ip_address(x)


@register_series_accessor("ip")
class DaskIPAccessor(IPAccessor):
@staticmethod
def _extract_array(obj):
# TODO: remove delayed trip
objs = obj.to_delayed()
dtype = obj.dtype._record_type
arrays = [da.from_delayed(x.array.data, shape=(np.nan,), dtype=dtype)
for x in objs]
arr = da.concatenate(arrays)
return IPArray(arr)

@property
def _constructor(self):
return dd.Series
99 changes: 97 additions & 2 deletions cyberpandas/ip_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .base import NumPyBackedExtensionArrayMixin
from .common import _U8_MAX, _IPv4_MAX
from .parser import _to_ipaddress_pyint, _as_ip_object
from . import _compat

# -----------------------------------------------------------------------------
# Extension Type
Expand Down Expand Up @@ -215,7 +216,10 @@ def take(self, indices, allow_fill=False, fill_value=None):
# -------------------------------------------------------------------------

def __repr__(self):
formatted = self._format_values()
if isinstance(self.data, np.ndarray):
formatted = self._format_values()
else:
formatted = self.data
return "IPArray({!r})".format(formatted)

def _format_values(self):
Expand Down Expand Up @@ -320,6 +324,47 @@ def to_bytes(self):
"""
return self.data.tobytes()

def to_delayed(self):
"""
Convert an IPArray to a list of Delayed objects.

This only works for IPArrays backed by a Dask Array.
Returns
-------
List[dask.delayed.Delayed]
"""
from dask import delayed
cls = delayed(type(self))

return [cls(x) for x in self.data.to_delayed()]

def to_dask_series(self, index=None, name=None):
"""
Convert to a dask Series

index : dask.dataframe.Index, optional
name : str, optional
Name to use for the resulting dask Series.

returns
-------
dask.dataframe.Series
"""
import dask
import dask.dataframe as dd

blocks = self.to_delayed()
if index is not None:
args = zip(blocks, index.to_delayed())
divisions = index.divisions
else:
args = zip(blocks)
divisions = None
blocks = [dask.delayed(pd.Series)(*b) for b in args]
result = dd.from_delayed(blocks, meta=(name, self.dtype),
divisions=divisions)
return result

def astype(self, dtype, copy=True):
if isinstance(dtype, IPType):
if copy:
Expand Down Expand Up @@ -658,6 +703,48 @@ def mask(self, mask):
masked = np.bitwise_and(a, b).ravel().view(self.dtype._record_type)
return type(self)(masked)

def compute(self, **kwargs):
import dask
return dask.compute(self, **kwargs)

def persist(self, *args, **kwargs):
import dask
return dask.persist(self, *args, **kwargs)

if _compat.HAS_DASK:
import dask.threaded
import dask.context

def __dask_graph__(self):
return self.data.__dask_graph__()

def __dask_keys__(self):
return self.data.__dask_keys__()

def __dask_layers__(self):
return self.data.__dask_layers__()

@property
def __dask_optimize__(self):
return self.data.__dask_optimize__

@property
def __dask_scheduler__(self):
return self.data.__dask_scheduler__

def __dask_postcompute__(self):
func, args = self.data.__dask_postcompute__()
return self._dask_finalize, (func, args)

def __dask_postpersist__(self):
func, args = self.data.__dask_postpersist__()
return self._dask_finalize, (func, args)

@staticmethod
def _dask_finalize(results, func, args):
ds = func(results, *args)
return IPArray(ds)


# -----------------------------------------------------------------------------
# Accessor
Expand All @@ -683,10 +770,18 @@ class IPAccessor:

def __init__(self, obj):
self._validate(obj)
self._data = obj.values
self._data = self._extract_array(obj)
self._index = obj.index
self._name = obj.name

@property
def _constructor(self):
return pd.Series

@staticmethod
def _extract_array(obj):
return obj.array

@staticmethod
def _validate(obj):
if not is_ipaddress_type(obj):
Expand Down
2 changes: 1 addition & 1 deletion cyberpandas/mac_array.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import Iterable
from collections.abc import Iterable

import numpy as np
import six
Expand Down
17 changes: 9 additions & 8 deletions cyberpandas/parser.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import ipaddress

import numpy as np
from pandas.api.types import is_list_like

from ._utils import pack, unpack
from . import _compat


def to_ipaddress(values):
Expand Down Expand Up @@ -36,7 +36,7 @@ def to_ipaddress(values):
"""
from . import IPArray

if not is_list_like(values):
if not _compat.is_list_like(values):
values = [values]

return IPArray(_to_ip_array(values))
Expand All @@ -47,19 +47,20 @@ def _to_ip_array(values):

if isinstance(values, IPArray):
return values.data
array_like = _compat.is_array_like(values)

if (isinstance(values, np.ndarray) and
values.ndim == 1 and
if (array_like and values.ndim == 1 and
isinstance(values.dtype, np.dtype) and
np.issubdtype(values.dtype, np.integer)):
# We assume we're given the low bits here.
values = values.astype("u8")
values = np.asarray(values, dtype=IPType._record_type)
values = _compat.asarray(values).astype(dtype=IPType._record_type)
values['hi'] = 0

elif not (isinstance(values, np.ndarray) and
values.dtype == IPType._record_type):
elif not (array_like and values.dtype == IPType._record_type):
values = _to_int_pairs(values)
return np.atleast_1d(np.asarray(values, dtype=IPType._record_type))
return _compat.atleast_1d(_compat.asarray(values,
dtype=IPType._record_type))


def _to_int_pairs(values):
Expand Down
34 changes: 34 additions & 0 deletions docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,37 @@ IP Accessor

ser.ip.isna
df['addresses'].ip.is_ipv6


Dask Integration
----------------

:class:`IPArray` also works well with `Dask <https://dask.org>`_.
In this case the ``.data`` attribute backing an :class:`IPArray`
will be a Dask Array, rather than a NumPy ndarray.

.. ipython:: python

import dask.array as da

arr = cyberpandas.ip_range(10)
arr.data

dask_data = da.from_array(arr.data, chunks=2)
dask_data

dask_arr = cyberpandas.IPArray(dask_data)
dask_arr

These dask-backed IPArrays may be stored in a dask Series or DataFrame

.. ipython:: python

ds = dask_arr.to_dask_series()
ds

An ``.ip`` accessor is provided for dask Series

.. ipython:: python

ds.ip.isna.compute()
Loading