From b3babf9a8c2f22c01284b3aa5167eb301c2a0bdd Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sun, 29 Dec 2019 18:05:36 -0800 Subject: [PATCH] Add a hidden column `__natural_order__`. (#1146) Adding a hidden column `__natural_order__` for ordering the rows as it is, especially for window-like operations like `cumxxx`. --- databricks/koalas/frame.py | 99 ++++++++++++++++-------------- databricks/koalas/generic.py | 4 +- databricks/koalas/groupby.py | 25 ++++---- databricks/koalas/indexing.py | 7 ++- databricks/koalas/internal.py | 112 ++++++++++++++++++---------------- databricks/koalas/series.py | 31 ++++------ 6 files changed, 144 insertions(+), 134 deletions(-) diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index 3b2987fdf9..4d5314e92f 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -47,7 +47,8 @@ from databricks import koalas as ks # For running doctests and reference resolution in PyCharm. from databricks.koalas.utils import validate_arguments_and_invoke_function, align_diff_frames from databricks.koalas.generic import _Frame -from databricks.koalas.internal import _InternalFrame, SPARK_INDEX_NAME_FORMAT +from databricks.koalas.internal import (_InternalFrame, HIDDEN_COLUMNS, NATURAL_ORDER_COLUMN_NAME, + SPARK_INDEX_NAME_FORMAT) from databricks.koalas.missing.frame import _MissingPandasLikeDataFrame from databricks.koalas.ml import corr from databricks.koalas.utils import column_index_level, name_like_string, scol_for, validate_axis @@ -512,7 +513,7 @@ def apply_op(kdf, this_column_index, that_column_index): applied.append(getattr(self[idx], op)(argument)) sdf = self._sdf.select( - self._internal.index_scols + [c._scol for c in applied]) + self._internal.index_scols + [c._scol for c in applied] + list(HIDDEN_COLUMNS)) internal = self._internal.copy(sdf=sdf, column_index=[c._internal.column_index[0] for c in applied], @@ -919,7 +920,7 @@ def applymap(self, func): applied.append(self[idx].apply(func)) sdf = self._sdf.select( - self._internal.index_scols + [c._scol for c in applied]) + self._internal.index_scols + [c._scol for c in applied] + list(HIDDEN_COLUMNS)) internal = self._internal.copy(sdf=sdf, column_index=[c._internal.column_index[0] for c in applied], column_scols=[scol_for(sdf, c._internal.data_columns[0]) @@ -1919,7 +1920,7 @@ def transform(self, func): applied.append(wrapped(self[idx]).rename(idx)) sdf = self._sdf.select( - self._internal.index_scols + [c._scol for c in applied]) + self._internal.index_scols + [c._scol for c in applied] + list(HIDDEN_COLUMNS)) internal = self._internal.copy(sdf=sdf, column_index=[c._internal.column_index[0] for c in applied], column_scols=[scol_for(sdf, c._internal.data_columns[0]) @@ -2092,8 +2093,9 @@ class locomotion rows = [self._internal.scols[lvl] == index for lvl, index in enumerate(key, level)] - sdf = self._sdf.select(scols) \ - .where(reduce(lambda x, y: x & y, rows)) + sdf = self._sdf.select(scols + list(HIDDEN_COLUMNS)) \ + .drop(NATURAL_ORDER_COLUMN_NAME) \ + .filter(reduce(lambda x, y: x & y, rows)) if len(key) == len(self._internal.index_scols): result = _col(DataFrame(_InternalFrame(sdf=sdf)).T) @@ -2251,13 +2253,13 @@ def where(self, cond, other=np.nan): data_col_name = self._internal.column_name_for(column) output.append( F.when( - sdf[tmp_cond_col_name.format(column)], sdf[data_col_name] + scol_for(sdf, tmp_cond_col_name.format(column)), scol_for(sdf, data_col_name) ).otherwise( - sdf[tmp_other_col_name.format(column)] + scol_for(sdf, tmp_other_col_name.format(column)) ).alias(data_col_name)) - index_columns = self._internal.index_columns - sdf = sdf.select(*index_columns, *output) + index_scols = kdf._internal.index_scols + sdf = sdf.select(index_scols + output + list(HIDDEN_COLUMNS)) return DataFrame(self._internal.copy( sdf=sdf, @@ -2699,9 +2701,11 @@ def rename(index): if len(index_map) > 0: index_scols = [scol_for(self._sdf, column) for column, _ in index_map] sdf = self._sdf.select( - index_scols + new_data_scols + self._internal.column_scols) + index_scols + new_data_scols + self._internal.column_scols + + list(HIDDEN_COLUMNS)) else: - sdf = self._sdf.select(new_data_scols + self._internal.column_scols) + sdf = self._sdf.select( + new_data_scols + self._internal.column_scols + list(HIDDEN_COLUMNS)) # Now, new internal Spark columns are named as same as index name. new_index_map = [(column, name) for column, name in new_index_map] @@ -4035,7 +4039,7 @@ def dropna(self, axis=0, how='any', thresh=None, subset=None, inplace=False): else: raise TypeError('must specify how or thresh') - sdf = self._sdf.filter(pred) + sdf = self._sdf.drop(NATURAL_ORDER_COLUMN_NAME).filter(pred) internal = self._internal.copy(sdf=sdf) if inplace: self._internal = internal @@ -4910,7 +4914,8 @@ def columns(self, columns): sdf = self._sdf.select( self._internal.index_scols + [self._internal.scol_for(idx).alias(name) - for idx, name in zip(self._internal.column_index, data_columns)]) + for idx, name in zip(self._internal.column_index, data_columns)] + + list(HIDDEN_COLUMNS)) column_scols = [scol_for(sdf, col) for col in data_columns] self._internal = self._internal.copy(sdf=sdf, column_index=column_index, @@ -4931,7 +4936,8 @@ def columns(self, columns): sdf = self._sdf.select( self._internal.index_scols + [self._internal.scol_for(idx).alias(name) - for idx, name in zip(self._internal.column_index, data_columns)]) + for idx, name in zip(self._internal.column_index, data_columns)] + + list(HIDDEN_COLUMNS)) column_scols = [scol_for(sdf, col) for col in data_columns] self._internal = self._internal.copy(sdf=sdf, column_index=column_index, @@ -5133,7 +5139,8 @@ def select_dtypes(self, include=None, exclude=None): column_index.append(idx) sdf = self._sdf.select(self._internal.index_scols + - [self._internal.scol_for(col) for col in columns]) + [self._internal.scol_for(col) for col in columns] + + list(HIDDEN_COLUMNS)) return DataFrame(self._internal.copy( sdf=sdf, column_index=column_index, @@ -5296,7 +5303,8 @@ def drop(self, labels=None, axis=1, in zip(self._internal.data_columns, self._internal.column_index) if idx not in drop_column_index)) sdf = self._sdf.select( - self._internal.index_scols + [self._internal.scol_for(idx) for idx in idxes]) + self._internal.index_scols + [self._internal.scol_for(idx) for idx in idxes] + + list(HIDDEN_COLUMNS)) internal = self._internal.copy( sdf=sdf, column_index=list(idxes), @@ -5324,7 +5332,8 @@ def _sort(self, by: List[Column], ascending: Union[bool, List[bool]], (False, 'last'): lambda x: Column(getattr(x._jc, "desc_nulls_last")()), } by = [mapper[(asc, na_position)](scol) for scol, asc in zip(by, ascending)] - kdf = DataFrame(self._internal.copy(sdf=self._sdf.sort(*by))) # type: ks.DataFrame + sdf = self._sdf.drop(NATURAL_ORDER_COLUMN_NAME).sort(*by) + kdf = DataFrame(self._internal.copy(sdf=sdf)) # type: ks.DataFrame if inplace: self._internal = kdf._internal return None @@ -5717,7 +5726,7 @@ def isin(self, values): else: raise TypeError('Values should be iterable, Series, DataFrame or dict.') - sdf = self._sdf.select(_select_columns) + sdf = self._sdf.select(_select_columns + list(HIDDEN_COLUMNS)) return DataFrame(self._internal.copy(sdf=sdf, column_scols=[scol_for(sdf, col) for col in self._internal.data_columns])) @@ -6273,7 +6282,8 @@ def update(self, other: 'DataFrame', join: str = 'left', overwrite: bool = True) update_sdf = update_sdf.withColumn(column_name, F.when(old_col.isNull(), new_col) .otherwise(old_col)) sdf = update_sdf.select([scol_for(update_sdf, col) - for col in self._internal.columns]) + for col in self._internal.columns] + + list(HIDDEN_COLUMNS)) internal = self._internal.copy(sdf=sdf, column_scols=[scol_for(sdf, col) for col in self._internal.data_columns]) @@ -6428,7 +6438,8 @@ def astype(self, dtype) -> 'DataFrame': for col_name, col in self.items(): results.append(col.astype(dtype=dtype)) sdf = self._sdf.select( - self._internal.index_scols + list(map(lambda ser: ser._scol, results))) + self._internal.index_scols + list(map(lambda ser: ser._scol, results)) + + list(HIDDEN_COLUMNS)) return DataFrame(self._internal.copy(sdf=sdf, column_scols=[scol_for(sdf, col) for col in self._internal.data_columns])) @@ -6479,7 +6490,8 @@ def add_prefix(self, prefix): sdf = self._sdf.select( self._internal.index_scols + [self._internal.scol_for(idx).alias(name) - for idx, name in zip(self._internal.column_index, data_columns)]) + for idx, name in zip(self._internal.column_index, data_columns)] + + list(HIDDEN_COLUMNS)) column_index = [tuple([prefix + i for i in idx]) for idx in self._internal.column_index] internal = self._internal.copy(sdf=sdf, column_index=column_index, @@ -6532,7 +6544,8 @@ def add_suffix(self, suffix): sdf = self._sdf.select( self._internal.index_scols + [self._internal.scol_for(idx).alias(name) - for idx, name in zip(self._internal.column_index, data_columns)]) + for idx, name in zip(self._internal.column_index, data_columns)] + + list(HIDDEN_COLUMNS)) column_index = [tuple([i + suffix for i in idx]) for idx in self._internal.column_index] internal = self._internal.copy(sdf=sdf, column_index=column_index, @@ -6699,22 +6712,16 @@ def _cum(self, func, skipna: bool): func = "cumsum" elif func.__name__ == "cumprod": func = "cumprod" - self = self.copy() - columns = self.columns - # add a temporal column to keep natural order. - self['__natural_order__'] = F.monotonically_increasing_id() applied = [] - for column in columns: + for column in self.columns: applied.append(getattr(self[column], func)(skipna)) sdf = self._sdf.select( - self._internal.index_scols + [c._scol for c in applied]) + self._internal.index_scols + [c._scol for c in applied] + list(HIDDEN_COLUMNS)) internal = self._internal.copy(sdf=sdf, column_index=[c._internal.column_index[0] for c in applied], column_scols=[scol_for(sdf, c._internal.data_columns[0]) for c in applied]) - # add a temporal column to keep natural order. - self = self.drop('__natural_order__') return DataFrame(internal) # TODO: implements 'keep' parameters @@ -6774,8 +6781,8 @@ def drop_duplicates(self, subset=None, inplace=False): else: subset = [sub if isinstance(sub, tuple) else (sub,) for sub in subset] - sdf = self._sdf.drop_duplicates(subset=[self._internal.column_name_for(idx) - for idx in subset]) + sdf = self._sdf.drop(*HIDDEN_COLUMNS) \ + .drop_duplicates(subset=[self._internal.column_name_for(idx) for idx in subset]) internal = self._internal.copy(sdf=sdf) if inplace: self._internal = internal @@ -6974,7 +6981,8 @@ def _reindex_index(self, index): kser = ks.Series(list(index)) labels = kser._internal._sdf.select(kser._scol.alias(index_column)) - joined_df = self._sdf.join(labels, on=index_column, how="right") + joined_df = self._sdf.drop(NATURAL_ORDER_COLUMN_NAME) \ + .join(labels, on=index_column, how="right") internal = self._internal.copy(sdf=joined_df) return internal @@ -7002,7 +7010,7 @@ def _reindex_columns(self, columns): columns.append(name_like_string(label)) idx.append(label) - sdf = self._sdf.select(self._internal.index_scols + scols) + sdf = self._sdf.select(self._internal.index_scols + scols + list(HIDDEN_COLUMNS)) return self._internal.copy(sdf=sdf, column_index=idx, @@ -7505,7 +7513,7 @@ def filter(self, items=None, like=None, regex=None, axis=None): axis = validate_axis(axis, none_axis=1) index_scols = self._internal.index_scols - sdf = self._sdf + sdf = self._sdf.drop(NATURAL_ORDER_COLUMN_NAME) if items is not None: if is_list_like(items): @@ -7763,7 +7771,7 @@ def gen_new_column_index_entry(column_index_entry): new_data_scols = [scol_for(internal.sdf, old_col_name).alias(new_col_name) for old_col_name, new_col_name in zip(internal.data_columns, new_data_columns)] - sdf = internal.sdf.select(*(internal.index_scols + new_data_scols)) + sdf = internal.sdf.select(internal.index_scols + new_data_scols + list(HIDDEN_COLUMNS)) internal = internal.copy(sdf=sdf, column_index=new_column_index, column_scols=[scol_for(sdf, col) for col in new_data_columns]) if inplace: @@ -7845,7 +7853,7 @@ def pct_change(self, periods=1): 1980-02-01 NaN NaN NaN 1980-03-01 0.067912 0.073814 0.06883 """ - sdf = self._sdf + sdf = self._sdf.drop(NATURAL_ORDER_COLUMN_NAME) window = Window.orderBy(self._internal.index_columns).rowsBetween(-periods, -periods) for column_name in self._internal.data_columns: @@ -7919,9 +7927,8 @@ def idxmax(self, axis=0): c z 2 Name: 0, dtype: int64 """ - from databricks.koalas.series import Series sdf = self._sdf - max_cols = map(lambda x: F.max(x).alias(x), self._internal.data_columns) + max_cols = map(lambda x: F.max(scol_for(sdf, x)).alias(x), self._internal.data_columns) sdf_max = sdf.select(*max_cols) # `sdf_max` looks like below # +------+------+------+ @@ -7930,11 +7937,11 @@ def idxmax(self, axis=0): # | 3| 4.0| 400| # +------+------+------+ - conds = (F.col(column_name) == max_val + conds = (scol_for(sdf, column_name) == max_val for column_name, max_val in zip(sdf_max.columns, sdf_max.head())) cond = reduce(lambda x, y: x | y, conds) - kdf = DataFrame(self._internal.copy(sdf=sdf.where(cond))) + kdf = DataFrame(self._internal.copy(sdf=sdf.drop(NATURAL_ORDER_COLUMN_NAME).filter(cond))) pdf = kdf.to_pandas() return ks.from_pandas(pdf.idxmax()) @@ -7999,16 +8006,15 @@ def idxmin(self, axis=0): c z 1 Name: 0, dtype: int64 """ - from databricks.koalas.series import Series sdf = self._sdf - min_cols = map(lambda x: F.min(x).alias(x), self._internal.data_columns) + min_cols = map(lambda x: F.min(scol_for(sdf, x)).alias(x), self._internal.data_columns) sdf_min = sdf.select(*min_cols) - conds = (F.col(column_name) == min_val + conds = (scol_for(sdf, column_name) == min_val for column_name, min_val in zip(sdf_min.columns, sdf_min.head())) cond = reduce(lambda x, y: x | y, conds) - kdf = DataFrame(self._internal.copy(sdf=sdf.where(cond))) + kdf = DataFrame(self._internal.copy(sdf=sdf.drop(NATURAL_ORDER_COLUMN_NAME).filter(cond))) pdf = kdf.to_pandas() return ks.from_pandas(pdf.idxmin()) @@ -8193,7 +8199,6 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, accuracy=10000): percentile_cols = [] for column in self._internal.data_columns: - sdf = self._sdf percentile_cols.append(F.expr( "approx_percentile(`%s`, array(%s), %s)" % (column, args, accuracy)) .alias(column)) diff --git a/databricks/koalas/generic.py b/databricks/koalas/generic.py index d01a3247aa..1f01930c5d 100644 --- a/databricks/koalas/generic.py +++ b/databricks/koalas/generic.py @@ -33,7 +33,7 @@ from databricks import koalas as ks # For running doctests and reference resolution in PyCharm. from databricks.koalas.indexing import AtIndexer, ILocIndexer, LocIndexer -from databricks.koalas.internal import _InternalFrame +from databricks.koalas.internal import _InternalFrame, NATURAL_ORDER_COLUMN_NAME from databricks.koalas.utils import validate_arguments_and_invoke_function, scol_for from databricks.koalas.window import Rolling, Expanding @@ -1426,7 +1426,7 @@ def first_valid_index(self): cond = reduce(lambda x, y: x & y, map(lambda x: x.isNotNull(), column_scols)) - first_valid_row = sdf.where(cond).first() + first_valid_row = sdf.drop(NATURAL_ORDER_COLUMN_NAME).filter(cond).first() first_valid_idx = tuple(first_valid_row[idx_col] for idx_col in self._internal.index_columns) diff --git a/databricks/koalas/groupby.py b/databricks/koalas/groupby.py index febf28e3bb..796b2cda79 100644 --- a/databricks/koalas/groupby.py +++ b/databricks/koalas/groupby.py @@ -30,12 +30,13 @@ from pyspark.sql import Window, functions as F from pyspark.sql.types import FloatType, DoubleType, NumericType, StructField, StructType -from pyspark.sql.functions import PandasUDFType, pandas_udf, Column, monotonically_increasing_id +from pyspark.sql.functions import PandasUDFType, pandas_udf, Column from databricks import koalas as ks # For running doctests and reference resolution in PyCharm. from databricks.koalas.typedef import _infer_return_type from databricks.koalas.frame import DataFrame -from databricks.koalas.internal import _InternalFrame, SPARK_INDEX_NAME_FORMAT +from databricks.koalas.internal import (_InternalFrame, HIDDEN_COLUMNS, NATURAL_ORDER_COLUMN_NAME, + SPARK_INDEX_NAME_FORMAT) from databricks.koalas.missing.groupby import _MissingPandasLikeDataFrameGroupBy, \ _MissingPandasLikeSeriesGroupBy from databricks.koalas.series import Series, _col @@ -884,7 +885,7 @@ def apply(self, func): pdf = self._kdf.head(limit)._to_internal_pandas() pdf = pdf.groupby(input_groupnames).apply(func) kdf = DataFrame(pdf) - return_schema = kdf._sdf.schema + return_schema = kdf._sdf.drop(*HIDDEN_COLUMNS).schema sdf = self._spark_group_map_apply( lambda pdf: pdf.groupby(input_groupnames).apply(func), @@ -939,7 +940,7 @@ def filter(self, func): if not isinstance(func, Callable): raise TypeError("%s object is not callable" % type(func)) - data_schema = self._kdf._sdf.schema + data_schema = self._kdf._sdf.drop(*HIDDEN_COLUMNS).schema groupby_names = [s.name for s in self._groupkeys] def pandas_filter(pdf): @@ -1026,7 +1027,7 @@ def rename_output(pdf): grouped_map_func = pandas_udf(return_schema, PandasUDFType.GROUPED_MAP)(rename_output) - sdf = self._kdf._sdf + sdf = self._kdf._sdf.drop(*HIDDEN_COLUMNS) input_groupkeys = [s._scol for s in self._groupkeys] sdf = sdf.groupby(*input_groupkeys).apply(grouped_map_func) @@ -1150,7 +1151,8 @@ def idxmax(self, skipna=True): order_column = Column(kser._scol._jc.desc_nulls_last()) else: order_column = Column(kser._scol._jc.desc_nulls_first()) - window = Window.partitionBy(groupkey_cols).orderBy(order_column) + window = Window.partitionBy(groupkey_cols) \ + .orderBy(order_column, NATURAL_ORDER_COLUMN_NAME) sdf = sdf.withColumn(name, F.when(F.row_number().over(window) == 1, scol_for(sdf, index)) .otherwise(None)) @@ -1220,7 +1222,8 @@ def idxmin(self, skipna=True): order_column = Column(kser._scol._jc.asc_nulls_last()) else: order_column = Column(kser._scol._jc.asc_nulls_first()) - window = Window.partitionBy(groupkey_cols).orderBy(order_column) + window = Window.partitionBy(groupkey_cols) \ + .orderBy(order_column, NATURAL_ORDER_COLUMN_NAME) sdf = sdf.withColumn(name, F.when(F.row_number().over(window) == 1, scol_for(sdf, index)) .otherwise(None)) @@ -1451,7 +1454,7 @@ def head(self, n=5): tmp_col = '__row_number__' sdf = self._kdf._sdf window = Window.partitionBy([s._scol for s in groupkeys]) \ - .orderBy(F.monotonically_increasing_id()) + .orderBy(NATURAL_ORDER_COLUMN_NAME) sdf = sdf.withColumn(tmp_col, F.row_number().over(window)).filter(F.col(tmp_col) <= n) sdf = sdf.select(self._kdf._internal.scols) @@ -1641,7 +1644,7 @@ def pandas_transform(pdf): pdf = self._kdf.head(limit + 1)._to_internal_pandas() pdf = pdf.groupby(input_groupnames).transform(func) kdf = DataFrame(pdf) - return_schema = kdf._sdf.schema + return_schema = kdf._sdf.drop(*HIDDEN_COLUMNS).schema if len(pdf) <= limit: return kdf @@ -1891,9 +1894,7 @@ def _cum(self, func): func = "cumprod" applied = [] - kdf = self._kdf.copy() - # add a temporal column to keep natural order. - kdf['__natural_order__'] = F.monotonically_increasing_id() + kdf = self._kdf for column in self._agg_columns: # pandas groupby.cumxxx ignores the grouping key itself. applied.append(getattr(column.groupby(self._groupkeys), func)()) diff --git a/databricks/koalas/indexing.py b/databricks/koalas/indexing.py index 5ff219e6fb..c25120b985 100644 --- a/databricks/koalas/indexing.py +++ b/databricks/koalas/indexing.py @@ -25,7 +25,7 @@ from pyspark.sql.types import BooleanType from pyspark.sql.utils import AnalysisException -from databricks.koalas.internal import _InternalFrame +from databricks.koalas.internal import _InternalFrame, HIDDEN_COLUMNS, NATURAL_ORDER_COLUMN_NAME from databricks.koalas.exceptions import SparkPandasIndexingError, SparkPandasNotImplementedError from databricks.koalas.utils import column_index_level, name_like_string @@ -120,7 +120,8 @@ def __getitem__(self, key): cond = reduce(lambda x, y: x & y, [scol == row for scol, row in zip(self._internal.index_scols, row_sel)]) - pdf = self._internal.sdf.where(cond).select(self._internal.scol_for(col_sel)).toPandas() + pdf = self._internal.sdf.drop(NATURAL_ORDER_COLUMN_NAME).filter(cond) \ + .select(self._internal.scol_for(col_sel)).toPandas() if len(pdf) < 1: raise KeyError(name_like_string(row_sel)) @@ -170,7 +171,7 @@ def __getitem__(self, key): try: sdf = self._internal._sdf if cond is not None: - sdf = sdf.where(cond) + sdf = sdf.drop(NATURAL_ORDER_COLUMN_NAME).filter(cond) if limit is not None: if limit >= 0: sdf = sdf.limit(limit) diff --git a/databricks/koalas/internal.py b/databricks/koalas/internal.py index a7bbd11e06..1c4b67a33f 100644 --- a/databricks/koalas/internal.py +++ b/databricks/koalas/internal.py @@ -42,6 +42,10 @@ # A pattern to check if the name of a Spark column is a Koalas index name or not. SPARK_INDEX_NAME_PATTERN = re.compile(r"__index_level_[0-9]+__") +NATURAL_ORDER_COLUMN_NAME = '__natural_order__' + +HIDDEN_COLUMNS = set([NATURAL_ORDER_COLUMN_NAME]) + IndexMap = Tuple[str, Optional[Tuple[str, ...]]] @@ -109,15 +113,15 @@ class _InternalFrame(object): * `pandas_df` represents pandas DataFrame derived by the metadata >>> internal = kdf._internal - >>> internal.sdf.show() # doctest: +NORMALIZE_WHITESPACE - +-----------------+---+---+---+---+---+ - |__index_level_0__| A| B| C| D| E| - +-----------------+---+---+---+---+---+ - | 0| 1| 5| 9| 13| 17| - | 1| 2| 6| 10| 14| 18| - | 2| 3| 7| 11| 15| 19| - | 3| 4| 8| 12| 16| 20| - +-----------------+---+---+---+---+---+ + >>> internal.sdf.show() # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS + +-----------------+---+---+---+---+---+-----------------+ + |__index_level_0__| A| B| C| D| E|__natural_order__| + +-----------------+---+---+---+---+---+-----------------+ + | 0| 1| 5| 9| 13| 17|...| + | 1| 2| 6| 10| 14| 18|...| + | 2| 3| 7| 11| 15| 19|...| + | 3| 4| 8| 12| 16| 20|...| + +-----------------+---+---+---+---+---+-----------------+ >>> internal.data_columns ['A', 'B', 'C', 'D', 'E'] >>> internal.index_columns @@ -175,15 +179,15 @@ class _InternalFrame(object): +---+---+---+---+---+ >>> internal = kdf1._internal - >>> internal.sdf.show() # doctest: +NORMALIZE_WHITESPACE - +-----------------+---+---+---+---+---+ - |__index_level_0__| A| B| C| D| E| - +-----------------+---+---+---+---+---+ - | 0| 1| 5| 9| 13| 17| - | 1| 2| 6| 10| 14| 18| - | 2| 3| 7| 11| 15| 19| - | 3| 4| 8| 12| 16| 20| - +-----------------+---+---+---+---+---+ + >>> internal.sdf.show() # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS + +-----------------+---+---+---+---+---+-----------------+ + |__index_level_0__| A| B| C| D| E|__natural_order__| + +-----------------+---+---+---+---+---+-----------------+ + | 0| 1| 5| 9| 13| 17|...| + | 1| 2| 6| 10| 14| 18|...| + | 2| 3| 7| 11| 15| 19|...| + | 3| 4| 8| 12| 16| 20|...| + +-----------------+---+---+---+---+---+-----------------+ >>> internal.data_columns ['B', 'C', 'D', 'E'] >>> internal.index_columns @@ -257,15 +261,15 @@ class _InternalFrame(object): +-----------------+---+---+---+---+---+ >>> internal = kdf2._internal - >>> internal.sdf.show() # doctest: +NORMALIZE_WHITESPACE - +-----------------+---+---+---+---+---+ - |__index_level_0__| A| B| C| D| E| - +-----------------+---+---+---+---+---+ - | 0| 1| 5| 9| 13| 17| - | 1| 2| 6| 10| 14| 18| - | 2| 3| 7| 11| 15| 19| - | 3| 4| 8| 12| 16| 20| - +-----------------+---+---+---+---+---+ + >>> internal.sdf.show() # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS + +-----------------+---+---+---+---+---+-----------------+ + |__index_level_0__| A| B| C| D| E|__natural_order__| + +-----------------+---+---+---+---+---+-----------------+ + | 0| 1| 5| 9| 13| 17|...| + | 1| 2| 6| 10| 14| 18|...| + | 2| 3| 7| 11| 15| 19|...| + | 3| 4| 8| 12| 16| 20|...| + +-----------------+---+---+---+---+---+-----------------+ >>> internal.data_columns ['B', 'C', 'D', 'E'] >>> internal.index_columns @@ -313,16 +317,16 @@ class _InternalFrame(object): 4 17 18 19 20 >>> internal = kdf3._internal - >>> internal.sdf.show() # doctest: +NORMALIZE_WHITESPACE - +-----------------+------+------+------+------+ - |__index_level_0__|(X, A)|(X, B)|(Y, C)|(Y, D)| - +-----------------+------+------+------+------+ - | 0| 1| 2| 3| 4| - | 1| 5| 6| 7| 8| - | 2| 9| 10| 11| 12| - | 3| 13| 14| 15| 16| - | 4| 17| 18| 19| 20| - +-----------------+------+------+------+------+ + >>> internal.sdf.show() # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS + +-----------------+------+------+------+------+-----------------+ + |__index_level_0__|(X, A)|(X, B)|(Y, C)|(Y, D)|__natural_order__| + +-----------------+------+------+------+------+-----------------+ + | 0| 1| 2| 3| 4|...| + | 1| 5| 6| 7| 8|...| + | 2| 9| 10| 11| 12|...| + | 3| 13| 14| 15| 16|...| + | 4| 17| 18| 19| 20|...| + +-----------------+------+------+------+------+-----------------+ >>> internal.data_columns ['(X, A)', '(X, B)', '(Y, C)', '(Y, D)'] >>> internal.column_index @@ -340,15 +344,15 @@ class _InternalFrame(object): Name: B, dtype: int64 >>> internal = kseries._internal - >>> internal.sdf.show() # doctest: +NORMALIZE_WHITESPACE - +-----------------+---+---+---+---+---+ - |__index_level_0__| A| B| C| D| E| - +-----------------+---+---+---+---+---+ - | 0| 1| 5| 9| 13| 17| - | 1| 2| 6| 10| 14| 18| - | 2| 3| 7| 11| 15| 19| - | 3| 4| 8| 12| 16| 20| - +-----------------+---+---+---+---+---+ + >>> internal.sdf.show() # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS + +-----------------+---+---+---+---+---+-----------------+ + |__index_level_0__| A| B| C| D| E|__natural_order__| + +-----------------+---+---+---+---+---+-----------------+ + | 0| 1| 5| 9| 13| 17|...| + | 1| 2| 6| 10| 14| 18|...| + | 2| 3| 7| 11| 15| 19|...| + | 3| 4| 8| 12| 16| 20|...| + +-----------------+---+---+---+---+---+-----------------+ >>> internal.scol Column >>> internal.data_columns @@ -402,6 +406,10 @@ def __init__(self, sdf: spark.DataFrame, :param scol: Spark Column to be managed. """ assert isinstance(sdf, spark.DataFrame) + + if NATURAL_ORDER_COLUMN_NAME not in sdf.columns: + sdf = sdf.withColumn(NATURAL_ORDER_COLUMN_NAME, F.monotonically_increasing_id()) + if index_map is None: # Here is when Koalas DataFrame is created directly from Spark DataFrame. assert not any(SPARK_INDEX_NAME_PATTERN.match(name) for name in sdf.schema.names), \ @@ -429,7 +437,7 @@ def __init__(self, sdf: spark.DataFrame, elif column_scols is None: index_columns = set(index_column for index_column, _ in self._index_map) self._column_scols = [scol_for(sdf, col) for col in sdf.columns - if col not in index_columns] + if col not in index_columns and col not in HIDDEN_COLUMNS] else: self._column_scols = column_scols @@ -468,7 +476,7 @@ def attach_default_index(sdf): default_index_type = get_option("compute.default_index_type") if default_index_type == "sequence": sequential_index = F.row_number().over( - Window.orderBy(F.monotonically_increasing_id().asc())) - 1 + Window.orderBy(NATURAL_ORDER_COLUMN_NAME)) - 1 scols = [scol_for(sdf, column) for column in sdf.columns] return sdf.select(sequential_index.alias(SPARK_INDEX_NAME_FORMAT(0)), *scols) elif default_index_type == "distributed-sequence": @@ -493,16 +501,18 @@ def attach_default_index(sdf): # zip it with partition key. sums = dict(zip(map(lambda count: count[0], sorted_counts), cumulative_counts)) + return_schema = StructType( + [StructField(SPARK_INDEX_NAME_FORMAT(0), LongType())] + list(sdf.schema)) + columns = [f.name for f in return_schema] + # 3. Group by partition id and assign each range. def default_index(pdf): current_partition_max = sums[pdf["__spark_partition_id"].iloc[0]] offset = len(pdf) pdf[SPARK_INDEX_NAME_FORMAT(0)] = list(range( current_partition_max - offset, current_partition_max)) - return pdf.drop(columns=["__spark_partition_id"]) + return pdf[columns] - return_schema = StructType( - [StructField(SPARK_INDEX_NAME_FORMAT(0), LongType())] + list(sdf.schema)) grouped_map_func = pandas_udf(return_schema, PandasUDFType.GROUPED_MAP)(default_index) sdf = sdf.withColumn("__spark_partition_id", F.spark_partition_id()) diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index b38ac97207..1453b72db5 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -39,7 +39,8 @@ from databricks.koalas.base import IndexOpsMixin from databricks.koalas.frame import DataFrame from databricks.koalas.generic import _Frame -from databricks.koalas.internal import IndexMap, _InternalFrame, SPARK_INDEX_NAME_FORMAT +from databricks.koalas.internal import (_InternalFrame, NATURAL_ORDER_COLUMN_NAME, + SPARK_INDEX_NAME_FORMAT) from databricks.koalas.missing.series import _MissingPandasLikeSeries from databricks.koalas.plot import KoalasSeriesPlotMethods from databricks.koalas.ml import corr @@ -3044,15 +3045,15 @@ def idxmax(self, skipna=True): >>> s.idxmax() 3 """ - sdf = self._internal._sdf + sdf = self._internal.sdf scol = self._scol index_scols = self._internal.index_scols # desc_nulls_(last|first) is used via Py4J directly because # it's not supported in Spark 2.3. if skipna: - sdf = sdf.orderBy(Column(scol._jc.desc_nulls_last()), F.monotonically_increasing_id()) + sdf = sdf.orderBy(Column(scol._jc.desc_nulls_last()), NATURAL_ORDER_COLUMN_NAME) else: - sdf = sdf.orderBy(Column(scol._jc.desc_nulls_first()), F.monotonically_increasing_id()) + sdf = sdf.orderBy(Column(scol._jc.desc_nulls_first()), NATURAL_ORDER_COLUMN_NAME) results = sdf.select([scol] + index_scols).take(1) if len(results) == 0: raise ValueError("attempt to get idxmin of an empty sequence") @@ -3158,9 +3159,9 @@ def idxmin(self, skipna=True): # asc_nulls_(last|first)is used via Py4J directly because # it's not supported in Spark 2.3. if skipna: - sdf = sdf.orderBy(Column(scol._jc.asc_nulls_last()), F.monotonically_increasing_id()) + sdf = sdf.orderBy(Column(scol._jc.asc_nulls_last()), NATURAL_ORDER_COLUMN_NAME) else: - sdf = sdf.orderBy(Column(scol._jc.asc_nulls_first()), F.monotonically_increasing_id()) + sdf = sdf.orderBy(Column(scol._jc.asc_nulls_first()), NATURAL_ORDER_COLUMN_NAME) results = sdf.select([scol] + index_scols).take(1) if len(results) == 0: raise ValueError("attempt to get idxmin of an empty sequence") @@ -3315,7 +3316,7 @@ def pop(self, item): for level, index in enumerate(item)] sdf = self._internal.sdf \ .select(cols) \ - .where(reduce(lambda x, y: x & y, rows)) + .filter(reduce(lambda x, y: x & y, rows)) if len(self._internal._index_map) == len(item): # if sdf has one column and one data, return data only without frame @@ -3520,7 +3521,7 @@ def mode(self, dropna=True) -> 'Series': ser_count = self.value_counts(dropna=dropna, sort=False) sdf_count = ser_count._internal.sdf most_value = ser_count.max() - sdf_most_value = sdf_count.where("count == {}".format(most_value)) + sdf_most_value = sdf_count.filter("count == {}".format(most_value)) sdf = sdf_most_value.select( F.col(SPARK_INDEX_NAME_FORMAT(0)).alias('0')) internal = _InternalFrame(sdf=sdf) @@ -4139,22 +4140,16 @@ def pct_change(self, periods=1): """ scol = self._internal.scol - window = Window.orderBy(F.monotonically_increasing_id()).rowsBetween(-periods, -periods) + window = Window.orderBy(NATURAL_ORDER_COLUMN_NAME).rowsBetween(-periods, -periods) prev_row = F.lag(scol, periods).over(window) return self._with_new_scol((scol - prev_row) / prev_row) def _cum(self, func, skipna, part_cols=()): # This is used to cummin, cummax, cumsum, etc. - index_columns = self._internal.index_columns - - # address temporal column to keep natural order. - sdf = self._internal.sdf - if '__natural_order__' not in sdf.columns: - sdf = sdf.withColumn('__natural_order__', F.monotonically_increasing_id()) window = Window.orderBy( - '__natural_order__').partitionBy(*part_cols).rowsBetween( + NATURAL_ORDER_COLUMN_NAME).partitionBy(*part_cols).rowsBetween( Window.unboundedPreceding, Window.currentRow) if skipna: @@ -4229,9 +4224,7 @@ def _cum(self, func, skipna, part_cols=()): if func.__name__ == "cumprod": scol = F.exp(scol) - internal = self._internal.copy(sdf=sdf, scol=scol) - - return _col(DataFrame(internal)).rename(self.name) + return self._with_new_scol(scol).rename(self.name) # ---------------------------------------------------------------------- # Accessor Methods