From 251bea3e90a340577b19d1c111a8ee9c9a3bef8e Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 10 Nov 2020 14:36:16 -0800 Subject: [PATCH 1/5] Type annotations to Koalas accessors and Spark accessors --- databricks/koalas/accessors.py | 6 ++--- databricks/koalas/spark/accessors.py | 38 +++++++++++++++++++++------- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/databricks/koalas/accessors.py b/databricks/koalas/accessors.py index ca5e6235e0..4114e2986b 100644 --- a/databricks/koalas/accessors.py +++ b/databricks/koalas/accessors.py @@ -185,7 +185,7 @@ def attach_id_column(self, id_type: str, column: Union[Any, Tuple]) -> "DataFram ).resolved_copy ) - def apply_batch(self, func, args=(), **kwds): + def apply_batch(self, func, args=(), **kwds) -> "DataFrame": """ Apply a function that takes pandas DataFrame and outputs pandas DataFrame. The pandas DataFrame given to the function is of a batch used internally. @@ -200,7 +200,7 @@ def apply_batch(self, func, args=(), **kwds): >>> # This case does not return the length of whole frame but of the batch internally ... # used. - ... def length(pdf) -> ks.DataFrame[int]: + ... def length(pdf) -> pd.DataFrame[int]: ... return pd.DataFrame([len(pdf)]) ... >>> df = ks.DataFrame({'A': range(1000)}) @@ -389,7 +389,7 @@ def apply_batch(self, func, args=(), **kwds): return DataFrame(internal) - def transform_batch(self, func, *args, **kwargs): + def transform_batch(self, func, *args, **kwargs) -> "DataFrame": """ Transform chunks with a function that takes pandas DataFrame and outputs pandas DataFrame. The pandas DataFrame given to the function is of a batch used internally. The length of diff --git a/databricks/koalas/spark/accessors.py b/databricks/koalas/spark/accessors.py index 471e1b6703..c5aeeba28a 100644 --- a/databricks/koalas/spark/accessors.py +++ b/databricks/koalas/spark/accessors.py @@ -19,7 +19,7 @@ but Spark has it. """ from distutils.version import LooseVersion -from typing import TYPE_CHECKING, Optional, Union, List +from typing import TYPE_CHECKING, Optional, Union, List, cast import pyspark from pyspark import StorageLevel @@ -59,7 +59,7 @@ def column(self) -> Column: """ return self._data._internal.spark_column_for(self._data._column_label) - def transform(self, func): + def transform(self, func) -> Union["ks.Series", "ks.Index"]: """ Applies a function that takes and returns a Spark column. It allows to natively apply a Spark function and column APIs with the Spark column internally used @@ -126,7 +126,7 @@ def transform(self, func): class SparkSeriesMethods(SparkIndexOpsMethods): def transform(self, func) -> "ks.Series": - return super().transform(func) + return cast(ks.Series, super().transform(func)) transform.__doc__ = SparkIndexOpsMethods.transform.__doc__ @@ -252,7 +252,7 @@ def analyzed(self) -> "ks.Series": class SparkIndexMethods(SparkIndexOpsMethods): def transform(self, func) -> "ks.Index": - return super().transform(func) + return cast(ks.Index, super().transform(func)) transform.__doc__ = SparkIndexOpsMethods.transform.__doc__ @@ -295,7 +295,7 @@ def schema(self, index_col: Optional[Union[str, List[str]]] = None) -> StructTyp """ return self.frame(index_col).schema - def print_schema(self, index_col: Optional[Union[str, List[str]]] = None): + def print_schema(self, index_col: Optional[Union[str, List[str]]] = None) -> None: """ Prints out the underlying Spark schema in the tree format. @@ -305,6 +305,10 @@ def print_schema(self, index_col: Optional[Union[str, List[str]]] = None): Column names to be used in Spark to represent Koalas' index. The index name in Koalas is ignored. By default, the index is always lost. + Returns + ------- + None + Examples -------- >>> df = ks.DataFrame({'a': list('abc'), @@ -634,7 +638,7 @@ def to_table( partition_cols: Optional[Union[str, List[str]]] = None, index_col: Optional[Union[str, List[str]]] = None, **options - ): + ) -> None: """ Write the DataFrame into a Spark table. :meth:`DataFrame.spark.to_table` is an alias of :meth:`DataFrame.to_table`. @@ -669,6 +673,10 @@ def to_table( options Additional options passed directly to Spark. + Returns + ------- + None + See Also -------- read_table @@ -705,7 +713,7 @@ def to_spark_io( partition_cols: Optional[Union[str, List[str]]] = None, index_col: Optional[Union[str, List[str]]] = None, **options - ): + ) -> None: """Write the DataFrame out to a Spark data source. :meth:`DataFrame.spark.to_spark_io` is an alias of :meth:`DataFrame.to_spark_io`. @@ -736,6 +744,10 @@ def to_spark_io( options : dict All other options passed directly into Spark's data source. + Returns + ------- + None + See Also -------- read_spark_io @@ -766,7 +778,7 @@ def to_spark_io( path=path, format=format, mode=mode, partitionBy=partition_cols, **options ) - def explain(self, extended: Optional[bool] = None, mode: Optional[str] = None): + def explain(self, extended: Optional[bool] = None, mode: Optional[str] = None) -> None: """ Prints the underlying (logical and physical) Spark plans to the console for debugging purpose. @@ -778,6 +790,10 @@ def explain(self, extended: Optional[bool] = None, mode: Optional[str] = None): mode : string, default ``None``. The expected output format of plans. + Returns + ------- + None + Examples -------- >>> df = ks.DataFrame({'id': range(10)}) @@ -1164,11 +1180,15 @@ def storage_level(self) -> StorageLevel: """ return self._kdf._cached.storageLevel - def unpersist(self): + def unpersist(self) -> None: """ The `unpersist` function is used to uncache the Koalas DataFrame when it is not used with `with` statement. + Returns + ------- + None + Examples -------- >>> df = ks.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)], From 9dbd20e977d0a96cc917fe455434c41fdee6bf2f Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 10 Nov 2020 15:55:00 -0800 Subject: [PATCH 2/5] Fix Koalas accessor --- databricks/koalas/accessors.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/databricks/koalas/accessors.py b/databricks/koalas/accessors.py index 4114e2986b..7a722cc943 100644 --- a/databricks/koalas/accessors.py +++ b/databricks/koalas/accessors.py @@ -18,7 +18,7 @@ """ import inspect from distutils.version import LooseVersion -from typing import Any, Tuple, Union, TYPE_CHECKING +from typing import Any, Tuple, Union, TYPE_CHECKING, cast import types import numpy as np # noqa: F401 @@ -330,7 +330,7 @@ def apply_batch(self, func, args=(), **kwds) -> "DataFrame": original_func = func func = lambda o: original_func(o, *args, **kwds) - self_applied = DataFrame(self._kdf._internal.resolved_copy) + self_applied = DataFrame(self._kdf._internal.resolved_copy) # type: DataFrame if should_infer_schema: # Here we execute with the first 1000 to get the return type. @@ -343,7 +343,7 @@ def apply_batch(self, func, args=(), **kwds) -> "DataFrame": "The given function should return a frame; however, " "the return type was %s." % type(applied) ) - kdf = ks.DataFrame(applied) + kdf = ks.DataFrame(applied) # type: DataFrame if len(pdf) <= limit: return kdf @@ -389,7 +389,7 @@ def apply_batch(self, func, args=(), **kwds) -> "DataFrame": return DataFrame(internal) - def transform_batch(self, func, *args, **kwargs) -> "DataFrame": + def transform_batch(self, func, *args, **kwargs) -> Union["DataFrame", "Series"]: """ Transform chunks with a function that takes pandas DataFrame and outputs pandas DataFrame. The pandas DataFrame given to the function is of a batch used internally. The length of @@ -450,7 +450,7 @@ def transform_batch(self, func, *args, **kwargs) -> "DataFrame": Returns ------- - DataFrame + DataFrame or Series See Also -------- @@ -594,12 +594,12 @@ def pandas_frame_func(f): if len(pdf) <= limit: # only do the short cut when it returns a frame to avoid # operations on different dataframes in case of series. - return kdf + return cast(ks.DataFrame, kdf) # Force nullability. return_schema = as_nullable_spark_type(kdf._internal.to_internal_spark_frame.schema) - self_applied = DataFrame(self._kdf._internal.resolved_copy) + self_applied = DataFrame(self._kdf._internal.resolved_copy) # type: DataFrame output_func = GroupBy._make_pandas_df_builder_func( self_applied, func, return_schema, retain_index=True From 836bad3e9ed2e3c98a9e06f513927cacf467055b Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 10 Nov 2020 15:57:39 -0800 Subject: [PATCH 3/5] Restore example for apply_batch --- databricks/koalas/accessors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks/koalas/accessors.py b/databricks/koalas/accessors.py index 7a722cc943..a94454eb9b 100644 --- a/databricks/koalas/accessors.py +++ b/databricks/koalas/accessors.py @@ -200,7 +200,7 @@ def apply_batch(self, func, args=(), **kwds) -> "DataFrame": >>> # This case does not return the length of whole frame but of the batch internally ... # used. - ... def length(pdf) -> pd.DataFrame[int]: + ... def length(pdf) -> ks.DataFrame[int]: ... return pd.DataFrame([len(pdf)]) ... >>> df = ks.DataFrame({'A': range(1000)}) From a6c7a07078c4d21f97bfb3389c92cf32a4b124c3 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 10 Nov 2020 17:40:15 -0800 Subject: [PATCH 4/5] String literal of forward ref --- databricks/koalas/spark/accessors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks/koalas/spark/accessors.py b/databricks/koalas/spark/accessors.py index c5aeeba28a..cb9c5e2c52 100644 --- a/databricks/koalas/spark/accessors.py +++ b/databricks/koalas/spark/accessors.py @@ -126,7 +126,7 @@ def transform(self, func) -> Union["ks.Series", "ks.Index"]: class SparkSeriesMethods(SparkIndexOpsMethods): def transform(self, func) -> "ks.Series": - return cast(ks.Series, super().transform(func)) + return cast("ks.Series", super().transform(func)) transform.__doc__ = SparkIndexOpsMethods.transform.__doc__ From 5203dc4fcb2c70df12c3ad2fb17d445fc72a48e4 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 10 Nov 2020 19:08:33 -0800 Subject: [PATCH 5/5] String literal of forward ref --- databricks/koalas/spark/accessors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/databricks/koalas/spark/accessors.py b/databricks/koalas/spark/accessors.py index cb9c5e2c52..38ab04139b 100644 --- a/databricks/koalas/spark/accessors.py +++ b/databricks/koalas/spark/accessors.py @@ -252,7 +252,7 @@ def analyzed(self) -> "ks.Series": class SparkIndexMethods(SparkIndexOpsMethods): def transform(self, func) -> "ks.Index": - return cast(ks.Index, super().transform(func)) + return cast("ks.Index", super().transform(func)) transform.__doc__ = SparkIndexOpsMethods.transform.__doc__