Skip to content

Commit

Permalink
[feature](cloud-compaction) Support shadow tablet to do cumulative co…
Browse files Browse the repository at this point in the history
…mpaction during schema change in cloud mode (#39558)

In cloud mode, when do schema change, shadow tablet encounters -235
because it cant do cumulative compaction in the case of a large number
of loads. And it will prevents the user from continuing to loads.
Implementation details:
1. When start schema change, record the end convert rowset version
`alter_version` into SchemaChangeJob.
2. For origin tablet, only can do base compaction in [0,
`alter_version`] and do cumulative compaction in (`alter_version`, N].
can not do compaction across `alter_verison` such as compaction [a,
`alter_version` + n].
3. For shadow tablet, cannot do base compaction and and do cumulative
compaction in (`alter_version`, N].
4. When the schema change failed because FE or BE coredump, it will
retry. When retry the schema change, it will get the `alter_version`
from meta_serive, and continue to do it.
5. When finish the schema change job or cancel it, we need to clear the
schema change job. Before this pr, it will cover by next schema change.

co-author(main author): @Lchangliang
original PR: #37293

---------

Co-authored-by: Lightman <[email protected]>
  • Loading branch information
2 people authored and gavinchou committed Sep 11, 2024
1 parent bd2453e commit a0b16b0
Show file tree
Hide file tree
Showing 38 changed files with 3,425 additions and 144 deletions.
38 changes: 37 additions & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "service/backend_options.h"
#include "util/thread.h"
#include "util/uuid_generator.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris {
using namespace ErrorCode;
Expand Down Expand Up @@ -82,21 +83,40 @@ Status CloudBaseCompaction::prepare_compact() {
compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
compaction_job->set_base_compaction_cnt(_base_compaction_cnt);
compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt);
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
using namespace std::chrono;
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
_expiration = now + config::compaction_timeout_seconds;
compaction_job->set_expiration(_expiration);
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
cloud::StartTabletJobResponse resp;
//auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp);
auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (resp.has_alter_version()) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
}
if (!st.ok()) {
if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
// set last_sync_time to 0 to force sync tablet next time
cloud_tablet()->last_sync_time_s = 0;
} else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
// tablet not found
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. The input version end must "
"less than or equal to alter_version."
"current alter version in BE is not correct."
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " current alter_version=" << cloud_tablet->alter_version()
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
return st;
}
Expand Down Expand Up @@ -320,6 +340,22 @@ Status CloudBaseCompaction::modify_rowsets() {
if (!st.ok()) {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. The input version end must "
"less than or equal to alter_version."
"current alter version in BE is not correct."
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " current alter_version=" << cloud_tablet->alter_version()
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
cloud_tablet->set_alter_version(resp.alter_version());
return Status::InternalError(msg);
}
return st;
}
Expand Down
46 changes: 38 additions & 8 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine,
CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;

Status CloudCumulativeCompaction::prepare_compact() {
if (_tablet->tablet_state() != TABLET_RUNNING) {
if (_tablet->tablet_state() != TABLET_RUNNING &&
(!config::enable_new_tablet_do_compaction ||
static_cast<CloudTablet*>(_tablet.get())->alter_version() == -1)) {
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
}

Expand Down Expand Up @@ -110,11 +112,11 @@ Status CloudCumulativeCompaction::prepare_compact() {
_expiration = now + config::compaction_timeout_seconds;
compaction_job->set_expiration(_expiration);
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
if (config::enable_parallel_cumu_compaction) {
// Set input version range to let meta-service judge version range conflict
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
}

compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
// Set input version range to let meta-service check version range conflict
compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction);
cloud::StartTabletJobResponse resp;
st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (!st.ok()) {
Expand All @@ -141,6 +143,18 @@ Status CloudCumulativeCompaction::prepare_compact() {
.tag("msg", resp.status().msg());
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions");
}
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
return st;
}
Expand Down Expand Up @@ -262,12 +276,27 @@ Status CloudCumulativeCompaction::modify_rowsets() {

cloud::FinishTabletJobResponse resp;
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
if (resp.has_alter_version()) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
}
if (!st.ok()) {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
return st;
}

auto& stats = resp.stats();
LOG(INFO) << "tablet stats=" << stats.ShortDebugString();
{
Expand Down Expand Up @@ -350,8 +379,9 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
std::shared_lock rlock(_tablet->get_header_lock());
_base_compaction_cnt = cloud_tablet()->base_compaction_cnt();
_cumulative_compaction_cnt = cloud_tablet()->cumulative_compaction_cnt();
int64_t candidate_version =
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1);
int64_t candidate_version = std::max(
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1),
cloud_tablet()->alter_version() + 1);
// Get all rowsets whose version >= `candidate_version` as candidate rowsets
cloud_tablet()->traverse_rowsets(
[&candidate_rowsets, candidate_version](const RowsetSharedPtr& rs) {
Expand Down
18 changes: 10 additions & 8 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,22 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
};
if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) {
auto sync_st = tablet->sync_rowsets();
if (sync_st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
return sync_st;
}
if (!sync_st.ok()) {
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << sync_st;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st);
return sync_st;
}
if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] {
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
return Status::Error<ErrorCode::INVALID_TABLET_STATE>(
"invalid tablet state {}. tablet_id={}", tablet->tablet_state(),
tablet->tablet_id());
}
}
auto sync_rowset_time_us = MonotonicMicros() - t2;
max_version = tablet->max_version_unlocked();
Expand Down
5 changes: 4 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
tablet->last_sync_time_s = now;

if (tablet->enable_unique_key_merge_on_write()) {
// If is mow, the tablet has no delete bitmap in base rowsets.
// So dont need to sync it.
if (tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_state() == TABLET_RUNNING) {
DeleteBitmap delete_bitmap(tablet_id);
int64_t old_max_version = req.start_version() - 1;
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),
Expand Down
Loading

0 comments on commit a0b16b0

Please sign in to comment.