Skip to content

Commit

Permalink
Allow file name to be supplied to non-bucketing table write node (fac…
Browse files Browse the repository at this point in the history
…ebookincubator#10903)

Summary:
Pull Request resolved: facebookincubator#10903

For some non Presto/Spark use cases, we need to use `PlanBuilder` to
construct plan and supply file name directly to table write.

Differential Revision: D62032439
  • Loading branch information
Yuhta authored and facebook-github-bot committed Sep 4, 2024
1 parent 5f4e7e5 commit 584ce97
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 11 deletions.
11 changes: 7 additions & 4 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -912,12 +912,14 @@ HiveWriterParameters HiveDataSink::getWriterParameters(

std::pair<std::string, std::string> HiveDataSink::getWriterFileNames(
std::optional<uint32_t> bucketId) const {
std::string targetFileName;
auto targetFileName = insertTableHandle_->locationHandle()->targetFileName();
const bool generateFileName = targetFileName.empty();
if (bucketId.has_value()) {
VELOX_CHECK(generateFileName);
// TODO: add hive.file_renaming_enabled support.
targetFileName = computeBucketedFileName(
connectorQueryCtx_->queryId(), bucketId.value());
} else {
} else if (generateFileName) {
// targetFileName includes planNodeId and Uuid. As a result, different
// table writers run by the same task driver or the same table writer
// run in different task tries would have different targetFileNames.
Expand All @@ -931,8 +933,9 @@ std::pair<std::string, std::string> HiveDataSink::getWriterFileNames(
const std::string writeFileName = isCommitRequired()
? fmt::format(".tmp.velox.{}_{}", targetFileName, makeUuid())
: targetFileName;
if (insertTableHandle_->tableStorageFormat() ==
dwio::common::FileFormat::PARQUET) {
if (generateFileName &&
insertTableHandle_->tableStorageFormat() ==
dwio::common::FileFormat::PARQUET) {
return {
fmt::format("{}{}", targetFileName, ".parquet"),
fmt::format("{}{}", writeFileName, ".parquet")};
Expand Down
10 changes: 9 additions & 1 deletion velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,21 @@ class LocationHandle : public ISerializable {
LocationHandle(
std::string targetPath,
std::string writePath,
TableType tableType)
TableType tableType,
std::string targetFileName = "")
: targetPath_(std::move(targetPath)),
targetFileName_(std::move(targetFileName)),
writePath_(std::move(writePath)),
tableType_(tableType) {}

const std::string& targetPath() const {
return targetPath_;
}

const std::string& targetFileName() const {
return targetFileName_;
}

const std::string& writePath() const {
return writePath_;
}
Expand All @@ -79,6 +85,8 @@ class LocationHandle : public ISerializable {
private:
// Target directory path.
const std::string targetPath_;
// If non-empty, use this name instead of generating our own.
const std::string targetFileName_;
// Staging directory path.
const std::string writePath_;
// Whether the table to be written is new, already existing or temporary.
Expand Down
27 changes: 27 additions & 0 deletions velox/exec/tests/TableWriteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,33 @@ TEST_F(BasicTableWriteTest, roundTrip) {
assertEqualResults({data}, {copy});
}

TEST_F(BasicTableWriteTest, targetFileName) {
constexpr const char* kFileName = "test.dwrf";
auto data = makeRowVector({makeFlatVector<int64_t>(10, folly::identity)});
auto directory = TempDirectoryPath::create();
auto plan = PlanBuilder()
.values({data})
.tableWrite(
directory->getPath(),
dwio::common::FileFormat::DWRF,
{},
nullptr,
kFileName)
.planNode();
auto results = AssertQueryBuilder(plan).copyResults(pool());
auto* details = results->childAt(TableWriteTraits::kFragmentChannel)
->asUnchecked<SimpleVector<StringView>>();
auto detail = folly::parseJson(details->valueAt(1));
auto fileWriteInfos = detail["fileWriteInfos"];
ASSERT_EQ(1, fileWriteInfos.size());
ASSERT_EQ(fileWriteInfos[0]["writeFileName"].asString(), kFileName);
plan = PlanBuilder().tableScan(asRowType(data->type())).planNode();
AssertQueryBuilder(plan)
.split(makeHiveConnectorSplit(
fmt::format("{}/{}", directory->getPath(), kFileName)))
.assertResults(data);
}

class PartitionedTableWriterTest
: public TableWriteTest,
public testing::WithParamInterface<uint64_t> {
Expand Down
12 changes: 8 additions & 4 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ PlanBuilder& PlanBuilder::tableWrite(
const std::string& outputDirectoryPath,
const dwio::common::FileFormat fileFormat,
const std::vector<std::string>& aggregates,
const std::shared_ptr<dwio::common::WriterOptions>& options) {
const std::shared_ptr<dwio::common::WriterOptions>& options,
const std::string& outputFileName) {
return tableWrite(
outputDirectoryPath,
{},
Expand All @@ -352,7 +353,8 @@ PlanBuilder& PlanBuilder::tableWrite(
aggregates,
kHiveDefaultConnectorId,
{},
options);
options,
outputFileName);
}

PlanBuilder& PlanBuilder::tableWrite(
Expand Down Expand Up @@ -405,7 +407,8 @@ PlanBuilder& PlanBuilder::tableWrite(
const std::vector<std::string>& aggregates,
const std::string_view& connectorId,
const std::unordered_map<std::string, std::string>& serdeParameters,
const std::shared_ptr<dwio::common::WriterOptions>& options) {
const std::shared_ptr<dwio::common::WriterOptions>& options,
const std::string& outputFileName) {
VELOX_CHECK_NOT_NULL(planNode_, "TableWrite cannot be the source node");
auto rowType = planNode_->outputType();

Expand All @@ -428,7 +431,8 @@ PlanBuilder& PlanBuilder::tableWrite(
auto locationHandle = std::make_shared<connector::hive::LocationHandle>(
outputDirectoryPath,
outputDirectoryPath,
connector::hive::LocationHandle::TableType::kNew);
connector::hive::LocationHandle::TableType::kNew,
outputFileName);
std::shared_ptr<HiveBucketProperty> bucketProperty;
if (!partitionBy.empty() && bucketCount != 0) {
bucketProperty =
Expand Down
12 changes: 10 additions & 2 deletions velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,9 @@ class PlanBuilder {
/// @param aggregates Aggregations for column statistics collection during
/// @param polymorphic options object to be passed to the writer.
/// write, supported aggregation types vary for different column types.
/// @param outputFileName Optional file name of the output. If specified
/// (non-empty), use it instead of generating the file name in Velox. Should
/// only be specified in non-bucketing write.
/// For example:
/// Boolean: count, countIf.
/// NumericType/Date/Timestamp: min, max, approx_distinct, count.
Expand All @@ -393,7 +396,8 @@ class PlanBuilder {
const dwio::common::FileFormat fileFormat =
dwio::common::FileFormat::DWRF,
const std::vector<std::string>& aggregates = {},
const std::shared_ptr<dwio::common::WriterOptions>& options = nullptr);
const std::shared_ptr<dwio::common::WriterOptions>& options = nullptr,
const std::string& outputFileName = "");

/// Adds a TableWriteNode to write all input columns into a partitioned Hive
/// table without compression.
Expand Down Expand Up @@ -447,6 +451,9 @@ class PlanBuilder {
/// @param connectorId Name used to register the connector.
/// @param serdeParameters Additional parameters passed to the writer.
/// @param Option objects passed to the writer.
/// @param outputFileName Optional file name of the output. If specified
/// (non-empty), use it instead of generating the file name in Velox. Should
/// only be specified in non-bucketing write.
PlanBuilder& tableWrite(
const std::string& outputDirectoryPath,
const std::vector<std::string>& partitionBy,
Expand All @@ -459,7 +466,8 @@ class PlanBuilder {
const std::vector<std::string>& aggregates = {},
const std::string_view& connectorId = kHiveDefaultConnectorId,
const std::unordered_map<std::string, std::string>& serdeParameters = {},
const std::shared_ptr<dwio::common::WriterOptions>& options = nullptr);
const std::shared_ptr<dwio::common::WriterOptions>& options = nullptr,
const std::string& outputFileName = "");

/// Add a TableWriteMergeNode.
PlanBuilder& tableWriteMerge(
Expand Down

0 comments on commit 584ce97

Please sign in to comment.