Skip to content

Commit

Permalink
apacheGH-34216: [Python] Support for reading JSON Datasets With Python (
Browse files Browse the repository at this point in the history
apache#34586)

This PR supports for reading JSON Datasets With Python. As mentioned in [apache#34216](apache#34216), only the reading ability are supported.

Please compare the difference between my implemenation of _json.pyx, _json.pyd and _csv.pyx _csv.pyd.
Cause _csv.pyd utilize pointer for cpp class and my implementation doesn't. 

**What changes are included in this PR?**

C++: add inclusion for file_json.h
Python: reference C++ codes and support reading JSON Datasets

**Are these changes tested?**
Yes
6 test samples added in tests/test_dataset.py

* Closes: apache#34216

Lead-authored-by: Junming Chen <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
  • Loading branch information
2 people authored and liujiacheng777 committed May 11, 2023
1 parent b62e33f commit c7dee48
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 9 deletions.
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#ifdef ARROW_CSV
#include "arrow/dataset/file_csv.h"
#endif
#ifdef ARROW_JSON
#include "arrow/dataset/file_json.h"
#endif
#include "arrow/dataset/file_ipc.h"
#ifdef ARROW_ORC
#include "arrow/dataset/file_orc.h"
Expand Down
131 changes: 128 additions & 3 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ from pyarrow._fs cimport FileSystem, FileSelector
from pyarrow._csv cimport (
ConvertOptions, ParseOptions, ReadOptions, WriteOptions)
from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
from pyarrow._json cimport ParseOptions as JsonParseOptions
from pyarrow._json cimport ReadOptions as JsonReadOptions


_DEFAULT_BATCH_SIZE = 2**17
Expand Down Expand Up @@ -983,7 +985,7 @@ cdef class FileSystemDataset(Dataset):
The top-level schema of the Dataset.
format : FileFormat
File format of the fragments, currently only ParquetFileFormat,
IpcFileFormat, and CsvFileFormat are supported.
IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported.
filesystem : FileSystem
FileSystem of the fragments.
root_partition : Expression, optional
Expand Down Expand Up @@ -1078,7 +1080,7 @@ cdef class FileSystemDataset(Dataset):
The top-level schema of the DataDataset.
format : FileFormat
File format to create fragments from, currently only
ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported.
ParquetFileFormat, IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported.
filesystem : FileSystem
The filesystem which files are from.
partitions : list[Expression], optional
Expand Down Expand Up @@ -1179,6 +1181,7 @@ cdef class FileFormat(_Weakrefable):
classes = {
'ipc': IpcFileFormat,
'csv': CsvFileFormat,
'json': JsonFileFormat,
'parquet': _get_parquet_symbol('ParquetFileFormat'),
'orc': _get_orc_fileformat(),
}
Expand Down Expand Up @@ -1315,10 +1318,11 @@ cdef class Fragment(_Weakrefable):
type_name = frombytes(sp.get().type_name())

classes = {
# IpcFileFormat, CsvFileFormat and OrcFileFormat do not have
# IpcFileFormat, CsvFileFormat, JsonFileFormat and OrcFileFormat do not have
# corresponding subclasses of FileFragment
'ipc': FileFragment,
'csv': FileFragment,
'json': FileFragment,
'orc': FileFragment,
'parquet': _get_parquet_symbol('ParquetFileFragment'),
}
Expand Down Expand Up @@ -1928,6 +1932,7 @@ cdef class FragmentScanOptions(_Weakrefable):

classes = {
'csv': CsvFragmentScanOptions,
'json': JsonFragmentScanOptions,
'parquet': _get_parquet_symbol('ParquetFragmentScanOptions'),
}

Expand Down Expand Up @@ -2184,6 +2189,126 @@ cdef class CsvFileWriteOptions(FileWriteOptions):
self.csv_options = <CCsvFileWriteOptions*> sp.get()


cdef class JsonFileFormat(FileFormat):
"""
FileFormat for JSON files.
Parameters
----------
default_fragment_scan_options : JsonFragmentScanOptions
Default options for fragments scan.
parse_options : pyarrow.json.ParseOptions
Options regarding json parsing.
read_options : pyarrow.json.ReadOptions
General read options.
"""
cdef:
CJsonFileFormat* json_format

# Avoid mistakingly creating attributes
__slots__ = ()

def __init__(self, default_fragment_scan_options=None,
JsonParseOptions parse_options=None,
JsonReadOptions read_options=None):
self.init(shared_ptr[CFileFormat](new CJsonFileFormat()))
if parse_options is not None or read_options is not None:
if default_fragment_scan_options is not None:
raise ValueError('If `default_fragment_scan_options` is '
'given, cannot specify read_options')
self.default_fragment_scan_options = JsonFragmentScanOptions(
parse_options=parse_options,
read_options=read_options)
elif isinstance(default_fragment_scan_options, dict):
self.default_fragment_scan_options = JsonFragmentScanOptions(
**default_fragment_scan_options)
elif isinstance(default_fragment_scan_options, JsonFragmentScanOptions):
self.default_fragment_scan_options = default_fragment_scan_options
elif default_fragment_scan_options is not None:
raise TypeError('`default_fragment_scan_options` must be either '
'a dictionary or an instance of '
'JsonFragmentScanOptions')

cdef void init(self, const shared_ptr[CFileFormat]& sp):
FileFormat.init(self, sp)
self.json_format = <CJsonFileFormat*> sp.get()

cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
if options.type_name == 'json':
self.json_format.default_fragment_scan_options = options.wrapped
self.default_fragment_scan_options.read_options = options.read_options
self.default_fragment_scan_options.parse_options = options.parse_options
else:
super()._set_default_fragment_scan_options(options)

def equals(self, JsonFileFormat other):
return (other and
self.default_fragment_scan_options ==
other.default_fragment_scan_options)

def __reduce__(self):
return JsonFileFormat, (self.default_fragment_scan_options,)

def __repr__(self):
return "<JsonFileFormat>"


cdef class JsonFragmentScanOptions(FragmentScanOptions):
"""
Scan-specific options for JSON fragments.
Parameters
----------
parse_options : pyarrow.json.ParseOptions
Options regarding JSON parsing.
read_options : pyarrow.json.ReadOptions
General read options.
"""
cdef:
CJsonFragmentScanOptions* json_options

# Avoid mistakingly creating attributes
__slots__ = ()

def __init__(self, JsonParseOptions parse_options=None,
JsonReadOptions read_options=None):
self.init(shared_ptr[CFragmentScanOptions](
new CJsonFragmentScanOptions()))
if parse_options is not None:
self.parse_options = parse_options
if read_options is not None:
self.read_options = read_options

cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
FragmentScanOptions.init(self, sp)
self.json_options = <CJsonFragmentScanOptions*> sp.get()

@property
def parse_options(self):
return JsonParseOptions.wrap(self.json_options.parse_options)

@parse_options.setter
def parse_options(self, JsonParseOptions parse_options not None):
self.json_options.parse_options = parse_options.options

@property
def read_options(self):
return JsonReadOptions.wrap(self.json_options.read_options)

@read_options.setter
def read_options(self, JsonReadOptions read_options not None):
self.json_options.read_options = read_options.options

def equals(self, JsonFragmentScanOptions other):
return (
other and
self.read_options.equals(other.read_options) and
self.parse_options.equals(other.parse_options))

def __reduce__(self):
return JsonFragmentScanOptions, (self.parse_options, self.read_options)


cdef class Partitioning(_Weakrefable):

def __init__(self):
Expand Down
36 changes: 36 additions & 0 deletions python/pyarrow/_json.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport _Weakrefable


cdef class ParseOptions(_Weakrefable):
cdef:
CJSONParseOptions options

@staticmethod
cdef ParseOptions wrap(CJSONParseOptions options)

cdef class ReadOptions(_Weakrefable):
cdef:
CJSONReadOptions options

@staticmethod
cdef ReadOptions wrap(CJSONReadOptions options)
42 changes: 37 additions & 5 deletions python/pyarrow/_json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ cdef class ReadOptions(_Weakrefable):
This will determine multi-threading granularity as well as
the size of individual chunks in the Table.
"""
cdef:
CJSONReadOptions options

# Avoid mistakingly creating attributes
__slots__ = ()
Expand Down Expand Up @@ -84,6 +82,24 @@ cdef class ReadOptions(_Weakrefable):
self.block_size
)

def equals(self, ReadOptions other):
return (
self.use_threads == other.use_threads and
self.block_size == other.block_size
)

def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return False

@staticmethod
cdef ReadOptions wrap(CJSONReadOptions options):
out = ReadOptions()
out.options = options # shallow copy
return out


cdef class ParseOptions(_Weakrefable):
"""
Expand All @@ -107,9 +123,6 @@ cdef class ParseOptions(_Weakrefable):
the output
"""

cdef:
CJSONParseOptions options

__slots__ = ()

def __init__(self, explicit_schema=None, newlines_in_values=None,
Expand Down Expand Up @@ -198,6 +211,25 @@ cdef class ParseOptions(_Weakrefable):

self.options.unexpected_field_behavior = v

def equals(self, ParseOptions other):
return (
self.explicit_schema == other.explicit_schema and
self.newlines_in_values == other.newlines_in_values and
self.unexpected_field_behavior == other.unexpected_field_behavior
)

def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return False

@staticmethod
cdef ParseOptions wrap(CJSONParseOptions options):
out = ParseOptions()
out.options = options # shallow copy
return out


cdef _get_reader(input_file, shared_ptr[CInputStream]* out):
use_memory_map = False
Expand Down
6 changes: 5 additions & 1 deletion python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from pyarrow._dataset import ( # noqa
CsvFileFormat,
CsvFragmentScanOptions,
JsonFileFormat,
JsonFragmentScanOptions,
Dataset,
DatasetFactory,
DirectoryPartitioning,
Expand Down Expand Up @@ -297,6 +299,8 @@ def _ensure_format(obj):
if not _orc_available:
raise ValueError(_orc_msg)
return OrcFileFormat()
elif obj == "json":
return JsonFileFormat()
else:
raise ValueError("format '{}' is not supported".format(obj))

Expand Down Expand Up @@ -598,7 +602,7 @@ def dataset(source, schema=None, format=None, filesystem=None,
Optionally provide the Schema for the Dataset, in which case it will
not be inferred from the source.
format : FileFormat or str
Currently "parquet", "ipc"/"arrow"/"feather", "csv", and "orc" are
Currently "parquet", "ipc"/"arrow"/"feather", "csv", "json", and "orc" are
supported. For Feather, only version 2 files are supported.
filesystem : FileSystem or URI string, default None
If a single path is given as source and filesystem is None, then the
Expand Down
7 changes: 7 additions & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CCSVReadOptions read_options
function[StreamWrapFunc] stream_transform_func

cdef cppclass CJsonFileFormat "arrow::dataset::JsonFileFormat"(CFileFormat):
pass

cdef cppclass CJsonFragmentScanOptions "arrow::dataset::JsonFragmentScanOptions"(CFragmentScanOptions):
CJSONParseOptions parse_options
CJSONReadOptions read_options

cdef cppclass CPartitioning "arrow::dataset::Partitioning":
c_string type_name() const
CResult[CExpression] Parse(const c_string & path) const
Expand Down
Loading

0 comments on commit c7dee48

Please sign in to comment.