From e9becfcaff4c199f878866bb792cb84c88137c7d Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Fri, 6 Sep 2019 15:04:06 +0900 Subject: [PATCH] Add read_json and let to_json use spark.write.json --- databricks/koalas/generic.py | 184 ++++++------------ databricks/koalas/namespace.py | 27 ++- .../koalas/tests/test_dataframe_conversion.py | 33 ++-- docs/source/reference/io.rst | 7 + 4 files changed, 110 insertions(+), 141 deletions(-) diff --git a/databricks/koalas/generic.py b/databricks/koalas/generic.py index 8e7a5d47c8..25f26f4909 100644 --- a/databricks/koalas/generic.py +++ b/databricks/koalas/generic.py @@ -26,6 +26,7 @@ from pyspark import sql as spark from pyspark.sql import functions as F +from pyspark.sql.readwriter import OptionUtils from pyspark.sql.types import DataType, DoubleType, FloatType from databricks import koalas as ks # For running doctests and reference resolution in PyCharm. @@ -569,158 +570,83 @@ def to_csv(self, path_or_buf=None, sep=",", na_rep='', float_format=None, return validate_arguments_and_invoke_function( kdf._to_internal_pandas(), self.to_csv, f, args) - def to_json(self, path_or_buf=None, orient=None, date_format=None, - double_precision=10, force_ascii=True, date_unit='ms', - default_handler=None, lines=False, compression='infer', - index=True): + def to_json(self, path=None, compression='uncompressed', num_files=None, **kwargs): """ Convert the object to a JSON string. + .. note:: Koalas `to_json` writes files to a path or URI. Unlike pandas', Koalas + respects HDFS's property such as 'fs.default.name'. + + .. note:: Koalas writes JSON files into the directory, `path`, and writes + multiple `part-...` files in the directory when `path` is specified. + This behaviour was inherited from Apache Spark. The number of files can + be controlled by `num_files`. + + .. note:: output JSON format is different from pandas'. It always use `orient='records'` + for its output. This behaviour might have to change in the near future. + Note NaN's and None will be converted to null and datetime objects will be converted to UNIX timestamps. - .. note:: This method should only be used if the resulting JSON is expected - to be small, as all the data is loaded into the driver's memory. - Parameters ---------- - path_or_buf : string or file handle, optional - File path or object. If not specified, the result is returned as + path : string, optional + File path. If not specified, the result is returned as a string. - orient : string - Indication of expected JSON string format. - - * Series - - - default is 'index' - - allowed values are: {'split','records','index','table'} - - * DataFrame - - - default is 'columns' - - allowed values are: - {'split','records','index','columns','values','table'} - - * The format of the JSON string - - - 'split' : dict like {'index' -> [index], - 'columns' -> [columns], 'data' -> [values]} - - 'records' : list like - [{column -> value}, ... , {column -> value}] - - 'index' : dict like {index -> {column -> value}} - - 'columns' : dict like {column -> {index -> value}} - - 'values' : just the values array - - 'table' : dict like {'schema': {schema}, 'data': {data}} - describing the data, and the data component is - like ``orient='records'``. - date_format : {None, 'epoch', 'iso'} - Type of date conversion. 'epoch' = epoch milliseconds, - 'iso' = ISO8601. The default depends on the `orient`. For - ``orient='table'``, the default is 'iso'. For all other orients, - the default is 'epoch'. - double_precision : int, default 10 - The number of decimal places to use when encoding - floating point values. - force_ascii : bool, default True - Force encoded string to be ASCII. - date_unit : string, default 'ms' (milliseconds) - The time unit to encode to, governs timestamp and ISO8601 - precision. One of 's', 'ms', 'us', 'ns' for second, millisecond, - microsecond, and nanosecond respectively. - default_handler : callable, default None - Handler to call if object cannot otherwise be converted to a - suitable format for JSON. Should receive a single argument which is - the object to convert and return a serialisable object. - lines : bool, default False - If 'orient' is 'records' write out line delimited json format. Will - throw ValueError if incorrect 'orient' since others are not list - like. - compression : {'infer', 'gzip', 'bz2', 'zip', 'xz', None} + date_format : str, default None + Format string for datetime objects. + compression : {'gzip', 'bz2', 'xz', None} A string representing the compression to use in the output file, only used when the first argument is a filename. By default, the compression is inferred from the filename. - index : bool, default True - Whether to include the index values in the JSON string. Not - including the index (``index=False``) is only supported when - orient is 'split' or 'table'. Examples -------- - >>> df = ks.DataFrame([['a', 'b'], ['c', 'd']], - ... index=['row 1', 'row 2'], ... columns=['col 1', 'col 2']) - >>> df.to_json(orient='split') - '{"columns":["col 1","col 2"],\ -"index":["row 1","row 2"],\ -"data":[["a","b"],["c","d"]]}' - - >>> df['col 1'].to_json(orient='split') - '{"name":"col 1","index":["row 1","row 2"],"data":["a","c"]}' - - Encoding/decoding a Dataframe using ``'records'`` formatted JSON. - Note that index labels are not preserved with this encoding. - - >>> df.to_json(orient='records') + >>> df.to_json() '[{"col 1":"a","col 2":"b"},{"col 1":"c","col 2":"d"}]' - >>> df['col 1'].to_json(orient='records') - '["a","c"]' - - Encoding/decoding a Dataframe using ``'index'`` formatted JSON: - - >>> df.to_json(orient='index') - '{"row 1":{"col 1":"a","col 2":"b"},"row 2":{"col 1":"c","col 2":"d"}}' - - >>> df['col 1'].to_json(orient='index') - '{"row 1":"a","row 2":"c"}' - - Encoding/decoding a Dataframe using ``'columns'`` formatted JSON: - - >>> df.to_json(orient='columns') - '{"col 1":{"row 1":"a","row 2":"c"},"col 2":{"row 1":"b","row 2":"d"}}' - - >>> df['col 1'].to_json(orient='columns') - '{"row 1":"a","row 2":"c"}' - - Encoding/decoding a Dataframe using ``'values'`` formatted JSON: - - >>> df.to_json(orient='values') - '[["a","b"],["c","d"]]' - - >>> df['col 1'].to_json(orient='values') - '["a","c"]' - - Encoding with Table Schema - - >>> df.to_json(orient='table') # doctest: +SKIP - '{"schema": {"fields":[{"name":"index","type":"string"},\ -{"name":"col 1","type":"string"},\ -{"name":"col 2","type":"string"}],\ -"primaryKey":["index"],\ -"pandas_version":"0.20.0"}, \ -"data": [{"index":"row 1","col 1":"a","col 2":"b"},\ -{"index":"row 2","col 1":"c","col 2":"d"}]}' - - >>> df['col 1'].to_json(orient='table') # doctest: +SKIP - '{"schema": {"fields":[{"name":"index","type":"string"},\ -{"name":"col 1","type":"string"}],"primaryKey":["index"],"pandas_version":"0.20.0"}, \ -"data": [{"index":"row 1","col 1":"a"},{"index":"row 2","col 1":"c"}]}' + >>> df['col 1'].to_json() + '[{"col 1":"a"},{"col 1":"c"}]' + + >>> df.to_json(path=r'%s/to_json/foo.json' % path, num_files=1) + >>> ks.read_json( + ... path=r'%s/to_json/foo.json' % path + ... ).sort_values(by="col 1") + col 1 col 2 + 0 a b + 1 c d + + >>> df['col 1'].to_json(path=r'%s/to_json/foo.json' % path, num_files=1) + >>> ks.read_json( + ... path=r'%s/to_json/foo.json' % path + ... ).sort_values(by="col 1") + col 1 + 0 a + 1 c """ - # Make sure locals() call is at the top of the function so we don't capture local variables. - args = locals() + if path is None: + # If path is none, just collect and use pandas's to_json. + kdf_or_ser = self + pdf = kdf_or_ser.to_pandas() + if isinstance(self, ks.Series): + pdf = pdf.to_frame() + # To make the format consistent and readable by `read_json`, convert it to pandas' and + # use 'records' orient for now. + return pdf.to_json(orient='records') + kdf = self + if isinstance(self, ks.Series): + kdf = self._kdf + sdf = kdf._sdf - if isinstance(self, ks.DataFrame): - f = pd.DataFrame.to_json - elif isinstance(self, ks.Series): - f = pd.Series.to_json - else: - raise TypeError('Constructor expects DataFrame or Series; however, ' - 'got [%s]' % (self,)) + if num_files is not None: + sdf = sdf.repartition(num_files) - return validate_arguments_and_invoke_function( - kdf._to_internal_pandas(), self.to_json, f, args) + builder = sdf.select(self._internal.data_columns).write.mode("overwrite") + OptionUtils._set_opts(builder, compression=compression) + builder.options(**kwargs).format("json").save(path) def to_excel(self, excel_writer, sheet_name="Sheet1", na_rep="", float_format=None, columns=None, header=True, index=True, index_label=None, startrow=0, diff --git a/databricks/koalas/namespace.py b/databricks/koalas/namespace.py index ec4a7135f0..3f59f07d4b 100644 --- a/databricks/koalas/namespace.py +++ b/databricks/koalas/namespace.py @@ -41,7 +41,7 @@ __all__ = ["from_pandas", "range", "read_csv", "read_delta", "read_table", "read_spark_io", "read_parquet", "read_clipboard", "read_excel", "read_html", "to_datetime", "get_dummies", "concat", "melt", "isna", "isnull", "notna", "notnull", - "read_sql_table", "read_sql_query", "read_sql"] + "read_sql_table", "read_sql_query", "read_sql", "read_json"] def from_pandas(pobj: Union['pd.DataFrame', 'pd.Series']) -> Union['Series', 'DataFrame']: @@ -241,6 +241,31 @@ def read_csv(path, header='infer', names=None, usecols=None, return DataFrame(sdf) +def read_json(path: str, **options): + """ + Convert a JSON string to pandas object. + + Parameters + ---------- + path : string + File path + + Examples + -------- + >>> df = ks.DataFrame([['a', 'b'], ['c', 'd']], + ... columns=['col 1', 'col 2']) + + >>> df.to_json(path=r'%s/read_json/foo.json' % path, num_files=1) + >>> ks.read_json( + ... path=r'%s/read_json/foo.json' % path + ... ).sort_values(by="col 1") + col 1 col 2 + 0 a b + 1 c d + """ + return read_spark_io(path, format='json', options=options) + + def read_delta(path: str, version: Optional[str] = None, timestamp: Optional[str] = None, **options) -> DataFrame: """ diff --git a/databricks/koalas/tests/test_dataframe_conversion.py b/databricks/koalas/tests/test_dataframe_conversion.py index ce4865ac6e..8ffda99a25 100644 --- a/databricks/koalas/tests/test_dataframe_conversion.py +++ b/databricks/koalas/tests/test_dataframe_conversion.py @@ -13,8 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import os +import shutil import string +import tempfile import numpy as np import pandas as pd @@ -27,6 +29,12 @@ class DataFrameConversionTest(ReusedSQLTestCase, SQLTestUtils, TestUtils): """Test cases for "small data" conversion and I/O.""" + def setUp(self): + self.tmp_dir = tempfile.mkdtemp(prefix=DataFrameConversionTest.__name__) + + def tearDown(self): + shutil.rmtree(self.tmp_dir, ignore_errors=True) + @property def pdf(self): return pd.DataFrame({ @@ -162,16 +170,19 @@ def test_to_json(self): pdf = self.pdf kdf = ks.from_pandas(pdf) - self.assert_eq(kdf.to_json(), pdf.to_json()) - self.assert_eq(kdf.to_json(orient='split'), pdf.to_json(orient='split')) - self.assert_eq(kdf.to_json(orient='records'), pdf.to_json(orient='records')) - self.assert_eq(kdf.to_json(orient='index'), pdf.to_json(orient='index')) - self.assert_eq(kdf.to_json(orient='values'), pdf.to_json(orient='values')) - self.assert_eq(kdf.to_json(orient='table'), pdf.to_json(orient='table')) - self.assert_eq(kdf.to_json(orient='records', lines=True), - pdf.to_json(orient='records', lines=True)) - self.assert_eq(kdf.to_json(orient='split', index=False), - pdf.to_json(orient='split', index=False)) + self.assert_eq(kdf.to_json(), pdf.to_json(orient='records')) + + def test_to_json_with_path(self): + pdf = pd.DataFrame({'a': [1], 'b': ['a']}) + kdf = ks.DataFrame(pdf) + + kdf.to_json(self.tmp_dir, num_files=1) + expected = pdf.to_json(orient='records') + + output_paths = [path for path in os.listdir(self.tmp_dir) if path.startswith("part-")] + assert len(output_paths) > 0 + output_path = "%s/%s" % (self.tmp_dir, output_paths[0]) + self.assertEqual("[%s]" % open(output_path).read().strip(), expected) def test_to_clipboard(self): pdf = self.pdf diff --git a/docs/source/reference/io.rst b/docs/source/reference/io.rst index 19e2a56085..59d309e282 100644 --- a/docs/source/reference/io.rst +++ b/docs/source/reference/io.rst @@ -69,6 +69,13 @@ Excel read_excel DataFrame.to_excel +JSON +---- +.. autosummary:: + :toctree: api/ + + read_json + HTML ---- .. autosummary::