diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 4d65598183b755e..2f092c27323a9e4 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -173,8 +173,7 @@ Status CloudCumulativeCompaction::prepare_compact() { .tag("tablet_max_version", cloud_tablet()->max_version_unlocked()) .tag("cumulative_point", cloud_tablet()->cumulative_layer_point()) .tag("num_rowsets", cloud_tablet()->fetch_add_approximate_num_rowsets(0)) - .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)) - .tag("lightman 0711 alter_version", cloud_tablet()->alter_version()); + .tag("cumu_num_rowsets", cloud_tablet()->fetch_add_approximate_cumu_num_rowsets(0)); return st; } diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 89a3276bee5eba1..80428123cae8a9e 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -63,8 +63,6 @@ CloudSchemaChangeJob::~CloudSchemaChangeJob() = default; Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) { - DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.sleep", - { std::this_thread::sleep_for(std::chrono::seconds(600)); }); // new tablet has to exist _new_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.new_tablet_id)); if (_new_tablet->tablet_state() == TABLET_RUNNING) { @@ -100,7 +98,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque sc_job->set_id(_job_id); sc_job->set_initiator(BackendOptions::get_localhost() + ':' + std::to_string(config::heartbeat_service_port)); - sc_job->set_alter_version(request.alter_version); + sc_job->set_alter_version(base_max_version); auto* new_tablet_idx = sc_job->mutable_new_tablet_idx(); new_tablet_idx->set_tablet_id(_new_tablet->tablet_id()); new_tablet_idx->set_table_id(_new_tablet->table_id()); @@ -135,6 +133,9 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque << ", new_tablet_id=" << request.new_tablet_id << ", alter_version=" << start_resp.alter_version() << ", job_id=" << _job_id; sc_job->set_alter_version(start_resp.alter_version()); + + DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.sleep", + { std::this_thread::sleep_for(std::chrono::seconds(120)); }); // FIXME(cyx): Should trigger compaction on base_tablet if there are too many rowsets to convert. // Create a new tablet schema, should merge with dropped columns in light weight schema change @@ -152,7 +153,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque delete_predicates.push_back(rs_meta); } } - RETURN_IF_ERROR(delete_handler.init(_base_tablet_schema, delete_predicates, base_max_version)); + RETURN_IF_ERROR(delete_handler.init(_base_tablet_schema, delete_predicates, start_resp.alter_version())); std::vector return_columns; return_columns.resize(_base_tablet_schema->num_columns()); @@ -169,7 +170,7 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS; reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap(); - reader_context.version = Version(0, base_max_version); + reader_context.version = Version(0, start_resp.alter_version()); for (auto& split : rs_splits) { RETURN_IF_ERROR(split.rs_reader->init(&reader_context)); diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index b1b455d2007e1f8..6771acba1d62a3b 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -546,21 +546,21 @@ std::vector CloudStorageEngine::_generate_cloud_compaction_task std::function filter_out; if (compaction_type == CompactionType::BASE_COMPACTION) { filter_out = [&submitted_base_compactions, &submitted_full_compactions](CloudTablet* t) { - return !!submitted_base_compactions.count(t->tablet_id()) || - !!submitted_full_compactions.count(t->tablet_id()) || + return submitted_base_compactions.contains(t->tablet_id()) || + submitted_full_compactions.contains(t->tablet_id()) || t->tablet_state() != TABLET_RUNNING; }; } else if (config::enable_parallel_cumu_compaction) { filter_out = [&tablet_preparing_cumu_compaction](CloudTablet* t) { - return !!tablet_preparing_cumu_compaction.count(t->tablet_id()) || - t->tablet_state() != TABLET_RUNNING; + return tablet_preparing_cumu_compaction.contains(t->tablet_id()) || + (t->tablet_state() != TABLET_RUNNING && t->alter_version() == -1); }; } else { filter_out = [&tablet_preparing_cumu_compaction, &submitted_cumu_compactions](CloudTablet* t) { - return !!tablet_preparing_cumu_compaction.count(t->tablet_id()) || - !!submitted_cumu_compactions.count(t->tablet_id()) || - t->tablet_state() != TABLET_RUNNING; + return tablet_preparing_cumu_compaction.contains(t->tablet_id()) || + submitted_cumu_compactions.contains(t->tablet_id()) || + (t->tablet_state() != TABLET_RUNNING && t->alter_version() == -1); }; } diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index c983b21ba5c0f3f..33db53afa82a788 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -106,7 +106,6 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version, // There are only two tablet_states RUNNING and NOT_READY in cloud mode // This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS. Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) { - RETURN_IF_ERROR(sync_if_not_running()); if (query_version > 0) { std::shared_lock rlock(_meta_lock); @@ -131,58 +130,6 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) return st; } -// Sync tablet meta and all rowset meta if not running. -// This could happen when BE didn't finish schema change job and another BE committed this schema change job. -// It should be a quite rare situation. -Status CloudTablet::sync_if_not_running() { - if (tablet_state() == TABLET_RUNNING) { - return Status::OK(); - } - - // Serially execute sync to reduce unnecessary network overhead - std::lock_guard lock(_sync_meta_lock); - - { - std::shared_lock rlock(_meta_lock); - if (tablet_state() == TABLET_RUNNING) { - return Status::OK(); - } - } - - TabletMetaSharedPtr tablet_meta; - auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta); - if (!st.ok()) { - if (st.is()) { - clear_cache(); - } - return st; - } - - if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] { - // MoW may go to here when load while schema change - return Status::Error("invalid tablet state {}. tablet_id={}", - tablet_meta->tablet_state(), tablet_id()); - } - - TimestampedVersionTracker empty_tracker; - { - std::lock_guard wlock(_meta_lock); - RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING)); - _rs_version_map.clear(); - _stale_rs_version_map.clear(); - std::swap(_timestamped_version_tracker, empty_tracker); - _tablet_meta->clear_rowsets(); - _tablet_meta->clear_stale_rowset(); - _max_version = -1; - } - - st = _engine.meta_mgr().sync_tablet_rowsets(this); - if (st.is()) { - clear_cache(); - } - return st; -} - void CloudTablet::add_rowsets(std::vector to_add, bool version_overlap, std::unique_lock& meta_lock, bool warmup_delta_data) { @@ -574,7 +521,7 @@ std::vector CloudTablet::pick_candidate_rowsets_to_base_compact std::shared_lock rlock(_meta_lock); for (const auto& [version, rs] : _rs_version_map) { if (version.first != 0 && version.first < _cumulative_point && - (_alter_version == -1 || version.first < _alter_version)) { + (_alter_version == -1 || version.second <= _alter_version)) { candidate_rowsets.push_back(rs); } } diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index c563b2ebf38bc7e..1b4234db8866f29 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -215,8 +215,6 @@ class CloudTablet final : public BaseTablet { static void recycle_cached_data(const std::vector& rowsets); - Status sync_if_not_running(); - CloudStorageEngine& _engine; // this mutex MUST ONLY be used when sync meta diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index b9c957d2109c519..dd16df837be5ac2 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -56,7 +56,7 @@ bool check_compaction_input_verions(const TabletCompactionJobPB& compaction, << proto_to_json(compaction); int64_t alter_version = job_pb.schema_change().alter_version(); return (compaction.type() == TabletCompactionJobPB_CompactionType_BASE && - compaction.input_versions(1) < alter_version) || + compaction.input_versions(1) <= alter_version) || (compaction.type() == TabletCompactionJobPB_CompactionType_CUMULATIVE && compaction.input_versions(0) > alter_version); }