Skip to content

Commit

Permalink
GH-35095: [C++] Prevent write after close in arrow::ipc::IpcFormatWri…
Browse files Browse the repository at this point in the history
…ter (#37783)

This addresses GH-35095 by adding a flag to IpcFormatWriter to track when a writer has been closed, and check this flag before writes.

### Rationale for this change

This addresses #35095 , preventing stream and file IPC writers from writing record batches once the IPC writer has been closed. 

### What changes are included in this PR?

Adding a flag so that an IpcFormatWriter to track when it's been closed, a check before writes in IpcFormatWriter, and two tests to confirm it works as expected. 

### Are these changes tested?

Yes, the changes are tested. The two tests were added, and the C++ test suite ran. No unexpected failures appeared. 

### Are there any user-facing changes?

Other than newly returning an invalid status when writing after close, no, there should not be any user-facing changes. 

* Closes: #35095

Lead-authored-by: Chris Jordan-Squire <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
chrisjordansquire and kou authored Sep 21, 2023
1 parent 6eb08dd commit 6cd34f3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
19 changes: 19 additions & 0 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,22 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
}
}

void TestWriteAfterClose() {
// Part of GH-35095.
std::shared_ptr<RecordBatch> batch_ints;
ASSERT_OK(MakeIntRecordBatch(&batch_ints));

auto schema = batch_ints->schema();

WriterHelper writer_helper;
ASSERT_OK(writer_helper.Init(schema, IpcWriteOptions::Defaults()));
ASSERT_OK(writer_helper.WriteBatch(batch_ints));
ASSERT_OK(writer_helper.Finish());

// Write after close raises status
ASSERT_RAISES(Invalid, writer_helper.WriteBatch(batch_ints));
}

void TestWriteDifferentSchema() {
// Test writing batches with a different schema than the RecordBatchWriter
// was initialized with.
Expand Down Expand Up @@ -1991,6 +2007,9 @@ TEST_F(TestFileFormatGenerator, DictionaryRoundTrip) { TestDictionaryRoundtrip()
TEST_F(TestFileFormatGeneratorCoalesced, DictionaryRoundTrip) {
TestDictionaryRoundtrip();
}
TEST_F(TestFileFormat, WriteAfterClose) { TestWriteAfterClose(); }

TEST_F(TestStreamFormat, WriteAfterClose) { TestWriteAfterClose(); }

TEST_F(TestStreamFormat, DifferentSchema) { TestWriteDifferentSchema(); }

Expand Down
8 changes: 7 additions & 1 deletion cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,9 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {
Status WriteRecordBatch(
const RecordBatch& batch,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata) override {
if (closed_) {
return Status::Invalid("Destination already closed");
}
if (!batch.schema()->Equals(schema_, false /* check_metadata */)) {
return Status::Invalid("Tried to write record batch with different schema");
}
Expand Down Expand Up @@ -1101,7 +1104,9 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {

Status Close() override {
RETURN_NOT_OK(CheckStarted());
return payload_writer_->Close();
RETURN_NOT_OK(payload_writer_->Close());
closed_ = true;
return Status::OK();
}

Status Start() {
Expand Down Expand Up @@ -1213,6 +1218,7 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {
std::unordered_map<int64_t, std::shared_ptr<Array>> last_dictionaries_;

bool started_ = false;
bool closed_ = false;
IpcWriteOptions options_;
WriteStats stats_;
};
Expand Down

0 comments on commit 6cd34f3

Please sign in to comment.