Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-35730: [C++] Add the ability to specify custom schema on a dataset write #35860

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ add_arrow_dataset_test(file_test)
add_arrow_dataset_test(partition_test)
add_arrow_dataset_test(scanner_test)
add_arrow_dataset_test(subtree_test)
add_arrow_dataset_test(write_node_test)

if(ARROW_CSV)
add_arrow_dataset_test(file_csv_test)
Expand Down
53 changes: 45 additions & 8 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,16 +387,16 @@ Status WriteBatch(

class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
public:
DatasetWritingSinkNodeConsumer(std::shared_ptr<const KeyValueMetadata> custom_metadata,
DatasetWritingSinkNodeConsumer(std::shared_ptr<Schema> custom_schema,
FileSystemDatasetWriteOptions write_options)
: custom_metadata_(std::move(custom_metadata)),
: custom_schema_(std::move(custom_schema)),
write_options_(std::move(write_options)) {}

Status Init(const std::shared_ptr<Schema>& schema,
acero::BackpressureControl* backpressure_control,
acero::ExecPlan* plan) override {
if (custom_metadata_) {
schema_ = schema->WithMetadata(custom_metadata_);
if (custom_schema_) {
schema_ = custom_schema_;
} else {
schema_ = schema;
}
Expand Down Expand Up @@ -434,7 +434,7 @@ class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
});
}

std::shared_ptr<const KeyValueMetadata> custom_metadata_;
std::shared_ptr<Schema> custom_schema_;
std::unique_ptr<internal::DatasetWriter> dataset_writer_;
FileSystemDatasetWriteOptions write_options_;
Future<> finished_ = Future<>::Make();
Expand All @@ -453,13 +453,16 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio

// The projected_schema is currently used by pyarrow to preserve the custom metadata
// when reading from a single input file.
const auto& custom_metadata = scanner->options()->projected_schema->metadata();
const auto& custom_schema = scanner->options()->projected_schema;

WriteNodeOptions write_node_options(write_options);
write_node_options.custom_schema = custom_schema;

acero::Declaration plan = acero::Declaration::Sequence({
{"scan", ScanNodeOptions{dataset, scanner->options()}},
{"filter", acero::FilterNodeOptions{scanner->options()->filter}},
{"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"write", WriteNodeOptions{write_options, custom_metadata}},
{"write", std::move(write_node_options)},
});

return acero::DeclarationToStatus(std::move(plan), scanner->options()->use_threads);
Expand All @@ -475,16 +478,50 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,

const WriteNodeOptions write_node_options =
checked_cast<const WriteNodeOptions&>(options);
std::shared_ptr<Schema> custom_schema = write_node_options.custom_schema;
const std::shared_ptr<const KeyValueMetadata>& custom_metadata =
write_node_options.custom_metadata;
const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options;

const std::shared_ptr<Schema>& input_schema = inputs[0]->output_schema();

if (custom_schema != nullptr) {
if (custom_metadata) {
return Status::TypeError(
"Do not provide both custom_metadata and custom_schema. If custom_schema is "
"used then custom_schema->metadata should be used instead of custom_metadata");
}

if (custom_schema->num_fields() != input_schema->num_fields()) {
return Status::TypeError(
"The provided custom_schema did not have the same number of fields as the "
"data. The custom schema can only be used to add metadata / nullability to "
"fields and cannot change the type or number of fields.");
}
for (int field_idx = 0; field_idx < input_schema->num_fields(); field_idx++) {
if (!input_schema->field(field_idx)->type()->Equals(
custom_schema->field(field_idx)->type())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test that the names of the fields are equal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the names should be safe. Admittedly, a user could also do this name change by inserting a project node before the write node.

I could be convinced otherwise but I don't think this does any harm and I think, as a user, I would expect this behavior, so it wouldn't be surprising that the names changed.

return Status::TypeError("The provided custom_schema specified type ",
custom_schema->field(field_idx)->type()->ToString(),
" for field ", field_idx, "and the input data has type ",
input_schema->field(field_idx),
"The custom schema can only be used to add metadata / "
"nullability to fields and "
"cannot change the type or number of fields.");
}
}
}

if (custom_metadata) {
custom_schema = input_schema->WithMetadata(custom_metadata);
}

if (!write_options.partitioning) {
return Status::Invalid("Must provide partitioning");
}

std::shared_ptr<DatasetWritingSinkNodeConsumer> consumer =
std::make_shared<DatasetWritingSinkNodeConsumer>(custom_metadata, write_options);
std::make_shared<DatasetWritingSinkNodeConsumer>(custom_schema, write_options);

ARROW_ASSIGN_OR_RAISE(
auto node,
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "arrow/dataset/visibility.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/io/file.h"
#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"

namespace arrow {
Expand Down Expand Up @@ -470,6 +471,15 @@ class ARROW_DS_EXPORT WriteNodeOptions : public acero::ExecNodeOptions {

/// \brief Options to control how to write the dataset
FileSystemDatasetWriteOptions write_options;
/// \brief Optional schema to attach to all written batches
///
/// By default, we will use the output schema of the input.
///
/// This can be used to alter schema metadata, field nullability, or field metadata.
/// However, this cannot be used to change the type of data. If the custom schema does
/// not have the same number of fields and the same data types as the input then the
/// plan will fail.
std::shared_ptr<Schema> custom_schema;
/// \brief Optional metadata to attach to written batches
std::shared_ptr<const KeyValueMetadata> custom_metadata;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of removing this option (as a breaking change), we could in theory still allow the user to specify one of both?

(I am not using the C++ API for this, so I don't know how useful this would be / how cumbersome it is to specify the schema if you only want to specify metadata. From the DatasetWriter point of view, this is a fine change of course since there we already have the full schema)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it were a new feature I would argue it's not worth it (A user could technically use DeclarationToSchema to get the output schema of the plan leading up to the write and then attach custom metadata to that). However, given we have already released custom_metadata, and I would like Acero's API to start being stable, I suppose I should set an example. Thanks for the nudge. I have restored custom_metadata

};
Expand Down
174 changes: 174 additions & 0 deletions cpp/src/arrow/dataset/write_node_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

#include <memory>

#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/file_ipc.h"
#include "arrow/dataset/partition.h"
#include "arrow/dataset/plan.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/io/interfaces.h"
#include "arrow/ipc/reader.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"

#include "arrow/table.h"
#include "arrow/util/key_value_metadata.h"

namespace arrow {

namespace dataset {

class SimpleWriteNodeTest : public ::testing::Test {
protected:
void SetUp() override {
internal::Initialize();
mock_fs_ = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
auto ipc_format = std::make_shared<dataset::IpcFileFormat>();

fs_write_options_.filesystem = mock_fs_;
fs_write_options_.base_dir = "/my_dataset";
fs_write_options_.basename_template = "{i}.arrow";
fs_write_options_.file_write_options = ipc_format->DefaultWriteOptions();
fs_write_options_.partitioning = dataset::Partitioning::Default();
}

std::shared_ptr<fs::internal::MockFileSystem> mock_fs_;
dataset::FileSystemDatasetWriteOptions fs_write_options_;
};

TEST_F(SimpleWriteNodeTest, CustomNullability) {
// Create an input table with a nullable and a non-nullable type
ExecBatch batch = gen::Gen({gen::Step()})->FailOnError()->ExecBatch(/*num_rows=*/1);
std::shared_ptr<Schema> test_schema =
schema({field("nullable_i32", uint32(), /*nullable=*/true),
field("non_nullable_i32", uint32(), /*nullable=*/false)});
std::shared_ptr<RecordBatch> record_batch =
RecordBatch::Make(test_schema, /*num_rows=*/1,
{batch.values[0].make_array(), batch.values[0].make_array()});
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Table> table,
Table::FromRecordBatches({std::move(record_batch)}));

ASSERT_TRUE(table->field(0)->nullable());
ASSERT_FALSE(table->field(1)->nullable());

dataset::WriteNodeOptions write_options(fs_write_options_);
write_options.custom_schema = test_schema;

// Write the data to disk (these plans use a project because it destroys whatever
// metadata happened to be in the table source node's output schema). This more
// accurately simulates reading from a dataset.
acero::Declaration plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"write", write_options}});

ASSERT_OK(DeclarationToStatus(plan));

// Read the file back out and verify the nullability
ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::RandomAccessFile> file,
mock_fs_->OpenInputFile("/my_dataset/0.arrow"));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> file_reader,
ipc::RecordBatchFileReader::Open(file));
std::shared_ptr<Schema> file_schema = file_reader->schema();

ASSERT_TRUE(file_schema->field(0)->nullable());
ASSERT_FALSE(file_schema->field(1)->nullable());

// Invalid custom schema

// Incorrect # of fields
write_options.custom_schema = schema({});
plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"write", write_options}});

ASSERT_THAT(
DeclarationToStatus(plan),
Raises(StatusCode::TypeError,
::testing::HasSubstr("did not have the same number of fields as the data")));

// Incorrect types
write_options.custom_schema =
schema({field("nullable_i32", int32()), field("non_nullable_i32", int32())});
plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"write", write_options}});
ASSERT_THAT(
DeclarationToStatus(plan),
Raises(StatusCode::TypeError, ::testing::HasSubstr("and the input data has type")));

// Cannot have both custom_schema and custom_metadata
write_options.custom_schema = test_schema;
write_options.custom_metadata = key_value_metadata({{"foo", "bar"}});
plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(std::move(table))},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"write", write_options}});
ASSERT_THAT(DeclarationToStatus(plan),
Raises(StatusCode::TypeError,
::testing::HasSubstr(
"Do not provide both custom_metadata and custom_schema")));
}

TEST_F(SimpleWriteNodeTest, CustomMetadata) {
constexpr int64_t kRowsPerChunk = 1;
constexpr int64_t kNumChunks = 1;
// Create an input table with no schema metadata
std::shared_ptr<Table> table =
gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerChunk, kNumChunks);

std::shared_ptr<KeyValueMetadata> custom_metadata =
key_value_metadata({{"foo", "bar"}});

dataset::WriteNodeOptions write_options(fs_write_options_);
write_options.custom_metadata = custom_metadata;

// Write the data to disk
acero::Declaration plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project", acero::ProjectNodeOptions({compute::field_ref(0)})},
{"write", write_options}});

ASSERT_OK(DeclarationToStatus(plan));

// Read the file back out and verify the schema metadata
ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::RandomAccessFile> file,
mock_fs_->OpenInputFile("/my_dataset/0.arrow"));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> file_reader,
ipc::RecordBatchFileReader::Open(file));
std::shared_ptr<Schema> file_schema = file_reader->schema();

ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
}

} // namespace dataset
} // namespace arrow
53 changes: 53 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5172,6 +5172,59 @@ def test_dataset_partition_with_slash(tmpdir):
assert encoded_paths == file_paths


@pytest.mark.parquet
def test_write_dataset_preserve_nullability(tempdir):
# GH-35730
schema_nullable = pa.schema([
pa.field("x", pa.int64(), nullable=False),
pa.field("y", pa.int64(), nullable=True)])

arrays = [[1, 2, 3], [None, 5, None]]
table = pa.Table.from_arrays(arrays, schema=schema_nullable)

pq.write_to_dataset(table, tempdir / "nulltest1")
dataset = ds.dataset(tempdir / "nulltest1", format="parquet")
# nullability of field is preserved
assert dataset.to_table().schema.equals(schema_nullable)

ds.write_dataset(table, tempdir / "nulltest2", format="parquet")
dataset = ds.dataset(tempdir / "nulltest2", format="parquet")
assert dataset.to_table().schema.equals(schema_nullable)

ds.write_dataset([table, table], tempdir / "nulltest3", format="parquet")
dataset = ds.dataset(tempdir / "nulltest3", format="parquet")
assert dataset.to_table().schema.equals(schema_nullable)


def test_write_dataset_preserve_field_metadata(tempdir):
schema_metadata = pa.schema([
pa.field("x", pa.int64(), metadata={b'foo': b'bar'}),
pa.field("y", pa.int64())])

schema_no_meta = pa.schema([
pa.field("x", pa.int64()),
pa.field("y", pa.int64())])

arrays = [[1, 2, 3], [None, 5, None]]
table = pa.Table.from_arrays(arrays, schema=schema_metadata)
table_no_meta = pa.Table.from_arrays(arrays, schema=schema_no_meta)

# If no schema is provided the schema of the first table will be used
ds.write_dataset([table, table_no_meta], tempdir / "test1", format="parquet")
dataset = ds.dataset(tempdir / "test1", format="parquet")
assert dataset.to_table().schema.equals(schema_metadata, check_metadata=True)

ds.write_dataset([table_no_meta, table], tempdir / "test2", format="parquet")
dataset = ds.dataset(tempdir / "test2", format="parquet")
assert dataset.to_table().schema.equals(schema_no_meta, check_metadata=True)

# If a schema is provided it will override the schema of the input
ds.write_dataset([table_no_meta, table], tempdir / "test3", format="parquet",
schema=schema_metadata)
dataset = ds.dataset(tempdir / "test3", format="parquet")
assert dataset.to_table().schema.equals(schema_metadata, check_metadata=True)


@pytest.mark.parametrize('dstype', [
"fs", "mem"
])
Expand Down
4 changes: 2 additions & 2 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,12 @@ ExecPlan <- R6Class("ExecPlan",
},
Write = function(node, ...) {
# TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ...
final_metadata <- prepare_key_value_metadata(node$final_metadata())

ExecPlan_Write(
self,
node,
prepare_key_value_metadata(node$final_metadata()),
node$schema$WithMetadata(final_metadata),
...
)
},
Expand Down
Loading