Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Jul 15, 2024
1 parent 52d156b commit cc58284
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 71 deletions.
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,7 @@ Status CloudCumulativeCompaction::prepare_compact() {
.tag("tablet_max_version", cloud_tablet()->max_version_unlocked())
.tag("cumulative_point", cloud_tablet()->cumulative_layer_point())
.tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0))
.tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0))
.tag("lightman 0711 alter_version", cloud_tablet()->alter_version());
.tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0));
return st;
}

Expand Down
11 changes: 6 additions & 5 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ CloudSchemaChangeJob::~CloudSchemaChangeJob() = default;

Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) {

DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.sleep",
{ std::this_thread::sleep_for(std::chrono::seconds(600)); });
// new tablet has to exist
_new_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.new_tablet_id));
if (_new_tablet->tablet_state() == TABLET_RUNNING) {
Expand Down Expand Up @@ -100,7 +98,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
sc_job->set_id(_job_id);
sc_job->set_initiator(BackendOptions::get_localhost() + ':' +
std::to_string(config::heartbeat_service_port));
sc_job->set_alter_version(request.alter_version);
sc_job->set_alter_version(base_max_version);
auto* new_tablet_idx = sc_job->mutable_new_tablet_idx();
new_tablet_idx->set_tablet_id(_new_tablet->tablet_id());
new_tablet_idx->set_table_id(_new_tablet->table_id());
Expand Down Expand Up @@ -135,6 +133,9 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
<< ", new_tablet_id=" << request.new_tablet_id
<< ", alter_version=" << start_resp.alter_version() << ", job_id=" << _job_id;
sc_job->set_alter_version(start_resp.alter_version());

DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.sleep",
{ std::this_thread::sleep_for(std::chrono::seconds(120)); });
// FIXME(cyx): Should trigger compaction on base_tablet if there are too many rowsets to convert.

// Create a new tablet schema, should merge with dropped columns in light weight schema change
Expand All @@ -152,7 +153,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
delete_predicates.push_back(rs_meta);
}
}
RETURN_IF_ERROR(delete_handler.init(_base_tablet_schema, delete_predicates, base_max_version));
RETURN_IF_ERROR(delete_handler.init(_base_tablet_schema, delete_predicates, start_resp.alter_version()));

std::vector<ColumnId> return_columns;
return_columns.resize(_base_tablet_schema->num_columns());
Expand All @@ -169,7 +170,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS;
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, base_max_version);
reader_context.version = Version(0, start_resp.alter_version());

for (auto& split : rs_splits) {
RETURN_IF_ERROR(split.rs_reader->init(&reader_context));
Expand Down
14 changes: 7 additions & 7 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,21 +546,21 @@ std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_task
std::function<bool(CloudTablet*)> filter_out;
if (compaction_type == CompactionType::BASE_COMPACTION) {
filter_out = [&submitted_base_compactions, &submitted_full_compactions](CloudTablet* t) {
return !!submitted_base_compactions.count(t->tablet_id()) ||
!!submitted_full_compactions.count(t->tablet_id()) ||
return submitted_base_compactions.contains(t->tablet_id()) ||
submitted_full_compactions.contains(t->tablet_id()) ||
t->tablet_state() != TABLET_RUNNING;
};
} else if (config::enable_parallel_cumu_compaction) {
filter_out = [&tablet_preparing_cumu_compaction](CloudTablet* t) {
return !!tablet_preparing_cumu_compaction.count(t->tablet_id()) ||
t->tablet_state() != TABLET_RUNNING;
return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
(t->tablet_state() != TABLET_RUNNING && t->alter_version() == -1);
};
} else {
filter_out = [&tablet_preparing_cumu_compaction,
&submitted_cumu_compactions](CloudTablet* t) {
return !!tablet_preparing_cumu_compaction.count(t->tablet_id()) ||
!!submitted_cumu_compactions.count(t->tablet_id()) ||
t->tablet_state() != TABLET_RUNNING;
return tablet_preparing_cumu_compaction.contains(t->tablet_id()) ||
submitted_cumu_compactions.contains(t->tablet_id()) ||
(t->tablet_state() != TABLET_RUNNING && t->alter_version() == -1);
};
}

Expand Down
55 changes: 1 addition & 54 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) {
RETURN_IF_ERROR(sync_if_not_running());

if (query_version > 0) {
std::shared_lock rlock(_meta_lock);
Expand All @@ -131,58 +130,6 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data)
return st;
}

// Sync tablet meta and all rowset meta if not running.
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
// It should be a quite rare situation.
Status CloudTablet::sync_if_not_running() {
if (tablet_state() == TABLET_RUNNING) {
return Status::OK();
}

// Serially execute sync to reduce unnecessary network overhead
std::lock_guard lock(_sync_meta_lock);

{
std::shared_lock rlock(_meta_lock);
if (tablet_state() == TABLET_RUNNING) {
return Status::OK();
}
}

TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
if (!st.ok()) {
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
return st;
}

if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
// MoW may go to here when load while schema change
return Status::Error<INVALID_TABLET_STATE>("invalid tablet state {}. tablet_id={}",
tablet_meta->tablet_state(), tablet_id());
}

TimestampedVersionTracker empty_tracker;
{
std::lock_guard wlock(_meta_lock);
RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
_rs_version_map.clear();
_stale_rs_version_map.clear();
std::swap(_timestamped_version_tracker, empty_tracker);
_tablet_meta->clear_rowsets();
_tablet_meta->clear_stale_rowset();
_max_version = -1;
}

st = _engine.meta_mgr().sync_tablet_rowsets(this);
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
return st;
}

void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
std::unique_lock<std::shared_mutex>& meta_lock,
bool warmup_delta_data) {
Expand Down Expand Up @@ -574,7 +521,7 @@ std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compact
std::shared_lock rlock(_meta_lock);
for (const auto& [version, rs] : _rs_version_map) {
if (version.first != 0 && version.first < _cumulative_point &&
(_alter_version == -1 || version.first < _alter_version)) {
(_alter_version == -1 || version.second <= _alter_version)) {
candidate_rowsets.push_back(rs);
}
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ class CloudTablet final : public BaseTablet {

static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);

Status sync_if_not_running();

CloudStorageEngine& _engine;

// this mutex MUST ONLY be used when sync meta
Expand Down
2 changes: 1 addition & 1 deletion cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ bool check_compaction_input_verions(const TabletCompactionJobPB& compaction,
<< proto_to_json(compaction);
int64_t alter_version = job_pb.schema_change().alter_version();
return (compaction.type() == TabletCompactionJobPB_CompactionType_BASE &&
compaction.input_versions(1) < alter_version) ||
compaction.input_versions(1) <= alter_version) ||
(compaction.type() == TabletCompactionJobPB_CompactionType_CUMULATIVE &&
compaction.input_versions(0) > alter_version);
}
Expand Down

0 comments on commit cc58284

Please sign in to comment.