Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Column refactoring 2 #8130

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 53 additions & 15 deletions python/cudf/cudf/core/column/categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import pickle
from collections.abc import MutableSequence
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -35,6 +36,7 @@
min_signed_type,
min_unsigned_type,
)
from cudf.utils.utils import cached_property

if TYPE_CHECKING:
from cudf.core.column import (
Expand Down Expand Up @@ -819,7 +821,7 @@ def __contains__(self, item: ScalarLike) -> bool:
return self._encode(item) in self.as_numerical

def serialize(self) -> Tuple[dict, list]:
header = {} # type: Dict[Any, Any]
header: Dict[Any, Any] = {}
frames = []
header["type-serialized"] = pickle.dumps(type(self))
header["dtype"], dtype_frames = self.dtype.serialize()
Expand Down Expand Up @@ -1341,23 +1343,13 @@ def find_last_value(self, value: ScalarLike, closest: bool = False) -> int:
"""
return self.as_numerical.find_last_value(self._encode(value))

@property
@cached_property
vyasr marked this conversation as resolved.
Show resolved Hide resolved
def is_monotonic_increasing(self) -> bool:
if not hasattr(self, "_is_monotonic_increasing"):
self._is_monotonic_increasing = (
bool(self.ordered)
and self.as_numerical.is_monotonic_increasing
)
return self._is_monotonic_increasing
return bool(self.ordered) and self.as_numerical.is_monotonic_increasing

@property
@cached_property
def is_monotonic_decreasing(self) -> bool:
if not hasattr(self, "_is_monotonic_decreasing"):
self._is_monotonic_decreasing = (
bool(self.ordered)
and self.as_numerical.is_monotonic_decreasing
)
return self._is_monotonic_decreasing
return bool(self.ordered) and self.as_numerical.is_monotonic_decreasing

def as_categorical_column(
self, dtype: Dtype, **kwargs
Expand Down Expand Up @@ -1472,6 +1464,52 @@ def view(self, dtype: Dtype) -> ColumnBase:
"Categorical column views are not currently supported"
)

@staticmethod
def _concat(objs: MutableSequence[CategoricalColumn]) -> CategoricalColumn:
# TODO: This function currently assumes it is being called from
# column._concat_columns, at least to the extent that all the
# preprocessing in that function has already been done. That should be
# improved as the concatenation API is solidified.
# Find the first non-null column:
vyasr marked this conversation as resolved.
Show resolved Hide resolved
head = next((obj for obj in objs if obj.valid_count), objs[0])

# Combine and de-dupe the categories
cats = (
cudf.concat([o.cat().categories for o in objs])
.to_series()
.drop_duplicates(ignore_index=True)
kkraus14 marked this conversation as resolved.
Show resolved Hide resolved
._column
)
isVoid marked this conversation as resolved.
Show resolved Hide resolved
objs = [
o.cat()._set_categories(o.cat().categories, cats, is_unique=True)
for o in objs
]
# Map `objs` into a list of the codes until we port Categorical to
# use the libcudf++ Category data type.
vyasr marked this conversation as resolved.
Show resolved Hide resolved
objs = [o.cat().codes._column for o in objs]
head = head.cat().codes._column
vyasr marked this conversation as resolved.
Show resolved Hide resolved

newsize = sum(map(len, objs))
if newsize > libcudf.MAX_COLUMN_SIZE:
raise MemoryError(
f"Result of concat cannot have "
f"size > {libcudf.MAX_COLUMN_SIZE_STR}"
)
elif newsize == 0:
col = column.column_empty(0, head.dtype, masked=True)
else:
# Filter out inputs that have 0 length, then concatenate.
objs = [o for o in objs if len(o) > 0]
vyasr marked this conversation as resolved.
Show resolved Hide resolved
col = libcudf.concat.concat_columns(objs)

return column.build_categorical_column(
categories=column.as_column(cats),
codes=column.as_column(col.base_data, dtype=col.dtype),
mask=col.base_mask,
size=col.size,
offset=col.offset,
)


def _create_empty_categorical_column(
categorical_column: CategoricalColumn, dtype: "CategoricalDtype"
Expand Down
206 changes: 76 additions & 130 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
min_unsigned_type,
np_to_pa_dtype,
)
from cudf.utils.utils import mask_dtype
from cudf.utils.utils import cached_property, mask_dtype

T = TypeVar("T", bound="ColumnBase")

Expand Down Expand Up @@ -189,114 +189,6 @@ def __sizeof__(self) -> int:
n += bitmask_allocation_size_bytes(self.size)
return n

def cat(
self, parent=None
) -> "cudf.core.column.categorical.CategoricalAccessor":
raise NotImplementedError()

def str(self, parent=None) -> "cudf.core.column.string.StringMethods":
raise NotImplementedError()

@classmethod
def _concat(
cls, objs: "MutableSequence[ColumnBase]", dtype: Dtype = None
) -> ColumnBase:
if len(objs) == 0:
dtype = pd.api.types.pandas_dtype(dtype)
if is_categorical_dtype(dtype):
dtype = CategoricalDtype()
return column_empty(0, dtype=dtype, masked=True)

# If all columns are `NumericalColumn` with different dtypes,
# we cast them to a common dtype.
# Notice, we can always cast pure null columns
not_null_cols = list(filter(lambda o: o.valid_count > 0, objs))
if len(not_null_cols) > 0 and (
len(
[
o
for o in not_null_cols
if not is_numerical_dtype(o.dtype)
or np.issubdtype(o.dtype, np.datetime64)
]
)
== 0
):
col_dtypes = [o.dtype for o in not_null_cols]
# Use NumPy to find a common dtype
common_dtype = np.find_common_type(col_dtypes, [])
# Cast all columns to the common dtype
for i in range(len(objs)):
objs[i] = objs[i].astype(common_dtype)

# Find the first non-null column:
head = objs[0]
for i, obj in enumerate(objs):
if obj.valid_count > 0:
head = obj
break

for i, obj in enumerate(objs):
# Check that all columns are the same type:
if not pd.api.types.is_dtype_equal(obj.dtype, head.dtype):
# if all null, cast to appropriate dtype
if obj.valid_count == 0:
objs[i] = column_empty_like(
head, dtype=head.dtype, masked=True, newsize=len(obj)
)
else:
raise ValueError("All columns must be the same type")

cats = None
is_categorical = all(is_categorical_dtype(o.dtype) for o in objs)

# Combine CategoricalColumn categories
if is_categorical:
# Combine and de-dupe the categories
cats = (
cudf.concat([o.cat().categories for o in objs])
.to_series()
.drop_duplicates(ignore_index=True)
._column
)
objs = [
o.cat()._set_categories(
o.cat().categories, cats, is_unique=True
)
for o in objs
]
# Map `objs` into a list of the codes until we port Categorical to
# use the libcudf++ Category data type.
objs = [o.cat().codes._column for o in objs]
head = head.cat().codes._column

newsize = sum(map(len, objs))
if newsize > libcudf.MAX_COLUMN_SIZE:
raise MemoryError(
f"Result of concat cannot have "
f"size > {libcudf.MAX_COLUMN_SIZE_STR}"
)

# Filter out inputs that have 0 length
objs = [o for o in objs if len(o) > 0]

# Perform the actual concatenation
if newsize > 0:
col = libcudf.concat.concat_columns(objs)
else:
col = column_empty(0, head.dtype, masked=True)

if is_categorical:
col = build_categorical_column(
categories=as_column(cats),
codes=as_column(col.base_data, dtype=col.dtype),
mask=col.base_mask,
size=col.size,
offset=col.offset,
)

return col

def dropna(self, drop_nan: bool = False) -> ColumnBase:
if drop_nan:
col = self.nans_to_nulls()
Expand Down Expand Up @@ -796,7 +688,7 @@ def find_last_value(self, value: ScalarLike, closest: bool = False) -> int:
return indices[-1]

def append(self, other: ColumnBase) -> ColumnBase:
return self.__class__._concat([self, as_column(other)])
return _concat_columns([self, as_column(other)])

def quantile(
self,
Expand Down Expand Up @@ -932,27 +824,17 @@ def is_unique(self) -> bool:
def is_monotonic(self) -> bool:
return self.is_monotonic_increasing

@property
@cached_property
def is_monotonic_increasing(self) -> bool:
if not hasattr(self, "_is_monotonic_increasing"):
if self.has_nulls:
self._is_monotonic_increasing = False
else:
self._is_monotonic_increasing = self.as_frame()._is_sorted(
ascending=None, null_position=None
)
return self._is_monotonic_increasing
return not self.has_nulls and self.as_frame()._is_sorted(
ascending=None, null_position=None
)
vyasr marked this conversation as resolved.
Show resolved Hide resolved

@property
@cached_property
def is_monotonic_decreasing(self) -> bool:
if not hasattr(self, "_is_monotonic_decreasing"):
if self.has_nulls:
self._is_monotonic_decreasing = False
else:
self._is_monotonic_decreasing = self.as_frame()._is_sorted(
ascending=[False], null_position=None
)
return self._is_monotonic_decreasing
return not self.has_nulls and self.as_frame()._is_sorted(
ascending=[False], null_position=None
)
vyasr marked this conversation as resolved.
Show resolved Hide resolved

def get_slice_bound(
self, label: ScalarLike, side: builtins.str, kind: builtins.str
Expand Down Expand Up @@ -1211,7 +1093,7 @@ def unique(self) -> ColumnBase:
)

def serialize(self) -> Tuple[dict, list]:
header = {} # type: Dict[Any, Any]
header: Dict[Any, Any] = {}
frames = []
header["type-serialized"] = pickle.dumps(type(self))
header["dtype"] = self.dtype.str
Expand Down Expand Up @@ -2226,7 +2108,7 @@ def serialize_columns(columns) -> Tuple[List[dict], List]:
frames : list
list of frames
"""
headers = [] # type List[Dict[Any, Any], ...]
headers: List[Dict[Any, Any]] = []
frames = []

if len(columns) > 0:
Expand Down Expand Up @@ -2346,3 +2228,67 @@ def full(size: int, fill_value: ScalarLike, dtype: Dtype = None) -> ColumnBase:
dtype: int8
"""
return ColumnBase.from_scalar(cudf.Scalar(fill_value, dtype), size)


def _concat_columns(objs: "MutableSequence[ColumnBase]") -> ColumnBase:
"""Concatenate a sequence of columns."""
# TODO: This function currently assumes that len(objs) > 0. Concatenation
# is only accessible via cudf.concat (which calls through to this from
# methods like Index._concat or Series._concat), and that method
# pre-filters on zero objects being provided. We may need to add logic for
# handling an empty input sequence as more Frame logic becomes centralized.

# If all columns are `NumericalColumn` with different dtypes,
# we cast them to a common dtype.
# Notice, we can always cast pure null columns
not_null_col_dtypes = [o.dtype for o in objs if o.valid_count > 0]
vyasr marked this conversation as resolved.
Show resolved Hide resolved
if len(not_null_col_dtypes) and all(
is_numerical_dtype(dtyp) and np.issubdtype(dtyp, np.datetime64)
for dtyp in not_null_col_dtypes
):
# Use NumPy to find a common dtype
common_dtype = np.find_common_type(not_null_col_dtypes, [])
vyasr marked this conversation as resolved.
Show resolved Hide resolved
vyasr marked this conversation as resolved.
Show resolved Hide resolved
# Cast all columns to the common dtype
objs = [obj.astype(common_dtype) for obj in objs]

# Find the first non-null column:
head = next((obj for obj in objs if obj.valid_count), objs[0])

for i, obj in enumerate(objs):
# Check that all columns are the same type:
if not pd.api.types.is_dtype_equal(obj.dtype, head.dtype):
vyasr marked this conversation as resolved.
Show resolved Hide resolved
# if all null, cast to appropriate dtype
if obj.valid_count == 0:
objs[i] = column_empty_like(
head, dtype=head.dtype, masked=True, newsize=len(obj)
)
else:
raise ValueError("All columns must be the same type")

# TODO: This logic should be generalized to a dispatch to
# ColumnBase._concat so that all subclasses can override necessary
# behavior. However, at the moment it's not clear what that API should look
# like, so CategoricalColumn simply implements a minimal working API.
if all(is_categorical_dtype(o.dtype) for o in objs):
return cudf.core.column.categorical.CategoricalColumn._concat(
cast(
MutableSequence[
cudf.core.column.categorical.CategoricalColumn
],
objs,
)
)

newsize = sum(map(len, objs))
if newsize > libcudf.MAX_COLUMN_SIZE:
raise MemoryError(
f"Result of concat cannot have "
f"size > {libcudf.MAX_COLUMN_SIZE_STR}"
)
elif newsize == 0:
col = column_empty(0, head.dtype, masked=True)
else:
# Filter out inputs that have 0 length, then concatenate.
objs = [o for o in objs if len(o) > 0]
vyasr marked this conversation as resolved.
Show resolved Hide resolved
col = libcudf.concat.concat_columns(objs)
return col
Loading