-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-34484: [Substrait] add an option to disable augmented fields #41583
Changes from 4 commits
dddaf0b
c9b53f3
455a011
564f5d1
488114b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,6 +114,9 @@ struct ARROW_DS_EXPORT ScanOptions { | |
/// Note: This must be true in order for any readahead to happen | ||
bool use_threads = false; | ||
|
||
/// If true the scanner will add augmented fields to the output schema. | ||
bool add_augmented_fields = true; | ||
|
||
/// Fragment-specific scan options. | ||
std::shared_ptr<FragmentScanOptions> fragment_scan_options; | ||
|
||
|
@@ -287,10 +290,12 @@ struct ARROW_DS_EXPORT ProjectionDescr { | |
|
||
/// \brief Create a default projection referencing fields in the dataset schema | ||
static Result<ProjectionDescr> FromNames(std::vector<std::string> names, | ||
const Schema& dataset_schema); | ||
const Schema& dataset_schema, | ||
bool add_augmented_fields); | ||
|
||
/// \brief Make a projection that projects every field in the dataset schema | ||
static Result<ProjectionDescr> Default(const Schema& dataset_schema); | ||
static Result<ProjectionDescr> Default(const Schema& dataset_schema, | ||
bool add_augmented_fields); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Another alternative could be to pass through a ScanOptions data structure (still would require a default value though). |
||
}; | ||
|
||
/// \brief Utility method to set the projection expression and schema | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1064,6 +1064,86 @@ NamedTableProvider AlwaysProvideSameTable(std::shared_ptr<Table> table) { | |
}; | ||
} | ||
|
||
TEST(Substrait, ExecReadRelWithLocalFiles) { | ||
ASSERT_OK_AND_ASSIGN(std::string dir_string, | ||
arrow::internal::GetEnvVar("PARQUET_TEST_DATA")); | ||
|
||
std::string substrait_json = R"({ | ||
"relations": [ | ||
{ | ||
"root": { | ||
"input": { | ||
"read": { | ||
"common": { | ||
"direct": {} | ||
}, | ||
"baseSchema": { | ||
"names": [ | ||
"f32", | ||
"f64" | ||
], | ||
"struct": { | ||
"types": [ | ||
{ | ||
"fp32": { | ||
"nullability": "NULLABILITY_REQUIRED" | ||
} | ||
}, | ||
{ | ||
"fp64": { | ||
"nullability": "NULLABILITY_REQUIRED" | ||
} | ||
} | ||
], | ||
"nullability": "NULLABILITY_REQUIRED" | ||
} | ||
}, | ||
"localFiles": { | ||
"items": [ | ||
{ | ||
"uriFile": "file://[DIRECTORY_PLACEHOLDER]/byte_stream_split.zstd.parquet", | ||
"parquet": {} | ||
} | ||
] | ||
} | ||
} | ||
}, | ||
"names": [ | ||
"f32", | ||
"f64" | ||
] | ||
} | ||
} | ||
], | ||
"version": { | ||
"minorNumber": 42, | ||
"producer": "my-producer" | ||
} | ||
})"; | ||
const char* placeholder = "[DIRECTORY_PLACEHOLDER]"; | ||
substrait_json.replace(substrait_json.find(placeholder), strlen(placeholder), | ||
dir_string); | ||
|
||
ASSERT_OK_AND_ASSIGN(auto buf, | ||
internal::SubstraitFromJSON("Plan", substrait_json, | ||
/*ignore_unknown_fields=*/false)); | ||
|
||
ASSERT_OK_AND_ASSIGN(auto declarations, | ||
DeserializePlans(*buf, acero::NullSinkNodeConsumer::Make)); | ||
ASSERT_EQ(declarations.size(), 1); | ||
acero::Declaration* decl = &declarations[0]; | ||
ASSERT_EQ(decl->factory_name, "consuming_sink"); | ||
ASSERT_OK_AND_ASSIGN(auto plan, acero::ExecPlan::Make()); | ||
ASSERT_OK_AND_ASSIGN(auto sink_node, declarations[0].AddToPlan(plan.get())); | ||
ASSERT_STREQ(sink_node->kind_name(), "ConsumingSinkNode"); | ||
ASSERT_EQ(sink_node->num_inputs(), 1); | ||
auto& prev_node = sink_node->inputs()[0]; | ||
ASSERT_STREQ(prev_node->kind_name(), "SourceNode"); | ||
|
||
plan->StartProducing(); | ||
ASSERT_FINISHES_OK(plan->finished()); | ||
} | ||
|
||
TEST(Substrait, RelWithHint) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor nit: I think we have one or two spots in python where we have to do a column selection to workaround this issue. We can probably remove these now. e.g. https://github.com/apache/arrow/blob/main/python/pyarrow/tests/test_substrait.py#L93 |
||
ASSERT_OK_AND_ASSIGN(auto buf, | ||
internal::SubstraitFromJSON("Rel", R"({ | ||
|
@@ -2443,6 +2523,7 @@ TEST(SubstraitRoundTrip, BasicPlanEndToEnd) { | |
|
||
auto scan_options = std::make_shared<dataset::ScanOptions>(); | ||
scan_options->projection = compute::project({}, {}); | ||
scan_options->add_augmented_fields = false; | ||
const std::string filter_col_left = "shared"; | ||
const std::string filter_col_right = "distinct"; | ||
auto comp_left_value = compute::field_ref(filter_col_left); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we set a default value to ensure we don't break any existing consumers of this function?