Skip to content

Commit

Permalink
Revert "(cloud-merge) Support shadow tablet to do cumulative compacti…
Browse files Browse the repository at this point in the history
…on in cloud mode (apache#37293)"

This reverts commit b58b9e4.
  • Loading branch information
gavinchou committed Aug 4, 2024
1 parent ef7b9ad commit 8ad86a0
Show file tree
Hide file tree
Showing 39 changed files with 142 additions and 3,419 deletions.
38 changes: 1 addition & 37 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "service/backend_options.h"
#include "util/thread.h"
#include "util/uuid_generator.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris {
using namespace ErrorCode;
Expand Down Expand Up @@ -83,40 +82,21 @@ Status CloudBaseCompaction::prepare_compact() {
compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
compaction_job->set_base_compaction_cnt(_base_compaction_cnt);
compaction_job->set_cumulative_compaction_cnt(_cumulative_compaction_cnt);
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
using namespace std::chrono;
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
_expiration = now + config::compaction_timeout_seconds;
compaction_job->set_expiration(_expiration);
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);
cloud::StartTabletJobResponse resp;
//auto st = cloud::meta_mgr()->prepare_tablet_job(job, &resp);
auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (resp.has_alter_version()) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
}
if (!st.ok()) {
if (resp.status().code() == cloud::STALE_TABLET_CACHE) {
// set last_sync_time to 0 to force sync tablet next time
cloud_tablet()->last_sync_time_s = 0;
} else if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
// tablet not found
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. The input version end must "
"less than or equal to alter_version."
"current alter version in BE is not correct."
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " current alter_version=" << cloud_tablet->alter_version()
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
return st;
}
Expand Down Expand Up @@ -334,22 +314,6 @@ Status CloudBaseCompaction::modify_rowsets() {
if (!st.ok()) {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
auto* cloud_tablet = (static_cast<CloudTablet*>(_tablet.get()));
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. The input version end must "
"less than or equal to alter_version."
"current alter version in BE is not correct."
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " current alter_version=" << cloud_tablet->alter_version()
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
cloud_tablet->set_alter_version(resp.alter_version());
return Status::InternalError(msg);
}
return st;
}
Expand Down
46 changes: 8 additions & 38 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine,
CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;

Status CloudCumulativeCompaction::prepare_compact() {
if (_tablet->tablet_state() != TABLET_RUNNING &&
(!config::enable_new_tablet_do_compaction ||
static_cast<CloudTablet*>(_tablet.get())->alter_version() == -1)) {
if (_tablet->tablet_state() != TABLET_RUNNING) {
return Status::InternalError("invalid tablet state. tablet_id={}", _tablet->tablet_id());
}

Expand Down Expand Up @@ -112,11 +110,11 @@ Status CloudCumulativeCompaction::prepare_compact() {
_expiration = now + config::compaction_timeout_seconds;
compaction_job->set_expiration(_expiration);
compaction_job->set_lease(now + config::lease_compaction_interval_seconds * 4);

compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
// Set input version range to let meta-service check version range conflict
compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction);
if (config::enable_parallel_cumu_compaction) {
// Set input version range to let meta-service judge version range conflict
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
}
cloud::StartTabletJobResponse resp;
st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
if (!st.ok()) {
Expand All @@ -143,18 +141,6 @@ Status CloudCumulativeCompaction::prepare_compact() {
.tag("msg", resp.status().msg());
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("no suitable versions");
}
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
return st;
}
Expand Down Expand Up @@ -270,27 +256,12 @@ Status CloudCumulativeCompaction::modify_rowsets() {

cloud::FinishTabletJobResponse resp;
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
if (resp.has_alter_version()) {
(static_cast<CloudTablet*>(_tablet.get()))->set_alter_version(resp.alter_version());
}
if (!st.ok()) {
if (resp.status().code() == cloud::TABLET_NOT_FOUND) {
cloud_tablet()->clear_cache();
} else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION) {
std::stringstream ss;
ss << "failed to prepare cumu compaction. Check compaction input versions "
"failed in schema change. "
"input_version_start="
<< compaction_job->input_versions(0)
<< " input_version_end=" << compaction_job->input_versions(1)
<< " schema_change_alter_version=" << resp.alter_version();
std::string msg = ss.str();
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
return st;
}

auto& stats = resp.stats();
LOG(INFO) << "tablet stats=" << stats.ShortDebugString();
{
Expand Down Expand Up @@ -373,9 +344,8 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
std::shared_lock rlock(_tablet->get_header_lock());
_base_compaction_cnt = cloud_tablet()->base_compaction_cnt();
_cumulative_compaction_cnt = cloud_tablet()->cumulative_compaction_cnt();
int64_t candidate_version = std::max(
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1),
cloud_tablet()->alter_version() + 1);
int64_t candidate_version =
std::max(cloud_tablet()->cumulative_layer_point(), _max_conflict_version + 1);
// Get all rowsets whose version >= `candidate_version` as candidate rowsets
cloud_tablet()->traverse_rowsets(
[&candidate_rowsets, candidate_version](const RowsetSharedPtr& rs) {
Expand Down
18 changes: 8 additions & 10 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,19 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
};
if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) {
auto sync_st = tablet->sync_rowsets();
if (!sync_st.ok()) {
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << sync_st;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st);
return sync_st;
}
if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] {
if (sync_st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
return Status::Error<ErrorCode::INVALID_TABLET_STATE>(
"invalid tablet state {}. tablet_id={}", tablet->tablet_state(),
tablet->tablet_id());
return sync_st;
}
if (!sync_st.ok()) {
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << sync_st;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st);
return sync_st;
}
}
auto sync_rowset_time_us = MonotonicMicros() - t2;
Expand Down
5 changes: 1 addition & 4 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
tablet->last_sync_time_s = now;

// If is mow, the tablet has no delete bitmap in base rowsets.
// So dont need to sync it.
if (tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_state() == TABLET_RUNNING) {
if (tablet->enable_unique_key_merge_on_write()) {
DeleteBitmap delete_bitmap(tablet_id);
int64_t old_max_version = req.start_version() - 1;
auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),
Expand Down
Loading

0 comments on commit 8ad86a0

Please sign in to comment.