Skip to content

Commit

Permalink
apacheGH-34484: [Substrait] add an option to disable augmented fields (
Browse files Browse the repository at this point in the history
…apache#41583)

### Rationale for this change

Augmented fields interfere with the schema passing between nodes.  When enabled they cause names/schema mismatching at the end of the plan.

### What changes are included in this PR?

Adds an option to disable augmented fields (defaulting to adding them), connects it everywhere it is called, and disables it in ReadRel conversion.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

There are no API related changes however this will allow Substrait plans that consume local files to work without requiring a project/emit relation after the read relation to remove the unexpected fields.

* GitHub Issue: apache#34484

Authored-by: David Sisson <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
EpsilonPrime authored and vibhatha committed May 25, 2024
1 parent 319a4f8 commit 1dffbc7
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 27 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/acero/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ class ConsumingSinkNode : public ExecNode,
std::atomic<int32_t> backpressure_counter_ = 0;
std::unique_ptr<util::SerialSequencingQueue> sequencer_;
};

static Result<ExecNode*> MakeTableConsumingSinkNode(ExecPlan* plan,
std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest {
}
options_ = std::make_shared<ScanOptions>();
options_->dataset_schema = schema;
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::Default(*schema));
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::Default(
*schema, options_->add_augmented_fields));
SetProjection(options_.get(), std::move(projection));
ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema));
ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset_->GetFragments());
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,9 @@ TEST_F(TestParquetFileFormat, CachedMetadata) {
// Read the file the first time, will read metadata
auto options = std::make_shared<ScanOptions>();
options->filter = literal(true);
ASSERT_OK_AND_ASSIGN(auto projection_descr,
ProjectionDescr::FromNames({"x"}, *test_schema));
ASSERT_OK_AND_ASSIGN(
auto projection_descr,
ProjectionDescr::FromNames({"x"}, *test_schema, options->add_augmented_fields));
options->projected_schema = projection_descr.schema;
options->projection = projection_descr.expression;
ASSERT_OK_AND_ASSIGN(auto generator, fragment->ScanBatchesAsync(options));
Expand Down
35 changes: 23 additions & 12 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
// create the projected schema only if the provided expressions
// produces valid set of fields.
ARROW_ASSIGN_OR_RAISE(auto projection_descr,
ProjectionDescr::Default(*projected_schema));
ProjectionDescr::Default(
*projected_schema, scan_options->add_augmented_fields));
scan_options->projected_schema = std::move(projection_descr.schema);
scan_options->projection = projection_descr.expression;
ARROW_ASSIGN_OR_RAISE(scan_options->projection,
Expand All @@ -220,7 +221,8 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
// if projected_fields are not found, we default to creating the projected_schema
// and projection from the dataset_schema.
ARROW_ASSIGN_OR_RAISE(auto projection_descr,
ProjectionDescr::Default(*dataset_schema));
ProjectionDescr::Default(
*dataset_schema, scan_options->add_augmented_fields));
scan_options->projected_schema = std::move(projection_descr.schema);
scan_options->projection = projection_descr.expression;
}
Expand All @@ -231,7 +233,7 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
ARROW_ASSIGN_OR_RAISE(
auto projection_descr,
ProjectionDescr::FromNames(scan_options->projected_schema->field_names(),
*dataset_schema));
*dataset_schema, scan_options->add_augmented_fields));
scan_options->projection = projection_descr.expression;
}

Expand Down Expand Up @@ -730,7 +732,8 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
const auto options = std::make_shared<ScanOptions>(*scan_options_);
ARROW_ASSIGN_OR_RAISE(auto empty_projection,
ProjectionDescr::FromNames(std::vector<std::string>(),
*scan_options_->dataset_schema));
*scan_options_->dataset_schema,
scan_options_->add_augmented_fields));
SetProjection(options.get(), empty_projection);

auto total = std::make_shared<std::atomic<int64_t>>(0);
Expand Down Expand Up @@ -828,7 +831,8 @@ Result<ProjectionDescr> ProjectionDescr::FromExpressions(
}

Result<ProjectionDescr> ProjectionDescr::FromNames(std::vector<std::string> names,
const Schema& dataset_schema) {
const Schema& dataset_schema,
bool add_augmented_fields) {
std::vector<compute::Expression> exprs(names.size());
for (size_t i = 0; i < exprs.size(); ++i) {
// If name isn't in schema, try finding it by dotted path.
Expand All @@ -846,15 +850,19 @@ Result<ProjectionDescr> ProjectionDescr::FromNames(std::vector<std::string> name
}
}
auto fields = dataset_schema.fields();
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
if (add_augmented_fields) {
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
}
}
return ProjectionDescr::FromExpressions(std::move(exprs), std::move(names),
Schema(fields, dataset_schema.metadata()));
}

Result<ProjectionDescr> ProjectionDescr::Default(const Schema& dataset_schema) {
return ProjectionDescr::FromNames(dataset_schema.field_names(), dataset_schema);
Result<ProjectionDescr> ProjectionDescr::Default(const Schema& dataset_schema,
bool add_augmented_fields) {
return ProjectionDescr::FromNames(dataset_schema.field_names(), dataset_schema,
add_augmented_fields);
}

void SetProjection(ScanOptions* options, ProjectionDescr projection) {
Expand Down Expand Up @@ -899,7 +907,8 @@ const std::shared_ptr<Schema>& ScannerBuilder::projected_schema() const {
Status ScannerBuilder::Project(std::vector<std::string> columns) {
ARROW_ASSIGN_OR_RAISE(
auto projection,
ProjectionDescr::FromNames(std::move(columns), *scan_options_->dataset_schema));
ProjectionDescr::FromNames(std::move(columns), *scan_options_->dataset_schema,
scan_options_->add_augmented_fields));
SetProjection(scan_options_.get(), std::move(projection));
return Status::OK();
}
Expand Down Expand Up @@ -1052,8 +1061,10 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
});

auto fields = scan_options->dataset_schema->fields();
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
if (scan_options->add_augmented_fields) {
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
}
}

return acero::MakeExecNode(
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ struct ARROW_DS_EXPORT ScanOptions {
/// Note: This must be true in order for any readahead to happen
bool use_threads = false;

/// If true the scanner will add augmented fields to the output schema.
bool add_augmented_fields = true;

/// Fragment-specific scan options.
std::shared_ptr<FragmentScanOptions> fragment_scan_options;

Expand Down Expand Up @@ -287,10 +290,12 @@ struct ARROW_DS_EXPORT ProjectionDescr {

/// \brief Create a default projection referencing fields in the dataset schema
static Result<ProjectionDescr> FromNames(std::vector<std::string> names,
const Schema& dataset_schema);
const Schema& dataset_schema,
bool add_augmented_fields = true);

/// \brief Make a projection that projects every field in the dataset schema
static Result<ProjectionDescr> Default(const Schema& dataset_schema);
static Result<ProjectionDescr> Default(const Schema& dataset_schema,
bool add_augmented_fields = true);
};

/// \brief Utility method to set the projection expression and schema
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,8 @@ TEST_P(TestScanner, ProjectionDefaults) {
}
// If we only specify a projection expression then infer the projected schema
// from the projection expression
auto projection_desc = ProjectionDescr::FromNames({"i32"}, *schema_);
auto projection_desc =
ProjectionDescr::FromNames({"i32"}, *schema_, /*add_augmented_fields=*/true);
{
ARROW_SCOPED_TRACE("User only specifies projection");
options_->projection = projection_desc->expression;
Expand Down Expand Up @@ -1148,7 +1149,8 @@ TEST_P(TestScanner, ProjectedScanNestedFromNames) {
});
ASSERT_OK_AND_ASSIGN(auto descr,
ProjectionDescr::FromNames({".struct.i32", "nested.right.f64"},
*options_->dataset_schema))
*options_->dataset_schema,
options_->add_augmented_fields))
SetProjection(options_.get(), std::move(descr));
auto batch_in = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_);
auto batch_out = ConstantArrayGenerator::Zeroes(
Expand Down Expand Up @@ -2106,7 +2108,8 @@ TEST(ScanOptions, TestMaterializedFields) {

auto set_projection_from_names = [&opts](std::vector<std::string> names) {
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::FromNames(
std::move(names), *opts->dataset_schema));
std::move(names), *opts->dataset_schema,
opts->add_augmented_fields));
SetProjection(opts.get(), std::move(projection));
};

Expand Down Expand Up @@ -2160,7 +2163,8 @@ TEST(ScanOptions, TestMaterializedFields) {
// project top-level field, filter nothing
opts->filter = literal(true);
ASSERT_OK_AND_ASSIGN(projection,
ProjectionDescr::FromNames({"nested"}, *opts->dataset_schema));
ProjectionDescr::FromNames({"nested"}, *opts->dataset_schema,
opts->add_augmented_fields));
SetProjection(opts.get(), std::move(projection));
EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("nested")));

Expand Down
18 changes: 12 additions & 6 deletions cpp/src/arrow/dataset/test_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ class DatasetFixtureMixin : public ::testing::Test {
options_ = std::make_shared<ScanOptions>();
options_->dataset_schema = schema_;
ASSERT_OK_AND_ASSIGN(auto projection,
ProjectionDescr::FromNames(schema_->field_names(), *schema_));
ProjectionDescr::FromNames(schema_->field_names(), *schema_,
options_->add_augmented_fields));
SetProjection(options_.get(), std::move(projection));
SetFilter(literal(true));
}
Expand All @@ -398,7 +399,8 @@ class DatasetFixtureMixin : public ::testing::Test {
void SetProjectedColumns(std::vector<std::string> column_names) {
ASSERT_OK_AND_ASSIGN(
auto projection,
ProjectionDescr::FromNames(std::move(column_names), *options_->dataset_schema));
ProjectionDescr::FromNames(std::move(column_names), *options_->dataset_schema,
/*add_augmented_fields=*/true));
SetProjection(options_.get(), std::move(projection));
}

Expand Down Expand Up @@ -502,7 +504,8 @@ class FileFormatFixtureMixin : public ::testing::Test {
void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
opts_->dataset_schema = schema(std::move(fields));
ASSERT_OK_AND_ASSIGN(auto projection,
ProjectionDescr::Default(*opts_->dataset_schema));
ProjectionDescr::Default(*opts_->dataset_schema,
/*add_augmented_fields=*/true));
SetProjection(opts_.get(), std::move(projection));
}

Expand All @@ -512,7 +515,8 @@ class FileFormatFixtureMixin : public ::testing::Test {

void Project(std::vector<std::string> names) {
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::FromNames(
std::move(names), *opts_->dataset_schema));
std::move(names), *opts_->dataset_schema,
/*add_augmented_fields=*/true));
SetProjection(opts_.get(), std::move(projection));
}

Expand Down Expand Up @@ -993,7 +997,8 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,
auto i64 = field("i64", int64());
this->opts_->dataset_schema = schema({i32, i32, i64});
ASSERT_RAISES(Invalid,
ProjectionDescr::FromNames({"i32"}, *this->opts_->dataset_schema));
ProjectionDescr::FromNames({"i32"}, *this->opts_->dataset_schema,
/*add_augmented_fields=*/true));
}
void TestScanWithPushdownNulls() {
// Regression test for ARROW-15312
Expand Down Expand Up @@ -1933,7 +1938,8 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
scan_options_->dataset_schema = dataset_->schema();
ASSERT_OK_AND_ASSIGN(
auto projection,
ProjectionDescr::FromNames(source_schema_->field_names(), *dataset_->schema()));
ProjectionDescr::FromNames(source_schema_->field_names(), *dataset_->schema(),
scan_options_->add_augmented_fields));
SetProjection(scan_options_.get(), std::move(projection));
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/engine/substrait/relation_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&

auto scan_options = std::make_shared<dataset::ScanOptions>();
scan_options->use_threads = true;
scan_options->add_augmented_fields = false;

if (read.has_filter()) {
ARROW_ASSIGN_OR_RAISE(scan_options->filter,
Expand Down
81 changes: 81 additions & 0 deletions cpp/src/arrow/engine/substrait/serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,86 @@ NamedTableProvider AlwaysProvideSameTable(std::shared_ptr<Table> table) {
};
}

TEST(Substrait, ExecReadRelWithLocalFiles) {
ASSERT_OK_AND_ASSIGN(std::string dir_string,
arrow::internal::GetEnvVar("PARQUET_TEST_DATA"));

std::string substrait_json = R"({
"relations": [
{
"root": {
"input": {
"read": {
"common": {
"direct": {}
},
"baseSchema": {
"names": [
"f32",
"f64"
],
"struct": {
"types": [
{
"fp32": {
"nullability": "NULLABILITY_REQUIRED"
}
},
{
"fp64": {
"nullability": "NULLABILITY_REQUIRED"
}
}
],
"nullability": "NULLABILITY_REQUIRED"
}
},
"localFiles": {
"items": [
{
"uriFile": "file://[DIRECTORY_PLACEHOLDER]/byte_stream_split.zstd.parquet",
"parquet": {}
}
]
}
}
},
"names": [
"f32",
"f64"
]
}
}
],
"version": {
"minorNumber": 42,
"producer": "my-producer"
}
})";
const char* placeholder = "[DIRECTORY_PLACEHOLDER]";
substrait_json.replace(substrait_json.find(placeholder), strlen(placeholder),
dir_string);

ASSERT_OK_AND_ASSIGN(auto buf,
internal::SubstraitFromJSON("Plan", substrait_json,
/*ignore_unknown_fields=*/false));

ASSERT_OK_AND_ASSIGN(auto declarations,
DeserializePlans(*buf, acero::NullSinkNodeConsumer::Make));
ASSERT_EQ(declarations.size(), 1);
acero::Declaration* decl = &declarations[0];
ASSERT_EQ(decl->factory_name, "consuming_sink");
ASSERT_OK_AND_ASSIGN(auto plan, acero::ExecPlan::Make());
ASSERT_OK_AND_ASSIGN(auto sink_node, declarations[0].AddToPlan(plan.get()));
ASSERT_STREQ(sink_node->kind_name(), "ConsumingSinkNode");
ASSERT_EQ(sink_node->num_inputs(), 1);
auto& prev_node = sink_node->inputs()[0];
ASSERT_STREQ(prev_node->kind_name(), "SourceNode");

plan->StartProducing();
ASSERT_FINISHES_OK(plan->finished());
}

TEST(Substrait, RelWithHint) {
ASSERT_OK_AND_ASSIGN(auto buf,
internal::SubstraitFromJSON("Rel", R"({
Expand Down Expand Up @@ -2443,6 +2523,7 @@ TEST(SubstraitRoundTrip, BasicPlanEndToEnd) {

auto scan_options = std::make_shared<dataset::ScanOptions>();
scan_options->projection = compute::project({}, {});
scan_options->add_augmented_fields = false;
const std::string filter_col_left = "shared";
const std::string filter_col_right = "distinct";
auto comp_left_value = compute::field_ref(filter_col_left);
Expand Down

0 comments on commit 1dffbc7

Please sign in to comment.