diff --git a/velox/connectors/hive/iceberg/DeleteFileReader.cpp b/velox/connectors/hive/iceberg/DeleteFileReader.cpp index 143f2e73609b..daa428e2475c 100644 --- a/velox/connectors/hive/iceberg/DeleteFileReader.cpp +++ b/velox/connectors/hive/iceberg/DeleteFileReader.cpp @@ -18,6 +18,7 @@ #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" +#include "velox/type/Filter.h" namespace facebook::velox::connector::hive::iceberg { DeleteFileReader::DeleteFileReader( @@ -82,7 +83,12 @@ void DeleteFileReader::createPositionalDeleteDataSource( // TODO: Build filters on the path column: filePathColumn = baseFilePath_ // TODO: Build filters on the positionsColumn: // positionsColumn >= baseReadOffset_ + splitOffsetInFile - SubfieldFilters subfieldFilters = {}; + SubfieldFilters subfieldFilters; + std::vector values = {baseFilePath_}; + std::unique_ptr pathFilter = + std::make_unique(values, false); + subfieldFilters[common::Subfield(filePathColumn->name)] = + std::move(pathFilter); auto deleteTableHandle = std::make_shared( connectorId, deleteFileName, false, std::move(subfieldFilters), nullptr); diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 6b599861312a..3b6c24d38d9a 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -34,24 +34,36 @@ namespace facebook::velox::connector::hive::iceberg { class HiveIcebergTest : public HiveConnectorTestBase { public: - void assertPositionalDeletes(const std::vector& deleteRows) { + void assertPositionalDeletes( + const std::vector& deleteRows, + bool multipleBaseFiles = false) { assertPositionalDeletes( deleteRows, - "SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + - ")"); + "SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + ")", + multipleBaseFiles); } void assertPositionalDeletes( const std::vector& deleteRows, - std::string duckdbSql) { + std::string duckdbSql, + bool multipleBaseFiles = false) { std::shared_ptr dataFilePath = writeDataFile(rowCount); - std::shared_ptr deleteFilePath = - writePositionDeleteFile(dataFilePath->path, deleteRows); + + std::mt19937 gen{0}; + int64_t numDeleteRowsBefore = + multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0; + int64_t numDeleteRowsAfter = + multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0; + std::shared_ptr deleteFilePath = writePositionDeleteFile( + dataFilePath->path, + deleteRows, + numDeleteRowsBefore, + numDeleteRowsAfter); IcebergDeleteFile deleteFile( FileContent::kPositionalDeletes, deleteFilePath->path, fileFomat_, - deleteRows.size(), + deleteRows.size() + numDeleteRowsBefore + numDeleteRowsAfter, testing::internal::GetFileSize( std::fopen(deleteFilePath->path.c_str(), "r"))); @@ -135,14 +147,50 @@ class HiveIcebergTest : public HiveConnectorTestBase { std::shared_ptr writePositionDeleteFile( const std::string& dataFilePath, - const std::vector& deleteRows) { - uint32_t numDeleteRows = deleteRows.size(); + const std::vector& deleteRows, + int64_t numRowsBefore = 0, + int64_t numRowsAfter = 0) { + // if containsMultipleDataFiles == true, we will write rows for other base + // files before and after the target base file + uint32_t numDeleteRows = numRowsBefore + deleteRows.size() + numRowsAfter; auto child = vectorMaker_.flatVector(std::vector{1UL}); - auto filePathVector = vectorMaker_.flatVector( - numDeleteRows, [&](auto row) { return StringView(dataFilePath); }); - auto deletePositionsVector = vectorMaker_.flatVector(deleteRows); + auto filePathVector = + vectorMaker_.flatVector(numDeleteRows, [&](auto row) { + if (row < numRowsBefore) { + std::string dataFilePathBefore = dataFilePath + "_before"; + return StringView(dataFilePathBefore); + } else if ( + row >= numRowsBefore && row < deleteRows.size() + numRowsBefore) { + return StringView(dataFilePath); + } else if ( + row >= deleteRows.size() + numRowsBefore && row < numDeleteRows) { + std::string dataFilePathAfter = dataFilePath + "_after"; + return StringView(dataFilePathAfter); + } else { + return StringView(); + } + }); + + std::vector deleteRowsVec; + deleteRowsVec.reserve(numDeleteRows); + + if (numRowsBefore > 0) { + auto rowsBefore = makeSequenceRows(numRowsBefore); + deleteRowsVec.insert( + deleteRowsVec.end(), rowsBefore.begin(), rowsBefore.end()); + } + deleteRowsVec.insert( + deleteRowsVec.end(), deleteRows.begin(), deleteRows.end()); + if (numRowsAfter > 0) { + auto rowsAfter = makeSequenceRows(numRowsAfter); + deleteRowsVec.insert( + deleteRowsVec.end(), rowsAfter.begin(), rowsAfter.end()); + } + + auto deletePositionsVector = + vectorMaker_.flatVector(deleteRowsVec); RowVectorPtr deleteFileVectors = makeRowVector( {pathColumn_->name, posColumn_->name}, {filePathVector, deletePositionsVector}); @@ -189,7 +237,7 @@ class HiveIcebergTest : public HiveConnectorTestBase { ICEBERG_DELETE_FILE_POSITIONS_COLUMN(); }; -TEST_F(HiveIcebergTest, positionalDeletes) { +TEST_F(HiveIcebergTest, positionalDeletesSingleBaseFile) { folly::SingletonVault::singleton()->registrationComplete(); // Delete row 0, 1, 2, 3 from the first batch out of two. @@ -201,12 +249,32 @@ TEST_F(HiveIcebergTest, positionalDeletes) { // Delete random rows assertPositionalDeletes(makeRandomDeleteRows(rowCount)); // Delete 0 rows - assertPositionalDeletes({}, "SELECT * FROM tmp"); + assertPositionalDeletes({}, "SELECT * FROM tmp", false); // Delete all rows assertPositionalDeletes( - makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0"); + makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", false); // Delete rows that don't exist assertPositionalDeletes({20000, 29999}); } +TEST_F(HiveIcebergTest, positionalDeletesMultipleBaseFiles) { + folly::SingletonVault::singleton()->registrationComplete(); + + // // Delete row 0, 1, 2, 3 from the first batch out of two. + // assertPositionalDeletes({0, 1, 2, 3}, true); + // Delete the first and last row in each batch (10000 rows per batch) + assertPositionalDeletes({0, 9999, 10000, 19999}, true); + // Delete several rows in the second batch (10000 rows per batch) + assertPositionalDeletes({10000, 10002, 19999}, true); + // Delete random rows + assertPositionalDeletes(makeRandomDeleteRows(rowCount), true); + // Delete 0 rows + assertPositionalDeletes({}, "SELECT * FROM tmp", true); + // Delete all rows + assertPositionalDeletes( + makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", true); + // Delete rows that don't exist + assertPositionalDeletes({20000, 29999}, true); +} + } // namespace facebook::velox::connector::hive::iceberg \ No newline at end of file