Skip to content

Commit

Permalink
Exclude Index columns for exposed Spark DataFrame and disallow Koalas…
Browse files Browse the repository at this point in the history
… DataFrame with no index
  • Loading branch information
HyukjinKwon committed Aug 19, 2019
1 parent c87a849 commit ad958b8
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 138 deletions.
9 changes: 0 additions & 9 deletions databricks/koalas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,6 @@ def is_monotonic(self):
>>> ser.rename("a").to_frame().set_index("a").index.is_monotonic
True
"""
if len(self._kdf._internal.index_columns) == 0:
raise ValueError("Index must be set.")

col = self._scol
window = Window.orderBy(self._kdf._internal.index_scols).rowsBetween(-1, -1)
sdf = self._kdf._sdf.withColumn(
Expand Down Expand Up @@ -356,9 +353,6 @@ def is_monotonic_decreasing(self):
>>> ser.rename("a").to_frame().set_index("a").index.is_monotonic_decreasing
True
"""
if len(self._kdf._internal.index_columns) == 0:
raise ValueError("Index must be set.")

col = self._scol
window = Window.orderBy(self._kdf._internal.index_scols).rowsBetween(-1, -1)
sdf = self._kdf._sdf.withColumn(
Expand Down Expand Up @@ -705,9 +699,6 @@ def shift(self, periods=1, fill_value=None):
>>> df.index.shift(periods=3, fill_value=0)
Int64Index([0, 0, 0, 0, 1], dtype='int64')
"""
if len(self._internal.index_columns) == 0:
raise ValueError("Index must be set.")

if not isinstance(periods, int):
raise ValueError('periods should be an int; however, got [%s]' % type(periods))

Expand Down
70 changes: 40 additions & 30 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1598,9 +1598,7 @@ def index(self):
Index
"""
from databricks.koalas.indexes import Index, MultiIndex
if len(self._internal.index_map) == 0:
return None
elif len(self._internal.index_map) == 1:
if len(self._internal.index_map) == 1:
return Index(self)
else:
return MultiIndex(self)
Expand Down Expand Up @@ -1860,9 +1858,6 @@ class max type
lion mammal 80.5 run
monkey mammal NaN jump
"""
if len(self._internal.index_map) == 0:
raise NotImplementedError('Can\'t reset index because there is no index.')

multi_index = len(self._internal.index_map) > 1

def rename(index):
Expand All @@ -1877,7 +1872,15 @@ def rename(index):
if level is None:
new_index_map = [(column, name if name is not None else rename(i))
for i, (column, name) in enumerate(self._internal.index_map)]
index_map = []
new_data_columns = [
self._internal.scol_for(column).alias(name) for column, name in new_index_map]
sdf = self._sdf.select(new_data_columns + self._internal.data_columns)

# Now, new internal Spark columns are named as same as index name.
new_index_map = [(name, name) for column, name in new_index_map]

index_map = [('__index_level_0__', None)]
sdf = _InternalFrame.attach_default_index(sdf)
else:
if isinstance(level, (int, str)):
level = [level]
Expand Down Expand Up @@ -1915,10 +1918,13 @@ def rename(index):
index_name if index_name is not None else rename(index_name)))
index_map.remove(info)

sdf = self._sdf

if drop:
new_index_map = []

internal = self._internal.copy(
sdf=sdf,
data_columns=[column for column, _ in new_index_map] + self._internal.data_columns,
index_map=index_map,
column_index=None)
Expand Down Expand Up @@ -2382,13 +2388,13 @@ def to_koalas(self):
>>> spark_df = df.to_spark()
>>> spark_df
DataFrame[__index_level_0__: bigint, col1: bigint, col2: bigint]
DataFrame[col1: bigint, col2: bigint]
>>> kdf = spark_df.to_koalas()
>>> kdf
__index_level_0__ col1 col2
0 0 1 3
1 1 2 4
col1 col2
0 1 3
1 2 4
Calling to_koalas on a Koalas DataFrame simply returns itself.
Expand Down Expand Up @@ -2493,8 +2499,8 @@ def to_table(self, name: str, format: Optional[str] = None, mode: str = 'error',
>>> df.to_table('%s.my_table' % db, partition_cols='date')
"""
self._sdf.write.saveAsTable(name=name, format=format, mode=mode,
partitionBy=partition_cols, options=options)
self.to_spark().write.saveAsTable(name=name, format=format, mode=mode,
partitionBy=partition_cols, options=options)

def to_delta(self, path: str, mode: str = 'error',
partition_cols: Union[str, List[str], None] = None, **options):
Expand Down Expand Up @@ -2604,8 +2610,8 @@ def to_parquet(self, path: str, mode: str = 'error',
... mode = 'overwrite',
... partition_cols=['date', 'country'])
"""
self._sdf.write.parquet(path=path, mode=mode, partitionBy=partition_cols,
compression=compression)
self.to_spark().write.parquet(
path=path, mode=mode, partitionBy=partition_cols, compression=compression)

def to_spark_io(self, path: Optional[str] = None, format: Optional[str] = None,
mode: str = 'error', partition_cols: Union[str, List[str], None] = None,
Expand Down Expand Up @@ -2657,13 +2663,16 @@ def to_spark_io(self, path: Optional[str] = None, format: Optional[str] = None,
>>> df.to_spark_io(path='%s/to_spark_io/foo.json' % path, format='json')
"""
self._sdf.write.save(path=path, format=format, mode=mode, partitionBy=partition_cols,
options=options)
self.to_spark().write.save(
path=path, format=format, mode=mode, partitionBy=partition_cols, options=options)

def to_spark(self):
"""
Return the current DataFrame as a Spark DataFrame.
.. note:: Index information is lost. So, if the index columns are not present in
actual columns, they are lost.
See Also
--------
DataFrame.to_koalas
Expand Down Expand Up @@ -3653,14 +3662,21 @@ def pivot_table(self, values=None, index=None, columns=None,
sdf = sdf.fillna(fill_value)

if index is not None:
return DataFrame(sdf).set_index(index)
data_columns = [column for column in sdf.columns if column not in index]
index_map = [(column, column) for column in index]
internal = _InternalFrame(sdf=sdf, data_columns=data_columns, index_map=index_map)
return DataFrame(internal)
else:
if isinstance(values, list):
index_values = values[-1]
else:
index_values = values

return DataFrame(sdf.withColumn(columns, F.lit(index_values))).set_index(columns)
sdf = sdf.withColumn(columns, F.lit(index_values))
data_columns = [column for column in sdf.columns if column not in columns]
index_map = [(column, column) for column in columns]
internal = _InternalFrame(sdf=sdf, data_columns=data_columns, index_map=index_map)
return DataFrame(internal)

def pivot(self, index=None, columns=None, values=None):
"""
Expand Down Expand Up @@ -4364,9 +4380,6 @@ def sort_index(self, axis: int = 0,
a 1 2 1
b 1 0 3
"""
if len(self._internal.index_map) == 0:
raise ValueError("Index should be set.")

if axis != 0:
raise ValueError("No other axes than 0 are supported at the moment")
if kind is not None:
Expand Down Expand Up @@ -4959,12 +4972,12 @@ def join(self, right: 'DataFrame', on: Optional[Union[str, List[str]]] = None,
original DataFrame’s index in the result.
>>> join_kdf = kdf1.join(kdf2.set_index('key'), on='key')
>>> join_kdf.sort_values(by=join_kdf.columns)
>>> join_kdf.sort_index()
key A B
0 K0 A0 B0
1 K1 A1 B1
2 K2 A2 B2
3 K3 A3 None
0 K3 A3 None
1 K0 A0 B0
2 K1 A1 B1
3 K2 A2 B2
"""
if on:
self = self.set_index(on)
Expand Down Expand Up @@ -5543,9 +5556,6 @@ def _cum(self, func, skipna: bool):
elif func.__name__ == "cumprod":
func = "cumprod"

if len(self._internal.index_columns) == 0:
raise ValueError("Index must be set.")

applied = []
for column in self.columns:
applied.append(getattr(self[column], func)(skipna))
Expand Down
79 changes: 43 additions & 36 deletions databricks/koalas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,6 @@ def cumsum(self):
"""
return self._cum(F.sum)

# TODO: Series support is not implemented yet.
def apply(self, func):
"""
Apply function `func` group-wise and combine the results together.
Expand Down Expand Up @@ -797,7 +796,31 @@ def apply(self, func):
return_schema = None # schema will inferred.
else:
return_schema = _infer_return_type(func).tpe
return self._apply(func, return_schema, retain_index=return_schema is None)

should_infer_schema = return_schema is None
input_groupnames = [s.name for s in self._groupkeys]

if should_infer_schema:
# Here we execute with the first 1000 to get the return type.
# If the records were less than 1000, it uses pandas API directly for a shortcut.
limit = 1000
pdf = self._kdf.head(limit + 1).to_pandas()
pdf = pdf.groupby(input_groupnames).apply(func)
kdf = DataFrame(pdf)
return_schema = kdf._sdf.schema
if len(pdf) <= limit:
return kdf

sdf = self._spark_group_map_apply(
func, return_schema, retain_index=should_infer_schema)

if should_infer_schema:
# If schema is inferred, we can restore indexes too.
internal = kdf._internal.copy(sdf=sdf)
else:
# Otherwise, it loses index.
internal = _InternalFrame(sdf=sdf)
return DataFrame(internal)

# TODO: implement 'dropna' parameter
def filter(self, func):
Expand Down Expand Up @@ -843,24 +866,11 @@ def filter(self, func):
def pandas_filter(pdf):
return pdf.groupby(groupby_names).filter(func)

kdf = self._apply(pandas_filter, data_schema, retain_index=True)
return DataFrame(self._kdf._internal.copy(sdf=kdf._sdf))

def _apply(self, func, return_schema, retain_index):
should_infer_schema = return_schema is None
input_groupnames = [s.name for s in self._groupkeys]

if should_infer_schema:
# Here we execute with the first 1000 to get the return type.
# If the records were less than 1000, it uses pandas API directly for a shortcut.
limit = 1000
pdf = self._kdf.head(limit + 1).to_pandas()
pdf = pdf.groupby(input_groupnames).apply(func)
kdf = DataFrame(pdf)
return_schema = kdf._sdf.schema
if len(pdf) <= limit:
return kdf
sdf = self._spark_group_map_apply(
pandas_filter, data_schema, retain_index=True)
return DataFrame(self._kdf._internal.copy(sdf=sdf))

def _spark_group_map_apply(self, func, return_schema, retain_index):
index_columns = self._kdf._internal.index_columns
index_names = self._kdf._internal.index_names
data_columns = self._kdf._internal.data_columns
Expand Down Expand Up @@ -934,14 +944,7 @@ def rename_output(pdf):
input_groupkeys = [s._scol for s in self._groupkeys]
sdf = sdf.groupby(*input_groupkeys).apply(grouped_map_func)

if should_infer_schema:
# If schema is inferred, we can restore indexes too.
internal = kdf._internal.copy(sdf=sdf)
else:
# Otherwise, it loses index.
internal = _InternalFrame(
sdf=sdf, data_columns=return_schema.fieldNames(), index_map=[])
return DataFrame(internal)
return sdf

def rank(self, method='average', ascending=True):
"""
Expand Down Expand Up @@ -1007,7 +1010,6 @@ def rank(self, method='average', ascending=True):
"""
return self._rank(method, ascending)

# TODO: Series support is not implemented yet.
def transform(self, func):
"""
Apply function column-by-column to the GroupBy object.
Expand Down Expand Up @@ -1117,7 +1119,9 @@ def pandas_transform(pdf):
pdf = pdf.drop(columns=input_groupnames)
return pdf.transform(func)

if return_sig is None:
should_infer_schema = return_sig is None

if should_infer_schema:
# Here we execute with the first 1000 to get the return type.
# If the records were less than 1000, it uses pandas API directly for a shortcut.
limit = 1000
Expand All @@ -1128,16 +1132,22 @@ def pandas_transform(pdf):
if len(pdf) <= limit:
return pdf

applied_kdf = self._apply(pandas_transform, return_schema, retain_index=True)
# kdf inferred from pdf holds a correct index.
return DataFrame(kdf._internal.copy(sdf=applied_kdf._sdf))
sdf = self._spark_group_map_apply(
pandas_transform, return_schema, retain_index=True)
# If schema is inferred, we can restore indexes too.
internal = kdf._internal.copy(sdf=sdf)
else:
return_type = _infer_return_type(func).tpe
data_columns = self._kdf._internal.data_columns
return_schema = StructType([
StructField(c, return_type) for c in data_columns if c not in input_groupnames])

return self._apply(pandas_transform, return_schema, retain_index=False)
sdf = self._spark_group_map_apply(
pandas_transform, return_schema, retain_index=False)
# Otherwise, it loses index.
internal = _InternalFrame(sdf=sdf)

return DataFrame(internal)

def nunique(self, dropna=True):
"""
Expand Down Expand Up @@ -1362,9 +1372,6 @@ def _cum(self, func):
elif func.__name__ == "cumprod":
func = "cumprod"

if len(self._kdf._internal.index_columns) == 0:
raise ValueError("Index must be set.")

applied = []
kdf = self._kdf
groupkey_columns = set(s.name for s in self._groupkeys)
Expand Down
Loading

0 comments on commit ad958b8

Please sign in to comment.