Skip to content

Commit

Permalink
GH-37969: [C++][Parquet] add more closed file checks for ParquetFileW…
Browse files Browse the repository at this point in the history
…riter (#38390)

### Rationale for this change
Operations on closed ParquetFileWriter are not allowed, but should not segfault. Somehow, ParquetFileWriter::Close() also reset its pimpl, so after that, any operators, those need this pointer will lead to segfault

### What changes are included in this PR?
Adding more checks for closed file.

### Are these changes tested?
Yes.

### Are there any user-facing changes?
No.

* Closes: #37969

Authored-by: Quang Hoang <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
quanghgx authored Nov 16, 2023
1 parent 62cf42b commit 3e0ca5b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
27 changes: 27 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5221,6 +5221,33 @@ TEST(TestArrowReadWrite, FuzzReader) {
}
}

// Test writing table with a closed writer, should not segfault (GH-37969).
TEST(TestArrowReadWrite, OperationsOnClosedWriter) {
// A sample table, type and structure does not matter in this test case
auto schema = ::arrow::schema({::arrow::field("letter", ::arrow::utf8())});
auto table = ::arrow::Table::Make(
schema, {::arrow::ArrayFromJSON(::arrow::utf8(), R"(["a", "b", "c"])")});

auto sink = CreateOutputStream();
ASSERT_OK_AND_ASSIGN(auto writer, parquet::arrow::FileWriter::Open(
*schema, ::arrow::default_memory_pool(), sink,
parquet::default_writer_properties(),
parquet::default_arrow_writer_properties()));

// Should be ok
ASSERT_OK(writer->WriteTable(*table, 1));

// Operations on closed writer are invalid
ASSERT_OK(writer->Close());

ASSERT_RAISES(Invalid, writer->NewRowGroup(1));
ASSERT_RAISES(Invalid, writer->WriteColumnChunk(table->column(0), 0, 1));
ASSERT_RAISES(Invalid, writer->NewBufferedRowGroup());
ASSERT_OK_AND_ASSIGN(auto record_batch, table->CombineChunksToBatch());
ASSERT_RAISES(Invalid, writer->WriteRecordBatch(*record_batch));
ASSERT_RAISES(Invalid, writer->WriteTable(*table, 1));
}

namespace {

struct ColumnIndexObject {
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ class FileWriterImpl : public FileWriter {
}

Status NewRowGroup(int64_t chunk_size) override {
RETURN_NOT_OK(CheckClosed());
if (row_group_writer_ != nullptr) {
PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
}
Expand All @@ -325,6 +326,13 @@ class FileWriterImpl : public FileWriter {
return Status::OK();
}

Status CheckClosed() const {
if (closed_) {
return Status::Invalid("Operation on closed file");
}
return Status::OK();
}

Status WriteColumnChunk(const Array& data) override {
// A bit awkward here since cannot instantiate ChunkedArray from const Array&
auto chunk = ::arrow::MakeArray(data.data());
Expand All @@ -334,6 +342,7 @@ class FileWriterImpl : public FileWriter {

Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t offset,
int64_t size) override {
RETURN_NOT_OK(CheckClosed());
if (arrow_properties_->engine_version() == ArrowWriterProperties::V2 ||
arrow_properties_->engine_version() == ArrowWriterProperties::V1) {
if (row_group_writer_->buffered()) {
Expand All @@ -356,6 +365,7 @@ class FileWriterImpl : public FileWriter {
std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }

Status WriteTable(const Table& table, int64_t chunk_size) override {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(table.Validate());

if (chunk_size <= 0 && table.num_rows() > 0) {
Expand Down Expand Up @@ -392,6 +402,7 @@ class FileWriterImpl : public FileWriter {
}

Status NewBufferedRowGroup() override {
RETURN_NOT_OK(CheckClosed());
if (row_group_writer_ != nullptr) {
PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
}
Expand All @@ -400,6 +411,7 @@ class FileWriterImpl : public FileWriter {
}

Status WriteRecordBatch(const RecordBatch& batch) override {
RETURN_NOT_OK(CheckClosed());
if (batch.num_rows() == 0) {
return Status::OK();
}
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,11 @@ void ParquetFileWriter::AddKeyValueMetadata(
}

const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
return contents_->properties();
if (contents_) {
return contents_->properties();
} else {
throw ParquetException("Cannot get properties from closed file");
}
}

} // namespace parquet

0 comments on commit 3e0ca5b

Please sign in to comment.