Skip to content

Commit

Permalink
GH-36819: [R] Use RunWithCapturedR for reading Parquet files (#37274)
Browse files Browse the repository at this point in the history
### Rationale for this change

When we first added RunWithCapturedR to support reading files from R connections, none of the Parquet tests seemed to call R from another thread. Because RunWithCapturedR comes with some complexity, I didn't add it anywhere it wasn't strictly needed. A recent StackOverflow post exposed that reading very large parquet files do use multiple threads and thus need RunWithCapturedR.

### What changes are included in this PR?

The two most common calls to read a parquet in which a user might trigger this failure are now wrapped in RunWithCapturedR.

### Are these changes tested?

The changes are tested in the current suite.

### Are there any user-facing changes?

No.
* Closes: #36819

Lead-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Signed-off-by: Dewey Dunnington <[email protected]>
  • Loading branch information
paleolimbot and paleolimbot authored Sep 5, 2023
1 parent a526ba6 commit b5d36e9
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions r/src/parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "./arrow_types.h"

#include "./safe-call-into-r.h"

#if defined(ARROW_R_WITH_PARQUET)

#include <arrow/table.h>
Expand Down Expand Up @@ -129,7 +131,10 @@ std::shared_ptr<parquet::arrow::FileReader> parquet___arrow___FileReader__OpenFi
std::shared_ptr<arrow::Table> parquet___arrow___FileReader__ReadTable1(
const std::shared_ptr<parquet::arrow::FileReader>& reader) {
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
auto result =
RunWithCapturedRIfPossibleVoid([&]() { return reader->ReadTable(&table); });

StopIfNotOk(result);
return table;
}

Expand All @@ -138,15 +143,21 @@ std::shared_ptr<arrow::Table> parquet___arrow___FileReader__ReadTable2(
const std::shared_ptr<parquet::arrow::FileReader>& reader,
const std::vector<int>& column_indices) {
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->ReadTable(column_indices, &table));
auto result = RunWithCapturedRIfPossibleVoid(
[&]() { return reader->ReadTable(column_indices, &table); });

StopIfNotOk(result);
return table;
}

// [[parquet::export]]
std::shared_ptr<arrow::Table> parquet___arrow___FileReader__ReadRowGroup1(
const std::shared_ptr<parquet::arrow::FileReader>& reader, int i) {
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->ReadRowGroup(i, &table));
auto result =
RunWithCapturedRIfPossibleVoid([&]() { return reader->ReadRowGroup(i, &table); });

StopIfNotOk(result);
return table;
}

Expand All @@ -155,7 +166,10 @@ std::shared_ptr<arrow::Table> parquet___arrow___FileReader__ReadRowGroup2(
const std::shared_ptr<parquet::arrow::FileReader>& reader, int i,
const std::vector<int>& column_indices) {
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->ReadRowGroup(i, column_indices, &table));
auto result = RunWithCapturedRIfPossibleVoid(
[&]() { return reader->ReadRowGroup(i, column_indices, &table); });

StopIfNotOk(result);
return table;
}

Expand All @@ -164,7 +178,10 @@ std::shared_ptr<arrow::Table> parquet___arrow___FileReader__ReadRowGroups1(
const std::shared_ptr<parquet::arrow::FileReader>& reader,
const std::vector<int>& row_groups) {
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->ReadRowGroups(row_groups, &table));
auto result = RunWithCapturedRIfPossibleVoid(
[&]() { return reader->ReadRowGroups(row_groups, &table); });

StopIfNotOk(result);
return table;
}

Expand All @@ -173,7 +190,10 @@ std::shared_ptr<arrow::Table> parquet___arrow___FileReader__ReadRowGroups2(
const std::shared_ptr<parquet::arrow::FileReader>& reader,
const std::vector<int>& row_groups, const std::vector<int>& column_indices) {
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->ReadRowGroups(row_groups, column_indices, &table));
auto result = RunWithCapturedRIfPossibleVoid(
[&]() { return reader->ReadRowGroups(row_groups, column_indices, &table); });

StopIfNotOk(result);
return table;
}

Expand All @@ -199,7 +219,10 @@ int parquet___arrow___FileReader__num_row_groups(
std::shared_ptr<arrow::ChunkedArray> parquet___arrow___FileReader__ReadColumn(
const std::shared_ptr<parquet::arrow::FileReader>& reader, int i) {
std::shared_ptr<arrow::ChunkedArray> array;
PARQUET_THROW_NOT_OK(reader->ReadColumn(i - 1, &array));
auto result =
RunWithCapturedRIfPossibleVoid([&]() { return reader->ReadColumn(i - 1, &array); });

StopIfNotOk(result);
return array;
}

Expand Down

0 comments on commit b5d36e9

Please sign in to comment.