Skip to content

Commit

Permalink
Add a hidden column __natural_order__. (#1146)
Browse files Browse the repository at this point in the history
Adding a hidden column `__natural_order__` for ordering the rows as it is, especially for window-like operations like `cumxxx`.
  • Loading branch information
ueshin authored and HyukjinKwon committed Dec 30, 2019
1 parent 051f2c9 commit b3babf9
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 134 deletions.
99 changes: 52 additions & 47 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.utils import validate_arguments_and_invoke_function, align_diff_frames
from databricks.koalas.generic import _Frame
from databricks.koalas.internal import _InternalFrame, SPARK_INDEX_NAME_FORMAT
from databricks.koalas.internal import (_InternalFrame, HIDDEN_COLUMNS, NATURAL_ORDER_COLUMN_NAME,
SPARK_INDEX_NAME_FORMAT)
from databricks.koalas.missing.frame import _MissingPandasLikeDataFrame
from databricks.koalas.ml import corr
from databricks.koalas.utils import column_index_level, name_like_string, scol_for, validate_axis
Expand Down Expand Up @@ -512,7 +513,7 @@ def apply_op(kdf, this_column_index, that_column_index):
applied.append(getattr(self[idx], op)(argument))

sdf = self._sdf.select(
self._internal.index_scols + [c._scol for c in applied])
self._internal.index_scols + [c._scol for c in applied] + list(HIDDEN_COLUMNS))
internal = self._internal.copy(sdf=sdf,
column_index=[c._internal.column_index[0]
for c in applied],
Expand Down Expand Up @@ -919,7 +920,7 @@ def applymap(self, func):
applied.append(self[idx].apply(func))

sdf = self._sdf.select(
self._internal.index_scols + [c._scol for c in applied])
self._internal.index_scols + [c._scol for c in applied] + list(HIDDEN_COLUMNS))
internal = self._internal.copy(sdf=sdf,
column_index=[c._internal.column_index[0] for c in applied],
column_scols=[scol_for(sdf, c._internal.data_columns[0])
Expand Down Expand Up @@ -1919,7 +1920,7 @@ def transform(self, func):
applied.append(wrapped(self[idx]).rename(idx))

sdf = self._sdf.select(
self._internal.index_scols + [c._scol for c in applied])
self._internal.index_scols + [c._scol for c in applied] + list(HIDDEN_COLUMNS))
internal = self._internal.copy(sdf=sdf,
column_index=[c._internal.column_index[0] for c in applied],
column_scols=[scol_for(sdf, c._internal.data_columns[0])
Expand Down Expand Up @@ -2092,8 +2093,9 @@ class locomotion
rows = [self._internal.scols[lvl] == index
for lvl, index in enumerate(key, level)]

sdf = self._sdf.select(scols) \
.where(reduce(lambda x, y: x & y, rows))
sdf = self._sdf.select(scols + list(HIDDEN_COLUMNS)) \
.drop(NATURAL_ORDER_COLUMN_NAME) \
.filter(reduce(lambda x, y: x & y, rows))

if len(key) == len(self._internal.index_scols):
result = _col(DataFrame(_InternalFrame(sdf=sdf)).T)
Expand Down Expand Up @@ -2251,13 +2253,13 @@ def where(self, cond, other=np.nan):
data_col_name = self._internal.column_name_for(column)
output.append(
F.when(
sdf[tmp_cond_col_name.format(column)], sdf[data_col_name]
scol_for(sdf, tmp_cond_col_name.format(column)), scol_for(sdf, data_col_name)
).otherwise(
sdf[tmp_other_col_name.format(column)]
scol_for(sdf, tmp_other_col_name.format(column))
).alias(data_col_name))

index_columns = self._internal.index_columns
sdf = sdf.select(*index_columns, *output)
index_scols = kdf._internal.index_scols
sdf = sdf.select(index_scols + output + list(HIDDEN_COLUMNS))

return DataFrame(self._internal.copy(
sdf=sdf,
Expand Down Expand Up @@ -2699,9 +2701,11 @@ def rename(index):
if len(index_map) > 0:
index_scols = [scol_for(self._sdf, column) for column, _ in index_map]
sdf = self._sdf.select(
index_scols + new_data_scols + self._internal.column_scols)
index_scols + new_data_scols + self._internal.column_scols +
list(HIDDEN_COLUMNS))
else:
sdf = self._sdf.select(new_data_scols + self._internal.column_scols)
sdf = self._sdf.select(
new_data_scols + self._internal.column_scols + list(HIDDEN_COLUMNS))

# Now, new internal Spark columns are named as same as index name.
new_index_map = [(column, name) for column, name in new_index_map]
Expand Down Expand Up @@ -4035,7 +4039,7 @@ def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False):
else:
raise TypeError('must specify how or thresh')

sdf = self._sdf.filter(pred)
sdf = self._sdf.drop(NATURAL_ORDER_COLUMN_NAME).filter(pred)
internal = self._internal.copy(sdf=sdf)
if inplace:
self._internal = internal
Expand Down Expand Up @@ -4910,7 +4914,8 @@ def columns(self, columns):
sdf = self._sdf.select(
self._internal.index_scols +
[self._internal.scol_for(idx).alias(name)
for idx, name in zip(self._internal.column_index, data_columns)])
for idx, name in zip(self._internal.column_index, data_columns)] +
list(HIDDEN_COLUMNS))
column_scols = [scol_for(sdf, col) for col in data_columns]
self._internal = self._internal.copy(sdf=sdf,
column_index=column_index,
Expand All @@ -4931,7 +4936,8 @@ def columns(self, columns):
sdf = self._sdf.select(
self._internal.index_scols +
[self._internal.scol_for(idx).alias(name)
for idx, name in zip(self._internal.column_index, data_columns)])
for idx, name in zip(self._internal.column_index, data_columns)] +
list(HIDDEN_COLUMNS))
column_scols = [scol_for(sdf, col) for col in data_columns]
self._internal = self._internal.copy(sdf=sdf,
column_index=column_index,
Expand Down Expand Up @@ -5133,7 +5139,8 @@ def select_dtypes(self, include=None, exclude=None):
column_index.append(idx)

sdf = self._sdf.select(self._internal.index_scols +
[self._internal.scol_for(col) for col in columns])
[self._internal.scol_for(col) for col in columns] +
list(HIDDEN_COLUMNS))
return DataFrame(self._internal.copy(
sdf=sdf,
column_index=column_index,
Expand Down Expand Up @@ -5296,7 +5303,8 @@ def drop(self, labels=None, axis=1,
in zip(self._internal.data_columns, self._internal.column_index)
if idx not in drop_column_index))
sdf = self._sdf.select(
self._internal.index_scols + [self._internal.scol_for(idx) for idx in idxes])
self._internal.index_scols + [self._internal.scol_for(idx) for idx in idxes] +
list(HIDDEN_COLUMNS))
internal = self._internal.copy(
sdf=sdf,
column_index=list(idxes),
Expand Down Expand Up @@ -5324,7 +5332,8 @@ def _sort(self, by: List[Column], ascending: Union[bool, List[bool]],
(False, 'last'): lambda x: Column(getattr(x._jc, "desc_nulls_last")()),
}
by = [mapper[(asc, na_position)](scol) for scol, asc in zip(by, ascending)]
kdf = DataFrame(self._internal.copy(sdf=self._sdf.sort(*by))) # type: ks.DataFrame
sdf = self._sdf.drop(NATURAL_ORDER_COLUMN_NAME).sort(*by)
kdf = DataFrame(self._internal.copy(sdf=sdf)) # type: ks.DataFrame
if inplace:
self._internal = kdf._internal
return None
Expand Down Expand Up @@ -5717,7 +5726,7 @@ def isin(self, values):
else:
raise TypeError('Values should be iterable, Series, DataFrame or dict.')

sdf = self._sdf.select(_select_columns)
sdf = self._sdf.select(_select_columns + list(HIDDEN_COLUMNS))
return DataFrame(self._internal.copy(sdf=sdf,
column_scols=[scol_for(sdf, col)
for col in self._internal.data_columns]))
Expand Down Expand Up @@ -6273,7 +6282,8 @@ def update(self, other: 'DataFrame', join: str = 'left', overwrite: bool = True)
update_sdf = update_sdf.withColumn(column_name, F.when(old_col.isNull(), new_col)
.otherwise(old_col))
sdf = update_sdf.select([scol_for(update_sdf, col)
for col in self._internal.columns])
for col in self._internal.columns] +
list(HIDDEN_COLUMNS))
internal = self._internal.copy(sdf=sdf,
column_scols=[scol_for(sdf, col)
for col in self._internal.data_columns])
Expand Down Expand Up @@ -6428,7 +6438,8 @@ def astype(self, dtype) -> 'DataFrame':
for col_name, col in self.items():
results.append(col.astype(dtype=dtype))
sdf = self._sdf.select(
self._internal.index_scols + list(map(lambda ser: ser._scol, results)))
self._internal.index_scols + list(map(lambda ser: ser._scol, results)) +
list(HIDDEN_COLUMNS))
return DataFrame(self._internal.copy(sdf=sdf,
column_scols=[scol_for(sdf, col)
for col in self._internal.data_columns]))
Expand Down Expand Up @@ -6479,7 +6490,8 @@ def add_prefix(self, prefix):
sdf = self._sdf.select(
self._internal.index_scols +
[self._internal.scol_for(idx).alias(name)
for idx, name in zip(self._internal.column_index, data_columns)])
for idx, name in zip(self._internal.column_index, data_columns)] +
list(HIDDEN_COLUMNS))
column_index = [tuple([prefix + i for i in idx]) for idx in self._internal.column_index]
internal = self._internal.copy(sdf=sdf,
column_index=column_index,
Expand Down Expand Up @@ -6532,7 +6544,8 @@ def add_suffix(self, suffix):
sdf = self._sdf.select(
self._internal.index_scols +
[self._internal.scol_for(idx).alias(name)
for idx, name in zip(self._internal.column_index, data_columns)])
for idx, name in zip(self._internal.column_index, data_columns)] +
list(HIDDEN_COLUMNS))
column_index = [tuple([i + suffix for i in idx]) for idx in self._internal.column_index]
internal = self._internal.copy(sdf=sdf,
column_index=column_index,
Expand Down Expand Up @@ -6699,22 +6712,16 @@ def _cum(self, func, skipna: bool):
func = "cumsum"
elif func.__name__ == "cumprod":
func = "cumprod"
self = self.copy()
columns = self.columns
# add a temporal column to keep natural order.
self['__natural_order__'] = F.monotonically_increasing_id()
applied = []
for column in columns:
for column in self.columns:
applied.append(getattr(self[column], func)(skipna))

sdf = self._sdf.select(
self._internal.index_scols + [c._scol for c in applied])
self._internal.index_scols + [c._scol for c in applied] + list(HIDDEN_COLUMNS))
internal = self._internal.copy(sdf=sdf,
column_index=[c._internal.column_index[0] for c in applied],
column_scols=[scol_for(sdf, c._internal.data_columns[0])
for c in applied])
# add a temporal column to keep natural order.
self = self.drop('__natural_order__')
return DataFrame(internal)

# TODO: implements 'keep' parameters
Expand Down Expand Up @@ -6774,8 +6781,8 @@ def drop_duplicates(self, subset=None, inplace=False):
else:
subset = [sub if isinstance(sub, tuple) else (sub,) for sub in subset]

sdf = self._sdf.drop_duplicates(subset=[self._internal.column_name_for(idx)
for idx in subset])
sdf = self._sdf.drop(*HIDDEN_COLUMNS) \
.drop_duplicates(subset=[self._internal.column_name_for(idx) for idx in subset])
internal = self._internal.copy(sdf=sdf)
if inplace:
self._internal = internal
Expand Down Expand Up @@ -6974,7 +6981,8 @@ def _reindex_index(self, index):
kser = ks.Series(list(index))
labels = kser._internal._sdf.select(kser._scol.alias(index_column))

joined_df = self._sdf.join(labels, on=index_column, how="right")
joined_df = self._sdf.drop(NATURAL_ORDER_COLUMN_NAME) \
.join(labels, on=index_column, how="right")
internal = self._internal.copy(sdf=joined_df)

return internal
Expand Down Expand Up @@ -7002,7 +7010,7 @@ def _reindex_columns(self, columns):
columns.append(name_like_string(label))
idx.append(label)

sdf = self._sdf.select(self._internal.index_scols + scols)
sdf = self._sdf.select(self._internal.index_scols + scols + list(HIDDEN_COLUMNS))

return self._internal.copy(sdf=sdf,
column_index=idx,
Expand Down Expand Up @@ -7505,7 +7513,7 @@ def filter(self, items=None, like=None, regex=None, axis=None):
axis = validate_axis(axis, none_axis=1)

index_scols = self._internal.index_scols
sdf = self._sdf
sdf = self._sdf.drop(NATURAL_ORDER_COLUMN_NAME)

if items is not None:
if is_list_like(items):
Expand Down Expand Up @@ -7763,7 +7771,7 @@ def gen_new_column_index_entry(column_index_entry):
new_data_scols = [scol_for(internal.sdf, old_col_name).alias(new_col_name)
for old_col_name, new_col_name
in zip(internal.data_columns, new_data_columns)]
sdf = internal.sdf.select(*(internal.index_scols + new_data_scols))
sdf = internal.sdf.select(internal.index_scols + new_data_scols + list(HIDDEN_COLUMNS))
internal = internal.copy(sdf=sdf, column_index=new_column_index,
column_scols=[scol_for(sdf, col) for col in new_data_columns])
if inplace:
Expand Down Expand Up @@ -7845,7 +7853,7 @@ def pct_change(self, periods=1):
1980-02-01 NaN NaN NaN
1980-03-01 0.067912 0.073814 0.06883
"""
sdf = self._sdf
sdf = self._sdf.drop(NATURAL_ORDER_COLUMN_NAME)
window = Window.orderBy(self._internal.index_columns).rowsBetween(-periods, -periods)

for column_name in self._internal.data_columns:
Expand Down Expand Up @@ -7919,9 +7927,8 @@ def idxmax(self, axis=0):
c z 2
Name: 0, dtype: int64
"""
from databricks.koalas.series import Series
sdf = self._sdf
max_cols = map(lambda x: F.max(x).alias(x), self._internal.data_columns)
max_cols = map(lambda x: F.max(scol_for(sdf, x)).alias(x), self._internal.data_columns)
sdf_max = sdf.select(*max_cols)
# `sdf_max` looks like below
# +------+------+------+
Expand All @@ -7930,11 +7937,11 @@ def idxmax(self, axis=0):
# | 3| 4.0| 400|
# +------+------+------+

conds = (F.col(column_name) == max_val
conds = (scol_for(sdf, column_name) == max_val
for column_name, max_val in zip(sdf_max.columns, sdf_max.head()))
cond = reduce(lambda x, y: x | y, conds)

kdf = DataFrame(self._internal.copy(sdf=sdf.where(cond)))
kdf = DataFrame(self._internal.copy(sdf=sdf.drop(NATURAL_ORDER_COLUMN_NAME).filter(cond)))
pdf = kdf.to_pandas()

return ks.from_pandas(pdf.idxmax())
Expand Down Expand Up @@ -7999,16 +8006,15 @@ def idxmin(self, axis=0):
c z 1
Name: 0, dtype: int64
"""
from databricks.koalas.series import Series
sdf = self._sdf
min_cols = map(lambda x: F.min(x).alias(x), self._internal.data_columns)
min_cols = map(lambda x: F.min(scol_for(sdf, x)).alias(x), self._internal.data_columns)
sdf_min = sdf.select(*min_cols)

conds = (F.col(column_name) == min_val
conds = (scol_for(sdf, column_name) == min_val
for column_name, min_val in zip(sdf_min.columns, sdf_min.head()))
cond = reduce(lambda x, y: x | y, conds)

kdf = DataFrame(self._internal.copy(sdf=sdf.where(cond)))
kdf = DataFrame(self._internal.copy(sdf=sdf.drop(NATURAL_ORDER_COLUMN_NAME).filter(cond)))
pdf = kdf.to_pandas()

return ks.from_pandas(pdf.idxmin())
Expand Down Expand Up @@ -8193,7 +8199,6 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000):

percentile_cols = []
for column in self._internal.data_columns:
sdf = self._sdf
percentile_cols.append(F.expr(
"approx_percentile(`%s`, array(%s), %s)" % (column, args, accuracy))
.alias(column))
Expand Down
4 changes: 2 additions & 2 deletions databricks/koalas/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.indexing import AtIndexer, ILocIndexer, LocIndexer
from databricks.koalas.internal import _InternalFrame
from databricks.koalas.internal import _InternalFrame, NATURAL_ORDER_COLUMN_NAME
from databricks.koalas.utils import validate_arguments_and_invoke_function, scol_for
from databricks.koalas.window import Rolling, Expanding

Expand Down Expand Up @@ -1426,7 +1426,7 @@ def first_valid_index(self):
cond = reduce(lambda x, y: x & y,
map(lambda x: x.isNotNull(), column_scols))

first_valid_row = sdf.where(cond).first()
first_valid_row = sdf.drop(NATURAL_ORDER_COLUMN_NAME).filter(cond).first()
first_valid_idx = tuple(first_valid_row[idx_col]
for idx_col in self._internal.index_columns)

Expand Down
Loading

0 comments on commit b3babf9

Please sign in to comment.