Skip to content

Commit

Permalink
[Fix](recycler) recycler need index_suffix and inverted index storage…
Browse files Browse the repository at this point in the history
… format to delete idx file (#38306)

## Proposed changes

Recycler did not correctly delete idx file for inverted index format V2
and variant data type.
  • Loading branch information
airborne12 authored Jul 25, 2024
1 parent 73a710f commit 1293912
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 40 deletions.
74 changes: 51 additions & 23 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class InstanceRecycler::InvertedIndexIdCache {
: instance_id_(std::move(instance_id)), txn_kv_(std::move(txn_kv)) {}

// Return 0 if success, 1 if schema kv not found, negative for error
int get(int64_t index_id, int32_t schema_version, std::vector<int64_t>& res) {
int get(int64_t index_id, int32_t schema_version, InvertedIndexInfo& res) {
{
std::lock_guard lock(mtx_);
if (schemas_without_inverted_index_.count({index_id, schema_version})) {
Expand Down Expand Up @@ -407,26 +407,29 @@ class InstanceRecycler::InvertedIndexIdCache {
LOG(WARNING) << "malformed schema value, key=" << hex(schema_key);
return -1;
}
if (schema.index_size() > 0) {
res.reserve(schema.index_size());
if (schema.index_size() > 0 && schema.has_inverted_index_storage_format()) {
res.first = schema.inverted_index_storage_format();
res.second.reserve(schema.index_size());
for (auto& i : schema.index()) {
res.push_back(i.index_id());
if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
res.second.push_back(std::make_pair(i.index_id(), i.index_suffix_name()));
}
}
}
insert(index_id, schema_version, res);
return 0;
}

// Empty `ids` means this schema has no inverted index
void insert(int64_t index_id, int32_t schema_version, const std::vector<int64_t>& ids) {
if (ids.empty()) {
void insert(int64_t index_id, int32_t schema_version, const InvertedIndexInfo& index_info) {
if (index_info.second.empty()) {
TEST_SYNC_POINT("InvertedIndexIdCache::insert1");
std::lock_guard lock(mtx_);
schemas_without_inverted_index_.emplace(index_id, schema_version);
} else {
TEST_SYNC_POINT("InvertedIndexIdCache::insert2");
std::lock_guard lock(mtx_);
inverted_index_id_map_.try_emplace({index_id, schema_version}, ids);
inverted_index_id_map_.try_emplace({index_id, schema_version}, index_info);
}
}

Expand All @@ -445,7 +448,7 @@ class InstanceRecycler::InvertedIndexIdCache {
}
};
// <index_id, schema_version> -> inverted_index_ids
std::unordered_map<Key, std::vector<int64_t>, HashOfKey> inverted_index_id_map_;
std::unordered_map<Key, InvertedIndexInfo, HashOfKey> inverted_index_id_map_;
// Store <index_id, schema_version> of schema which doesn't have inverted index
std::unordered_set<Key, HashOfKey> schemas_without_inverted_index_;
};
Expand Down Expand Up @@ -1273,17 +1276,26 @@ int InstanceRecycler::delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta
const auto& rowset_id = rs_meta_pb.rowset_id_v2();
int64_t tablet_id = rs_meta_pb.tablet_id();
// process inverted indexes
std::vector<int64_t> index_ids;
std::vector<std::pair<int64_t, std::string>> index_ids;
index_ids.reserve(rs_meta_pb.tablet_schema().index_size());
for (auto& i : rs_meta_pb.tablet_schema().index()) {
index_ids.push_back(i.index_id());
if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
index_ids.push_back(std::make_pair(i.index_id(), i.index_suffix_name()));
}
}
std::vector<std::string> file_paths;
file_paths.reserve(num_segments * (1 + index_ids.size()));
auto tablet_schema = rs_meta_pb.tablet_schema();
for (int64_t i = 0; i < num_segments; ++i) {
file_paths.push_back(segment_path(tablet_id, rowset_id, i));
for (int64_t index_id : index_ids) {
file_paths.push_back(inverted_index_path(tablet_id, rowset_id, i, index_id));
if (tablet_schema.has_inverted_index_storage_format()) {
if (tablet_schema.inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) {
for (const auto& index_id : index_ids) {
file_paths.push_back(inverted_index_path_v1(tablet_id, rowset_id, i,
index_id.first, index_id.second));
}
} else if (!index_ids.empty()) {
file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id, i));
}
}
}
// TODO(AlexYue): seems could do do batch
Expand All @@ -1294,7 +1306,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
int ret = 0;
// resource_id -> file_paths
std::map<std::string, std::vector<std::string>> resource_file_paths;
for (auto& rs : rowsets) {

for (const auto& rs : rowsets) {
{
std::lock_guard lock(recycled_tablets_mtx_);
if (recycled_tablets_.count(rs.tablet_id())) {
Expand All @@ -1317,14 +1330,21 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
int64_t num_segments = rs.num_segments();
if (num_segments <= 0) continue;

// process inverted indexes
std::vector<int64_t> index_ids;
// Process inverted indexes
std::vector<std::pair<int64_t, std::string>> index_ids;
// default format as v2.
InvertedIndexStorageFormatPB index_format = InvertedIndexStorageFormatPB::V2;

if (rs.has_tablet_schema()) {
index_ids.reserve(rs.tablet_schema().index().size());
for (auto& index_pb : rs.tablet_schema().index()) {
index_ids.push_back(index_pb.index_id());
for (const auto& index : rs.tablet_schema().index()) {
if (index.has_index_type() && index.index_type() == IndexType::INVERTED) {
index_ids.emplace_back(index.index_id(), index.index_suffix_name());
}
}
if (rs.tablet_schema().has_inverted_index_storage_format()) {
index_format = rs.tablet_schema().inverted_index_storage_format();
}
} else { // Detached schema
} else {
if (!rs.has_index_id() || !rs.has_schema_version()) {
LOG(WARNING) << "rowset must have either schema or schema_version and index_id, "
"instance_id="
Expand All @@ -1333,8 +1353,9 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
ret = -1;
continue;
}
InvertedIndexInfo index_info;
int get_ret =
inverted_index_id_cache_->get(rs.index_id(), rs.schema_version(), index_ids);
inverted_index_id_cache_->get(rs.index_id(), rs.schema_version(), index_info);
if (get_ret != 0) {
if (get_ret == 1) { // Schema kv not found
// Check tablet existence
Expand All @@ -1352,11 +1373,18 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
ret = -1;
continue;
}
index_format = index_info.first;
index_ids = std::move(index_info.second);
}
for (int64_t i = 0; i < num_segments; ++i) {
file_paths.push_back(segment_path(tablet_id, rowset_id, i));
for (int64_t index_id : index_ids) {
file_paths.push_back(inverted_index_path(tablet_id, rowset_id, i, index_id));
if (index_format == InvertedIndexStorageFormatPB::V1) {
for (const auto& index_id : index_ids) {
file_paths.push_back(inverted_index_path_v1(tablet_id, rowset_id, i,
index_id.first, index_id.second));
}
} else if (!index_ids.empty()) {
file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id, i));
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions cloud/src/recycler/recycler.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ class InstanceRecycler {

// TODO(plat1ko): Add new accessor to map in runtime for new created storage vaults
std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> accessor_map_;
using InvertedIndexInfo =
std::pair<InvertedIndexStorageFormatPB, std::vector<std::pair<int64_t, std::string>>>;

class InvertedIndexIdCache;
std::unique_ptr<InvertedIndexIdCache> inverted_index_id_cache_;
Expand Down
15 changes: 12 additions & 3 deletions cloud/src/recycler/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,18 @@ inline std::string segment_path(int64_t tablet_id, const std::string& rowset_id,
return fmt::format("data/{}/{}_{}.dat", tablet_id, rowset_id, segment_id);
}

inline std::string inverted_index_path(int64_t tablet_id, const std::string& rowset_id,
int64_t segment_id, int64_t index_id) {
return fmt::format("data/{}/{}_{}_{}.idx", tablet_id, rowset_id, segment_id, index_id);
inline std::string inverted_index_path_v2(int64_t tablet_id, const std::string& rowset_id,
int64_t segment_id) {
return fmt::format("data/{}/{}_{}.idx", tablet_id, rowset_id, segment_id);
}

inline std::string inverted_index_path_v1(int64_t tablet_id, const std::string& rowset_id,
int64_t segment_id, int64_t index_id,
std::string_view index_path_suffix) {
std::string suffix =
index_path_suffix.empty() ? "" : std::string {"@"} + index_path_suffix.data();
return fmt::format("data/{}/{}_{}_{}{}.idx", tablet_id, rowset_id, segment_id, index_id,
suffix);
}

inline std::string rowset_path_prefix(int64_t tablet_id, const std::string& rowset_id) {
Expand Down
52 changes: 38 additions & 14 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ static int create_recycle_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor,
auto path = segment_path(rowset.tablet_id(), rowset.rowset_id_v2(), i);
accessor->put_file(path, "");
for (auto& index : rowset.tablet_schema().index()) {
auto path = inverted_index_path(rowset.tablet_id(), rowset.rowset_id_v2(), i,
index.index_id());
auto path = inverted_index_path_v1(rowset.tablet_id(), rowset.rowset_id_v2(), i,
index.index_id(), index.index_suffix_name());
accessor->put_file(path, "");
}
}
Expand Down Expand Up @@ -200,8 +200,8 @@ static int create_tmp_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor,
auto path = segment_path(rowset.tablet_id(), rowset.rowset_id_v2(), i);
accessor->put_file(path, path);
for (auto& index : rowset.tablet_schema().index()) {
auto path = inverted_index_path(rowset.tablet_id(), rowset.rowset_id_v2(), i,
index.index_id());
auto path = inverted_index_path_v1(rowset.tablet_id(), rowset.rowset_id_v2(), i,
index.index_id(), index.index_suffix_name());
accessor->put_file(path, path);
}
}
Expand Down Expand Up @@ -247,7 +247,7 @@ static int create_committed_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor
auto path = segment_path(tablet_id, rowset_id, i);
accessor->put_file(path, "");
for (int j = 0; j < num_inverted_indexes; ++j) {
auto path = inverted_index_path(tablet_id, rowset_id, i, j);
auto path = inverted_index_path_v1(tablet_id, rowset_id, i, j, "");
accessor->put_file(path, "");
}
}
Expand Down Expand Up @@ -671,8 +671,11 @@ TEST(RecyclerTest, recycle_rowsets) {
for (int i = 0; i < 5; ++i) {
auto& schema = schemas.emplace_back();
schema.set_schema_version(i);
schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
for (int j = 0; j < i; ++j) {
schema.add_index()->set_index_id(j);
auto index = schema.add_index();
index->set_index_id(j);
index->set_index_type(IndexType::INVERTED);
}
}

Expand Down Expand Up @@ -750,8 +753,11 @@ TEST(RecyclerTest, bench_recycle_rowsets) {
for (int i = 0; i < 5; ++i) {
auto& schema = schemas.emplace_back();
schema.set_schema_version(i);
schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
for (int j = 0; j < i; ++j) {
schema.add_index()->set_index_id(j);
auto index = schema.add_index();
index->set_index_id(j);
index->set_index_type(IndexType::INVERTED);
}
}

Expand Down Expand Up @@ -812,9 +818,12 @@ TEST(RecyclerTest, recycle_tmp_rowsets) {
std::vector<doris::TabletSchemaCloudPB> schemas;
for (int i = 0; i < 5; ++i) {
auto& schema = schemas.emplace_back();
schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
schema.set_schema_version(i);
for (int j = 0; j < i; ++j) {
schema.add_index()->set_index_id(j);
auto index = schema.add_index();
index->set_index_id(j);
index->set_index_type(IndexType::INVERTED);
}
}

Expand Down Expand Up @@ -873,7 +882,9 @@ TEST(RecyclerTest, recycle_tablet) {
auto& schema = schemas.emplace_back();
schema.set_schema_version(i);
for (int j = 0; j < i; ++j) {
schema.add_index()->set_index_id(j);
auto index = schema.add_index();
index->set_index_id(j);
index->set_index_type(IndexType::INVERTED);
}
}

Expand Down Expand Up @@ -945,8 +956,11 @@ TEST(RecyclerTest, recycle_indexes) {
for (int i = 0; i < 5; ++i) {
auto& schema = schemas.emplace_back();
schema.set_schema_version(i);
schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
for (int j = 0; j < i; ++j) {
schema.add_index()->set_index_id(j);
auto index = schema.add_index();
index->set_index_id(j);
index->set_index_type(IndexType::INVERTED);
}
}

Expand Down Expand Up @@ -1054,8 +1068,11 @@ TEST(RecyclerTest, recycle_partitions) {
for (int i = 0; i < 5; ++i) {
auto& schema = schemas.emplace_back();
schema.set_schema_version(i);
schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
for (int j = 0; j < i; ++j) {
schema.add_index()->set_index_id(j);
auto index = schema.add_index();
index->set_index_id(j);
index->set_index_type(IndexType::INVERTED);
}
}

Expand Down Expand Up @@ -1986,7 +2003,9 @@ TEST(RecyclerTest, recycle_deleted_instance) {
auto& schema = schemas.emplace_back();
schema.set_schema_version(i);
for (int j = 0; j < i; ++j) {
schema.add_index()->set_index_id(j);
auto index = schema.add_index();
index->set_index_id(j);
index->set_index_type(IndexType::INVERTED);
}
}

Expand Down Expand Up @@ -2254,7 +2273,9 @@ TEST(CheckerTest, DISABLED_abnormal_inverted_check) {
auto& schema = schemas.emplace_back();
schema.set_schema_version(i);
for (int j = 0; j < i; ++j) {
schema.add_index()->set_index_id(j);
auto index = schema.add_index();
index->set_index_id(j);
index->set_index_type(IndexType::INVERTED);
}
}

Expand Down Expand Up @@ -2538,8 +2559,11 @@ TEST(RecyclerTest, delete_rowset_data) {
for (int i = 0; i < 5; ++i) {
auto& schema = schemas.emplace_back();
schema.set_schema_version(i);
schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);
for (int j = 0; j < i; ++j) {
schema.add_index()->set_index_id(j);
auto index = schema.add_index();
index->set_index_id(j);
index->set_index_type(IndexType::INVERTED);
}
}

Expand Down

0 comments on commit 1293912

Please sign in to comment.