Skip to content

Commit

Permalink
Support tuple or None name for Series and Index. (#776)
Browse files Browse the repository at this point in the history
  • Loading branch information
ueshin authored Sep 16, 2019
1 parent da3cc58 commit 468a757
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 246 deletions.
135 changes: 76 additions & 59 deletions databricks/koalas/frame.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions databricks/koalas/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ def abs(self):
1 2.00
2 3.33
3 4.00
Name: abs(0), dtype: float64
Name: 0, dtype: float64
Absolute numeric values in a DataFrame.
Expand All @@ -1189,7 +1189,7 @@ def abs(self):
2 6 30 30
3 7 40 50
"""
# TODO: The first example above should not have "Name: abs(0)".
# TODO: The first example above should not have "Name: 0".
return _spark_col_apply(self, F.abs)

# TODO: by argument only support the grouping name and as_index only for now. Documentation
Expand Down
93 changes: 50 additions & 43 deletions databricks/koalas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import inspect
from collections import Callable
from functools import partial
from typing import Any, List
from typing import Any, List, Tuple, Union

import numpy as np
from pandas._libs.parsers import is_datetime64_dtype
Expand Down Expand Up @@ -141,7 +141,8 @@ def aggregate(self, func_or_funcs, *args, **kwargs):
internal = _InternalFrame(sdf=sdf,
data_columns=data_columns,
column_index=column_index if multi_aggs else None,
index_map=[('__index_level_{}__'.format(i), s.name)
index_map=[('__index_level_{}__'.format(i),
s._internal.column_index[0])
for i, s in enumerate(groupkeys)])
kdf = DataFrame(internal)
if not self._as_index:
Expand Down Expand Up @@ -415,13 +416,14 @@ def size(self):
sdf = self._kdf._sdf
sdf = sdf.groupby(*groupkey_cols).count()
if (len(self._agg_columns) > 0) and (self._have_agg_columns):
name = self._agg_columns[0].name
name = self._agg_columns[0]._internal.data_columns[0]
sdf = sdf.withColumnRenamed('count', name)
else:
name = 'count'
internal = _InternalFrame(sdf=sdf,
data_columns=[name],
index_map=[('__index_level_{}__'.format(i), s.name)
index_map=[('__index_level_{}__'.format(i),
s._internal.column_index[0])
for i, s in enumerate(groupkeys)])
return _col(DataFrame(internal))

Expand Down Expand Up @@ -1070,10 +1072,10 @@ def idxmax(self, skipna=True):
sdf = sdf.groupby(*groupkey_cols).agg(*stat_exprs)
internal = _InternalFrame(sdf=sdf,
data_columns=[ks.name for ks in self._agg_columns],
index_map=[('__index_level_{}__'.format(i), s.name)
index_map=[('__index_level_{}__'.format(i),
s._internal.column_index[0])
for i, s in enumerate(groupkeys)])
kdf = DataFrame(internal)
return kdf
return DataFrame(internal)

# TODO: add axis parameter
def idxmin(self, skipna=True):
Expand Down Expand Up @@ -1134,10 +1136,10 @@ def idxmin(self, skipna=True):
sdf = sdf.groupby(*groupkey_cols).agg(*stat_exprs)
internal = _InternalFrame(sdf=sdf,
data_columns=[ks.name for ks in self._agg_columns],
index_map=[('__index_level_{}__'.format(i), s.name)
index_map=[('__index_level_{}__'.format(i),
s._internal.column_index[0])
for i, s in enumerate(groupkeys)])
kdf = DataFrame(internal)
return kdf
return DataFrame(internal)

# TODO: add keep parameter
def nsmallest(self, n=5):
Expand Down Expand Up @@ -1174,16 +1176,16 @@ def nsmallest(self, n=5):
raise ValueError('idxmax do not support multi-index now')
groupkeys = self._groupkeys
sdf = self._kdf._sdf
name = self._agg_columns[0].name
index = self._kdf._internal.index_columns[0]
name = self._agg_columns[0]._internal.data_columns[0]
window = Window.partitionBy([s._scol for s in groupkeys]).orderBy(F.col(name))
sdf = sdf.withColumn('rank', F.row_number().over(window)).filter(F.col('rank') <= n)
internal = _InternalFrame(sdf=sdf,
data_columns=[name],
index_map=[(s.name, s.name) for s in self._groupkeys] +
[(index, None)])
kdf = _col(DataFrame(internal))
return kdf
index_map=([(s._internal.data_columns[0],
s._internal.column_index[0])
for s in self._groupkeys]
+ self._kdf._internal.index_map))
return _col(DataFrame(internal))

# TODO: add keep parameter
def nlargest(self, n=5):
Expand Down Expand Up @@ -1220,16 +1222,16 @@ def nlargest(self, n=5):
raise ValueError('idxmax do not support multi-index now')
groupkeys = self._groupkeys
sdf = self._kdf._sdf
name = self._agg_columns[0].name
index = self._kdf._internal.index_columns[0]
name = self._agg_columns[0]._internal.data_columns[0]
window = Window.partitionBy([s._scol for s in groupkeys]).orderBy(F.col(name).desc())
sdf = sdf.withColumn('rank', F.row_number().over(window)).filter(F.col('rank') <= n)
internal = _InternalFrame(sdf=sdf,
data_columns=[name],
index_map=[(s.name, s.name) for s in self._groupkeys] +
[(index, None)])
kdf = _col(DataFrame(internal))
return kdf
index_map=([(s._internal.data_columns[0],
s._internal.column_index[0])
for s in self._groupkeys]
+ self._kdf._internal.index_map))
return _col(DataFrame(internal))

def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None):
"""Fill NA/NaN values in group.
Expand Down Expand Up @@ -1633,7 +1635,7 @@ def value_counts(self, sort=None, ascending=None, dropna=True):
groupkey_cols = [s._scol.alias('__index_level_{}__'.format(i))
for i, s in enumerate(groupkeys)]
sdf = self._kdf._sdf
agg_column = self._agg_columns[0].name
agg_column = self._agg_columns[0]._internal.data_columns[0]
sdf = sdf.groupby(*groupkey_cols).count().withColumnRenamed('count', agg_column)

if sort:
Expand All @@ -1644,7 +1646,8 @@ def value_counts(self, sort=None, ascending=None, dropna=True):

internal = _InternalFrame(sdf=sdf,
data_columns=[agg_column],
index_map=[('__index_level_{}__'.format(i), s.name)
index_map=[('__index_level_{}__'.format(i),
s._internal.column_index[0])
for i, s in enumerate(groupkeys)])
return _col(DataFrame(internal))

Expand Down Expand Up @@ -1675,7 +1678,8 @@ def _reduce_for_stat_function(self, sfun, only_numeric):
sdf = sdf.sort(*groupkey_cols)
internal = _InternalFrame(sdf=sdf,
data_columns=data_columns,
index_map=[('__index_level_{}__'.format(i), s.name)
index_map=[('__index_level_{}__'.format(i),
s._internal.column_index[0])
for i, s in enumerate(groupkeys)])
kdf = DataFrame(internal)
if not self._as_index:
Expand All @@ -1686,16 +1690,15 @@ def _reduce_for_stat_function(self, sfun, only_numeric):
class DataFrameGroupBy(GroupBy):

def __init__(self, kdf: DataFrame, by: List[Series], as_index: bool = True,
agg_columns: List[str] = None):
agg_columns: List[Union[str, Tuple[str, ...]]] = None):
self._kdf = kdf
self._groupkeys = by
self._as_index = as_index
self._have_agg_columns = True

if agg_columns is None:
groupkey_names = set(s.name for s in self._groupkeys)
agg_columns = [col for col in self._kdf._internal.data_columns
if col not in groupkey_names]
agg_columns = [idx for idx in self._kdf._internal.column_index
if all(not self._kdf[idx]._equals(key) for key in self._groupkeys)]
self._have_agg_columns = False
self._agg_columns = [kdf[col] for col in agg_columns]

Expand Down Expand Up @@ -1727,24 +1730,27 @@ def _diff(self, *args, **kwargs):
kdf = self._kdf

for column in self._agg_columns:
# pandas groupby.diff ignores the grouping key itself.
applied.append(column.groupby(self._groupkeys)._diff(*args, **kwargs))

sdf = kdf._sdf.select(kdf._internal.index_scols + [c._scol for c in applied])
internal = kdf._internal.copy(sdf=sdf, data_columns=[c.name for c in applied])
internal = kdf._internal.copy(sdf=sdf,
data_columns=[c._internal.data_columns[0] for c in applied],
column_index=[c._internal.column_index[0] for c in applied])
return DataFrame(internal)

def _rank(self, *args, **kwargs):
applied = []
kdf = self._kdf
groupkey_columns = set(s.name for s in self._groupkeys)

for column in self._agg_columns:
# pandas groupby.rank ignores the grouping key itself.
if column.name not in groupkey_columns:
applied.append(column.groupby(self._groupkeys)._rank(*args, **kwargs))
applied.append(column.groupby(self._groupkeys)._rank(*args, **kwargs))

sdf = kdf._sdf.select(kdf._internal.index_scols + [c._scol for c in applied])
internal = kdf._internal.copy(sdf=sdf, data_columns=[c.name for c in applied])
internal = kdf._internal.copy(sdf=sdf,
data_columns=[c._internal.data_columns[0] for c in applied],
column_index=[c._internal.column_index[0] for c in applied])
return DataFrame(internal)

def _cum(self, func):
Expand All @@ -1760,29 +1766,30 @@ def _cum(self, func):

applied = []
kdf = self._kdf
groupkey_columns = set(s.name for s in self._groupkeys)

for column in self._agg_columns:
# pandas groupby.cumxxx ignores the grouping key itself.
if column.name not in groupkey_columns:
applied.append(getattr(column.groupby(self._groupkeys), func)())
applied.append(getattr(column.groupby(self._groupkeys), func)())

sdf = kdf._sdf.select(
kdf._internal.index_scols + [c._scol for c in applied])
internal = kdf._internal.copy(sdf=sdf, data_columns=[c.name for c in applied])
internal = kdf._internal.copy(sdf=sdf,
data_columns=[c._internal.data_columns[0] for c in applied],
column_index=[c._internal.column_index[0] for c in applied])
return DataFrame(internal)

def _fillna(self, *args, **kwargs):
applied = []
kdf = self._kdf
groupkey_columns = [s.name for s in self._groupkeys]

for column in kdf._internal.data_columns:
if column not in groupkey_columns:
applied.append(kdf[column].groupby(self._groupkeys)._fillna(*args, **kwargs))
for idx in kdf._internal.column_index:
if all(not self._kdf[idx]._equals(key) for key in self._groupkeys):
applied.append(kdf[idx].groupby(self._groupkeys)._fillna(*args, **kwargs))

sdf = kdf._sdf.select(kdf._internal.index_scols + [c._scol for c in applied])
internal = kdf._internal.copy(sdf=sdf, data_columns=[c.name for c in applied])
internal = kdf._internal.copy(sdf=sdf,
data_columns=[c._internal.data_columns[0] for c in applied],
column_index=[c._internal.column_index[0] for c in applied])
return DataFrame(internal)


Expand Down
67 changes: 34 additions & 33 deletions databricks/koalas/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""

from functools import partial
from typing import Any, List, Optional
from typing import Any, List, Optional, Tuple, Union

import pandas as pd
from pandas.api.types import is_list_like
Expand Down Expand Up @@ -61,10 +61,12 @@ class Index(IndexOpsMixin):
def __init__(self, kdf: DataFrame, scol: Optional[spark.Column] = None) -> None:
assert len(kdf._internal._index_map) == 1
if scol is None:
IndexOpsMixin.__init__(
self, kdf._internal.copy(scol=kdf._internal.index_scols[0]), kdf)
else:
IndexOpsMixin.__init__(self, kdf._internal.copy(scol=scol), kdf)
scol = kdf._internal.index_scols[0]
internal = kdf._internal.copy(scol=scol,
data_columns=kdf._internal.index_columns,
column_index=kdf._internal.index_names,
column_index_names=None)
IndexOpsMixin.__init__(self, internal, kdf)

def _with_new_scol(self, scol: spark.Column) -> 'Index':
"""
Expand Down Expand Up @@ -124,30 +126,32 @@ def spark_type(self):
return self.to_series().spark_type

@property
def name(self) -> str:
def name(self) -> Union[str, Tuple[str, ...]]:
"""Return name of the Index."""
return self.names[0]

@name.setter
def name(self, name: str) -> None:
def name(self, name: Union[str, Tuple[str, ...]]) -> None:
self.names = [name]

@property
def names(self) -> List[str]:
def names(self) -> List[Union[str, Tuple[str, ...]]]:
"""Return names of the Index."""
return self._kdf._internal.index_names.copy()
return [name if name is None or len(name) > 1 else name[0]
for name in self._kdf._internal.index_names]

@names.setter
def names(self, names: List[str]) -> None:
def names(self, names: List[Union[str, Tuple[str, ...]]]) -> None:
if not is_list_like(names):
raise ValueError('Names must be a list-like')
internal = self._kdf._internal
if len(internal.index_map) != len(names):
raise ValueError('Length of new names must be {}, got {}'
.format(len(internal.index_map), len(names)))
names = [name if isinstance(name, tuple) else (name,) for name in names]
self._kdf._internal = internal.copy(index_map=list(zip(internal.index_columns, names)))

def rename(self, name, inplace=False):
def rename(self, name: Union[str, Tuple[str, ...]], inplace: bool = False):
"""
Alter Index name.
Able to set new names without level. Defaults to returning new index.
Expand Down Expand Up @@ -188,16 +192,17 @@ def rename(self, name, inplace=False):
index_columns = self._kdf._internal.index_columns
assert len(index_columns) == 1

sdf = self._kdf._sdf.select([self._scol] + self._kdf._internal.data_scols)
internal = self._kdf._internal.copy(sdf=sdf, index_map=[(sdf.schema[0].name, name)])
if isinstance(name, str):
name = (name,)
internal = self._kdf._internal.copy(index_map=[(index_columns[0], name)])

if inplace:
self._kdf._internal = internal
return self
else:
return DataFrame(internal).index
return Index(DataFrame(internal), self._scol)

def to_series(self, name: str = None) -> Series:
def to_series(self, name: Union[str, Tuple[str, ...]] = None) -> Series:
"""
Create a Series with both index and values equal to the index keys
useful with map for returning an indexer based on an index.
Expand All @@ -222,11 +227,16 @@ def to_series(self, name: str = None) -> Series:
b b
c c
d d
Name: __index_level_0__, dtype: object
Name: 0, dtype: object
"""
kdf = self._kdf
scol = self._scol
return Series(kdf._internal.copy(scol=scol if name is None else scol.alias(name)),
if name is not None:
scol = scol.alias(str(name))
column_index = [None] if len(kdf._internal.index_map) > 1 else kdf._internal.index_names
return Series(kdf._internal.copy(scol=scol,
column_index=column_index,
column_index_names=None),
anchor=kdf)

def __getattr__(self, item: str) -> Any:
Expand Down Expand Up @@ -266,23 +276,14 @@ def __repr__(self):

class MultiIndex(Index):

def __init__(self, kdf: DataFrame, scol: Optional[spark.Column] = None):
def __init__(self, kdf: DataFrame):
assert len(kdf._internal._index_map) > 1
self._kdf = kdf
if scol is None:
IndexOpsMixin.__init__(self, kdf._internal.copy(
scol=F.struct(self._kdf._internal.index_scols)), kdf)
else:
IndexOpsMixin.__init__(self, kdf._internal.copy(scol=scol), kdf)

def _with_new_scol(self, scol: spark.Column) -> 'MultiIndex':
"""
Copy Koalas MultiIndex with the new Spark Column.
:param scol: the new Spark Column
:return: the copied MultiIndex
"""
return MultiIndex(self._kdf, scol)
scol = F.struct(kdf._internal.index_scols)
data_columns = kdf._sdf.select(scol).columns
internal = kdf._internal.copy(scol=scol,
column_index=[(col, None) for col in data_columns],
column_index_names=None)
IndexOpsMixin.__init__(self, internal, kdf)

def any(self, *args, **kwargs):
raise TypeError("cannot perform any with this index type: MultiIndex")
Expand Down
2 changes: 1 addition & 1 deletion databricks/koalas/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def raiseNotImplemented(description):
column_index = None
elif all(isinstance(key, Series) for key in cols_sel):
columns = [_make_col(key) for key in cols_sel]
column_index = None
column_index = [key._internal.column_index[0] for key in cols_sel]
elif all(isinstance(key, spark.Column) for key in cols_sel):
columns = cols_sel
column_index = None
Expand Down
Loading

0 comments on commit 468a757

Please sign in to comment.