From 3e0ca5b7fd7c0fbe0dbec6c4bf5652a177baefad Mon Sep 17 00:00:00 2001 From: Quang Hoang Date: Thu, 16 Nov 2023 19:36:09 +0700 Subject: [PATCH] GH-37969: [C++][Parquet] add more closed file checks for ParquetFileWriter (#38390) ### Rationale for this change Operations on closed ParquetFileWriter are not allowed, but should not segfault. Somehow, ParquetFileWriter::Close() also reset its pimpl, so after that, any operators, those need this pointer will lead to segfault ### What changes are included in this PR? Adding more checks for closed file. ### Are these changes tested? Yes. ### Are there any user-facing changes? No. * Closes: #37969 Authored-by: Quang Hoang Signed-off-by: Antoine Pitrou --- .../parquet/arrow/arrow_reader_writer_test.cc | 27 +++++++++++++++++++ cpp/src/parquet/arrow/writer.cc | 12 +++++++++ cpp/src/parquet/file_writer.cc | 6 ++++- 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index fb9e53870583c..a314ecbf747e7 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -5221,6 +5221,33 @@ TEST(TestArrowReadWrite, FuzzReader) { } } +// Test writing table with a closed writer, should not segfault (GH-37969). +TEST(TestArrowReadWrite, OperationsOnClosedWriter) { + // A sample table, type and structure does not matter in this test case + auto schema = ::arrow::schema({::arrow::field("letter", ::arrow::utf8())}); + auto table = ::arrow::Table::Make( + schema, {::arrow::ArrayFromJSON(::arrow::utf8(), R"(["a", "b", "c"])")}); + + auto sink = CreateOutputStream(); + ASSERT_OK_AND_ASSIGN(auto writer, parquet::arrow::FileWriter::Open( + *schema, ::arrow::default_memory_pool(), sink, + parquet::default_writer_properties(), + parquet::default_arrow_writer_properties())); + + // Should be ok + ASSERT_OK(writer->WriteTable(*table, 1)); + + // Operations on closed writer are invalid + ASSERT_OK(writer->Close()); + + ASSERT_RAISES(Invalid, writer->NewRowGroup(1)); + ASSERT_RAISES(Invalid, writer->WriteColumnChunk(table->column(0), 0, 1)); + ASSERT_RAISES(Invalid, writer->NewBufferedRowGroup()); + ASSERT_OK_AND_ASSIGN(auto record_batch, table->CombineChunksToBatch()); + ASSERT_RAISES(Invalid, writer->WriteRecordBatch(*record_batch)); + ASSERT_RAISES(Invalid, writer->WriteTable(*table, 1)); +} + namespace { struct ColumnIndexObject { diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 0c67e8d6bb3d4..300a6d8e054cc 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -306,6 +306,7 @@ class FileWriterImpl : public FileWriter { } Status NewRowGroup(int64_t chunk_size) override { + RETURN_NOT_OK(CheckClosed()); if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } @@ -325,6 +326,13 @@ class FileWriterImpl : public FileWriter { return Status::OK(); } + Status CheckClosed() const { + if (closed_) { + return Status::Invalid("Operation on closed file"); + } + return Status::OK(); + } + Status WriteColumnChunk(const Array& data) override { // A bit awkward here since cannot instantiate ChunkedArray from const Array& auto chunk = ::arrow::MakeArray(data.data()); @@ -334,6 +342,7 @@ class FileWriterImpl : public FileWriter { Status WriteColumnChunk(const std::shared_ptr& data, int64_t offset, int64_t size) override { + RETURN_NOT_OK(CheckClosed()); if (arrow_properties_->engine_version() == ArrowWriterProperties::V2 || arrow_properties_->engine_version() == ArrowWriterProperties::V1) { if (row_group_writer_->buffered()) { @@ -356,6 +365,7 @@ class FileWriterImpl : public FileWriter { std::shared_ptr<::arrow::Schema> schema() const override { return schema_; } Status WriteTable(const Table& table, int64_t chunk_size) override { + RETURN_NOT_OK(CheckClosed()); RETURN_NOT_OK(table.Validate()); if (chunk_size <= 0 && table.num_rows() > 0) { @@ -392,6 +402,7 @@ class FileWriterImpl : public FileWriter { } Status NewBufferedRowGroup() override { + RETURN_NOT_OK(CheckClosed()); if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } @@ -400,6 +411,7 @@ class FileWriterImpl : public FileWriter { } Status WriteRecordBatch(const RecordBatch& batch) override { + RETURN_NOT_OK(CheckClosed()); if (batch.num_rows() == 0) { return Status::OK(); } diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index 9a92d4525d23d..5502e1f94a9d0 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -656,7 +656,11 @@ void ParquetFileWriter::AddKeyValueMetadata( } const std::shared_ptr& ParquetFileWriter::properties() const { - return contents_->properties(); + if (contents_) { + return contents_->properties(); + } else { + throw ParquetException("Cannot get properties from closed file"); + } } } // namespace parquet