Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-34216: [Python] Support for reading JSON Datasets With Python #34586

Merged
merged 21 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#ifdef ARROW_CSV
#include "arrow/dataset/file_csv.h"
#endif
#ifdef ARROW_JSON
#include "arrow/dataset/file_json.h"
#endif
#include "arrow/dataset/file_ipc.h"
#ifdef ARROW_ORC
#include "arrow/dataset/file_orc.h"
Expand Down
132 changes: 129 additions & 3 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ from pyarrow._fs cimport FileSystem, FileSelector
from pyarrow._csv cimport (
ConvertOptions, ParseOptions, ReadOptions, WriteOptions)
from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
from pyarrow._json cimport ParseOptions as JsonParseOptions
from pyarrow._json cimport ReadOptions as JsonReadOptions



_orc_fileformat = None
Expand Down Expand Up @@ -975,7 +978,7 @@ cdef class FileSystemDataset(Dataset):
The top-level schema of the Dataset.
format : FileFormat
File format of the fragments, currently only ParquetFileFormat,
IpcFileFormat, and CsvFileFormat are supported.
IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported.
filesystem : FileSystem
FileSystem of the fragments.
root_partition : Expression, optional
Expand Down Expand Up @@ -1070,7 +1073,7 @@ cdef class FileSystemDataset(Dataset):
The top-level schema of the DataDataset.
format : FileFormat
File format to create fragments from, currently only
ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported.
ParquetFileFormat, IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported.
filesystem : FileSystem
The filesystem which files are from.
partitions : list[Expression], optional
Expand Down Expand Up @@ -1171,6 +1174,7 @@ cdef class FileFormat(_Weakrefable):
classes = {
'ipc': IpcFileFormat,
'csv': CsvFileFormat,
'json': JsonFileFormat,
'parquet': _get_parquet_symbol('ParquetFileFormat'),
'orc': _get_orc_fileformat(),
}
Expand Down Expand Up @@ -1307,10 +1311,11 @@ cdef class Fragment(_Weakrefable):
type_name = frombytes(sp.get().type_name())

classes = {
# IpcFileFormat, CsvFileFormat and OrcFileFormat do not have
# IpcFileFormat, CsvFileFormat, JsonFileFormat and OrcFileFormat do not have
# corresponding subclasses of FileFragment
'ipc': FileFragment,
'csv': FileFragment,
'json': FileFragment,
'orc': FileFragment,
'parquet': _get_parquet_symbol('ParquetFileFragment'),
}
Expand Down Expand Up @@ -1920,6 +1925,7 @@ cdef class FragmentScanOptions(_Weakrefable):

classes = {
'csv': CsvFragmentScanOptions,
'json': JsonFragmentScanOptions,
'parquet': _get_parquet_symbol('ParquetFragmentScanOptions'),
}

Expand Down Expand Up @@ -2176,6 +2182,126 @@ cdef class CsvFileWriteOptions(FileWriteOptions):
self.csv_options = <CCsvFileWriteOptions*> sp.get()


cdef class JsonFileFormat(FileFormat):
"""
FileFormat for JSON files.

Parameters
----------
default_fragment_scan_options : JsonFragmentScanOptions
Default options for fragments scan.
parse_options : pyarrow.json.ParseOptions
Options regarding json parsing.
read_options : pyarrow.json.ReadOptions
General read options.
"""
cdef:
CJsonFileFormat* json_format

# Avoid mistakingly creating attributes
__slots__ = ()

def __init__(self,default_fragment_scan_options=None,
JsonParseOptions parse_options=None,
JsonReadOptions read_options=None):
self.init(shared_ptr[CFileFormat](new CJsonFileFormat()))
if parse_options is not None or read_options is not None:
if default_fragment_scan_options is not None:
raise ValueError('If `default_fragment_scan_options` is '
'given, cannot specify read_options')
self.default_fragment_scan_options = JsonFragmentScanOptions(
parse_options=parse_options,
read_options=read_options)
elif isinstance(default_fragment_scan_options, dict):
self.default_fragment_scan_options = JsonFragmentScanOptions(
**default_fragment_scan_options)
elif isinstance(default_fragment_scan_options, JsonFragmentScanOptions):
self.default_fragment_scan_options = default_fragment_scan_options
elif default_fragment_scan_options is not None:
raise TypeError('`default_fragment_scan_options` must be either '
'a dictionary or an instance of '
'JsonFragmentScanOptions')

cdef void init(self, const shared_ptr[CFileFormat]& sp):
FileFormat.init(self, sp)
self.json_format = <CJsonFileFormat*> sp.get()

cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
if options.type_name == 'json':
self.json_format.default_fragment_scan_options = options.wrapped
self.default_fragment_scan_options.read_options = options.read_options
else:
super()._set_default_fragment_scan_options(options)

def equals(self, JsonFileFormat other):
return (other and
self.default_fragment_scan_options ==
other.default_fragment_scan_options)

def __reduce__(self):
return JsonFileFormat, (self.default_fragment_scan_options,)

def __repr__(self):
return f"<JsonFileFormat>"


cdef class JsonFragmentScanOptions(FragmentScanOptions):
"""
Scan-specific options for JSON fragments.

Parameters
----------
parse_options : pyarrow.json.ParseOptions
Options regarding JSON parsing.
read_options : pyarrow.json.ReadOptions
R-JunmingChen marked this conversation as resolved.
Show resolved Hide resolved
General read options.
"""
cdef:
CJsonFragmentScanOptions* json_options

# Avoid mistakingly creating attributes
__slots__ = ()

def __init__(self,JsonParseOptions parse_options=None,
JsonReadOptions read_options=None):
self.init(shared_ptr[CFragmentScanOptions](
new CJsonFragmentScanOptions()))
if parse_options is not None:
self.parse_options = parse_options
if read_options is not None:
self.read_options = read_options

cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
FragmentScanOptions.init(self, sp)
self.json_options = <CJsonFragmentScanOptions*> sp.get()

@property
def parse_options(self):
return JsonParseOptions.wrap(self.json_options.parse_options)

@parse_options.setter
def parse_options(self, JsonParseOptions parse_options not None):
self.json_options.parse_options = parse_options.options

@property
def read_options(self):
read_options = JsonReadOptions.wrap(self.json_options.read_options)
return read_options

@read_options.setter
def read_options(self, JsonReadOptions read_options not None):
self.json_options.read_options = read_options.options

def equals(self, JsonFragmentScanOptions other):
return (
other and
self.read_options.equals(other.read_options) and
self.parse_options.equals(other.parse_options))

def __reduce__(self):
return JsonFragmentScanOptions, (self.parse_options, self.read_options)


cdef class Partitioning(_Weakrefable):

def __init__(self):
Expand Down
37 changes: 37 additions & 0 deletions python/pyarrow/_json.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport _Weakrefable


cdef class ParseOptions(_Weakrefable):
cdef:
CJSONParseOptions options

R-JunmingChen marked this conversation as resolved.
Show resolved Hide resolved
@staticmethod
cdef ParseOptions wrap(CJSONParseOptions options)

cdef class ReadOptions(_Weakrefable):
cdef:
CJSONReadOptions options

@staticmethod
cdef ReadOptions wrap(CJSONReadOptions options)

43 changes: 38 additions & 5 deletions python/pyarrow/_json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ cdef class ReadOptions(_Weakrefable):
This will determine multi-threading granularity as well as
the size of individual chunks in the Table.
"""
cdef:
CJSONReadOptions options

# Avoid mistakingly creating attributes
__slots__ = ()
Expand Down Expand Up @@ -83,6 +81,24 @@ cdef class ReadOptions(_Weakrefable):
self.use_threads,
self.block_size
)

def equals(self, ReadOptions other):
return (
self.use_threads == other.use_threads and
self.block_size == other.block_size
)

def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return False

@staticmethod
cdef ReadOptions wrap(CJSONReadOptions options):
out = ReadOptions()
out.options=options #shallow copy
return out


cdef class ParseOptions(_Weakrefable):
Expand All @@ -107,9 +123,6 @@ cdef class ParseOptions(_Weakrefable):
the output
"""

cdef:
CJSONParseOptions options

__slots__ = ()

def __init__(self, explicit_schema=None, newlines_in_values=None,
Expand Down Expand Up @@ -197,6 +210,26 @@ cdef class ParseOptions(_Weakrefable):
)

self.options.unexpected_field_behavior = v

def equals(self, ParseOptions other):
return (
self.explicit_schema == other.explicit_schema and
self.newlines_in_values == other.newlines_in_values and
self.unexpected_field_behavior == other.unexpected_field_behavior
)

def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return False

@staticmethod
cdef ParseOptions wrap(CJSONParseOptions options):
out = ParseOptions()
out.options = options #shallow copy
return out



cdef _get_reader(input_file, shared_ptr[CInputStream]* out):
Expand Down
6 changes: 5 additions & 1 deletion python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from pyarrow._dataset import ( # noqa
CsvFileFormat,
CsvFragmentScanOptions,
JsonFileFormat,
JsonFragmentScanOptions,
Dataset,
DatasetFactory,
DirectoryPartitioning,
Expand Down Expand Up @@ -296,6 +298,8 @@ def _ensure_format(obj):
if not _orc_available:
raise ValueError(_orc_msg)
return OrcFileFormat()
elif obj == "json":
return JsonFileFormat()
else:
raise ValueError("format '{}' is not supported".format(obj))

Expand Down Expand Up @@ -597,7 +601,7 @@ def dataset(source, schema=None, format=None, filesystem=None,
Optionally provide the Schema for the Dataset, in which case it will
not be inferred from the source.
format : FileFormat or str
Currently "parquet", "ipc"/"arrow"/"feather", "csv", and "orc" are
Currently "parquet", "ipc"/"arrow"/"feather", "csv", "json", and "orc" are
supported. For Feather, only version 2 files are supported.
filesystem : FileSystem or URI string, default None
If a single path is given as source and filesystem is None, then the
Expand Down
7 changes: 7 additions & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CCSVReadOptions read_options
function[StreamWrapFunc] stream_transform_func

cdef cppclass CJsonFileFormat "arrow::dataset::JsonFileFormat"(CFileFormat):
pass

cdef cppclass CJsonFragmentScanOptions "arrow::dataset::JsonFragmentScanOptions"(CFragmentScanOptions):
CJSONParseOptions parse_options
CJSONReadOptions read_options

cdef cppclass CPartitioning "arrow::dataset::Partitioning":
c_string type_name() const
CResult[CExpression] Parse(const c_string & path) const
Expand Down
Loading