Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-34484: [Substrait] add an option to disable augmented fields #41583

Merged
merged 5 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/acero/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ class ConsumingSinkNode : public ExecNode,
std::atomic<int32_t> backpressure_counter_ = 0;
std::unique_ptr<util::SerialSequencingQueue> sequencer_;
};

static Result<ExecNode*> MakeTableConsumingSinkNode(ExecPlan* plan,
std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest {
}
options_ = std::make_shared<ScanOptions>();
options_->dataset_schema = schema;
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::Default(*schema));
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::Default(
*schema, options_->add_augmented_fields));
SetProjection(options_.get(), std::move(projection));
ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema));
ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset_->GetFragments());
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,9 @@ TEST_F(TestParquetFileFormat, CachedMetadata) {
// Read the file the first time, will read metadata
auto options = std::make_shared<ScanOptions>();
options->filter = literal(true);
ASSERT_OK_AND_ASSIGN(auto projection_descr,
ProjectionDescr::FromNames({"x"}, *test_schema));
ASSERT_OK_AND_ASSIGN(
auto projection_descr,
ProjectionDescr::FromNames({"x"}, *test_schema, options->add_augmented_fields));
options->projected_schema = projection_descr.schema;
options->projection = projection_descr.expression;
ASSERT_OK_AND_ASSIGN(auto generator, fragment->ScanBatchesAsync(options));
Expand Down
35 changes: 23 additions & 12 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
// create the projected schema only if the provided expressions
// produces valid set of fields.
ARROW_ASSIGN_OR_RAISE(auto projection_descr,
ProjectionDescr::Default(*projected_schema));
ProjectionDescr::Default(
*projected_schema, scan_options->add_augmented_fields));
scan_options->projected_schema = std::move(projection_descr.schema);
scan_options->projection = projection_descr.expression;
ARROW_ASSIGN_OR_RAISE(scan_options->projection,
Expand All @@ -220,7 +221,8 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
// if projected_fields are not found, we default to creating the projected_schema
// and projection from the dataset_schema.
ARROW_ASSIGN_OR_RAISE(auto projection_descr,
ProjectionDescr::Default(*dataset_schema));
ProjectionDescr::Default(
*dataset_schema, scan_options->add_augmented_fields));
scan_options->projected_schema = std::move(projection_descr.schema);
scan_options->projection = projection_descr.expression;
}
Expand All @@ -231,7 +233,7 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
ARROW_ASSIGN_OR_RAISE(
auto projection_descr,
ProjectionDescr::FromNames(scan_options->projected_schema->field_names(),
*dataset_schema));
*dataset_schema, scan_options->add_augmented_fields));
scan_options->projection = projection_descr.expression;
}

Expand Down Expand Up @@ -730,7 +732,8 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
const auto options = std::make_shared<ScanOptions>(*scan_options_);
ARROW_ASSIGN_OR_RAISE(auto empty_projection,
ProjectionDescr::FromNames(std::vector<std::string>(),
*scan_options_->dataset_schema));
*scan_options_->dataset_schema,
scan_options_->add_augmented_fields));
SetProjection(options.get(), empty_projection);

auto total = std::make_shared<std::atomic<int64_t>>(0);
Expand Down Expand Up @@ -828,7 +831,8 @@ Result<ProjectionDescr> ProjectionDescr::FromExpressions(
}

Result<ProjectionDescr> ProjectionDescr::FromNames(std::vector<std::string> names,
const Schema& dataset_schema) {
const Schema& dataset_schema,
bool add_augmented_fields) {
std::vector<compute::Expression> exprs(names.size());
for (size_t i = 0; i < exprs.size(); ++i) {
// If name isn't in schema, try finding it by dotted path.
Expand All @@ -846,15 +850,19 @@ Result<ProjectionDescr> ProjectionDescr::FromNames(std::vector<std::string> name
}
}
auto fields = dataset_schema.fields();
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
if (add_augmented_fields) {
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
}
}
return ProjectionDescr::FromExpressions(std::move(exprs), std::move(names),
Schema(fields, dataset_schema.metadata()));
}

Result<ProjectionDescr> ProjectionDescr::Default(const Schema& dataset_schema) {
return ProjectionDescr::FromNames(dataset_schema.field_names(), dataset_schema);
Result<ProjectionDescr> ProjectionDescr::Default(const Schema& dataset_schema,
bool add_augmented_fields) {
return ProjectionDescr::FromNames(dataset_schema.field_names(), dataset_schema,
add_augmented_fields);
}

void SetProjection(ScanOptions* options, ProjectionDescr projection) {
Expand Down Expand Up @@ -899,7 +907,8 @@ const std::shared_ptr<Schema>& ScannerBuilder::projected_schema() const {
Status ScannerBuilder::Project(std::vector<std::string> columns) {
ARROW_ASSIGN_OR_RAISE(
auto projection,
ProjectionDescr::FromNames(std::move(columns), *scan_options_->dataset_schema));
ProjectionDescr::FromNames(std::move(columns), *scan_options_->dataset_schema,
scan_options_->add_augmented_fields));
SetProjection(scan_options_.get(), std::move(projection));
return Status::OK();
}
Expand Down Expand Up @@ -1052,8 +1061,10 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
});

auto fields = scan_options->dataset_schema->fields();
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
if (scan_options->add_augmented_fields) {
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
}
}

return acero::MakeExecNode(
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ struct ARROW_DS_EXPORT ScanOptions {
/// Note: This must be true in order for any readahead to happen
bool use_threads = false;

/// If true the scanner will add augmented fields to the output schema.
bool add_augmented_fields = true;

/// Fragment-specific scan options.
std::shared_ptr<FragmentScanOptions> fragment_scan_options;

Expand Down Expand Up @@ -287,10 +290,12 @@ struct ARROW_DS_EXPORT ProjectionDescr {

/// \brief Create a default projection referencing fields in the dataset schema
static Result<ProjectionDescr> FromNames(std::vector<std::string> names,
const Schema& dataset_schema);
const Schema& dataset_schema,
bool add_augmented_fields = true);

/// \brief Make a projection that projects every field in the dataset schema
static Result<ProjectionDescr> Default(const Schema& dataset_schema);
static Result<ProjectionDescr> Default(const Schema& dataset_schema,
bool add_augmented_fields = true);
};

/// \brief Utility method to set the projection expression and schema
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,8 @@ TEST_P(TestScanner, ProjectionDefaults) {
}
// If we only specify a projection expression then infer the projected schema
// from the projection expression
auto projection_desc = ProjectionDescr::FromNames({"i32"}, *schema_);
auto projection_desc =
ProjectionDescr::FromNames({"i32"}, *schema_, /*add_augmented_fields=*/true);
{
ARROW_SCOPED_TRACE("User only specifies projection");
options_->projection = projection_desc->expression;
Expand Down Expand Up @@ -1148,7 +1149,8 @@ TEST_P(TestScanner, ProjectedScanNestedFromNames) {
});
ASSERT_OK_AND_ASSIGN(auto descr,
ProjectionDescr::FromNames({".struct.i32", "nested.right.f64"},
*options_->dataset_schema))
*options_->dataset_schema,
options_->add_augmented_fields))
SetProjection(options_.get(), std::move(descr));
auto batch_in = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_);
auto batch_out = ConstantArrayGenerator::Zeroes(
Expand Down Expand Up @@ -2106,7 +2108,8 @@ TEST(ScanOptions, TestMaterializedFields) {

auto set_projection_from_names = [&opts](std::vector<std::string> names) {
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::FromNames(
std::move(names), *opts->dataset_schema));
std::move(names), *opts->dataset_schema,
opts->add_augmented_fields));
SetProjection(opts.get(), std::move(projection));
};

Expand Down Expand Up @@ -2160,7 +2163,8 @@ TEST(ScanOptions, TestMaterializedFields) {
// project top-level field, filter nothing
opts->filter = literal(true);
ASSERT_OK_AND_ASSIGN(projection,
ProjectionDescr::FromNames({"nested"}, *opts->dataset_schema));
ProjectionDescr::FromNames({"nested"}, *opts->dataset_schema,
opts->add_augmented_fields));
SetProjection(opts.get(), std::move(projection));
EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("nested")));

Expand Down
18 changes: 12 additions & 6 deletions cpp/src/arrow/dataset/test_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ class DatasetFixtureMixin : public ::testing::Test {
options_ = std::make_shared<ScanOptions>();
options_->dataset_schema = schema_;
ASSERT_OK_AND_ASSIGN(auto projection,
ProjectionDescr::FromNames(schema_->field_names(), *schema_));
ProjectionDescr::FromNames(schema_->field_names(), *schema_,
options_->add_augmented_fields));
SetProjection(options_.get(), std::move(projection));
SetFilter(literal(true));
}
Expand All @@ -398,7 +399,8 @@ class DatasetFixtureMixin : public ::testing::Test {
void SetProjectedColumns(std::vector<std::string> column_names) {
ASSERT_OK_AND_ASSIGN(
auto projection,
ProjectionDescr::FromNames(std::move(column_names), *options_->dataset_schema));
ProjectionDescr::FromNames(std::move(column_names), *options_->dataset_schema,
/*add_augmented_fields=*/true));
SetProjection(options_.get(), std::move(projection));
}

Expand Down Expand Up @@ -502,7 +504,8 @@ class FileFormatFixtureMixin : public ::testing::Test {
void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
opts_->dataset_schema = schema(std::move(fields));
ASSERT_OK_AND_ASSIGN(auto projection,
ProjectionDescr::Default(*opts_->dataset_schema));
ProjectionDescr::Default(*opts_->dataset_schema,
/*add_augmented_fields=*/true));
SetProjection(opts_.get(), std::move(projection));
}

Expand All @@ -512,7 +515,8 @@ class FileFormatFixtureMixin : public ::testing::Test {

void Project(std::vector<std::string> names) {
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::FromNames(
std::move(names), *opts_->dataset_schema));
std::move(names), *opts_->dataset_schema,
/*add_augmented_fields=*/true));
SetProjection(opts_.get(), std::move(projection));
}

Expand Down Expand Up @@ -993,7 +997,8 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,
auto i64 = field("i64", int64());
this->opts_->dataset_schema = schema({i32, i32, i64});
ASSERT_RAISES(Invalid,
ProjectionDescr::FromNames({"i32"}, *this->opts_->dataset_schema));
ProjectionDescr::FromNames({"i32"}, *this->opts_->dataset_schema,
/*add_augmented_fields=*/true));
}
void TestScanWithPushdownNulls() {
// Regression test for ARROW-15312
Expand Down Expand Up @@ -1933,7 +1938,8 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
scan_options_->dataset_schema = dataset_->schema();
ASSERT_OK_AND_ASSIGN(
auto projection,
ProjectionDescr::FromNames(source_schema_->field_names(), *dataset_->schema()));
ProjectionDescr::FromNames(source_schema_->field_names(), *dataset_->schema(),
scan_options_->add_augmented_fields));
SetProjection(scan_options_.get(), std::move(projection));
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/engine/substrait/relation_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&

auto scan_options = std::make_shared<dataset::ScanOptions>();
scan_options->use_threads = true;
scan_options->add_augmented_fields = false;

if (read.has_filter()) {
ARROW_ASSIGN_OR_RAISE(scan_options->filter,
Expand Down
81 changes: 81 additions & 0 deletions cpp/src/arrow/engine/substrait/serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,86 @@ NamedTableProvider AlwaysProvideSameTable(std::shared_ptr<Table> table) {
};
}

TEST(Substrait, ExecReadRelWithLocalFiles) {
ASSERT_OK_AND_ASSIGN(std::string dir_string,
arrow::internal::GetEnvVar("PARQUET_TEST_DATA"));

std::string substrait_json = R"({
"relations": [
{
"root": {
"input": {
"read": {
"common": {
"direct": {}
},
"baseSchema": {
"names": [
"f32",
"f64"
],
"struct": {
"types": [
{
"fp32": {
"nullability": "NULLABILITY_REQUIRED"
}
},
{
"fp64": {
"nullability": "NULLABILITY_REQUIRED"
}
}
],
"nullability": "NULLABILITY_REQUIRED"
}
},
"localFiles": {
"items": [
{
"uriFile": "file://[DIRECTORY_PLACEHOLDER]/byte_stream_split.zstd.parquet",
"parquet": {}
}
]
}
}
},
"names": [
"f32",
"f64"
]
}
}
],
"version": {
"minorNumber": 42,
"producer": "my-producer"
}
})";
const char* placeholder = "[DIRECTORY_PLACEHOLDER]";
substrait_json.replace(substrait_json.find(placeholder), strlen(placeholder),
dir_string);

ASSERT_OK_AND_ASSIGN(auto buf,
internal::SubstraitFromJSON("Plan", substrait_json,
/*ignore_unknown_fields=*/false));

ASSERT_OK_AND_ASSIGN(auto declarations,
DeserializePlans(*buf, acero::NullSinkNodeConsumer::Make));
ASSERT_EQ(declarations.size(), 1);
acero::Declaration* decl = &declarations[0];
ASSERT_EQ(decl->factory_name, "consuming_sink");
ASSERT_OK_AND_ASSIGN(auto plan, acero::ExecPlan::Make());
ASSERT_OK_AND_ASSIGN(auto sink_node, declarations[0].AddToPlan(plan.get()));
ASSERT_STREQ(sink_node->kind_name(), "ConsumingSinkNode");
ASSERT_EQ(sink_node->num_inputs(), 1);
auto& prev_node = sink_node->inputs()[0];
ASSERT_STREQ(prev_node->kind_name(), "SourceNode");

plan->StartProducing();
ASSERT_FINISHES_OK(plan->finished());
}

TEST(Substrait, RelWithHint) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: I think we have one or two spots in python where we have to do a column selection to workaround this issue. We can probably remove these now.

e.g. https://github.com/apache/arrow/blob/main/python/pyarrow/tests/test_substrait.py#L93

ASSERT_OK_AND_ASSIGN(auto buf,
internal::SubstraitFromJSON("Rel", R"({
Expand Down Expand Up @@ -2443,6 +2523,7 @@ TEST(SubstraitRoundTrip, BasicPlanEndToEnd) {

auto scan_options = std::make_shared<dataset::ScanOptions>();
scan_options->projection = compute::project({}, {});
scan_options->add_augmented_fields = false;
const std::string filter_col_left = "shared";
const std::string filter_col_right = "distinct";
auto comp_left_value = compute::field_ref(filter_col_left);
Expand Down
Loading