diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 678acd31f84b9..29db7e3ef781f 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -912,12 +912,14 @@ HiveWriterParameters HiveDataSink::getWriterParameters( std::pair HiveDataSink::getWriterFileNames( std::optional bucketId) const { - std::string targetFileName; + auto targetFileName = insertTableHandle_->locationHandle()->targetFileName(); + const bool generateFileName = targetFileName.empty(); if (bucketId.has_value()) { + VELOX_CHECK(generateFileName); // TODO: add hive.file_renaming_enabled support. targetFileName = computeBucketedFileName( connectorQueryCtx_->queryId(), bucketId.value()); - } else { + } else if (generateFileName) { // targetFileName includes planNodeId and Uuid. As a result, different // table writers run by the same task driver or the same table writer // run in different task tries would have different targetFileNames. @@ -931,8 +933,9 @@ std::pair HiveDataSink::getWriterFileNames( const std::string writeFileName = isCommitRequired() ? fmt::format(".tmp.velox.{}_{}", targetFileName, makeUuid()) : targetFileName; - if (insertTableHandle_->tableStorageFormat() == - dwio::common::FileFormat::PARQUET) { + if (generateFileName && + insertTableHandle_->tableStorageFormat() == + dwio::common::FileFormat::PARQUET) { return { fmt::format("{}{}", targetFileName, ".parquet"), fmt::format("{}{}", writeFileName, ".parquet")}; diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index a916316bda45c..233f5a8991b98 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -47,8 +47,10 @@ class LocationHandle : public ISerializable { LocationHandle( std::string targetPath, std::string writePath, - TableType tableType) + TableType tableType, + std::string targetFileName = "") : targetPath_(std::move(targetPath)), + targetFileName_(std::move(targetFileName)), writePath_(std::move(writePath)), tableType_(tableType) {} @@ -56,6 +58,10 @@ class LocationHandle : public ISerializable { return targetPath_; } + const std::string& targetFileName() const { + return targetFileName_; + } + const std::string& writePath() const { return writePath_; } @@ -79,6 +85,8 @@ class LocationHandle : public ISerializable { private: // Target directory path. const std::string targetPath_; + // If non-empty, use this name instead of generating our own. + const std::string targetFileName_; // Staging directory path. const std::string writePath_; // Whether the table to be written is new, already existing or temporary. diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index c54ebd3e443be..c1e718478500f 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -1104,6 +1104,33 @@ TEST_F(BasicTableWriteTest, roundTrip) { assertEqualResults({data}, {copy}); } +TEST_F(BasicTableWriteTest, targetFileName) { + constexpr const char* kFileName = "test.dwrf"; + auto data = makeRowVector({makeFlatVector(10, folly::identity)}); + auto directory = TempDirectoryPath::create(); + auto plan = PlanBuilder() + .values({data}) + .tableWrite( + directory->getPath(), + dwio::common::FileFormat::DWRF, + {}, + nullptr, + kFileName) + .planNode(); + auto results = AssertQueryBuilder(plan).copyResults(pool()); + auto* details = results->childAt(TableWriteTraits::kFragmentChannel) + ->asUnchecked>(); + auto detail = folly::parseJson(details->valueAt(1)); + auto fileWriteInfos = detail["fileWriteInfos"]; + ASSERT_EQ(1, fileWriteInfos.size()); + ASSERT_EQ(fileWriteInfos[0]["writeFileName"].asString(), kFileName); + plan = PlanBuilder().tableScan(asRowType(data->type())).planNode(); + AssertQueryBuilder(plan) + .split(makeHiveConnectorSplit( + fmt::format("{}/{}", directory->getPath(), kFileName))) + .assertResults(data); +} + class PartitionedTableWriterTest : public TableWriteTest, public testing::WithParamInterface { diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 8ec7dc813a91c..59b719458024b 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -341,7 +341,8 @@ PlanBuilder& PlanBuilder::tableWrite( const std::string& outputDirectoryPath, const dwio::common::FileFormat fileFormat, const std::vector& aggregates, - const std::shared_ptr& options) { + const std::shared_ptr& options, + const std::string& outputFileName) { return tableWrite( outputDirectoryPath, {}, @@ -352,7 +353,8 @@ PlanBuilder& PlanBuilder::tableWrite( aggregates, kHiveDefaultConnectorId, {}, - options); + options, + outputFileName); } PlanBuilder& PlanBuilder::tableWrite( @@ -405,7 +407,8 @@ PlanBuilder& PlanBuilder::tableWrite( const std::vector& aggregates, const std::string_view& connectorId, const std::unordered_map& serdeParameters, - const std::shared_ptr& options) { + const std::shared_ptr& options, + const std::string& outputFileName) { VELOX_CHECK_NOT_NULL(planNode_, "TableWrite cannot be the source node"); auto rowType = planNode_->outputType(); @@ -428,7 +431,8 @@ PlanBuilder& PlanBuilder::tableWrite( auto locationHandle = std::make_shared( outputDirectoryPath, outputDirectoryPath, - connector::hive::LocationHandle::TableType::kNew); + connector::hive::LocationHandle::TableType::kNew, + outputFileName); std::shared_ptr bucketProperty; if (!partitionBy.empty() && bucketCount != 0) { bucketProperty = diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 4c4e74c66b2a8..092a471eaa86b 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -383,6 +383,11 @@ class PlanBuilder { /// @param aggregates Aggregations for column statistics collection during /// @param polymorphic options object to be passed to the writer. /// write, supported aggregation types vary for different column types. + + /// @param outputFileName Optional file name of the output. If specified + /// (non-empty), use it instead of generating the file name in Velox. Should + /// only be specified in non-bucketing write. + /// For example: /// Boolean: count, countIf. /// NumericType/Date/Timestamp: min, max, approx_distinct, count. @@ -393,7 +398,8 @@ class PlanBuilder { const dwio::common::FileFormat fileFormat = dwio::common::FileFormat::DWRF, const std::vector& aggregates = {}, - const std::shared_ptr& options = nullptr); + const std::shared_ptr& options = nullptr, + const std::string& outputFileName = ""); /// Adds a TableWriteNode to write all input columns into a partitioned Hive /// table without compression. @@ -447,6 +453,9 @@ class PlanBuilder { /// @param connectorId Name used to register the connector. /// @param serdeParameters Additional parameters passed to the writer. /// @param Option objects passed to the writer. + /// @param outputFileName Optional file name of the output. If specified + /// (non-empty), use it instead of generating the file name in Velox. Should + /// only be specified in non-bucketing write. PlanBuilder& tableWrite( const std::string& outputDirectoryPath, const std::vector& partitionBy, @@ -459,7 +468,8 @@ class PlanBuilder { const std::vector& aggregates = {}, const std::string_view& connectorId = kHiveDefaultConnectorId, const std::unordered_map& serdeParameters = {}, - const std::shared_ptr& options = nullptr); + const std::shared_ptr& options = nullptr, + const std::string& outputFileName = ""); /// Add a TableWriteMergeNode. PlanBuilder& tableWriteMerge(