Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(cloud-merge) Support shadow tablet to do cumulative compaction in cloud mode #37293

Merged
merged 15 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "service/backend_options.h"
#include "util/thread.h"
#include "util/uuid_generator.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris {
using namespace ErrorCode;
Expand Down Expand Up @@ -82,21 +83,40 @@ Status CloudBaseCompaction::prepare_compact() {
compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
compaction_job->set_base_compaction_cnt(_base_compaction_cnt);
compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt);
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
using namespace std::chrono;
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
_expiration = now + config::compaction_timeout_seconds;
compaction_job->set_expiration(_expiration);
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
cloud::StartTabletJobResponse resp;
//auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp);
auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (resp.has_alter_version()) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
}
if (!st.ok()) {
if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
// set last_sync_time to 0 to force sync tablet next time
cloud_tablet()->last_sync_time_s = 0;
} else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
// tablet not found
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. The input version end must "
"less than or equal to alter_version."
"current alter version in BE is not correct."
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " current alter_version=" << cloud_tablet->alter_version()
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
return st;
}
Expand Down Expand Up @@ -314,6 +334,22 @@ Status CloudBaseCompaction::modify_rowsets() {
if (!st.ok()) {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. The input version end must "
"less than or equal to alter_version."
"current alter version in BE is not correct."
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " current alter_version=" << cloud_tablet->alter_version()
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
cloud_tablet->set_alter_version(resp.alter_version());
return Status::InternalError(msg);
}
return st;
}
Expand Down
46 changes: 38 additions & 8 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine,
CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;

Status CloudCumulativeCompaction::prepare_compact() {
if (_tablet->tablet_state() != TABLET_RUNNING) {
if (_tablet->tablet_state() != TABLET_RUNNING &&
(!config::enable_new_tablet_do_compaction ||
static_cast<CloudTablet*>(_tablet.get())->alter_version() == -1)) {
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
Lchangliang marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -110,11 +112,11 @@ Status CloudCumulativeCompaction::prepare_compact() {
_expiration = now + config::compaction_timeout_seconds;
compaction_job->set_expiration(_expiration);
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
if (config::enable_parallel_cumu_compaction) {
// Set input version range to let meta-service judge version range conflict
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
}

compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
// Set input version range to let meta-service check version range conflict
compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction);
cloud::StartTabletJobResponse resp;
st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (!st.ok()) {
Expand All @@ -141,6 +143,18 @@ Status CloudCumulativeCompaction::prepare_compact() {
.tag("msg", resp.status().msg());
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions");
}
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
return st;
}
Expand Down Expand Up @@ -256,12 +270,27 @@ Status CloudCumulativeCompaction::modify_rowsets() {

cloud::FinishTabletJobResponse resp;
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
if (resp.has_alter_version()) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
}
if (!st.ok()) {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
return st;
}

auto& stats = resp.stats();
LOG(INFO) << "tablet stats=" << stats.ShortDebugString();
{
Expand Down Expand Up @@ -344,8 +373,9 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
std::shared_lock rlock(_tablet->get_header_lock());
_base_compaction_cnt = cloud_tablet()->base_compaction_cnt();
_cumulative_compaction_cnt = cloud_tablet()->cumulative_compaction_cnt();
int64_t candidate_version =
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1);
int64_t candidate_version = std::max(
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1),
cloud_tablet()->alter_version() + 1);
// Get all rowsets whose version >= `candidate_version` as candidate rowsets
cloud_tablet()->traverse_rowsets(
[&candidate_rowsets, candidate_version](const RowsetSharedPtr& rs) {
Expand Down
18 changes: 10 additions & 8 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,22 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
};
if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) {
auto sync_st = tablet->sync_rowsets();
if (sync_st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
return sync_st;
}
if (!sync_st.ok()) {
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << sync_st;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st);
return sync_st;
}
if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] {
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
return Status::Error<ErrorCode::INVALID_TABLET_STATE>(
"invalid tablet state {}. tablet_id={}", tablet->tablet_state(),
tablet->tablet_id());
}
}
auto sync_rowset_time_us = MonotonicMicros() - t2;
max_version = tablet->max_version_unlocked();
Expand Down
5 changes: 4 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
tablet->last_sync_time_s = now;

if (tablet->enable_unique_key_merge_on_write()) {
// If is mow, the tablet has no delete bitmap in base rowsets.
// So dont need to sync it.
if (tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_state() == TABLET_RUNNING) {
Lchangliang marked this conversation as resolved.
Show resolved Hide resolved
DeleteBitmap delete_bitmap(tablet_id);
int64_t old_max_version = req.start_version() - 1;
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),
Expand Down
Loading
Loading