Skip to content

Commit

Permalink
Add subfield filter for the delete file path column
Browse files Browse the repository at this point in the history
  • Loading branch information
yingsu00 committed Nov 15, 2023
1 parent f44ad01 commit 6edc1db
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 16 deletions.
8 changes: 7 additions & 1 deletion velox/connectors/hive/iceberg/DeleteFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h"
#include "velox/type/Filter.h"

namespace facebook::velox::connector::hive::iceberg {
DeleteFileReader::DeleteFileReader(
Expand Down Expand Up @@ -82,7 +83,12 @@ void DeleteFileReader::createPositionalDeleteDataSource(
// TODO: Build filters on the path column: filePathColumn = baseFilePath_
// TODO: Build filters on the positionsColumn:
// positionsColumn >= baseReadOffset_ + splitOffsetInFile
SubfieldFilters subfieldFilters = {};
SubfieldFilters subfieldFilters;
std::vector<std::string> values = {baseFilePath_};
std::unique_ptr<common::Filter> pathFilter =
std::make_unique<common::BytesValues>(values, false);
subfieldFilters[common::Subfield(filePathColumn->name)] =
std::move(pathFilter);

auto deleteTableHandle = std::make_shared<HiveTableHandle>(
connectorId, deleteFileName, false, std::move(subfieldFilters), nullptr);
Expand Down
98 changes: 83 additions & 15 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,36 @@ namespace facebook::velox::connector::hive::iceberg {

class HiveIcebergTest : public HiveConnectorTestBase {
public:
void assertPositionalDeletes(const std::vector<int64_t>& deleteRows) {
void assertPositionalDeletes(
const std::vector<int64_t>& deleteRows,
bool multipleBaseFiles = false) {
assertPositionalDeletes(
deleteRows,
"SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) +
")");
"SELECT * FROM tmp WHERE c0 NOT IN (" + makeNotInList(deleteRows) + ")",
multipleBaseFiles);
}
void assertPositionalDeletes(
const std::vector<int64_t>& deleteRows,
std::string duckdbSql) {
std::string duckdbSql,
bool multipleBaseFiles = false) {
std::shared_ptr<TempFilePath> dataFilePath = writeDataFile(rowCount);
std::shared_ptr<TempFilePath> deleteFilePath =
writePositionDeleteFile(dataFilePath->path, deleteRows);

std::mt19937 gen{0};
int64_t numDeleteRowsBefore =
multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0;
int64_t numDeleteRowsAfter =
multipleBaseFiles ? folly::Random::rand32(0, 1000, gen) : 0;
std::shared_ptr<TempFilePath> deleteFilePath = writePositionDeleteFile(
dataFilePath->path,
deleteRows,
numDeleteRowsBefore,
numDeleteRowsAfter);

IcebergDeleteFile deleteFile(
FileContent::kPositionalDeletes,
deleteFilePath->path,
fileFomat_,
deleteRows.size(),
deleteRows.size() + numDeleteRowsBefore + numDeleteRowsAfter,
testing::internal::GetFileSize(
std::fopen(deleteFilePath->path.c_str(), "r")));

Expand Down Expand Up @@ -135,14 +147,50 @@ class HiveIcebergTest : public HiveConnectorTestBase {

std::shared_ptr<TempFilePath> writePositionDeleteFile(
const std::string& dataFilePath,
const std::vector<int64_t>& deleteRows) {
uint32_t numDeleteRows = deleteRows.size();
const std::vector<int64_t>& deleteRows,
int64_t numRowsBefore = 0,
int64_t numRowsAfter = 0) {
// if containsMultipleDataFiles == true, we will write rows for other base
// files before and after the target base file
uint32_t numDeleteRows = numRowsBefore + deleteRows.size() + numRowsAfter;

auto child = vectorMaker_.flatVector<int64_t>(std::vector<int64_t>{1UL});

auto filePathVector = vectorMaker_.flatVector<StringView>(
numDeleteRows, [&](auto row) { return StringView(dataFilePath); });
auto deletePositionsVector = vectorMaker_.flatVector<int64_t>(deleteRows);
auto filePathVector =
vectorMaker_.flatVector<StringView>(numDeleteRows, [&](auto row) {
if (row < numRowsBefore) {
std::string dataFilePathBefore = dataFilePath + "_before";
return StringView(dataFilePathBefore);
} else if (
row >= numRowsBefore && row < deleteRows.size() + numRowsBefore) {
return StringView(dataFilePath);
} else if (
row >= deleteRows.size() + numRowsBefore && row < numDeleteRows) {
std::string dataFilePathAfter = dataFilePath + "_after";
return StringView(dataFilePathAfter);
} else {
return StringView();
}
});

std::vector<int64_t> deleteRowsVec;
deleteRowsVec.reserve(numDeleteRows);

if (numRowsBefore > 0) {
auto rowsBefore = makeSequenceRows(numRowsBefore);
deleteRowsVec.insert(
deleteRowsVec.end(), rowsBefore.begin(), rowsBefore.end());
}
deleteRowsVec.insert(
deleteRowsVec.end(), deleteRows.begin(), deleteRows.end());
if (numRowsAfter > 0) {
auto rowsAfter = makeSequenceRows(numRowsAfter);
deleteRowsVec.insert(
deleteRowsVec.end(), rowsAfter.begin(), rowsAfter.end());
}

auto deletePositionsVector =
vectorMaker_.flatVector<int64_t>(deleteRowsVec);
RowVectorPtr deleteFileVectors = makeRowVector(
{pathColumn_->name, posColumn_->name},
{filePathVector, deletePositionsVector});
Expand Down Expand Up @@ -189,7 +237,7 @@ class HiveIcebergTest : public HiveConnectorTestBase {
ICEBERG_DELETE_FILE_POSITIONS_COLUMN();
};

TEST_F(HiveIcebergTest, positionalDeletes) {
TEST_F(HiveIcebergTest, positionalDeletesSingleBaseFile) {
folly::SingletonVault::singleton()->registrationComplete();

// Delete row 0, 1, 2, 3 from the first batch out of two.
Expand All @@ -201,12 +249,32 @@ TEST_F(HiveIcebergTest, positionalDeletes) {
// Delete random rows
assertPositionalDeletes(makeRandomDeleteRows(rowCount));
// Delete 0 rows
assertPositionalDeletes({}, "SELECT * FROM tmp");
assertPositionalDeletes({}, "SELECT * FROM tmp", false);
// Delete all rows
assertPositionalDeletes(
makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0");
makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", false);
// Delete rows that don't exist
assertPositionalDeletes({20000, 29999});
}

TEST_F(HiveIcebergTest, positionalDeletesMultipleBaseFiles) {
folly::SingletonVault::singleton()->registrationComplete();

// // Delete row 0, 1, 2, 3 from the first batch out of two.
// assertPositionalDeletes({0, 1, 2, 3}, true);
// Delete the first and last row in each batch (10000 rows per batch)
assertPositionalDeletes({0, 9999, 10000, 19999}, true);
// Delete several rows in the second batch (10000 rows per batch)
assertPositionalDeletes({10000, 10002, 19999}, true);
// Delete random rows
assertPositionalDeletes(makeRandomDeleteRows(rowCount), true);
// Delete 0 rows
assertPositionalDeletes({}, "SELECT * FROM tmp", true);
// Delete all rows
assertPositionalDeletes(
makeSequenceRows(rowCount), "SELECT * FROM tmp WHERE 1 = 0", true);
// Delete rows that don't exist
assertPositionalDeletes({20000, 29999}, true);
}

} // namespace facebook::velox::connector::hive::iceberg

0 comments on commit 6edc1db

Please sign in to comment.