diff --git a/cpp/src/arrow/dataset/api.h b/cpp/src/arrow/dataset/api.h index 6554dfc8cbcbd..c2ebd9d300727 100644 --- a/cpp/src/arrow/dataset/api.h +++ b/cpp/src/arrow/dataset/api.h @@ -26,6 +26,9 @@ #ifdef ARROW_CSV #include "arrow/dataset/file_csv.h" #endif +#ifdef ARROW_JSON +#include "arrow/dataset/file_json.h" +#endif #include "arrow/dataset/file_ipc.h" #ifdef ARROW_ORC #include "arrow/dataset/file_orc.h" diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index ba0a2860ecdaa..d2b5554ec1a03 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -36,6 +36,8 @@ from pyarrow._fs cimport FileSystem, FileSelector from pyarrow._csv cimport ( ConvertOptions, ParseOptions, ReadOptions, WriteOptions) from pyarrow.util import _is_iterable, _is_path_like, _stringify_path +from pyarrow._json cimport ParseOptions as JsonParseOptions +from pyarrow._json cimport ReadOptions as JsonReadOptions _DEFAULT_BATCH_SIZE = 2**17 @@ -983,7 +985,7 @@ cdef class FileSystemDataset(Dataset): The top-level schema of the Dataset. format : FileFormat File format of the fragments, currently only ParquetFileFormat, - IpcFileFormat, and CsvFileFormat are supported. + IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported. filesystem : FileSystem FileSystem of the fragments. root_partition : Expression, optional @@ -1078,7 +1080,7 @@ cdef class FileSystemDataset(Dataset): The top-level schema of the DataDataset. format : FileFormat File format to create fragments from, currently only - ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported. + ParquetFileFormat, IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported. filesystem : FileSystem The filesystem which files are from. partitions : list[Expression], optional @@ -1179,6 +1181,7 @@ cdef class FileFormat(_Weakrefable): classes = { 'ipc': IpcFileFormat, 'csv': CsvFileFormat, + 'json': JsonFileFormat, 'parquet': _get_parquet_symbol('ParquetFileFormat'), 'orc': _get_orc_fileformat(), } @@ -1315,10 +1318,11 @@ cdef class Fragment(_Weakrefable): type_name = frombytes(sp.get().type_name()) classes = { - # IpcFileFormat, CsvFileFormat and OrcFileFormat do not have + # IpcFileFormat, CsvFileFormat, JsonFileFormat and OrcFileFormat do not have # corresponding subclasses of FileFragment 'ipc': FileFragment, 'csv': FileFragment, + 'json': FileFragment, 'orc': FileFragment, 'parquet': _get_parquet_symbol('ParquetFileFragment'), } @@ -1928,6 +1932,7 @@ cdef class FragmentScanOptions(_Weakrefable): classes = { 'csv': CsvFragmentScanOptions, + 'json': JsonFragmentScanOptions, 'parquet': _get_parquet_symbol('ParquetFragmentScanOptions'), } @@ -2184,6 +2189,126 @@ cdef class CsvFileWriteOptions(FileWriteOptions): self.csv_options = sp.get() +cdef class JsonFileFormat(FileFormat): + """ + FileFormat for JSON files. + + Parameters + ---------- + default_fragment_scan_options : JsonFragmentScanOptions + Default options for fragments scan. + parse_options : pyarrow.json.ParseOptions + Options regarding json parsing. + read_options : pyarrow.json.ReadOptions + General read options. + """ + cdef: + CJsonFileFormat* json_format + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, default_fragment_scan_options=None, + JsonParseOptions parse_options=None, + JsonReadOptions read_options=None): + self.init(shared_ptr[CFileFormat](new CJsonFileFormat())) + if parse_options is not None or read_options is not None: + if default_fragment_scan_options is not None: + raise ValueError('If `default_fragment_scan_options` is ' + 'given, cannot specify read_options') + self.default_fragment_scan_options = JsonFragmentScanOptions( + parse_options=parse_options, + read_options=read_options) + elif isinstance(default_fragment_scan_options, dict): + self.default_fragment_scan_options = JsonFragmentScanOptions( + **default_fragment_scan_options) + elif isinstance(default_fragment_scan_options, JsonFragmentScanOptions): + self.default_fragment_scan_options = default_fragment_scan_options + elif default_fragment_scan_options is not None: + raise TypeError('`default_fragment_scan_options` must be either ' + 'a dictionary or an instance of ' + 'JsonFragmentScanOptions') + + cdef void init(self, const shared_ptr[CFileFormat]& sp): + FileFormat.init(self, sp) + self.json_format = sp.get() + + cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): + if options.type_name == 'json': + self.json_format.default_fragment_scan_options = options.wrapped + self.default_fragment_scan_options.read_options = options.read_options + self.default_fragment_scan_options.parse_options = options.parse_options + else: + super()._set_default_fragment_scan_options(options) + + def equals(self, JsonFileFormat other): + return (other and + self.default_fragment_scan_options == + other.default_fragment_scan_options) + + def __reduce__(self): + return JsonFileFormat, (self.default_fragment_scan_options,) + + def __repr__(self): + return "" + + +cdef class JsonFragmentScanOptions(FragmentScanOptions): + """ + Scan-specific options for JSON fragments. + + Parameters + ---------- + parse_options : pyarrow.json.ParseOptions + Options regarding JSON parsing. + read_options : pyarrow.json.ReadOptions + General read options. + """ + cdef: + CJsonFragmentScanOptions* json_options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, JsonParseOptions parse_options=None, + JsonReadOptions read_options=None): + self.init(shared_ptr[CFragmentScanOptions]( + new CJsonFragmentScanOptions())) + if parse_options is not None: + self.parse_options = parse_options + if read_options is not None: + self.read_options = read_options + + cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): + FragmentScanOptions.init(self, sp) + self.json_options = sp.get() + + @property + def parse_options(self): + return JsonParseOptions.wrap(self.json_options.parse_options) + + @parse_options.setter + def parse_options(self, JsonParseOptions parse_options not None): + self.json_options.parse_options = parse_options.options + + @property + def read_options(self): + return JsonReadOptions.wrap(self.json_options.read_options) + + @read_options.setter + def read_options(self, JsonReadOptions read_options not None): + self.json_options.read_options = read_options.options + + def equals(self, JsonFragmentScanOptions other): + return ( + other and + self.read_options.equals(other.read_options) and + self.parse_options.equals(other.parse_options)) + + def __reduce__(self): + return JsonFragmentScanOptions, (self.parse_options, self.read_options) + + cdef class Partitioning(_Weakrefable): def __init__(self): diff --git a/python/pyarrow/_json.pxd b/python/pyarrow/_json.pxd new file mode 100644 index 0000000000000..42a0a678a9b6a --- /dev/null +++ b/python/pyarrow/_json.pxd @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +from pyarrow.includes.libarrow cimport * +from pyarrow.lib cimport _Weakrefable + + +cdef class ParseOptions(_Weakrefable): + cdef: + CJSONParseOptions options + + @staticmethod + cdef ParseOptions wrap(CJSONParseOptions options) + +cdef class ReadOptions(_Weakrefable): + cdef: + CJSONReadOptions options + + @staticmethod + cdef ReadOptions wrap(CJSONReadOptions options) diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx index 4c6d964bd12c1..70cde6e23fed3 100644 --- a/python/pyarrow/_json.pyx +++ b/python/pyarrow/_json.pyx @@ -40,8 +40,6 @@ cdef class ReadOptions(_Weakrefable): This will determine multi-threading granularity as well as the size of individual chunks in the Table. """ - cdef: - CJSONReadOptions options # Avoid mistakingly creating attributes __slots__ = () @@ -84,6 +82,24 @@ cdef class ReadOptions(_Weakrefable): self.block_size ) + def equals(self, ReadOptions other): + return ( + self.use_threads == other.use_threads and + self.block_size == other.block_size + ) + + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + + @staticmethod + cdef ReadOptions wrap(CJSONReadOptions options): + out = ReadOptions() + out.options = options # shallow copy + return out + cdef class ParseOptions(_Weakrefable): """ @@ -107,9 +123,6 @@ cdef class ParseOptions(_Weakrefable): the output """ - cdef: - CJSONParseOptions options - __slots__ = () def __init__(self, explicit_schema=None, newlines_in_values=None, @@ -198,6 +211,25 @@ cdef class ParseOptions(_Weakrefable): self.options.unexpected_field_behavior = v + def equals(self, ParseOptions other): + return ( + self.explicit_schema == other.explicit_schema and + self.newlines_in_values == other.newlines_in_values and + self.unexpected_field_behavior == other.unexpected_field_behavior + ) + + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + + @staticmethod + cdef ParseOptions wrap(CJSONParseOptions options): + out = ParseOptions() + out.options = options # shallow copy + return out + cdef _get_reader(input_file, shared_ptr[CInputStream]* out): use_memory_map = False diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index c4da9686ecf08..8bec2080f38b9 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -23,6 +23,8 @@ from pyarrow._dataset import ( # noqa CsvFileFormat, CsvFragmentScanOptions, + JsonFileFormat, + JsonFragmentScanOptions, Dataset, DatasetFactory, DirectoryPartitioning, @@ -297,6 +299,8 @@ def _ensure_format(obj): if not _orc_available: raise ValueError(_orc_msg) return OrcFileFormat() + elif obj == "json": + return JsonFileFormat() else: raise ValueError("format '{}' is not supported".format(obj)) @@ -598,7 +602,7 @@ def dataset(source, schema=None, format=None, filesystem=None, Optionally provide the Schema for the Dataset, in which case it will not be inferred from the source. format : FileFormat or str - Currently "parquet", "ipc"/"arrow"/"feather", "csv", and "orc" are + Currently "parquet", "ipc"/"arrow"/"feather", "csv", "json", and "orc" are supported. For Feather, only version 2 files are supported. filesystem : FileSystem or URI string, default None If a single path is given as source and filesystem is None, then the diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index b554633e4b16a..201fb78217101 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -277,6 +277,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CCSVReadOptions read_options function[StreamWrapFunc] stream_transform_func + cdef cppclass CJsonFileFormat "arrow::dataset::JsonFileFormat"(CFileFormat): + pass + + cdef cppclass CJsonFragmentScanOptions "arrow::dataset::JsonFragmentScanOptions"(CFragmentScanOptions): + CJSONParseOptions parse_options + CJSONReadOptions read_options + cdef cppclass CPartitioning "arrow::dataset::Partitioning": c_string type_name() const CResult[CExpression] Parse(const c_string & path) const diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 20a1d51d00ece..66562b76c96b0 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -35,6 +35,7 @@ import pyarrow as pa import pyarrow.compute as pc import pyarrow.csv +import pyarrow.json import pyarrow.feather import pyarrow.fs as fs from pyarrow.tests.util import (change_cwd, _filesystem_uri, @@ -805,6 +806,12 @@ def test_file_format_pickling(): skip_rows=3, column_names=['foo'])), ds.CsvFileFormat(read_options=pa.csv.ReadOptions( skip_rows=3, block_size=2**20)), + ds.JsonFileFormat(), + ds.JsonFileFormat( + parse_options=pa.json.ParseOptions(newlines_in_values=True, + unexpected_field_behavior="ignore")), + ds.JsonFileFormat(read_options=pa.json.ReadOptions( + use_threads=False, block_size=14)), ] try: formats.append(ds.OrcFileFormat()) @@ -835,6 +842,12 @@ def test_fragment_scan_options_pickling(): convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)), ds.CsvFragmentScanOptions( read_options=pa.csv.ReadOptions(block_size=2**16)), + ds.JsonFragmentScanOptions(), + ds.JsonFragmentScanOptions( + pa.json.ParseOptions(newlines_in_values=False, + unexpected_field_behavior="error")), + ds.JsonFragmentScanOptions( + read_options=pa.json.ReadOptions(use_threads=True, block_size=512)), ] if pq is not None: @@ -972,6 +985,28 @@ def test_make_csv_fragment_from_buffer(dataset_reader): assert dataset_reader.to_table(pickled).equals(fragment.to_table()) +def test_make_json_fragment_from_buffer(dataset_reader): + content = '{"alpha" : "a", "num": 12, "animal" : "dog"}\n' + \ + '{"alpha" : "b", "num": 11, "animal" : "cat"}\n' + \ + '{"alpha" : "c", "num": 10, "animal" : "rabbit"}\n' + buffer = pa.py_buffer(content.encode('utf-8')) + + json_format = ds.JsonFileFormat() + fragment = json_format.make_fragment(buffer) + + # When buffer, fragment open returns a BufferReader, not NativeFile + assert isinstance(fragment.open(), pa.BufferReader) + + expected = pa.table([['a', 'b', 'c'], + [12, 11, 10], + ['dog', 'cat', 'rabbit']], + names=['alpha', 'num', 'animal']) + assert dataset_reader.to_table(fragment).equals(expected) + + pickled = pickle.loads(pickle.dumps(fragment)) + assert dataset_reader.to_table(pickled).equals(fragment.to_table()) + + @pytest.mark.parquet def test_make_parquet_fragment_from_buffer(dataset_reader): arrays = [ @@ -3174,6 +3209,69 @@ def test_csv_fragment_options(tempdir, dataset_reader): pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])})) +@pytest.mark.pandas +def test_json_format(tempdir, dataset_reader): + table = pa.table({'a': pa.array([1, 2, 3], type="int64"), + 'b': pa.array([.1, .2, .3], type="float64")}) + + path = str(tempdir / 'test.json') + out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{') + with open(path, 'w') as f: + f.write(out) + + dataset = ds.dataset(path, format=ds.JsonFileFormat()) + result = dataset_reader.to_table(dataset) + assert result.equals(table) + + assert_dataset_fragment_convenience_methods(dataset) + + dataset = ds.dataset(path, format='json') + result = dataset_reader.to_table(dataset) + assert result.equals(table) + + +def test_json_format_options(tempdir, dataset_reader): + table = pa.table({'a': pa.array([1, 2, 3], type="int64"), + 'b': pa.array([.1, .2, .3], type="float64")}) + + path = str(tempdir / 'test.json') + out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{') + with open(path, 'w') as f: + f.write(out) + + with pytest.raises(ValueError, + match="try to increase block size"): + dataset = ds.dataset(path, format=ds.JsonFileFormat( + read_options=pa.json.ReadOptions(block_size=4))) + + dataset = ds.dataset(path, format=ds.JsonFileFormat( + read_options=pa.json.ReadOptions(block_size=64))) + result = dataset_reader.to_table(dataset) + assert result.equals(table) + + +def test_json_fragment_options(tempdir, dataset_reader): + table = pa.table({'a': pa.array([1, 2, 3], type="int64"), + 'b': pa.array([.1, .2, .3], type="float64")}) + + path = str(tempdir / 'test.json') + out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{') + with open(path, 'w') as f: + f.write(out) + + with pytest.raises(ValueError, + match="try to increase block size"): + options = ds.JsonFragmentScanOptions( + read_options=pa.json.ReadOptions(block_size=4)) + dataset = ds.dataset(path, format=ds.JsonFileFormat(options)) + + options = ds.JsonFragmentScanOptions( + read_options=pa.json.ReadOptions(block_size=64)) + dataset = ds.dataset(path, format=ds.JsonFileFormat(options)) + result = dataset_reader.to_table(dataset) + assert result.equals(table) + + def test_encoding(tempdir, dataset_reader): path = str(tempdir / 'test.csv')