From 07cc0856bb5e7b3bc9adacefaad83dfbe6e87cb7 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Sun, 12 Mar 2023 17:38:30 +0800 Subject: [PATCH 01/19] Add to libarrow_dataset Of Python --- python/pyarrow/includes/libarrow_dataset.pxd | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 160379708490b..21a181a0bd2b5 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -276,6 +276,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CCSVReadOptions read_options function[StreamWrapFunc] stream_transform_func + cdef cppclass CJsonFileFormat "arrow::dataset::JsonFileFormat"(CFileFormat): + pass + + cdef cppclass CJsonFragmentScanOptions "arrow::dataset::JsonFileFormat"(CFragmentScanOptions): + CJSONParseOptions parse_options + CJSONReadOptions read_options + cdef cppclass CPartitioning "arrow::dataset::Partitioning": c_string type_name() const CResult[CExpression] Parse(const c_string & path) const From 4c190ed36da74d81a070cdfb9cbbe97dd2f0f6a3 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Sun, 12 Mar 2023 18:10:21 +0800 Subject: [PATCH 02/19] add _json.pxd and fix bug --- python/pyarrow/_json.pxd | 31 ++++++++++++++++++++ python/pyarrow/includes/libarrow_dataset.pxd | 2 +- 2 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 python/pyarrow/_json.pxd diff --git a/python/pyarrow/_json.pxd b/python/pyarrow/_json.pxd new file mode 100644 index 0000000000000..bccd117166b85 --- /dev/null +++ b/python/pyarrow/_json.pxd @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +from pyarrow.includes.libarrow cimport * +from pyarrow.lib cimport _Weakrefable + + +cdef class ParseOptions(_Weakrefable): + cdef: + unique_ptr[CJSONParseOptions] options + +cdef class ReadOptions(_Weakrefable): + cdef: + unique_ptr[CJSONReadOptions] options + diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 21a181a0bd2b5..a712ab9ccdc8b 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -279,7 +279,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CJsonFileFormat "arrow::dataset::JsonFileFormat"(CFileFormat): pass - cdef cppclass CJsonFragmentScanOptions "arrow::dataset::JsonFileFormat"(CFragmentScanOptions): + cdef cppclass CJsonFragmentScanOptions "arrow::dataset::JsonFragmentScanOptions"(CFragmentScanOptions): CJSONParseOptions parse_options CJSONReadOptions read_options From 129fa23e3e8bba6529fa4f74e31f0ff7ec2cf9e4 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Tue, 14 Mar 2023 22:32:29 +0800 Subject: [PATCH 03/19] support Json in dataset --- python/pyarrow/_dataset.pyx | 127 +++++++++++++++++++++++++++++++++++- python/pyarrow/_json.pxd | 6 ++ python/pyarrow/_json.pyx | 12 ++++ python/pyarrow/dataset.py | 2 +- 4 files changed, 143 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 16158f6749209..4319d3c7f631a 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -975,7 +975,7 @@ cdef class FileSystemDataset(Dataset): The top-level schema of the Dataset. format : FileFormat File format of the fragments, currently only ParquetFileFormat, - IpcFileFormat, and CsvFileFormat are supported. + IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported. filesystem : FileSystem FileSystem of the fragments. root_partition : Expression, optional @@ -1070,7 +1070,7 @@ cdef class FileSystemDataset(Dataset): The top-level schema of the DataDataset. format : FileFormat File format to create fragments from, currently only - ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported. + ParquetFileFormat, IpcFileFormat, CsvFileFormat, and JsonFileFormat are supported. filesystem : FileSystem The filesystem which files are from. partitions : list[Expression], optional @@ -1171,6 +1171,7 @@ cdef class FileFormat(_Weakrefable): classes = { 'ipc': IpcFileFormat, 'csv': CsvFileFormat, + 'json':JsonFileFormat, 'parquet': _get_parquet_symbol('ParquetFileFormat'), 'orc': _get_orc_fileformat(), } @@ -1307,10 +1308,11 @@ cdef class Fragment(_Weakrefable): type_name = frombytes(sp.get().type_name()) classes = { - # IpcFileFormat, CsvFileFormat and OrcFileFormat do not have + # IpcFileFormat, CsvFileFormat, JsonFileFormat and OrcFileFormat do not have # corresponding subclasses of FileFragment 'ipc': FileFragment, 'csv': FileFragment, + 'json': FileFormat, 'orc': FileFragment, 'parquet': _get_parquet_symbol('ParquetFileFragment'), } @@ -1920,6 +1922,7 @@ cdef class FragmentScanOptions(_Weakrefable): classes = { 'csv': CsvFragmentScanOptions, + 'json': JsonFragmentScanOptions, 'parquet': _get_parquet_symbol('ParquetFragmentScanOptions'), } @@ -2175,6 +2178,124 @@ cdef class CsvFileWriteOptions(FileWriteOptions): FileWriteOptions.init(self, sp) self.csv_options = sp.get() +cdef class JsonFileFormat(FileFormat): + """ + FileFormat for JSON files. + + Parameters + ---------- + parse_options : pyarrow.json.ParseOptions + Options regarding json parsing. + default_fragment_scan_options : JsonFragmentScanOptions + Default options for fragments scan. + read_options : pyarrow.json.ReadOptions + General read options. + """ + cdef: + CJsonFileFormat* json_format + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, ParseOptions parse_options=None, + default_fragment_scan_options=None, + ReadOptions read_options=None): + self.init(shared_ptr[CFileFormat](new CJsonFileFormat())) + if parse_options is not None: + self.parse_options = parse_options + if read_options is not None: + if default_fragment_scan_options is not None: + raise ValueError('If `default_fragment_scan_options` is ' + 'given, cannot specify read_options') + self.default_fragment_scan_options = JsonFragmentScanOptions( + read_options=read_options) + elif isinstance(default_fragment_scan_options, dict): + self.default_fragment_scan_options = JsonFragmentScanOptions( + **default_fragment_scan_options) + elif isinstance(default_fragment_scan_options, JsonFragmentScanOptions): + self.default_fragment_scan_options = default_fragment_scan_options + elif default_fragment_scan_options is not None: + raise TypeError('`default_fragment_scan_options` must be either ' + 'a dictionary or an instance of ' + 'JsonFragmentScanOptions') + + cdef void init(self, const shared_ptr[CFileFormat]& sp): + FileFormat.init(self, sp) + self.json_format = sp.get() + + @property + def parse_options(self): + return ParseOptions.wrap(self.json_format.parse_options) + + @parse_options.setter + def parse_options(self, ParseOptions parse_options not None): + self.json_format.parse_options = deref(parse_options.options) + self.parse_options = parse_options + + cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): + if options.type_name == 'json': + self.json_format.default_fragment_scan_options = options.wrapped + self.default_fragment_scan_options.read_options = options.read_options + else: + super()._set_default_fragment_scan_options(options) + + def equals(self, JsonFileFormat other): + return (other and + self.parse_options.equals(other.parse_options) and + self.default_fragment_scan_options == + other.default_fragment_scan_options) + + def __reduce__(self): + return JsonFileFormat, (self.parse_options, + self.default_fragment_scan_options) + + def __repr__(self): + return f"" + + +cdef class JsonFragmentScanOptions(FragmentScanOptions): + """ + Scan-specific options for JSON fragments. + + Parameters + ---------- + read_options : pyarrow.json.ReadOptions + General read options. + """ + cdef: + CJSONFragmentScanOptions* json_options + + def __init__(self, ReadOptions read_options=None): + self.init(shared_ptr[CFragmentScanOptions]( + new CJsonFragmentScanOptions())) + if read_options is not None: + self.read_options = read_options + + # Avoid mistakingly creating attributes + __slots__ = () + + cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): + FragmentScanOptions.init(self, sp) + self.json_options = sp.get() + + @property + def read_options(self): + read_options = ReadOptions.wrap(self.json_options.read_options) + return read_options + + @read_options.setter + def read_options(self, ReadOptions read_options not None): + self.json_options.read_options = deref(read_options.options) + self.read_options = read_options + + def equals(self, JsonFragmentScanOptions other): + return ( + other and + self.read_options.equals(other.read_options)) + + def __reduce__(self): + return JsonFragmentScanOptions, (self.read_options) + cdef class Partitioning(_Weakrefable): diff --git a/python/pyarrow/_json.pxd b/python/pyarrow/_json.pxd index bccd117166b85..b364346f486eb 100644 --- a/python/pyarrow/_json.pxd +++ b/python/pyarrow/_json.pxd @@ -25,7 +25,13 @@ cdef class ParseOptions(_Weakrefable): cdef: unique_ptr[CJSONParseOptions] options + @staticmethod + cdef ParseOptions wrap(CJSONParseOptions options) + cdef class ReadOptions(_Weakrefable): cdef: unique_ptr[CJSONReadOptions] options + + @staticmethod + cdef ReadOptions wrap(CJSONReadOptions options) diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx index 4c6d964bd12c1..3f3a481ef4253 100644 --- a/python/pyarrow/_json.pyx +++ b/python/pyarrow/_json.pyx @@ -83,6 +83,12 @@ cdef class ReadOptions(_Weakrefable): self.use_threads, self.block_size ) + + @staticmethod + cdef ReadOptions wrap(CJSONReadOptions options): + out = ReadOptions() + out.options.reset(new CJSONReadOptions(move(options))) + return out cdef class ParseOptions(_Weakrefable): @@ -197,6 +203,12 @@ cdef class ParseOptions(_Weakrefable): ) self.options.unexpected_field_behavior = v + + @staticmethod + cdef ParseOptions wrap(CJSONParseOptions options): + out = ParseOptions() + out.options.reset(new CJSONParseOptions(move(options))) + return out cdef _get_reader(input_file, shared_ptr[CInputStream]* out): diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index de9469de445bf..1fd63651ffc5c 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -597,7 +597,7 @@ def dataset(source, schema=None, format=None, filesystem=None, Optionally provide the Schema for the Dataset, in which case it will not be inferred from the source. format : FileFormat or str - Currently "parquet", "ipc"/"arrow"/"feather", "csv", and "orc" are + Currently "parquet", "ipc"/"arrow"/"feather", "csv", "json", and "orc" are supported. For Feather, only version 2 files are supported. filesystem : FileSystem or URI string, default None If a single path is given as source and filesystem is None, then the From 955ed8ee08c949827030ae44a55e5f077126f4e6 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Wed, 15 Mar 2023 22:10:14 +0800 Subject: [PATCH 04/19] fix parse bug --- python/pyarrow/_dataset.pyx | 54 ++++++++++++++++++++----------------- python/pyarrow/_json.pxd | 4 +-- python/pyarrow/_json.pyx | 9 ++----- python/pyarrow/dataset.py | 2 ++ 4 files changed, 35 insertions(+), 34 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 4319d3c7f631a..08343c71682bd 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -35,6 +35,9 @@ from pyarrow._fs cimport FileSystem, FileSelector from pyarrow._csv cimport ( ConvertOptions, ParseOptions, ReadOptions, WriteOptions) from pyarrow.util import _is_iterable, _is_path_like, _stringify_path +from pyarrow._json cimport ParseOptions as JsonParseOptions +from pyarrow._json cimport ReadOptions as JsonReadOptions + _orc_fileformat = None @@ -2197,17 +2200,16 @@ cdef class JsonFileFormat(FileFormat): # Avoid mistakingly creating attributes __slots__ = () - def __init__(self, ParseOptions parse_options=None, + def __init__(self, JsonParseOptions parse_options=None, default_fragment_scan_options=None, - ReadOptions read_options=None): + JsonReadOptions read_options=None): self.init(shared_ptr[CFileFormat](new CJsonFileFormat())) - if parse_options is not None: - self.parse_options = parse_options - if read_options is not None: + if parse_options is not None or read_options is not None: if default_fragment_scan_options is not None: raise ValueError('If `default_fragment_scan_options` is ' 'given, cannot specify read_options') self.default_fragment_scan_options = JsonFragmentScanOptions( + parse_options=parse_options, read_options=read_options) elif isinstance(default_fragment_scan_options, dict): self.default_fragment_scan_options = JsonFragmentScanOptions( @@ -2223,15 +2225,6 @@ cdef class JsonFileFormat(FileFormat): FileFormat.init(self, sp) self.json_format = sp.get() - @property - def parse_options(self): - return ParseOptions.wrap(self.json_format.parse_options) - - @parse_options.setter - def parse_options(self, ParseOptions parse_options not None): - self.json_format.parse_options = deref(parse_options.options) - self.parse_options = parse_options - cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): if options.type_name == 'json': self.json_format.default_fragment_scan_options = options.wrapped @@ -2241,16 +2234,14 @@ cdef class JsonFileFormat(FileFormat): def equals(self, JsonFileFormat other): return (other and - self.parse_options.equals(other.parse_options) and self.default_fragment_scan_options == other.default_fragment_scan_options) def __reduce__(self): - return JsonFileFormat, (self.parse_options, - self.default_fragment_scan_options) + return JsonFileFormat, (self.default_fragment_scan_options) def __repr__(self): - return f"" + return f"" cdef class JsonFragmentScanOptions(FragmentScanOptions): @@ -2263,13 +2254,16 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): General read options. """ cdef: - CJSONFragmentScanOptions* json_options + CJsonFragmentScanOptions* json_options - def __init__(self, ReadOptions read_options=None): + def __init__(self,JsonParseOptions parse_options=None, + JsonReadOptions read_options=None): self.init(shared_ptr[CFragmentScanOptions]( new CJsonFragmentScanOptions())) if read_options is not None: self.read_options = read_options + if parse_options is not None: + self.parse_options = parse_options # Avoid mistakingly creating attributes __slots__ = () @@ -2278,23 +2272,33 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): FragmentScanOptions.init(self, sp) self.json_options = sp.get() + @property + def parse_options(self): + return JsonParseOptions.wrap(self.json_options.parse_options) + + @parse_options.setter + def parse_options(self, JsonParseOptions parse_options not None): + self.json_options.parse_options = parse_options.options + self.parse_options = parse_options + @property def read_options(self): - read_options = ReadOptions.wrap(self.json_options.read_options) + read_options = JsonReadOptions.wrap(self.json_options.read_options) return read_options @read_options.setter - def read_options(self, ReadOptions read_options not None): - self.json_options.read_options = deref(read_options.options) + def read_options(self, JsonReadOptions read_options not None): + self.json_options.read_options = read_options.options self.read_options = read_options def equals(self, JsonFragmentScanOptions other): return ( other and - self.read_options.equals(other.read_options)) + self.read_options.equals(other.read_options) and + self.parse_options.equals(other.parse_options)) def __reduce__(self): - return JsonFragmentScanOptions, (self.read_options) + return JsonFragmentScanOptions, (self.read_options,self.parse_options) cdef class Partitioning(_Weakrefable): diff --git a/python/pyarrow/_json.pxd b/python/pyarrow/_json.pxd index b364346f486eb..ee784eedf1cc8 100644 --- a/python/pyarrow/_json.pxd +++ b/python/pyarrow/_json.pxd @@ -23,14 +23,14 @@ from pyarrow.lib cimport _Weakrefable cdef class ParseOptions(_Weakrefable): cdef: - unique_ptr[CJSONParseOptions] options + CJSONParseOptions options @staticmethod cdef ParseOptions wrap(CJSONParseOptions options) cdef class ReadOptions(_Weakrefable): cdef: - unique_ptr[CJSONReadOptions] options + CJSONReadOptions options @staticmethod cdef ReadOptions wrap(CJSONReadOptions options) diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx index 3f3a481ef4253..11ba103bf9839 100644 --- a/python/pyarrow/_json.pyx +++ b/python/pyarrow/_json.pyx @@ -40,8 +40,6 @@ cdef class ReadOptions(_Weakrefable): This will determine multi-threading granularity as well as the size of individual chunks in the Table. """ - cdef: - CJSONReadOptions options # Avoid mistakingly creating attributes __slots__ = () @@ -87,7 +85,7 @@ cdef class ReadOptions(_Weakrefable): @staticmethod cdef ReadOptions wrap(CJSONReadOptions options): out = ReadOptions() - out.options.reset(new CJSONReadOptions(move(options))) + out.options=options #shallow copy return out @@ -113,9 +111,6 @@ cdef class ParseOptions(_Weakrefable): the output """ - cdef: - CJSONParseOptions options - __slots__ = () def __init__(self, explicit_schema=None, newlines_in_values=None, @@ -207,7 +202,7 @@ cdef class ParseOptions(_Weakrefable): @staticmethod cdef ParseOptions wrap(CJSONParseOptions options): out = ParseOptions() - out.options.reset(new CJSONParseOptions(move(options))) + out.options = options #shallow copy return out diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 1fd63651ffc5c..6d8ec6fff744c 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -23,6 +23,8 @@ from pyarrow._dataset import ( # noqa CsvFileFormat, CsvFragmentScanOptions, + JsonFileFormat, + JsonFragmentScanOptions, Dataset, DatasetFactory, DirectoryPartitioning, From 62f836403194c7f9d421e8aae0d79842b91f8608 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Wed, 15 Mar 2023 23:50:31 +0800 Subject: [PATCH 05/19] include in api.h --- cpp/src/arrow/dataset/api.h | 3 +++ python/pyarrow/_dataset.pyx | 1 + 2 files changed, 4 insertions(+) diff --git a/cpp/src/arrow/dataset/api.h b/cpp/src/arrow/dataset/api.h index 6e8aab5e9ea8c..306c778cb4c4d 100644 --- a/cpp/src/arrow/dataset/api.h +++ b/cpp/src/arrow/dataset/api.h @@ -26,6 +26,9 @@ #ifdef ARROW_CSV #include "arrow/dataset/file_csv.h" #endif +#ifdef ARROW_JSON +#include "arrow/dataset/file_json.h" +#endif #include "arrow/dataset/file_ipc.h" #ifdef ARROW_ORC #include "arrow/dataset/file_orc.h" diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 08343c71682bd..0deea719a4147 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2181,6 +2181,7 @@ cdef class CsvFileWriteOptions(FileWriteOptions): FileWriteOptions.init(self, sp) self.csv_options = sp.get() + cdef class JsonFileFormat(FileFormat): """ FileFormat for JSON files. From 1cd684cd9f766e615988ba271c1f02a21cc9cb50 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Thu, 16 Mar 2023 20:11:07 +0800 Subject: [PATCH 06/19] add in dataset --- python/pyarrow/dataset.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 6d8ec6fff744c..a1b112056cdc0 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -298,6 +298,8 @@ def _ensure_format(obj): if not _orc_available: raise ValueError(_orc_msg) return OrcFileFormat() + elif obj == "json": + return JsonFileFormat() else: raise ValueError("format '{}' is not supported".format(obj)) From e32c47fdd66968d98ffac515110c908dea0c12f6 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Wed, 22 Mar 2023 19:53:59 +0800 Subject: [PATCH 07/19] Update python/pyarrow/_dataset.pyx Co-authored-by: Weston Pace --- python/pyarrow/_dataset.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 0deea719a4147..131eb8cca6ab1 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1174,7 +1174,7 @@ cdef class FileFormat(_Weakrefable): classes = { 'ipc': IpcFileFormat, 'csv': CsvFileFormat, - 'json':JsonFileFormat, + 'json': JsonFileFormat, 'parquet': _get_parquet_symbol('ParquetFileFormat'), 'orc': _get_orc_fileformat(), } From c94400cc1835f9a3971e8982b0d63aebf4d411d9 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Wed, 22 Mar 2023 19:54:16 +0800 Subject: [PATCH 08/19] Update python/pyarrow/_dataset.pyx Co-authored-by: Weston Pace --- python/pyarrow/_dataset.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 131eb8cca6ab1..0382e65d5d4b9 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1315,7 +1315,7 @@ cdef class Fragment(_Weakrefable): # corresponding subclasses of FileFragment 'ipc': FileFragment, 'csv': FileFragment, - 'json': FileFormat, + 'json': FileFragment, 'orc': FileFragment, 'parquet': _get_parquet_symbol('ParquetFileFragment'), } From d551a448bc4f745caebbdb37ebd579226341ccf3 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Thu, 23 Mar 2023 20:47:41 +0800 Subject: [PATCH 09/19] fix pickling bug --- python/pyarrow/_dataset.pyx | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 0382e65d5d4b9..e7ce8a398196d 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2188,10 +2188,10 @@ cdef class JsonFileFormat(FileFormat): Parameters ---------- - parse_options : pyarrow.json.ParseOptions - Options regarding json parsing. default_fragment_scan_options : JsonFragmentScanOptions Default options for fragments scan. + parse_options : pyarrow.json.ParseOptions + Options regarding json parsing. read_options : pyarrow.json.ReadOptions General read options. """ @@ -2201,9 +2201,9 @@ cdef class JsonFileFormat(FileFormat): # Avoid mistakingly creating attributes __slots__ = () - def __init__(self, JsonParseOptions parse_options=None, - default_fragment_scan_options=None, - JsonReadOptions read_options=None): + def __init__(self,default_fragment_scan_options=None, + JsonParseOptions parse_options=None, + JsonReadOptions read_options=None): self.init(shared_ptr[CFileFormat](new CJsonFileFormat())) if parse_options is not None or read_options is not None: if default_fragment_scan_options is not None: @@ -2239,7 +2239,7 @@ cdef class JsonFileFormat(FileFormat): other.default_fragment_scan_options) def __reduce__(self): - return JsonFileFormat, (self.default_fragment_scan_options) + return JsonFileFormat, (self.default_fragment_scan_options,) def __repr__(self): return f"" @@ -2251,6 +2251,8 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): Parameters ---------- + parse_options : pyarrow.json.ParseOptions + Options regarding JSON parsing. read_options : pyarrow.json.ReadOptions General read options. """ @@ -2261,10 +2263,11 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): JsonReadOptions read_options=None): self.init(shared_ptr[CFragmentScanOptions]( new CJsonFragmentScanOptions())) - if read_options is not None: - self.read_options = read_options if parse_options is not None: self.parse_options = parse_options + if read_options is not None: + self.read_options = read_options + # Avoid mistakingly creating attributes __slots__ = () @@ -2299,7 +2302,7 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): self.parse_options.equals(other.parse_options)) def __reduce__(self): - return JsonFragmentScanOptions, (self.read_options,self.parse_options) + return JsonFragmentScanOptions, (self.parse_options, self.read_options) cdef class Partitioning(_Weakrefable): From f9b53cb753e810efbbd75787fd0aad31cb4d0669 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Thu, 23 Mar 2023 21:17:00 +0800 Subject: [PATCH 10/19] test for pickle --- python/pyarrow/tests/test_dataset.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index ebacd0235916c..64fda804ed15d 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -35,6 +35,7 @@ import pyarrow as pa import pyarrow.compute as pc import pyarrow.csv +import pyarrow.json import pyarrow.feather import pyarrow.fs as fs from pyarrow.tests.util import (change_cwd, _filesystem_uri, @@ -798,12 +799,17 @@ def test_file_format_pickling(): formats = [ ds.IpcFileFormat(), ds.CsvFileFormat(), + ds.JsonFileFormat(), ds.CsvFileFormat(pa.csv.ParseOptions(delimiter='\t', ignore_empty_lines=True)), ds.CsvFileFormat(read_options=pa.csv.ReadOptions( skip_rows=3, column_names=['foo'])), ds.CsvFileFormat(read_options=pa.csv.ReadOptions( skip_rows=3, block_size=2**20)), + ds.JsonFileFormat(pa.json.ParseOptions(newlines_in_values=True, + unexpected_field_behavior="ignore")), + ds.JsonFileFormat(read_options=pa.json.ReadOptions( + use_threads=False,block_size=14)), ] try: formats.append(ds.OrcFileFormat()) @@ -829,6 +835,11 @@ def test_file_format_pickling(): def test_fragment_scan_options_pickling(): options = [ + ds.JsonFragmentScanOptions(), + ds.JsonFragmentScanOptions(pa.json.ParseOptions(newlines_in_values=False, + unexpected_field_behavior="error")), + ds.JsonFragmentScanOptions( + read_options=pa.json.ReadOptions(use_threads=True,block_size=512)), ds.CsvFragmentScanOptions(), ds.CsvFragmentScanOptions( convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)), From a271c22add194ff19a31716ee6cb1ea8b2639da9 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Sat, 25 Mar 2023 22:02:03 +0800 Subject: [PATCH 11/19] add buffer test --- python/pyarrow/_dataset.pyx | 9 +++---- python/pyarrow/_json.pyx | 26 +++++++++++++++++++++ python/pyarrow/tests/test_dataset.py | 35 ++++++++++++++++++++++------ 3 files changed, 57 insertions(+), 13 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index e7ce8a398196d..670416e435994 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2259,6 +2259,9 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): cdef: CJsonFragmentScanOptions* json_options + # Avoid mistakingly creating attributes + __slots__ = () + def __init__(self,JsonParseOptions parse_options=None, JsonReadOptions read_options=None): self.init(shared_ptr[CFragmentScanOptions]( @@ -2268,10 +2271,6 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): if read_options is not None: self.read_options = read_options - - # Avoid mistakingly creating attributes - __slots__ = () - cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) self.json_options = sp.get() @@ -2283,7 +2282,6 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): @parse_options.setter def parse_options(self, JsonParseOptions parse_options not None): self.json_options.parse_options = parse_options.options - self.parse_options = parse_options @property def read_options(self): @@ -2293,7 +2291,6 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): @read_options.setter def read_options(self, JsonReadOptions read_options not None): self.json_options.read_options = read_options.options - self.read_options = read_options def equals(self, JsonFragmentScanOptions other): return ( diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx index 11ba103bf9839..2baa70271190e 100644 --- a/python/pyarrow/_json.pyx +++ b/python/pyarrow/_json.pyx @@ -82,6 +82,18 @@ cdef class ReadOptions(_Weakrefable): self.block_size ) + def equals(self, ReadOptions other): + return ( + self.use_threads == other.use_threads and + self.block_size == other.block_size + ) + + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + @staticmethod cdef ReadOptions wrap(CJSONReadOptions options): out = ReadOptions() @@ -199,11 +211,25 @@ cdef class ParseOptions(_Weakrefable): self.options.unexpected_field_behavior = v + def equals(self, ParseOptions other): + return ( + self.explicit_schema == other.explicit_schema and + self.newlines_in_values == other.newlines_in_values and + self.unexpected_field_behavior == other.unexpected_field_behavior + ) + + def __eq__(self, other): + try: + return self.equals(other) + except TypeError: + return False + @staticmethod cdef ParseOptions wrap(CJSONParseOptions options): out = ParseOptions() out.options = options #shallow copy return out + cdef _get_reader(input_file, shared_ptr[CInputStream]* out): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 64fda804ed15d..a1ec127583d16 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -799,14 +799,14 @@ def test_file_format_pickling(): formats = [ ds.IpcFileFormat(), ds.CsvFileFormat(), - ds.JsonFileFormat(), ds.CsvFileFormat(pa.csv.ParseOptions(delimiter='\t', ignore_empty_lines=True)), ds.CsvFileFormat(read_options=pa.csv.ReadOptions( skip_rows=3, column_names=['foo'])), ds.CsvFileFormat(read_options=pa.csv.ReadOptions( skip_rows=3, block_size=2**20)), - ds.JsonFileFormat(pa.json.ParseOptions(newlines_in_values=True, + ds.JsonFileFormat(), + ds.JsonFileFormat(parse_options=pa.json.ParseOptions(newlines_in_values=True, unexpected_field_behavior="ignore")), ds.JsonFileFormat(read_options=pa.json.ReadOptions( use_threads=False,block_size=14)), @@ -835,16 +835,16 @@ def test_file_format_pickling(): def test_fragment_scan_options_pickling(): options = [ - ds.JsonFragmentScanOptions(), - ds.JsonFragmentScanOptions(pa.json.ParseOptions(newlines_in_values=False, - unexpected_field_behavior="error")), - ds.JsonFragmentScanOptions( - read_options=pa.json.ReadOptions(use_threads=True,block_size=512)), ds.CsvFragmentScanOptions(), ds.CsvFragmentScanOptions( convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)), ds.CsvFragmentScanOptions( read_options=pa.csv.ReadOptions(block_size=2**16)), + ds.JsonFragmentScanOptions(), + ds.JsonFragmentScanOptions(pa.json.ParseOptions(newlines_in_values=False, + unexpected_field_behavior="error")), + ds.JsonFragmentScanOptions( + read_options=pa.json.ReadOptions(use_threads=True,block_size=512)), ] if pq is not None: @@ -981,6 +981,25 @@ def test_make_csv_fragment_from_buffer(dataset_reader): pickled = pickle.loads(pickle.dumps(fragment)) assert dataset_reader.to_table(pickled).equals(fragment.to_table()) +def test_make_json_fragment_from_buffer(dataset_reader): + content = '{"alpha" : "a", "num": 12, "animal" : "dog"}\n' + '{"alpha" : "b", "num": 11, "animal" : "cat"}\n' + '{"alpha" : "c", "num": 10, "animal" : "rabbit"}\n' + buffer = pa.py_buffer(content.encode('utf-8')) + + json_format = ds.JsonFileFormat() + fragment = json_format.make_fragment(buffer) + + # When buffer, fragment open returns a BufferReader, not NativeFile + assert isinstance(fragment.open(), pa.BufferReader) + + expected = pa.table([['a', 'b', 'c'], + [12, 11, 10], + ['dog', 'cat', 'rabbit']], + names=['alpha', 'num', 'animal']) + assert dataset_reader.to_table(fragment).equals(expected) + + pickled = pickle.loads(pickle.dumps(fragment)) + assert dataset_reader.to_table(pickled).equals(fragment.to_table()) + @pytest.mark.parquet def test_make_parquet_fragment_from_buffer(dataset_reader): @@ -5133,3 +5152,5 @@ def test_dataset_sort_by(tempdir, dstype): sorted_tab_dict = sorted_tab.to_table().to_pydict() assert sorted_tab_dict["a"] == [5, 7, 7, 35] assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"] + +test_fragment_scan_options_pickling() \ No newline at end of file From 3a9f0dc2b1a394d386db693701ca42fbbb3442c6 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Sat, 25 Mar 2023 22:32:54 +0800 Subject: [PATCH 12/19] test_format --- python/pyarrow/tests/test_dataset.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index a1ec127583d16..8dd05c185866f 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3202,6 +3202,25 @@ def test_csv_fragment_options(tempdir, dataset_reader): assert result.equals( pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])})) +@pytest.mark.pandas +def test_json_format(tempdir, dataset_reader): + table = pa.table({'a': pa.array([1, 2, 3], type="int64"), + 'b': pa.array([.1, .2, .3], type="float64")}) + + path = str(tempdir / 'test.json') + out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{') + with open(path, 'w') as f: + f.write(out) + + dataset = ds.dataset(path, format=ds.JsonFileFormat()) + result = dataset_reader.to_table(dataset) + assert result.equals(table) + + assert_dataset_fragment_convenience_methods(dataset) + + dataset = ds.dataset(path, format='json') + result = dataset_reader.to_table(dataset) + assert result.equals(table) def test_encoding(tempdir, dataset_reader): path = str(tempdir / 'test.csv') From 0837364e1acc9d0161537f483ea9964b19ff89cb Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Sat, 25 Mar 2023 22:53:03 +0800 Subject: [PATCH 13/19] delete debug func --- python/pyarrow/tests/test_dataset.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 8dd05c185866f..cd51508f27ff8 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -5170,6 +5170,4 @@ def test_dataset_sort_by(tempdir, dstype): sorted_tab = dt.sort_by([("a", "ascending")]) sorted_tab_dict = sorted_tab.to_table().to_pydict() assert sorted_tab_dict["a"] == [5, 7, 7, 35] - assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"] - -test_fragment_scan_options_pickling() \ No newline at end of file + assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"] \ No newline at end of file From 31b5557e473a7ff293cf2a0e2e99e88afad0b9c2 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Sun, 26 Mar 2023 10:51:57 +0800 Subject: [PATCH 14/19] add option test --- python/pyarrow/tests/test_dataset.py | 33 ++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index cd51508f27ff8..1073334af0866 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3222,6 +3222,39 @@ def test_json_format(tempdir, dataset_reader): result = dataset_reader.to_table(dataset) assert result.equals(table) +def test_json_format_options(tempdir, dataset_reader): + table = pa.table({'a': pa.array([1, 2, 3], type="int64"), + 'b': pa.array([.1, .2, .3], type="float64")}) + + path = str(tempdir / 'test.json') + out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{') + with open(path, 'w') as f: + f.write(out) + + dataset = ds.dataset(path, format=ds.JsonFileFormat( + read_options=pa.json.ReadOptions(block_size=64))) + result = dataset_reader.to_table(dataset) + assert result.equals( + pa.table({'a': pa.array([1, 2, 3], type="int64"), + 'b': pa.array([.1, .2, .3], type="float64")})) + + +def test_json_fragment_options(tempdir, dataset_reader): + table = pa.table({'a': pa.array([1, 2, 3], type="int64"), + 'b': pa.array([.1, .2, .3], type="float64")}) + + path = str(tempdir / 'test.json') + out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{') + with open(path, 'w') as f: + f.write(out) + + options = ds.JsonFragmentScanOptions(read_options=pa.json.ReadOptions(block_size=64)) + dataset = ds.dataset(path, format=ds.JsonFileFormat(options)) + result = dataset_reader.to_table(dataset) + assert result.equals( + pa.table({'a': pa.array([1, 2, 3], type="int64"), + 'b': pa.array([.1, .2, .3], type="float64")})) + def test_encoding(tempdir, dataset_reader): path = str(tempdir / 'test.csv') From 1f4692d37e0c2b3197a3f7328015a4a87182cbf0 Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Mon, 10 Apr 2023 23:52:34 +0800 Subject: [PATCH 15/19] format --- python/pyarrow/tests/test_dataset.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 1073334af0866..a8cf918d8e715 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -809,7 +809,7 @@ def test_file_format_pickling(): ds.JsonFileFormat(parse_options=pa.json.ParseOptions(newlines_in_values=True, unexpected_field_behavior="ignore")), ds.JsonFileFormat(read_options=pa.json.ReadOptions( - use_threads=False,block_size=14)), + use_threads=False,block_size=14)), ] try: formats.append(ds.OrcFileFormat()) @@ -844,7 +844,7 @@ def test_fragment_scan_options_pickling(): ds.JsonFragmentScanOptions(pa.json.ParseOptions(newlines_in_values=False, unexpected_field_behavior="error")), ds.JsonFragmentScanOptions( - read_options=pa.json.ReadOptions(use_threads=True,block_size=512)), + read_options=pa.json.ReadOptions(use_threads=True,block_size=512)), ] if pq is not None: @@ -981,8 +981,11 @@ def test_make_csv_fragment_from_buffer(dataset_reader): pickled = pickle.loads(pickle.dumps(fragment)) assert dataset_reader.to_table(pickled).equals(fragment.to_table()) + def test_make_json_fragment_from_buffer(dataset_reader): - content = '{"alpha" : "a", "num": 12, "animal" : "dog"}\n' + '{"alpha" : "b", "num": 11, "animal" : "cat"}\n' + '{"alpha" : "c", "num": 10, "animal" : "rabbit"}\n' + content = '{"alpha" : "a", "num": 12, "animal" : "dog"}\n' + + '{"alpha" : "b", "num": 11, "animal" : "cat"}\n' + + '{"alpha" : "c", "num": 10, "animal" : "rabbit"}\n' buffer = pa.py_buffer(content.encode('utf-8')) json_format = ds.JsonFileFormat() @@ -3202,6 +3205,7 @@ def test_csv_fragment_options(tempdir, dataset_reader): assert result.equals( pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])})) + @pytest.mark.pandas def test_json_format(tempdir, dataset_reader): table = pa.table({'a': pa.array([1, 2, 3], type="int64"), @@ -3222,6 +3226,7 @@ def test_json_format(tempdir, dataset_reader): result = dataset_reader.to_table(dataset) assert result.equals(table) + def test_json_format_options(tempdir, dataset_reader): table = pa.table({'a': pa.array([1, 2, 3], type="int64"), 'b': pa.array([.1, .2, .3], type="float64")}) @@ -3230,7 +3235,7 @@ def test_json_format_options(tempdir, dataset_reader): out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{') with open(path, 'w') as f: f.write(out) - + dataset = ds.dataset(path, format=ds.JsonFileFormat( read_options=pa.json.ReadOptions(block_size=64))) result = dataset_reader.to_table(dataset) @@ -3247,14 +3252,16 @@ def test_json_fragment_options(tempdir, dataset_reader): out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{') with open(path, 'w') as f: f.write(out) - - options = ds.JsonFragmentScanOptions(read_options=pa.json.ReadOptions(block_size=64)) + + options = ds.JsonFragmentScanOptions( + read_options=pa.json.ReadOptions(block_size=64)) dataset = ds.dataset(path, format=ds.JsonFileFormat(options)) result = dataset_reader.to_table(dataset) assert result.equals( pa.table({'a': pa.array([1, 2, 3], type="int64"), 'b': pa.array([.1, .2, .3], type="float64")})) + def test_encoding(tempdir, dataset_reader): path = str(tempdir / 'test.csv') @@ -5203,4 +5210,4 @@ def test_dataset_sort_by(tempdir, dstype): sorted_tab = dt.sort_by([("a", "ascending")]) sorted_tab_dict = sorted_tab.to_table().to_pydict() assert sorted_tab_dict["a"] == [5, 7, 7, 35] - assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"] \ No newline at end of file + assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"] From e274504c7797d1087ac9166570c46f0ab7ff913f Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Tue, 11 Apr 2023 22:37:17 +0800 Subject: [PATCH 16/19] further format --- python/pyarrow/_dataset.pyx | 5 +++-- python/pyarrow/_json.pyx | 2 +- python/pyarrow/tests/test_dataset.py | 14 +++++--------- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 670416e435994..3977f7af25b7d 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2230,6 +2230,7 @@ cdef class JsonFileFormat(FileFormat): if options.type_name == 'json': self.json_format.default_fragment_scan_options = options.wrapped self.default_fragment_scan_options.read_options = options.read_options + self.default_fragment_scan_options.parse_options = options.parse_options else: super()._set_default_fragment_scan_options(options) @@ -2285,8 +2286,8 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): @property def read_options(self): - read_options = JsonReadOptions.wrap(self.json_options.read_options) - return read_options + return JsonReadOptions.wrap(self.json_options.read_options) + @read_options.setter def read_options(self, JsonReadOptions read_options not None): diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx index 2baa70271190e..72c8bcb5545f8 100644 --- a/python/pyarrow/_json.pyx +++ b/python/pyarrow/_json.pyx @@ -97,7 +97,7 @@ cdef class ReadOptions(_Weakrefable): @staticmethod cdef ReadOptions wrap(CJSONReadOptions options): out = ReadOptions() - out.options=options #shallow copy + out.options = options #shallow copy return out diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index a8cf918d8e715..e4a9925e20806 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -983,9 +983,9 @@ def test_make_csv_fragment_from_buffer(dataset_reader): def test_make_json_fragment_from_buffer(dataset_reader): - content = '{"alpha" : "a", "num": 12, "animal" : "dog"}\n' - + '{"alpha" : "b", "num": 11, "animal" : "cat"}\n' - + '{"alpha" : "c", "num": 10, "animal" : "rabbit"}\n' + content = '{"alpha" : "a", "num": 12, "animal" : "dog"}\n' + \ + '{"alpha" : "b", "num": 11, "animal" : "cat"}\n' + \ + '{"alpha" : "c", "num": 10, "animal" : "rabbit"}\n' buffer = pa.py_buffer(content.encode('utf-8')) json_format = ds.JsonFileFormat() @@ -3239,9 +3239,7 @@ def test_json_format_options(tempdir, dataset_reader): dataset = ds.dataset(path, format=ds.JsonFileFormat( read_options=pa.json.ReadOptions(block_size=64))) result = dataset_reader.to_table(dataset) - assert result.equals( - pa.table({'a': pa.array([1, 2, 3], type="int64"), - 'b': pa.array([.1, .2, .3], type="float64")})) + assert result.equals(table) def test_json_fragment_options(tempdir, dataset_reader): @@ -3257,9 +3255,7 @@ def test_json_fragment_options(tempdir, dataset_reader): read_options=pa.json.ReadOptions(block_size=64)) dataset = ds.dataset(path, format=ds.JsonFileFormat(options)) result = dataset_reader.to_table(dataset) - assert result.equals( - pa.table({'a': pa.array([1, 2, 3], type="int64"), - 'b': pa.array([.1, .2, .3], type="float64")})) + assert result.equals(table) def test_encoding(tempdir, dataset_reader): From 40b0436a080de62db0a3686b35a7dd7fda70f67a Mon Sep 17 00:00:00 2001 From: Junming Chen Date: Wed, 12 Apr 2023 22:06:42 +0800 Subject: [PATCH 17/19] test for block size --- python/pyarrow/tests/test_dataset.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e4a9925e20806..d2c9c1c49ff2d 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3235,6 +3235,11 @@ def test_json_format_options(tempdir, dataset_reader): out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{') with open(path, 'w') as f: f.write(out) + + with pytest.raises(ValueError, + match="try to increase block size"): + dataset = ds.dataset(path, format=ds.JsonFileFormat( + read_options=pa.json.ReadOptions(block_size=4))) dataset = ds.dataset(path, format=ds.JsonFileFormat( read_options=pa.json.ReadOptions(block_size=64))) @@ -3251,6 +3256,12 @@ def test_json_fragment_options(tempdir, dataset_reader): with open(path, 'w') as f: f.write(out) + with pytest.raises(ValueError, + match="try to increase block size"): + options = ds.JsonFragmentScanOptions( + read_options=pa.json.ReadOptions(block_size=4)) + dataset = ds.dataset(path, format=ds.JsonFileFormat(options)) + options = ds.JsonFragmentScanOptions( read_options=pa.json.ReadOptions(block_size=64)) dataset = ds.dataset(path, format=ds.JsonFileFormat(options)) From 0bec6d42bc395b675e002d0853c150c3968c10fb Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 12 Apr 2023 10:05:12 -0700 Subject: [PATCH 18/19] Lint pass with archery --python --fix --- python/pyarrow/_dataset.pyx | 18 ++++++++---------- python/pyarrow/_json.pxd | 3 +-- python/pyarrow/_json.pyx | 15 +++++++-------- python/pyarrow/includes/libarrow_dataset.pxd | 2 +- python/pyarrow/tests/test_dataset.py | 20 +++++++++++--------- 5 files changed, 28 insertions(+), 30 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 3977f7af25b7d..98c9b38f4e645 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -39,7 +39,6 @@ from pyarrow._json cimport ParseOptions as JsonParseOptions from pyarrow._json cimport ReadOptions as JsonReadOptions - _orc_fileformat = None _orc_imported = False @@ -2201,9 +2200,9 @@ cdef class JsonFileFormat(FileFormat): # Avoid mistakingly creating attributes __slots__ = () - def __init__(self,default_fragment_scan_options=None, - JsonParseOptions parse_options=None, - JsonReadOptions read_options=None): + def __init__(self, default_fragment_scan_options=None, + JsonParseOptions parse_options=None, + JsonReadOptions read_options=None): self.init(shared_ptr[CFileFormat](new CJsonFileFormat())) if parse_options is not None or read_options is not None: if default_fragment_scan_options is not None: @@ -2221,7 +2220,7 @@ cdef class JsonFileFormat(FileFormat): raise TypeError('`default_fragment_scan_options` must be either ' 'a dictionary or an instance of ' 'JsonFragmentScanOptions') - + cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) self.json_format = sp.get() @@ -2236,8 +2235,8 @@ cdef class JsonFileFormat(FileFormat): def equals(self, JsonFileFormat other): return (other and - self.default_fragment_scan_options == - other.default_fragment_scan_options) + self.default_fragment_scan_options == + other.default_fragment_scan_options) def __reduce__(self): return JsonFileFormat, (self.default_fragment_scan_options,) @@ -2263,8 +2262,8 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): # Avoid mistakingly creating attributes __slots__ = () - def __init__(self,JsonParseOptions parse_options=None, - JsonReadOptions read_options=None): + def __init__(self, JsonParseOptions parse_options=None, + JsonReadOptions read_options=None): self.init(shared_ptr[CFragmentScanOptions]( new CJsonFragmentScanOptions())) if parse_options is not None: @@ -2287,7 +2286,6 @@ cdef class JsonFragmentScanOptions(FragmentScanOptions): @property def read_options(self): return JsonReadOptions.wrap(self.json_options.read_options) - @read_options.setter def read_options(self, JsonReadOptions read_options not None): diff --git a/python/pyarrow/_json.pxd b/python/pyarrow/_json.pxd index ee784eedf1cc8..42a0a678a9b6a 100644 --- a/python/pyarrow/_json.pxd +++ b/python/pyarrow/_json.pxd @@ -31,7 +31,6 @@ cdef class ParseOptions(_Weakrefable): cdef class ReadOptions(_Weakrefable): cdef: CJSONReadOptions options - + @staticmethod cdef ReadOptions wrap(CJSONReadOptions options) - diff --git a/python/pyarrow/_json.pyx b/python/pyarrow/_json.pyx index 72c8bcb5545f8..70cde6e23fed3 100644 --- a/python/pyarrow/_json.pyx +++ b/python/pyarrow/_json.pyx @@ -81,23 +81,23 @@ cdef class ReadOptions(_Weakrefable): self.use_threads, self.block_size ) - + def equals(self, ReadOptions other): return ( self.use_threads == other.use_threads and self.block_size == other.block_size ) - + def __eq__(self, other): try: return self.equals(other) except TypeError: return False - + @staticmethod cdef ReadOptions wrap(CJSONReadOptions options): out = ReadOptions() - out.options = options #shallow copy + out.options = options # shallow copy return out @@ -210,7 +210,7 @@ cdef class ParseOptions(_Weakrefable): ) self.options.unexpected_field_behavior = v - + def equals(self, ParseOptions other): return ( self.explicit_schema == other.explicit_schema and @@ -223,13 +223,12 @@ cdef class ParseOptions(_Weakrefable): return self.equals(other) except TypeError: return False - + @staticmethod cdef ParseOptions wrap(CJSONParseOptions options): out = ParseOptions() - out.options = options #shallow copy + out.options = options # shallow copy return out - cdef _get_reader(input_file, shared_ptr[CInputStream]* out): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index a712ab9ccdc8b..4176a88c1bd9d 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -278,7 +278,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CJsonFileFormat "arrow::dataset::JsonFileFormat"(CFileFormat): pass - + cdef cppclass CJsonFragmentScanOptions "arrow::dataset::JsonFragmentScanOptions"(CFragmentScanOptions): CJSONParseOptions parse_options CJSONReadOptions read_options diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index d2c9c1c49ff2d..334a1a2be1f3b 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -806,10 +806,11 @@ def test_file_format_pickling(): ds.CsvFileFormat(read_options=pa.csv.ReadOptions( skip_rows=3, block_size=2**20)), ds.JsonFileFormat(), - ds.JsonFileFormat(parse_options=pa.json.ParseOptions(newlines_in_values=True, - unexpected_field_behavior="ignore")), + ds.JsonFileFormat( + parse_options=pa.json.ParseOptions(newlines_in_values=True, + unexpected_field_behavior="ignore")), ds.JsonFileFormat(read_options=pa.json.ReadOptions( - use_threads=False,block_size=14)), + use_threads=False, block_size=14)), ] try: formats.append(ds.OrcFileFormat()) @@ -841,10 +842,11 @@ def test_fragment_scan_options_pickling(): ds.CsvFragmentScanOptions( read_options=pa.csv.ReadOptions(block_size=2**16)), ds.JsonFragmentScanOptions(), - ds.JsonFragmentScanOptions(pa.json.ParseOptions(newlines_in_values=False, - unexpected_field_behavior="error")), ds.JsonFragmentScanOptions( - read_options=pa.json.ReadOptions(use_threads=True,block_size=512)), + pa.json.ParseOptions(newlines_in_values=False, + unexpected_field_behavior="error")), + ds.JsonFragmentScanOptions( + read_options=pa.json.ReadOptions(use_threads=True, block_size=512)), ] if pq is not None: @@ -3235,9 +3237,9 @@ def test_json_format_options(tempdir, dataset_reader): out = table.to_pandas().to_json(orient='records')[1:-1].replace('},{', '}\n{') with open(path, 'w') as f: f.write(out) - + with pytest.raises(ValueError, - match="try to increase block size"): + match="try to increase block size"): dataset = ds.dataset(path, format=ds.JsonFileFormat( read_options=pa.json.ReadOptions(block_size=4))) @@ -3257,7 +3259,7 @@ def test_json_fragment_options(tempdir, dataset_reader): f.write(out) with pytest.raises(ValueError, - match="try to increase block size"): + match="try to increase block size"): options = ds.JsonFragmentScanOptions( read_options=pa.json.ReadOptions(block_size=4)) dataset = ds.dataset(path, format=ds.JsonFileFormat(options)) From 951d88df95a2c9d40e89fa5d48879c7ff07b83c1 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 12 Apr 2023 10:14:10 -0700 Subject: [PATCH 19/19] It seems we have a cython linter now :) --- python/pyarrow/_dataset.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 98c9b38f4e645..02a70cd350fb8 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2242,7 +2242,7 @@ cdef class JsonFileFormat(FileFormat): return JsonFileFormat, (self.default_fragment_scan_options,) def __repr__(self): - return f"" + return "" cdef class JsonFragmentScanOptions(FragmentScanOptions):