Skip to content

Commit

Permalink
apacheGH-35519: [C++][Parquet] Fixing exception handling in parquet F…
Browse files Browse the repository at this point in the history
…ileSerializer (apache#35520)

### Rationale for this change

Mentioned in apache#35519

### What changes are included in this PR?

Exception handling in `CheckRowsWritten`

### Are these changes tested?

No, I don't know how to mock it currently

### Are there any user-facing changes?

no

* Closes: apache#35519

Authored-by: mwish <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
mapleFU authored and rtpsw committed May 16, 2023
1 parent 125c9a6 commit a5ddb56
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,16 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
closed_ = true;
CheckRowsWritten();

for (size_t i = 0; i < column_writers_.size(); i++) {
if (column_writers_[i]) {
total_bytes_written_ += column_writers_[i]->Close();
// Avoid invalid state if ColumnWriter::Close() throws internally.
auto column_writers = std::move(column_writers_);
for (size_t i = 0; i < column_writers.size(); i++) {
if (column_writers[i]) {
total_bytes_written_ += column_writers[i]->Close();
total_compressed_bytes_written_ +=
column_writers_[i]->total_compressed_bytes_written();
column_writers_[i].reset();
column_writers[i]->total_compressed_bytes_written();
}
}

column_writers_.clear();

// Ensures all columns have been written
metadata_->set_num_rows(num_rows_);
metadata_->Finish(total_bytes_written_, row_group_ordinal_);
Expand Down Expand Up @@ -261,8 +260,10 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
}
} else if (buffered_row_group_ &&
column_writers_.size() > 0) { // when buffered_row_group = true
DCHECK(column_writers_[0] != nullptr);
int64_t current_col_rows = column_writers_[0]->rows_written();
for (int i = 1; i < static_cast<int>(column_writers_.size()); i++) {
DCHECK(column_writers_[i] != nullptr);
int64_t current_col_rows_i = column_writers_[i]->rows_written();
if (current_col_rows != current_col_rows_i) {
ThrowRowsMisMatchError(i, current_col_rows_i, current_col_rows);
Expand Down Expand Up @@ -380,15 +381,15 @@ class FileSerializer : public ParquetFileWriter::Contents {
void AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) override {
if (key_value_metadata_ == nullptr) {
key_value_metadata_ = std::move(key_value_metadata);
key_value_metadata_ = key_value_metadata;
} else if (key_value_metadata != nullptr) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
}

~FileSerializer() override {
try {
Close();
FileSerializer::Close();
} catch (...) {
}
}
Expand Down

0 comments on commit a5ddb56

Please sign in to comment.