Skip to content

Commit

Permalink
GH-35730: [C++] Add the ability to specify custom schema on a dataset…
Browse files Browse the repository at this point in the history
… write (#35860)

### Rationale for this change

The dataset write node previously allowed you to specify custom key/value metadata on a write node.  This was added to support saving schema metadata.  However, it doesn't capture field metadata or field nullability.  This PR replaces that capability with the ability to specify a custom schema instead.  The custom schema must have the same number of fields as the input to the write node and each field must have the same type.

### What changes are included in this PR?

Added `custom_schema` to `WriteNodeOptions` and removed `custom_metadata`.

### Are these changes tested?

Yes, I added a new C++ unit test to verify that the custom info is applied to written files.

### Are there any user-facing changes?

No.  Only new functionality (which is user facing)

* Closes: #35730

Lead-authored-by: Weston Pace <[email protected]>
Co-authored-by: Nic Crane <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: anjakefala <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
  • Loading branch information
5 people authored and raulcd committed Jun 1, 2023
1 parent 0334820 commit a40a702
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 29 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ add_arrow_dataset_test(file_test)
add_arrow_dataset_test(partition_test)
add_arrow_dataset_test(scanner_test)
add_arrow_dataset_test(subtree_test)
add_arrow_dataset_test(write_node_test)

if(ARROW_CSV)
add_arrow_dataset_test(file_csv_test)
Expand Down
53 changes: 45 additions & 8 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,16 +387,16 @@ Status WriteBatch(

class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
public:
DatasetWritingSinkNodeConsumer(std::shared_ptr<const KeyValueMetadata> custom_metadata,
DatasetWritingSinkNodeConsumer(std::shared_ptr<Schema> custom_schema,
FileSystemDatasetWriteOptions write_options)
: custom_metadata_(std::move(custom_metadata)),
: custom_schema_(std::move(custom_schema)),
write_options_(std::move(write_options)) {}

Status Init(const std::shared_ptr<Schema>& schema,
acero::BackpressureControl* backpressure_control,
acero::ExecPlan* plan) override {
if (custom_metadata_) {
schema_ = schema->WithMetadata(custom_metadata_);
if (custom_schema_) {
schema_ = custom_schema_;
} else {
schema_ = schema;
}
Expand Down Expand Up @@ -434,7 +434,7 @@ class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer {
});
}

std::shared_ptr<const KeyValueMetadata> custom_metadata_;
std::shared_ptr<Schema> custom_schema_;
std::unique_ptr<internal::DatasetWriter> dataset_writer_;
FileSystemDatasetWriteOptions write_options_;
Future<> finished_ = Future<>::Make();
Expand All @@ -453,13 +453,16 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio

// The projected_schema is currently used by pyarrow to preserve the custom metadata
// when reading from a single input file.
const auto& custom_metadata = scanner->options()->projected_schema->metadata();
const auto& custom_schema = scanner->options()->projected_schema;

WriteNodeOptions write_node_options(write_options);
write_node_options.custom_schema = custom_schema;

acero::Declaration plan = acero::Declaration::Sequence({
{"scan", ScanNodeOptions{dataset, scanner->options()}},
{"filter", acero::FilterNodeOptions{scanner->options()->filter}},
{"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"write", WriteNodeOptions{write_options, custom_metadata}},
{"write", std::move(write_node_options)},
});

return acero::DeclarationToStatus(std::move(plan), scanner->options()->use_threads);
Expand All @@ -475,16 +478,50 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,

const WriteNodeOptions write_node_options =
checked_cast<const WriteNodeOptions&>(options);
std::shared_ptr<Schema> custom_schema = write_node_options.custom_schema;
const std::shared_ptr<const KeyValueMetadata>& custom_metadata =
write_node_options.custom_metadata;
const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options;

const std::shared_ptr<Schema>& input_schema = inputs[0]->output_schema();

if (custom_schema != nullptr) {
if (custom_metadata) {
return Status::TypeError(
"Do not provide both custom_metadata and custom_schema. If custom_schema is "
"used then custom_schema->metadata should be used instead of custom_metadata");
}

if (custom_schema->num_fields() != input_schema->num_fields()) {
return Status::TypeError(
"The provided custom_schema did not have the same number of fields as the "
"data. The custom schema can only be used to add metadata / nullability to "
"fields and cannot change the type or number of fields.");
}
for (int field_idx = 0; field_idx < input_schema->num_fields(); field_idx++) {
if (!input_schema->field(field_idx)->type()->Equals(
custom_schema->field(field_idx)->type())) {
return Status::TypeError("The provided custom_schema specified type ",
custom_schema->field(field_idx)->type()->ToString(),
" for field ", field_idx, "and the input data has type ",
input_schema->field(field_idx),
"The custom schema can only be used to add metadata / "
"nullability to fields and "
"cannot change the type or number of fields.");
}
}
}

if (custom_metadata) {
custom_schema = input_schema->WithMetadata(custom_metadata);
}

if (!write_options.partitioning) {
return Status::Invalid("Must provide partitioning");
}

std::shared_ptr<DatasetWritingSinkNodeConsumer> consumer =
std::make_shared<DatasetWritingSinkNodeConsumer>(custom_metadata, write_options);
std::make_shared<DatasetWritingSinkNodeConsumer>(custom_schema, write_options);

ARROW_ASSIGN_OR_RAISE(
auto node,
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "arrow/dataset/visibility.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/io/file.h"
#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"

namespace arrow {
Expand Down Expand Up @@ -470,6 +471,15 @@ class ARROW_DS_EXPORT WriteNodeOptions : public acero::ExecNodeOptions {

/// \brief Options to control how to write the dataset
FileSystemDatasetWriteOptions write_options;
/// \brief Optional schema to attach to all written batches
///
/// By default, we will use the output schema of the input.
///
/// This can be used to alter schema metadata, field nullability, or field metadata.
/// However, this cannot be used to change the type of data. If the custom schema does
/// not have the same number of fields and the same data types as the input then the
/// plan will fail.
std::shared_ptr<Schema> custom_schema;
/// \brief Optional metadata to attach to written batches
std::shared_ptr<const KeyValueMetadata> custom_metadata;
};
Expand Down
174 changes: 174 additions & 0 deletions cpp/src/arrow/dataset/write_node_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

#include <memory>

#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/file_ipc.h"
#include "arrow/dataset/partition.h"
#include "arrow/dataset/plan.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/io/interfaces.h"
#include "arrow/ipc/reader.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"

#include "arrow/table.h"
#include "arrow/util/key_value_metadata.h"

namespace arrow {

namespace dataset {

class SimpleWriteNodeTest : public ::testing::Test {
protected:
void SetUp() override {
internal::Initialize();
mock_fs_ = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
auto ipc_format = std::make_shared<dataset::IpcFileFormat>();

fs_write_options_.filesystem = mock_fs_;
fs_write_options_.base_dir = "/my_dataset";
fs_write_options_.basename_template = "{i}.arrow";
fs_write_options_.file_write_options = ipc_format->DefaultWriteOptions();
fs_write_options_.partitioning = dataset::Partitioning::Default();
}

std::shared_ptr<fs::internal::MockFileSystem> mock_fs_;
dataset::FileSystemDatasetWriteOptions fs_write_options_;
};

TEST_F(SimpleWriteNodeTest, CustomNullability) {
// Create an input table with a nullable and a non-nullable type
ExecBatch batch = gen::Gen({gen::Step()})->FailOnError()->ExecBatch(/*num_rows=*/1);
std::shared_ptr<Schema> test_schema =
schema({field("nullable_i32", uint32(), /*nullable=*/true),
field("non_nullable_i32", uint32(), /*nullable=*/false)});
std::shared_ptr<RecordBatch> record_batch =
RecordBatch::Make(test_schema, /*num_rows=*/1,
{batch.values[0].make_array(), batch.values[0].make_array()});
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Table> table,
Table::FromRecordBatches({std::move(record_batch)}));

ASSERT_TRUE(table->field(0)->nullable());
ASSERT_FALSE(table->field(1)->nullable());

dataset::WriteNodeOptions write_options(fs_write_options_);
write_options.custom_schema = test_schema;

// Write the data to disk (these plans use a project because it destroys whatever
// metadata happened to be in the table source node's output schema). This more
// accurately simulates reading from a dataset.
acero::Declaration plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"write", write_options}});

ASSERT_OK(DeclarationToStatus(plan));

// Read the file back out and verify the nullability
ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::RandomAccessFile> file,
mock_fs_->OpenInputFile("/my_dataset/0.arrow"));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> file_reader,
ipc::RecordBatchFileReader::Open(file));
std::shared_ptr<Schema> file_schema = file_reader->schema();

ASSERT_TRUE(file_schema->field(0)->nullable());
ASSERT_FALSE(file_schema->field(1)->nullable());

// Invalid custom schema

// Incorrect # of fields
write_options.custom_schema = schema({});
plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"write", write_options}});

ASSERT_THAT(
DeclarationToStatus(plan),
Raises(StatusCode::TypeError,
::testing::HasSubstr("did not have the same number of fields as the data")));

// Incorrect types
write_options.custom_schema =
schema({field("nullable_i32", int32()), field("non_nullable_i32", int32())});
plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"write", write_options}});
ASSERT_THAT(
DeclarationToStatus(plan),
Raises(StatusCode::TypeError, ::testing::HasSubstr("and the input data has type")));

// Cannot have both custom_schema and custom_metadata
write_options.custom_schema = test_schema;
write_options.custom_metadata = key_value_metadata({{"foo", "bar"}});
plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(std::move(table))},
{"project",
acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})},
{"write", write_options}});
ASSERT_THAT(DeclarationToStatus(plan),
Raises(StatusCode::TypeError,
::testing::HasSubstr(
"Do not provide both custom_metadata and custom_schema")));
}

TEST_F(SimpleWriteNodeTest, CustomMetadata) {
constexpr int64_t kRowsPerChunk = 1;
constexpr int64_t kNumChunks = 1;
// Create an input table with no schema metadata
std::shared_ptr<Table> table =
gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerChunk, kNumChunks);

std::shared_ptr<KeyValueMetadata> custom_metadata =
key_value_metadata({{"foo", "bar"}});

dataset::WriteNodeOptions write_options(fs_write_options_);
write_options.custom_metadata = custom_metadata;

// Write the data to disk
acero::Declaration plan = acero::Declaration::Sequence(
{{"table_source", acero::TableSourceNodeOptions(table)},
{"project", acero::ProjectNodeOptions({compute::field_ref(0)})},
{"write", write_options}});

ASSERT_OK(DeclarationToStatus(plan));

// Read the file back out and verify the schema metadata
ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::RandomAccessFile> file,
mock_fs_->OpenInputFile("/my_dataset/0.arrow"));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> file_reader,
ipc::RecordBatchFileReader::Open(file));
std::shared_ptr<Schema> file_schema = file_reader->schema();

ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
}

} // namespace dataset
} // namespace arrow
53 changes: 53 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5074,6 +5074,59 @@ def test_dataset_partition_with_slash(tmpdir):
assert encoded_paths == file_paths


@pytest.mark.parquet
def test_write_dataset_preserve_nullability(tempdir):
# GH-35730
schema_nullable = pa.schema([
pa.field("x", pa.int64(), nullable=False),
pa.field("y", pa.int64(), nullable=True)])

arrays = [[1, 2, 3], [None, 5, None]]
table = pa.Table.from_arrays(arrays, schema=schema_nullable)

pq.write_to_dataset(table, tempdir / "nulltest1")
dataset = ds.dataset(tempdir / "nulltest1", format="parquet")
# nullability of field is preserved
assert dataset.to_table().schema.equals(schema_nullable)

ds.write_dataset(table, tempdir / "nulltest2", format="parquet")
dataset = ds.dataset(tempdir / "nulltest2", format="parquet")
assert dataset.to_table().schema.equals(schema_nullable)

ds.write_dataset([table, table], tempdir / "nulltest3", format="parquet")
dataset = ds.dataset(tempdir / "nulltest3", format="parquet")
assert dataset.to_table().schema.equals(schema_nullable)


def test_write_dataset_preserve_field_metadata(tempdir):
schema_metadata = pa.schema([
pa.field("x", pa.int64(), metadata={b'foo': b'bar'}),
pa.field("y", pa.int64())])

schema_no_meta = pa.schema([
pa.field("x", pa.int64()),
pa.field("y", pa.int64())])

arrays = [[1, 2, 3], [None, 5, None]]
table = pa.Table.from_arrays(arrays, schema=schema_metadata)
table_no_meta = pa.Table.from_arrays(arrays, schema=schema_no_meta)

# If no schema is provided the schema of the first table will be used
ds.write_dataset([table, table_no_meta], tempdir / "test1", format="parquet")
dataset = ds.dataset(tempdir / "test1", format="parquet")
assert dataset.to_table().schema.equals(schema_metadata, check_metadata=True)

ds.write_dataset([table_no_meta, table], tempdir / "test2", format="parquet")
dataset = ds.dataset(tempdir / "test2", format="parquet")
assert dataset.to_table().schema.equals(schema_no_meta, check_metadata=True)

# If a schema is provided it will override the schema of the input
ds.write_dataset([table_no_meta, table], tempdir / "test3", format="parquet",
schema=schema_metadata)
dataset = ds.dataset(tempdir / "test3", format="parquet")
assert dataset.to_table().schema.equals(schema_metadata, check_metadata=True)


@pytest.mark.parametrize('dstype', [
"fs", "mem"
])
Expand Down
4 changes: 2 additions & 2 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,12 @@ ExecPlan <- R6Class("ExecPlan",
},
Write = function(node, ...) {
# TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ...
final_metadata <- prepare_key_value_metadata(node$final_metadata())

ExecPlan_Write(
self,
node,
prepare_key_value_metadata(node$final_metadata()),
node$schema$WithMetadata(final_metadata),
...
)
},
Expand Down
Loading

0 comments on commit a40a702

Please sign in to comment.