diff --git a/python/pyarrow/_csv.pxd b/python/pyarrow/_csv.pxd index f8e12f16bc869..030c4610e5de6 100644 --- a/python/pyarrow/_csv.pxd +++ b/python/pyarrow/_csv.pxd @@ -23,7 +23,7 @@ from pyarrow.lib cimport _Weakrefable cdef class ConvertOptions(_Weakrefable): cdef: - CCSVConvertOptions options + unique_ptr[CCSVConvertOptions] options @staticmethod cdef ConvertOptions wrap(CCSVConvertOptions options) @@ -31,7 +31,7 @@ cdef class ConvertOptions(_Weakrefable): cdef class ParseOptions(_Weakrefable): cdef: - CCSVParseOptions options + unique_ptr[CCSVParseOptions] options @staticmethod cdef ParseOptions wrap(CCSVParseOptions options) @@ -39,7 +39,7 @@ cdef class ParseOptions(_Weakrefable): cdef class ReadOptions(_Weakrefable): cdef: - CCSVReadOptions options + unique_ptr[CCSVReadOptions] options public object encoding @staticmethod diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index a330664f641ef..04b9cfd2bcd31 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -77,10 +77,13 @@ cdef class ReadOptions(_Weakrefable): # Avoid mistakingly creating attributes __slots__ = () + # __init__() is not called when unpickling, initialize storage here + def __cinit__(self, *argw, **kwargs): + self.options.reset(new CCSVReadOptions(CCSVReadOptions.Defaults())) + def __init__(self, *, use_threads=None, block_size=None, skip_rows=None, column_names=None, autogenerate_column_names=None, encoding='utf8'): - self.options = CCSVReadOptions.Defaults() if use_threads is not None: self.use_threads = use_threads if block_size is not None: @@ -99,11 +102,11 @@ cdef class ReadOptions(_Weakrefable): """ Whether to use multiple threads to accelerate reading. """ - return self.options.use_threads + return deref(self.options).use_threads @use_threads.setter def use_threads(self, value): - self.options.use_threads = value + deref(self.options).use_threads = value @property def block_size(self): @@ -112,11 +115,11 @@ cdef class ReadOptions(_Weakrefable): This will determine multi-threading granularity as well as the size of individual record batches or table chunks. """ - return self.options.block_size + return deref(self.options).block_size @block_size.setter def block_size(self, value): - self.options.block_size = value + deref(self.options).block_size = value @property def skip_rows(self): @@ -124,11 +127,11 @@ cdef class ReadOptions(_Weakrefable): The number of rows to skip before the column names (if any) and the CSV data. """ - return self.options.skip_rows + return deref(self.options).skip_rows @skip_rows.setter def skip_rows(self, value): - self.options.skip_rows = value + deref(self.options).skip_rows = value @property def column_names(self): @@ -136,13 +139,13 @@ cdef class ReadOptions(_Weakrefable): The column names of the target table. If empty, fall back on `autogenerate_column_names`. """ - return [frombytes(s) for s in self.options.column_names] + return [frombytes(s) for s in deref(self.options).column_names] @column_names.setter def column_names(self, value): - self.options.column_names.clear() + deref(self.options).column_names.clear() for item in value: - self.options.column_names.push_back(tobytes(item)) + deref(self.options).column_names.push_back(tobytes(item)) @property def autogenerate_column_names(self): @@ -152,11 +155,11 @@ cdef class ReadOptions(_Weakrefable): If false, column names will be read from the first CSV row after `skip_rows`. """ - return self.options.autogenerate_column_names + return deref(self.options).autogenerate_column_names @autogenerate_column_names.setter def autogenerate_column_names(self, value): - self.options.autogenerate_column_names = value + deref(self.options).autogenerate_column_names = value def equals(self, ReadOptions other): return ( @@ -172,7 +175,7 @@ cdef class ReadOptions(_Weakrefable): @staticmethod cdef ReadOptions wrap(CCSVReadOptions options): out = ReadOptions() - out.options = options + out.options.reset(new CCSVReadOptions(move(options))) out.encoding = 'utf8' # No way to know this return out @@ -221,10 +224,12 @@ cdef class ParseOptions(_Weakrefable): """ __slots__ = () + def __cinit__(self, *argw, **kwargs): + self.options.reset(new CCSVParseOptions(CCSVParseOptions.Defaults())) + def __init__(self, *, delimiter=None, quote_char=None, double_quote=None, escape_char=None, newlines_in_values=None, ignore_empty_lines=None): - self.options = CCSVParseOptions.Defaults() if delimiter is not None: self.delimiter = delimiter if quote_char is not None: @@ -243,11 +248,11 @@ cdef class ParseOptions(_Weakrefable): """ The character delimiting individual cells in the CSV data. """ - return chr(self.options.delimiter) + return chr(deref(self.options).delimiter) @delimiter.setter def delimiter(self, value): - self.options.delimiter = _single_char(value) + deref(self.options).delimiter = _single_char(value) @property def quote_char(self): @@ -255,18 +260,18 @@ cdef class ParseOptions(_Weakrefable): The character used optionally for quoting CSV values (False if quoting is not allowed). """ - if self.options.quoting: - return chr(self.options.quote_char) + if deref(self.options).quoting: + return chr(deref(self.options).quote_char) else: return False @quote_char.setter def quote_char(self, value): if value is False: - self.options.quoting = False + deref(self.options).quoting = False else: - self.options.quote_char = _single_char(value) - self.options.quoting = True + deref(self.options).quote_char = _single_char(value) + deref(self.options).quoting = True @property def double_quote(self): @@ -274,11 +279,11 @@ cdef class ParseOptions(_Weakrefable): Whether two quotes in a quoted CSV value denote a single quote in the data. """ - return self.options.double_quote + return deref(self.options).double_quote @double_quote.setter def double_quote(self, value): - self.options.double_quote = value + deref(self.options).double_quote = value @property def escape_char(self): @@ -286,18 +291,18 @@ cdef class ParseOptions(_Weakrefable): The character used optionally for escaping special characters (False if escaping is not allowed). """ - if self.options.escaping: - return chr(self.options.escape_char) + if deref(self.options).escaping: + return chr(deref(self.options).escape_char) else: return False @escape_char.setter def escape_char(self, value): if value is False: - self.options.escaping = False + deref(self.options).escaping = False else: - self.options.escape_char = _single_char(value) - self.options.escaping = True + deref(self.options).escape_char = _single_char(value) + deref(self.options).escaping = True @property def newlines_in_values(self): @@ -306,11 +311,11 @@ cdef class ParseOptions(_Weakrefable): Setting this to True reduces the performance of multi-threaded CSV reading. """ - return self.options.newlines_in_values + return deref(self.options).newlines_in_values @newlines_in_values.setter def newlines_in_values(self, value): - self.options.newlines_in_values = value + deref(self.options).newlines_in_values = value @property def ignore_empty_lines(self): @@ -319,11 +324,11 @@ cdef class ParseOptions(_Weakrefable): If False, an empty line is interpreted as containing a single empty value (assuming a one-column CSV file). """ - return self.options.ignore_empty_lines + return deref(self.options).ignore_empty_lines @ignore_empty_lines.setter def ignore_empty_lines(self, value): - self.options.ignore_empty_lines = value + deref(self.options).ignore_empty_lines = value def equals(self, ParseOptions other): return ( @@ -338,7 +343,7 @@ cdef class ParseOptions(_Weakrefable): @staticmethod cdef ParseOptions wrap(CCSVParseOptions options): out = ParseOptions() - out.options = options + out.options.reset(new CCSVParseOptions(move(options))) return out def __getstate__(self): @@ -431,12 +436,15 @@ cdef class ConvertOptions(_Weakrefable): # Avoid mistakingly creating attributes __slots__ = () + def __cinit__(self, *argw, **kwargs): + self.options.reset( + new CCSVConvertOptions(CCSVConvertOptions.Defaults())) + def __init__(self, *, check_utf8=None, column_types=None, null_values=None, true_values=None, false_values=None, strings_can_be_null=None, include_columns=None, include_missing_columns=None, auto_dict_encode=None, auto_dict_max_cardinality=None, timestamp_parsers=None): - self.options = CCSVConvertOptions.Defaults() if check_utf8 is not None: self.check_utf8 = check_utf8 if column_types is not None: @@ -465,22 +473,22 @@ cdef class ConvertOptions(_Weakrefable): """ Whether to check UTF8 validity of string columns. """ - return self.options.check_utf8 + return deref(self.options).check_utf8 @check_utf8.setter def check_utf8(self, value): - self.options.check_utf8 = value + deref(self.options).check_utf8 = value @property def strings_can_be_null(self): """ Whether string / binary columns can have null values. """ - return self.options.strings_can_be_null + return deref(self.options).strings_can_be_null @strings_can_be_null.setter def strings_can_be_null(self, value): - self.options.strings_can_be_null = value + deref(self.options).strings_can_be_null = value @property def column_types(self): @@ -488,7 +496,7 @@ cdef class ConvertOptions(_Weakrefable): Explicitly map column names to column types. """ d = {frombytes(item.first): pyarrow_wrap_data_type(item.second) - for item in self.options.column_types} + for item in deref(self.options).column_types} return d @column_types.setter @@ -499,7 +507,7 @@ cdef class ConvertOptions(_Weakrefable): if isinstance(value, Mapping): value = value.items() - self.options.column_types.clear() + deref(self.options).column_types.clear() for item in value: if isinstance(item, Field): k = item.name @@ -508,51 +516,51 @@ cdef class ConvertOptions(_Weakrefable): k, v = item typ = pyarrow_unwrap_data_type(ensure_type(v)) assert typ != NULL - self.options.column_types[tobytes(k)] = typ + deref(self.options).column_types[tobytes(k)] = typ @property def null_values(self): """ A sequence of strings that denote nulls in the data. """ - return [frombytes(x) for x in self.options.null_values] + return [frombytes(x) for x in deref(self.options).null_values] @null_values.setter def null_values(self, value): - self.options.null_values = [tobytes(x) for x in value] + deref(self.options).null_values = [tobytes(x) for x in value] @property def true_values(self): """ A sequence of strings that denote true booleans in the data. """ - return [frombytes(x) for x in self.options.true_values] + return [frombytes(x) for x in deref(self.options).true_values] @true_values.setter def true_values(self, value): - self.options.true_values = [tobytes(x) for x in value] + deref(self.options).true_values = [tobytes(x) for x in value] @property def false_values(self): """ A sequence of strings that denote false booleans in the data. """ - return [frombytes(x) for x in self.options.false_values] + return [frombytes(x) for x in deref(self.options).false_values] @false_values.setter def false_values(self, value): - self.options.false_values = [tobytes(x) for x in value] + deref(self.options).false_values = [tobytes(x) for x in value] @property def auto_dict_encode(self): """ Whether to try to automatically dict-encode string / binary data. """ - return self.options.auto_dict_encode + return deref(self.options).auto_dict_encode @auto_dict_encode.setter def auto_dict_encode(self, value): - self.options.auto_dict_encode = value + deref(self.options).auto_dict_encode = value @property def auto_dict_max_cardinality(self): @@ -561,11 +569,11 @@ cdef class ConvertOptions(_Weakrefable): This value is per chunk. """ - return self.options.auto_dict_max_cardinality + return deref(self.options).auto_dict_max_cardinality @auto_dict_max_cardinality.setter def auto_dict_max_cardinality(self, value): - self.options.auto_dict_max_cardinality = value + deref(self.options).auto_dict_max_cardinality = value @property def include_columns(self): @@ -575,13 +583,13 @@ cdef class ConvertOptions(_Weakrefable): If empty, the Table will include all columns from the CSV file. If not empty, only these columns will be included, in this order. """ - return [frombytes(s) for s in self.options.include_columns] + return [frombytes(s) for s in deref(self.options).include_columns] @include_columns.setter def include_columns(self, value): - self.options.include_columns.clear() + deref(self.options).include_columns.clear() for item in value: - self.options.include_columns.push_back(tobytes(item)) + deref(self.options).include_columns.push_back(tobytes(item)) @property def include_missing_columns(self): @@ -593,11 +601,11 @@ cdef class ConvertOptions(_Weakrefable): or null by default). This option is ignored if `include_columns` is empty. """ - return self.options.include_missing_columns + return deref(self.options).include_missing_columns @include_missing_columns.setter def include_missing_columns(self, value): - self.options.include_missing_columns = value + deref(self.options).include_missing_columns = value @property def timestamp_parsers(self): @@ -612,7 +620,7 @@ cdef class ConvertOptions(_Weakrefable): c_string kind parsers = [] - for c_parser in self.options.timestamp_parsers: + for c_parser in deref(self.options).timestamp_parsers: kind = deref(c_parser).kind() if kind == b'strptime': parsers.append(frombytes(deref(c_parser).format())) @@ -635,12 +643,12 @@ cdef class ConvertOptions(_Weakrefable): else: raise TypeError("Expected list of str or ISO8601 objects") - self.options.timestamp_parsers = move(c_parsers) + deref(self.options).timestamp_parsers = move(c_parsers) @staticmethod cdef ConvertOptions wrap(CCSVConvertOptions options): out = ConvertOptions() - out.options = options + out.options.reset(new CCSVConvertOptions(move(options))) return out def equals(self, ConvertOptions other): @@ -694,14 +702,14 @@ cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out): if read_options is None: out[0] = CCSVReadOptions.Defaults() else: - out[0] = read_options.options + out[0] = deref(read_options.options) cdef _get_parse_options(ParseOptions parse_options, CCSVParseOptions* out): if parse_options is None: out[0] = CCSVParseOptions.Defaults() else: - out[0] = parse_options.options + out[0] = deref(parse_options.options) cdef _get_convert_options(ConvertOptions convert_options, @@ -709,7 +717,7 @@ cdef _get_convert_options(ConvertOptions convert_options, if convert_options is None: out[0] = CCSVConvertOptions.Defaults() else: - out[0] = convert_options.options + out[0] = deref(convert_options.options) cdef class CSVStreamingReader(RecordBatchReader): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 44f016f521877..0b6c695235cf4 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1710,7 +1710,7 @@ cdef class CsvFileFormat(FileFormat): @parse_options.setter def parse_options(self, ParseOptions parse_options not None): - self.csv_format.parse_options = parse_options.options + self.csv_format.parse_options = deref(parse_options.options) cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): if options.type_name == 'csv': @@ -1760,7 +1760,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): @convert_options.setter def convert_options(self, ConvertOptions convert_options not None): - self.csv_options.convert_options = convert_options.options + self.csv_options.convert_options = deref(convert_options.options) @property def read_options(self): @@ -1768,7 +1768,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): @read_options.setter def read_options(self, ReadOptions read_options not None): - self.csv_options.read_options = read_options.options + self.csv_options.read_options = deref(read_options.options) def equals(self, CsvFragmentScanOptions other): return ( diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 8f0f973a79140..dec0038a0e475 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1583,6 +1583,9 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: c_bool newlines_in_values c_bool ignore_empty_lines + CCSVParseOptions() + CCSVParseOptions(CCSVParseOptions&&) + @staticmethod CCSVParseOptions Defaults() @@ -1601,6 +1604,9 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: vector[c_string] include_columns c_bool include_missing_columns + CCSVConvertOptions() + CCSVConvertOptions(CCSVConvertOptions&&) + @staticmethod CCSVConvertOptions Defaults() @@ -1611,6 +1617,9 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: vector[c_string] column_names c_bool autogenerate_column_names + CCSVReadOptions() + CCSVReadOptions(CCSVReadOptions&&) + @staticmethod CCSVReadOptions Defaults() diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 395f9486315f5..4e95ab3bd6015 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -97,6 +97,7 @@ def check_options_class(cls, **attr_values): assert getattr(opts, name) == value +# The various options classes need to be picklable for dataset def check_options_class_pickling(cls, **attr_values): opts = cls(**attr_values) new_opts = pickle.loads(pickle.dumps(opts, @@ -115,6 +116,12 @@ def test_read_options(): autogenerate_column_names=[False, True], encoding=['utf8', 'utf16']) + check_options_class_pickling(cls, use_threads=True, + skip_rows=3, + column_names=["ab", "cd"], + autogenerate_column_names=False, + encoding='utf16') + assert opts.block_size > 0 opts.block_size = 12345 assert opts.block_size == 12345 @@ -133,7 +140,6 @@ def test_parse_options(): newlines_in_values=[False, True], ignore_empty_lines=[True, False]) - # ParseOptions needs to be picklable for dataset check_options_class_pickling(cls, delimiter='x', escape_char='y', quote_char=False, @@ -154,6 +160,14 @@ def test_convert_options(): auto_dict_encode=[False, True], timestamp_parsers=[[], [ISO8601, '%y-%m']]) + check_options_class_pickling( + cls, check_utf8=True, + strings_can_be_null=False, + include_columns=['def', 'abc'], + include_missing_columns=False, + auto_dict_encode=True, + timestamp_parsers=[ISO8601, '%y-%m']) + assert opts.auto_dict_max_cardinality > 0 opts.auto_dict_max_cardinality = 99999 assert opts.auto_dict_max_cardinality == 99999