Skip to content

Commit

Permalink
ARROW-13763: [Python] Close files in ParquetFile & ParquetDatasetPiece (
Browse files Browse the repository at this point in the history
#13821)

Will fix [ARROW-13763](https://issues.apache.org/jira/browse/ARROW-13763)

A separate Jira issue will be made to address closing files in V2 ParquetDataset, which needs to be handled in the C++ layer. 

Adds context manager to `pq.ParquetFile` to close input file, and ensure reads within `pq.ParquetDataset` and `pq.read_table` are closed.

```python

# user opened file-like object will not be closed
with open('file.parquet', 'rb') as f:
    with pq.ParquetFile(f) as p:
        table = p.read()
        assert not f.closed  # did not inadvertently close the open file
        assert not p.closed
    assert not f.closed      # parquet context exit didn't close it
    assert not p.closed      # references the input file status
assert f.closed              # normal context exit close
assert p.closed              # ...

# path-like will be closed upon exit or `ParquetFile.close`
with pq.ParquetFile('file.parquet') as p:
    table = p.read()
    assert not p.closed
assert p.closed
```

Authored-by: Miles Granger <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
milesgranger authored Aug 17, 2022
1 parent b0422e5 commit 951663a
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 6 deletions.
19 changes: 16 additions & 3 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,7 @@ cdef class ParquetReader(_Weakrefable):
CMemoryPool* pool
unique_ptr[FileReader] reader
FileMetaData _metadata
shared_ptr[CRandomAccessFile] rd_handle

cdef public:
_column_idx_map
Expand All @@ -1175,7 +1176,6 @@ cdef class ParquetReader(_Weakrefable):
thrift_string_size_limit=None,
thrift_container_size_limit=None):
cdef:
shared_ptr[CRandomAccessFile] rd_handle
shared_ptr[CFileMetaData] c_metadata
CReaderProperties properties = default_reader_properties()
ArrowReaderProperties arrow_props = (
Expand Down Expand Up @@ -1221,10 +1221,10 @@ cdef class ParquetReader(_Weakrefable):
string_to_timeunit(coerce_int96_timestamp_unit))

self.source = source
get_reader(source, use_memory_map, &self.rd_handle)

get_reader(source, use_memory_map, &rd_handle)
with nogil:
check_status(builder.Open(rd_handle, properties, c_metadata))
check_status(builder.Open(self.rd_handle, properties, c_metadata))

# Set up metadata
with nogil:
Expand Down Expand Up @@ -1435,6 +1435,19 @@ cdef class ParquetReader(_Weakrefable):
.ReadColumn(column_index, &out))
return pyarrow_wrap_chunked_array(out)

def close(self):
if not self.closed:
with nogil:
check_status(self.rd_handle.get().Close())

@property
def closed(self):
if self.rd_handle == NULL:
return True
with nogil:
closed = self.rd_handle.get().closed()
return closed


cdef shared_ptr[WriterProperties] _create_writer_properties(
use_dictionary=None,
Expand Down
25 changes: 22 additions & 3 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,16 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
)
self._close_source = getattr(source, 'closed', True)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.close()

def _build_nested_paths(self):
paths = self.reader.column_paths

Expand Down Expand Up @@ -376,6 +383,14 @@ def num_row_groups(self):
"""
return self.reader.num_row_groups

def close(self, force: bool = False):
if self._close_source or force:
self.reader.close()

@property
def closed(self) -> bool:
return self.reader.closed

def read_row_group(self, i, columns=None, use_threads=True,
use_pandas_metadata=False):
"""
Expand Down Expand Up @@ -1129,8 +1144,8 @@ def get_metadata(self):
-------
metadata : FileMetaData
"""
f = self.open()
return f.metadata
with self.open() as parquet:
return parquet.metadata

def open(self):
"""
Expand Down Expand Up @@ -1204,6 +1219,9 @@ def read(self, columns=None, use_threads=True, partitions=None,
arr = pa.DictionaryArray.from_arrays(indices, dictionary)
table = table.append_column(name, arr)

# To ParquetFile the source looked like it was already open, so won't
# actually close it without overriding.
reader.close(force=True)
return table


Expand Down Expand Up @@ -1890,7 +1908,8 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
"""
tables = []
for piece in self._pieces:
table = piece.read(columns=columns, use_threads=use_threads,
table = piece.read(columns=columns,
use_threads=use_threads,
partitions=self._partitions,
use_pandas_metadata=use_pandas_metadata)
tables.append(table)
Expand Down
52 changes: 52 additions & 0 deletions python/pyarrow/tests/parquet/test_parquet_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io
import os
from unittest import mock

import pytest

Expand Down Expand Up @@ -277,3 +278,54 @@ def test_pre_buffer(pre_buffer):
buf.seek(0)
pf = pq.ParquetFile(buf, pre_buffer=pre_buffer)
assert pf.read().num_rows == N


def test_parquet_file_explicitly_closed(tempdir):
"""
Unopened files should be closed explicitly after use,
and previously opened files should be left open.
Applies to read_table, ParquetDataset, and ParquetFile
"""
# create test parquet file
fn = tempdir.joinpath('file.parquet')
table = pa.table({'col1': [0, 1], 'col2': [0, 1]})
pq.write_table(table, fn)

# read_table (legacy) with opened file (will leave open)
with open(fn, 'rb') as f:
pq.read_table(f, use_legacy_dataset=True)
assert not f.closed # Didn't close it internally after read_table

# read_table (legacy) with unopened file (will close)
with mock.patch.object(pq.ParquetFile, "close") as mock_close:
pq.read_table(fn, use_legacy_dataset=True)
mock_close.assert_called()

# ParquetDataset test (legacy) with unopened file (will close)
with mock.patch.object(pq.ParquetFile, "close") as mock_close:
pq.ParquetDataset(fn, use_legacy_dataset=True).read()
mock_close.assert_called()

# ParquetDataset test (legacy) with opened file (will leave open)
with open(fn, 'rb') as f:
# ARROW-8075: support ParquetDataset from file-like, not just path-like
with pytest.raises(TypeError, match='not a path-like object'):
pq.ParquetDataset(f, use_legacy_dataset=True).read()
assert not f.closed

# ParquetFile with opened file (will leave open)
with open(fn, 'rb') as f:
with pq.ParquetFile(f) as p:
p.read()
assert not f.closed
assert not p.closed
assert not f.closed # opened input file was not closed
assert not p.closed # parquet file obj reports as not closed
assert f.closed
assert p.closed # parquet file being closed reflects underlying file

# ParquetFile with unopened file (will close)
with pq.ParquetFile(fn) as p:
p.read()
assert not p.closed
assert p.closed # parquet file obj reports as closed

0 comments on commit 951663a

Please sign in to comment.