Skip to content

Commit

Permalink
Add read_json and let to_json use spark.write.json
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Sep 6, 2019
1 parent 3457f61 commit e9becfc
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 141 deletions.
184 changes: 55 additions & 129 deletions databricks/koalas/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from pyspark import sql as spark
from pyspark.sql import functions as F
from pyspark.sql.readwriter import OptionUtils
from pyspark.sql.types import DataType, DoubleType, FloatType

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
Expand Down Expand Up @@ -569,158 +570,83 @@ def to_csv(self, path_or_buf=None, sep=",", na_rep='', float_format=None,
return validate_arguments_and_invoke_function(
kdf._to_internal_pandas(), self.to_csv, f, args)

def to_json(self, path_or_buf=None, orient=None, date_format=None,
double_precision=10, force_ascii=True, date_unit='ms',
default_handler=None, lines=False, compression='infer',
index=True):
def to_json(self, path=None, compression='uncompressed', num_files=None, **kwargs):
"""
Convert the object to a JSON string.
.. note:: Koalas `to_json` writes files to a path or URI. Unlike pandas', Koalas
respects HDFS's property such as 'fs.default.name'.
.. note:: Koalas writes JSON files into the directory, `path`, and writes
multiple `part-...` files in the directory when `path` is specified.
This behaviour was inherited from Apache Spark. The number of files can
be controlled by `num_files`.
.. note:: output JSON format is different from pandas'. It always use `orient='records'`
for its output. This behaviour might have to change in the near future.
Note NaN's and None will be converted to null and datetime objects
will be converted to UNIX timestamps.
.. note:: This method should only be used if the resulting JSON is expected
to be small, as all the data is loaded into the driver's memory.
Parameters
----------
path_or_buf : string or file handle, optional
File path or object. If not specified, the result is returned as
path : string, optional
File path. If not specified, the result is returned as
a string.
orient : string
Indication of expected JSON string format.
* Series
- default is 'index'
- allowed values are: {'split','records','index','table'}
* DataFrame
- default is 'columns'
- allowed values are:
{'split','records','index','columns','values','table'}
* The format of the JSON string
- 'split' : dict like {'index' -> [index],
'columns' -> [columns], 'data' -> [values]}
- 'records' : list like
[{column -> value}, ... , {column -> value}]
- 'index' : dict like {index -> {column -> value}}
- 'columns' : dict like {column -> {index -> value}}
- 'values' : just the values array
- 'table' : dict like {'schema': {schema}, 'data': {data}}
describing the data, and the data component is
like ``orient='records'``.
date_format : {None, 'epoch', 'iso'}
Type of date conversion. 'epoch' = epoch milliseconds,
'iso' = ISO8601. The default depends on the `orient`. For
``orient='table'``, the default is 'iso'. For all other orients,
the default is 'epoch'.
double_precision : int, default 10
The number of decimal places to use when encoding
floating point values.
force_ascii : bool, default True
Force encoded string to be ASCII.
date_unit : string, default 'ms' (milliseconds)
The time unit to encode to, governs timestamp and ISO8601
precision. One of 's', 'ms', 'us', 'ns' for second, millisecond,
microsecond, and nanosecond respectively.
default_handler : callable, default None
Handler to call if object cannot otherwise be converted to a
suitable format for JSON. Should receive a single argument which is
the object to convert and return a serialisable object.
lines : bool, default False
If 'orient' is 'records' write out line delimited json format. Will
throw ValueError if incorrect 'orient' since others are not list
like.
compression : {'infer', 'gzip', 'bz2', 'zip', 'xz', None}
date_format : str, default None
Format string for datetime objects.
compression : {'gzip', 'bz2', 'xz', None}
A string representing the compression to use in the output file,
only used when the first argument is a filename. By default, the
compression is inferred from the filename.
index : bool, default True
Whether to include the index values in the JSON string. Not
including the index (``index=False``) is only supported when
orient is 'split' or 'table'.
Examples
--------
>>> df = ks.DataFrame([['a', 'b'], ['c', 'd']],
... index=['row 1', 'row 2'],
... columns=['col 1', 'col 2'])
>>> df.to_json(orient='split')
'{"columns":["col 1","col 2"],\
"index":["row 1","row 2"],\
"data":[["a","b"],["c","d"]]}'
>>> df['col 1'].to_json(orient='split')
'{"name":"col 1","index":["row 1","row 2"],"data":["a","c"]}'
Encoding/decoding a Dataframe using ``'records'`` formatted JSON.
Note that index labels are not preserved with this encoding.
>>> df.to_json(orient='records')
>>> df.to_json()
'[{"col 1":"a","col 2":"b"},{"col 1":"c","col 2":"d"}]'
>>> df['col 1'].to_json(orient='records')
'["a","c"]'
Encoding/decoding a Dataframe using ``'index'`` formatted JSON:
>>> df.to_json(orient='index')
'{"row 1":{"col 1":"a","col 2":"b"},"row 2":{"col 1":"c","col 2":"d"}}'
>>> df['col 1'].to_json(orient='index')
'{"row 1":"a","row 2":"c"}'
Encoding/decoding a Dataframe using ``'columns'`` formatted JSON:
>>> df.to_json(orient='columns')
'{"col 1":{"row 1":"a","row 2":"c"},"col 2":{"row 1":"b","row 2":"d"}}'
>>> df['col 1'].to_json(orient='columns')
'{"row 1":"a","row 2":"c"}'
Encoding/decoding a Dataframe using ``'values'`` formatted JSON:
>>> df.to_json(orient='values')
'[["a","b"],["c","d"]]'
>>> df['col 1'].to_json(orient='values')
'["a","c"]'
Encoding with Table Schema
>>> df.to_json(orient='table') # doctest: +SKIP
'{"schema": {"fields":[{"name":"index","type":"string"},\
{"name":"col 1","type":"string"},\
{"name":"col 2","type":"string"}],\
"primaryKey":["index"],\
"pandas_version":"0.20.0"}, \
"data": [{"index":"row 1","col 1":"a","col 2":"b"},\
{"index":"row 2","col 1":"c","col 2":"d"}]}'
>>> df['col 1'].to_json(orient='table') # doctest: +SKIP
'{"schema": {"fields":[{"name":"index","type":"string"},\
{"name":"col 1","type":"string"}],"primaryKey":["index"],"pandas_version":"0.20.0"}, \
"data": [{"index":"row 1","col 1":"a"},{"index":"row 2","col 1":"c"}]}'
>>> df['col 1'].to_json()
'[{"col 1":"a"},{"col 1":"c"}]'
>>> df.to_json(path=r'%s/to_json/foo.json' % path, num_files=1)
>>> ks.read_json(
... path=r'%s/to_json/foo.json' % path
... ).sort_values(by="col 1")
col 1 col 2
0 a b
1 c d
>>> df['col 1'].to_json(path=r'%s/to_json/foo.json' % path, num_files=1)
>>> ks.read_json(
... path=r'%s/to_json/foo.json' % path
... ).sort_values(by="col 1")
col 1
0 a
1 c
"""
# Make sure locals() call is at the top of the function so we don't capture local variables.
args = locals()
if path is None:
# If path is none, just collect and use pandas's to_json.
kdf_or_ser = self
pdf = kdf_or_ser.to_pandas()
if isinstance(self, ks.Series):
pdf = pdf.to_frame()
# To make the format consistent and readable by `read_json`, convert it to pandas' and
# use 'records' orient for now.
return pdf.to_json(orient='records')

kdf = self
if isinstance(self, ks.Series):
kdf = self._kdf
sdf = kdf._sdf

if isinstance(self, ks.DataFrame):
f = pd.DataFrame.to_json
elif isinstance(self, ks.Series):
f = pd.Series.to_json
else:
raise TypeError('Constructor expects DataFrame or Series; however, '
'got [%s]' % (self,))
if num_files is not None:
sdf = sdf.repartition(num_files)

return validate_arguments_and_invoke_function(
kdf._to_internal_pandas(), self.to_json, f, args)
builder = sdf.select(self._internal.data_columns).write.mode("overwrite")
OptionUtils._set_opts(builder, compression=compression)
builder.options(**kwargs).format("json").save(path)

def to_excel(self, excel_writer, sheet_name="Sheet1", na_rep="", float_format=None,
columns=None, header=True, index=True, index_label=None, startrow=0,
Expand Down
27 changes: 26 additions & 1 deletion databricks/koalas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
__all__ = ["from_pandas", "range", "read_csv", "read_delta", "read_table", "read_spark_io",
"read_parquet", "read_clipboard", "read_excel", "read_html", "to_datetime",
"get_dummies", "concat", "melt", "isna", "isnull", "notna", "notnull",
"read_sql_table", "read_sql_query", "read_sql"]
"read_sql_table", "read_sql_query", "read_sql", "read_json"]


def from_pandas(pobj: Union['pd.DataFrame', 'pd.Series']) -> Union['Series', 'DataFrame']:
Expand Down Expand Up @@ -241,6 +241,31 @@ def read_csv(path, header='infer', names=None, usecols=None,
return DataFrame(sdf)


def read_json(path: str, **options):
"""
Convert a JSON string to pandas object.
Parameters
----------
path : string
File path
Examples
--------
>>> df = ks.DataFrame([['a', 'b'], ['c', 'd']],
... columns=['col 1', 'col 2'])
>>> df.to_json(path=r'%s/read_json/foo.json' % path, num_files=1)
>>> ks.read_json(
... path=r'%s/read_json/foo.json' % path
... ).sort_values(by="col 1")
col 1 col 2
0 a b
1 c d
"""
return read_spark_io(path, format='json', options=options)


def read_delta(path: str, version: Optional[str] = None, timestamp: Optional[str] = None,
**options) -> DataFrame:
"""
Expand Down
33 changes: 22 additions & 11 deletions databricks/koalas/tests/test_dataframe_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import shutil
import string
import tempfile

import numpy as np
import pandas as pd
Expand All @@ -27,6 +29,12 @@
class DataFrameConversionTest(ReusedSQLTestCase, SQLTestUtils, TestUtils):
"""Test cases for "small data" conversion and I/O."""

def setUp(self):
self.tmp_dir = tempfile.mkdtemp(prefix=DataFrameConversionTest.__name__)

def tearDown(self):
shutil.rmtree(self.tmp_dir, ignore_errors=True)

@property
def pdf(self):
return pd.DataFrame({
Expand Down Expand Up @@ -162,16 +170,19 @@ def test_to_json(self):
pdf = self.pdf
kdf = ks.from_pandas(pdf)

self.assert_eq(kdf.to_json(), pdf.to_json())
self.assert_eq(kdf.to_json(orient='split'), pdf.to_json(orient='split'))
self.assert_eq(kdf.to_json(orient='records'), pdf.to_json(orient='records'))
self.assert_eq(kdf.to_json(orient='index'), pdf.to_json(orient='index'))
self.assert_eq(kdf.to_json(orient='values'), pdf.to_json(orient='values'))
self.assert_eq(kdf.to_json(orient='table'), pdf.to_json(orient='table'))
self.assert_eq(kdf.to_json(orient='records', lines=True),
pdf.to_json(orient='records', lines=True))
self.assert_eq(kdf.to_json(orient='split', index=False),
pdf.to_json(orient='split', index=False))
self.assert_eq(kdf.to_json(), pdf.to_json(orient='records'))

def test_to_json_with_path(self):
pdf = pd.DataFrame({'a': [1], 'b': ['a']})
kdf = ks.DataFrame(pdf)

kdf.to_json(self.tmp_dir, num_files=1)
expected = pdf.to_json(orient='records')

output_paths = [path for path in os.listdir(self.tmp_dir) if path.startswith("part-")]
assert len(output_paths) > 0
output_path = "%s/%s" % (self.tmp_dir, output_paths[0])
self.assertEqual("[%s]" % open(output_path).read().strip(), expected)

def test_to_clipboard(self):
pdf = self.pdf
Expand Down
7 changes: 7 additions & 0 deletions docs/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ Excel
read_excel
DataFrame.to_excel

JSON
----
.. autosummary::
:toctree: api/

read_json

HTML
----
.. autosummary::
Expand Down

0 comments on commit e9becfc

Please sign in to comment.