From 72f347a4d155caa4d28e2f72fbe41f5c2322a3f7 Mon Sep 17 00:00:00 2001 From: JaySon Date: Tue, 28 Mar 2023 15:00:53 +0800 Subject: [PATCH] This is an automated cherry-pick of #7168 Signed-off-by: ti-chi-bot --- dbms/src/Common/TiFlashMetrics.h | 19 +++- dbms/src/Interpreters/Settings.h | 5 + dbms/src/Server/StorageConfigParser.h | 4 +- .../DeltaMerge/Remote/DataStore/DataStore.h | 12 +++ .../Remote/DataStore/DataStoreS3.cpp | 19 ++++ .../DeltaMerge/Remote/DataStore/DataStoreS3.h | 1 + dbms/src/Storages/S3/S3Common.cpp | 3 + dbms/src/Storages/S3/S3Common.h | 2 +- dbms/src/Storages/S3/S3GCManager.cpp | 101 +++++++++++++++--- dbms/src/Storages/S3/S3GCManager.h | 4 + .../Storages/S3/tests/gtest_s3gcmanager.cpp | 28 ++++- dbms/src/Storages/Transaction/TMTContext.cpp | 26 ++++- 12 files changed, 200 insertions(+), 24 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 7dff6e8d68b..563d22ee480 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -229,7 +229,7 @@ namespace DB M(tiflash_disaggregated_details, "", Counter, \ F(type_cftiny_read, {{"type", "cftiny_read"}}), \ F(type_cftiny_fetch, {{"type", "cftiny_fetch"}})) \ - M(tiflash_raft_command_duration_seconds, "Bucketed histogram of some raft command: apply snapshot", \ + M(tiflash_raft_command_duration_seconds, "Bucketed histogram of some raft command: apply snapshot and ingest SST", \ Histogram, /* these command usually cost several seconds, increase the start bucket to 50ms */ \ F(type_ingest_sst, {{"type", "ingest_sst"}}, ExpBuckets{0.05, 2, 10}), \ F(type_ingest_sst_sst2dt, {{"type", "ingest_sst_sst2dt"}}, ExpBuckets{0.05, 2, 10}), \ @@ -321,7 +321,24 @@ namespace DB F(type_complete_multi_part_upload, {{"type", "complete_multi_part_upload"}}, ExpBuckets{0.001, 2, 20}), \ F(type_list_objects, {{"type", "list_objects"}}, ExpBuckets{0.001, 2, 20}), \ F(type_delete_object, {{"type", "delete_object"}}, ExpBuckets{0.001, 2, 20}), \ +<<<<<<< HEAD F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20})) +======= + F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_storage_s3_gc_seconds, "S3 GC subprocess duration in seconds", \ + Histogram, /* these command usually cost several seconds, increase the start bucket to 500ms */ \ + F(type_total, {{"type", "total"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_one_store, {{"type", "one_store"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_read_locks, {{"type", "read_locks"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_clean_locks, {{"type", "clean_locks"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_clean_manifests, {{"type", "clean_manifests"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_scan_then_clean_data_files, {{"type", "scan_then_clean_data_files"}}, ExpBuckets{0.5, 2, 20}), \ + F(type_clean_one_lock, {{"type", "clean_one_lock"}}, ExpBuckets{0.5, 2, 20})) \ + M(tiflash_storage_checkpoint_seconds, "PageStorage checkpoint elapsed time", Histogram, \ + F(type_dump_checkpoint_snapshot, {{"type", "dump_checkpoint_snapshot"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_dump_checkpoint_data, {{"type", "dump_checkpoint_data"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_upload_checkpoint, {{"type", "upload_checkpoint"}}, ExpBuckets{0.001, 2, 20})) +>>>>>>> 197bdcd0dd (Fix S3GC is broken on AWS (#7168)) // clang-format on diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index efb488bb4d6..9023a9865de 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -244,7 +244,12 @@ struct Settings M(SettingInt64, dt_compression_level, 1, "The compression level.") \ \ M(SettingInt64, remote_checkpoint_interval_seconds, 30, "The interval of uploading checkpoint to the remote store. Unit is second.") \ + M(SettingInt64, remote_gc_method, 1, "The method of running GC task on the remote store. 1 - lifecycle, 2 - scan.") \ M(SettingInt64, remote_gc_interval_seconds, 3600, "The interval of running GC task on the remote store. Unit is second.") \ +<<<<<<< HEAD +======= + M(SettingInt64, remote_gc_min_age_seconds, 3600, "The file will NOT be compacted when the time difference between the last modification is less than this threshold") \ +>>>>>>> 197bdcd0dd (Fix S3GC is broken on AWS (#7168)) M(SettingDouble, remote_gc_ratio, 0.5, "The files with valid rate less than this threshold will be compacted") \ M(SettingInt64, remote_gc_small_size, 128 * 1024, "The files with total size less than this threshold will be compacted") \ M(SettingDouble, disagg_read_concurrency_scale, 20.0, "Scale * logical cpu cores = disaggregated read IO concurrency.") \ diff --git a/dbms/src/Server/StorageConfigParser.h b/dbms/src/Server/StorageConfigParser.h index 704592b0c5a..302b48d1bc8 100644 --- a/dbms/src/Server/StorageConfigParser.h +++ b/dbms/src/Server/StorageConfigParser.h @@ -99,9 +99,9 @@ struct StorageS3Config String bucket; String access_key_id; String secret_access_key; - UInt64 max_connections = 1024; + UInt64 max_connections = 4096; UInt64 connection_timeout_ms = 1000; - UInt64 request_timeout_ms = 3000; + UInt64 request_timeout_ms = 7000; String root; inline static String S3_ACCESS_KEY_ID = "S3_ACCESS_KEY_ID"; diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h index d59138e203a..a86a99c8827 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h @@ -94,7 +94,19 @@ class IDataStore : boost::noncopyable */ virtual bool putCheckpointFiles(const PS::V3::LocalCheckpointFiles & local_files, StoreID store_id, UInt64 upload_seq) = 0; +<<<<<<< HEAD virtual std::unordered_map getDataFileSizes(const std::unordered_set & lock_keys) = 0; +======= + struct DataFileInfo + { + Int64 size = -1; + std::chrono::system_clock::time_point mtime; // last_modification_time + }; + virtual std::unordered_map getDataFilesInfo(const std::unordered_set & lock_keys) = 0; + + // Attach tagging to the keys on remote store + virtual void setTaggingsForKeys(const std::vector & keys, std::string_view tagging) = 0; +>>>>>>> 197bdcd0dd (Fix S3GC is broken on AWS (#7168)) }; diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp index 50b3e11e404..8305776fbf3 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp @@ -156,6 +156,7 @@ void DataStoreS3::copyToLocal(const S3::DMFileOID & remote_oid, const std::vecto auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); const auto remote_dir = S3::S3Filename::fromDMFileOID(remote_oid).toFullKey(); std::vector> results; + results.reserve(target_short_fnames.size()); for (const auto & fname : target_short_fnames) { auto remote_fname = fmt::format("{}/{}", remote_dir, fname); @@ -175,6 +176,24 @@ void DataStoreS3::copyToLocal(const S3::DMFileOID & remote_oid, const std::vecto } } +void DataStoreS3::setTaggingsForKeys(const std::vector & keys, std::string_view tagging) +{ + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + std::vector> results; + results.reserve(keys.size()); + for (const auto & k : keys) + { + auto task = std::make_shared>([&s3_client, &tagging, key = k] { + rewriteObjectWithTagging(*s3_client, key, String(tagging)); + }); + results.emplace_back(task->get_future()); + DataStoreS3Pool::get().scheduleOrThrowOnError([task] { (*task)(); }); + } + for (auto & f : results) + { + f.get(); + } +} IPreparedDMFileTokenPtr DataStoreS3::prepareDMFile(const S3::DMFileOID & oid, UInt64 page_id) { diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h index 83d31dd81f8..5f1719c951c 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h @@ -53,6 +53,7 @@ class DataStoreS3 final : public IDataStore std::unordered_map getDataFileSizes(const std::unordered_set & lock_keys) override; + void setTaggingsForKeys(const std::vector & keys, std::string_view tagging) override; #ifndef DBMS_PUBLIC_GTEST private: #else diff --git a/dbms/src/Storages/S3/S3Common.cpp b/dbms/src/Storages/S3/S3Common.cpp index cc264e432ef..b03512f95c9 100644 --- a/dbms/src/Storages/S3/S3Common.cpp +++ b/dbms/src/Storages/S3/S3Common.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -362,7 +363,9 @@ void rewriteObjectWithTagging(const TiFlashS3Client & client, const String & key // The copy_source format is "${source_bucket}/${source_key}" auto copy_source = client.bucket() + "/" + (client.root() == "/" ? "" : client.root()) + key; client.setBucketAndKeyWithRoot(req, key); + // metadata directive and tagging directive must be set to `REPLACE` req.WithCopySource(copy_source) // + .WithMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE) .WithTagging(tagging) .WithTaggingDirective(Aws::S3::Model::TaggingDirective::REPLACE); ProfileEvents::increment(ProfileEvents::S3CopyObject); diff --git a/dbms/src/Storages/S3/S3Common.h b/dbms/src/Storages/S3/S3Common.h index 0dc15c7bcda..883c3b1939a 100644 --- a/dbms/src/Storages/S3/S3Common.h +++ b/dbms/src/Storages/S3/S3Common.h @@ -85,7 +85,7 @@ class TiFlashS3Client : public Aws::S3::S3Client enum class S3GCMethod { - Lifecycle, + Lifecycle = 1, ScanThenDelete, }; diff --git a/dbms/src/Storages/S3/S3GCManager.cpp b/dbms/src/Storages/S3/S3GCManager.cpp index 21542da8ac3..1224686f19b 100644 --- a/dbms/src/Storages/S3/S3GCManager.cpp +++ b/dbms/src/Storages/S3/S3GCManager.cpp @@ -14,10 +14,14 @@ #include #include +#include +#include #include #include #include #include +#include +#include #include #include #include @@ -35,7 +39,9 @@ #include #include +#include #include +#include #include #include #include @@ -53,10 +59,12 @@ S3GCManager::S3GCManager( pingcap::pd::ClientPtr pd_client_, OwnerManagerPtr gc_owner_manager_, S3LockClientPtr lock_client_, + DM::Remote::IDataStorePtr remote_data_store_, S3GCConfig config_) : pd_client(std::move(pd_client_)) , gc_owner_manager(std::move(gc_owner_manager_)) , lock_client(std::move(lock_client_)) + , remote_data_store(std::move(remote_data_store_)) , shutdown_called(false) , config(config_) , log(Logger::get()) @@ -91,6 +99,10 @@ bool S3GCManager::runOnAllStores() lifecycle_has_been_set = true; } + Stopwatch watch; + SCOPE_EXIT({ + GET_METRIC(tiflash_storage_s3_gc_seconds, type_total).Observe(watch.elapsedSeconds()); + }); const std::vector all_store_ids = getAllStoreIds(); LOG_TRACE(log, "all_store_ids: {}", all_store_ids); // Get all store status from pd after getting the store ids from S3. @@ -108,6 +120,10 @@ bool S3GCManager::runOnAllStores() break; } + Stopwatch store_gc_watch; + SCOPE_EXIT({ + GET_METRIC(tiflash_storage_s3_gc_seconds, type_one_store).Observe(store_gc_watch.elapsedSeconds()); + }); std::optional s = std::nullopt; if (auto iter = stores_from_pd.find(gc_store_id); iter != stores_from_pd.end()) { @@ -138,6 +154,7 @@ void S3GCManager::runForStore(UInt64 gc_store_id) const Aws::Utils::DateTime gc_timepoint = Aws::Utils::DateTime::Now(); LOG_DEBUG(log, "run gc, gc_store_id={} timepoint={}", gc_store_id, gc_timepoint.ToGmtString(Aws::Utils::DateFormat::ISO_8601)); + Stopwatch watch; // Get the latest manifest const auto manifests = CheckpointManifestS3Set::getFromS3(*client, gc_store_id); if (manifests.empty()) @@ -154,13 +171,16 @@ void S3GCManager::runForStore(UInt64 gc_store_id) config.manifest_expired_hour, gc_timepoint)); LOG_INFO(log, "latest manifest, key={} n_locks={}", manifests.latestManifestKey(), valid_lock_files.size()); + GET_METRIC(tiflash_storage_s3_gc_seconds, type_read_locks).Observe(watch.elapsedMillisecondsFromLastTime() / 1000.0); // Scan and remove the expired locks const auto lock_prefix = S3Filename::getLockPrefix(); cleanUnusedLocks(gc_store_id, lock_prefix, manifests.latestUploadSequence(), valid_lock_files, gc_timepoint); + GET_METRIC(tiflash_storage_s3_gc_seconds, type_clean_locks).Observe(watch.elapsedMillisecondsFromLastTime() / 1000.0); // clean the outdated manifest objects removeOutdatedManifest(manifests, &gc_timepoint); + GET_METRIC(tiflash_storage_s3_gc_seconds, type_clean_manifests).Observe(watch.elapsedMillisecondsFromLastTime() / 1000.0); switch (config.method) { @@ -175,6 +195,7 @@ void S3GCManager::runForStore(UInt64 gc_store_id) // After removing the expired lock, we need to scan the data files // with expired delmark tryCleanExpiredDataFiles(gc_store_id, gc_timepoint); + GET_METRIC(tiflash_storage_s3_gc_seconds, type_scan_then_clean_data_files).Observe(watch.elapsedMillisecondsFromLastTime() / 1000.0); break; } } @@ -187,6 +208,7 @@ void S3GCManager::runForTombstoneStore(UInt64 gc_store_id) const Aws::Utils::DateTime gc_timepoint = Aws::Utils::DateTime::Now(); LOG_DEBUG(log, "run gc, gc_store_id={} timepoint={}", gc_store_id, gc_timepoint.ToGmtString(Aws::Utils::DateFormat::ISO_8601)); + Stopwatch watch; // If the store id is tombstone, then run gc on the store as if no locks. // Scan and remove all expired locks LOG_INFO(log, "store is tombstone, clean all locks"); @@ -194,11 +216,13 @@ void S3GCManager::runForTombstoneStore(UInt64 gc_store_id) // clean all by setting `safe_sequence` to MaxUInt64 and empty `valid_lock_files` std::unordered_set valid_lock_files; cleanUnusedLocks(gc_store_id, lock_prefix, std::numeric_limits::max(), valid_lock_files, gc_timepoint); + GET_METRIC(tiflash_storage_s3_gc_seconds, type_clean_locks).Observe(watch.elapsedMillisecondsFromLastTime() / 1000.0); // clean all manifest objects auto client = S3::ClientFactory::instance().sharedTiFlashClient(); const auto manifests = CheckpointManifestS3Set::getFromS3(*client, gc_store_id); removeOutdatedManifest(manifests, nullptr); + GET_METRIC(tiflash_storage_s3_gc_seconds, type_clean_manifests).Observe(watch.elapsedMillisecondsFromLastTime() / 1000.0); switch (config.method) { @@ -216,6 +240,7 @@ void S3GCManager::runForTombstoneStore(UInt64 gc_store_id) // After all the locks removed, the data files may still being locked by another // store id, we need to scan the data files with expired delmark tryCleanExpiredDataFiles(gc_store_id, gc_timepoint); + GET_METRIC(tiflash_storage_s3_gc_seconds, type_scan_then_clean_data_files).Observe(watch.elapsedMillisecondsFromLastTime() / 1000.0); break; } } @@ -268,6 +293,10 @@ void S3GCManager::cleanUnusedLocks( void S3GCManager::cleanOneLock(const String & lock_key, const S3FilenameView & lock_filename_view, const Aws::Utils::DateTime & timepoint) { + Stopwatch watch; + SCOPE_EXIT({ + GET_METRIC(tiflash_storage_s3_gc_seconds, type_clean_one_lock).Observe(watch.elapsedSeconds()); + }); const auto unlocked_datafilename_view = lock_filename_view.asDataFile(); RUNTIME_CHECK(unlocked_datafilename_view.isDataFile()); const auto unlocked_datafile_key = unlocked_datafilename_view.toFullKey(); @@ -276,6 +305,7 @@ void S3GCManager::cleanOneLock(const String & lock_key, const S3FilenameView & l // delete S3 lock file auto client = S3::ClientFactory::instance().sharedTiFlashClient(); deleteObject(*client, lock_key); + const auto elapsed_remove_lock = watch.elapsedMillisecondsFromLastTime() / 1000.0; // TODO: If `lock_key` is the only lock to datafile and GCManager crashes // after the lock deleted but before delmark uploaded, then the @@ -283,6 +313,9 @@ void S3GCManager::cleanOneLock(const String & lock_key, const S3FilenameView & l // Need another logic to cover this corner case. const auto delmark_object_info = S3::tryGetObjectInfo(*client, unlocked_datafile_delmark_key); + const auto elapsed_try_get_delmark = watch.elapsedMillisecondsFromLastTime() / 1000.0; + double elapsed_try_mark_delete = 0.0; + double elapsed_lifecycle_mark_delete = 0.0; if (!delmark_object_info.exist) { bool ok; @@ -291,6 +324,7 @@ void S3GCManager::cleanOneLock(const String & lock_key, const S3FilenameView & l { // delmark not exist, lets try create a delmark through S3LockService std::tie(ok, err_msg) = lock_client->sendTryMarkDeleteRequest(unlocked_datafile_key, config.mark_delete_timeout_seconds); + elapsed_try_mark_delete = watch.elapsedMillisecondsFromLastTime() / 1000.0; } catch (DB::Exception & e) { @@ -319,6 +353,16 @@ void S3GCManager::cleanOneLock(const String & lock_key, const S3FilenameView & l // 1 day, we consider it is long enough for no other write node try // access to the data file. lifecycleMarkDataFileDeleted(unlocked_datafile_key); + elapsed_lifecycle_mark_delete = watch.elapsedMillisecondsFromLastTime() / 1000.0; + LOG_INFO( + log, + "cleanOneLock done, method={} key={} remove_lock={:.3f} get_delmark={:.3f} mark_delete={:.3f} lifecycle_mark_delete={:.3f}", + magic_enum::enum_name(config.method), + unlocked_datafile_key, + elapsed_remove_lock, + elapsed_try_get_delmark, + elapsed_try_mark_delete, + elapsed_lifecycle_mark_delete); return; } case S3GCMethod::ScanThenDelete: @@ -327,7 +371,16 @@ void S3GCManager::cleanOneLock(const String & lock_key, const S3FilenameView & l } else { - LOG_INFO(log, "delmark create failed, key={} reason={}", unlocked_datafile_key, err_msg); + LOG_INFO( + log, + "delmark create failed, method={} key={} reason={} remove_lock={:.3f} get_delmark={:.3f} mark_delete={:.3f} lifecycle_mark_delete={:.3f}", + magic_enum::enum_name(config.method), + unlocked_datafile_key, + err_msg, + elapsed_remove_lock, + elapsed_try_get_delmark, + elapsed_try_mark_delete, + elapsed_lifecycle_mark_delete); } // no matter delmark create success or not, leave it to later GC round. return; @@ -342,8 +395,18 @@ void S3GCManager::cleanOneLock(const String & lock_key, const S3FilenameView & l } case S3GCMethod::ScanThenDelete: { + const auto elapsed_scan_try_remove_datafile = watch.elapsedMillisecondsFromLastTime() / 1000.0; // delmark exist, check whether we need to physical remove the datafile removeDataFileIfDelmarkExpired(unlocked_datafile_key, unlocked_datafile_delmark_key, timepoint, delmark_object_info.last_modification_time); + LOG_INFO( + log, + "cleanOneLock done, method={} key={} remove_lock={:.3f} get_delmark={:.3f} mark_delete={:.3f} scan_try_physical_remove={:.3f}", + magic_enum::enum_name(config.method), + unlocked_datafile_key, + elapsed_remove_lock, + elapsed_try_get_delmark, + elapsed_try_mark_delete, + elapsed_scan_try_remove_datafile); return; } } @@ -430,7 +493,7 @@ void S3GCManager::lifecycleMarkDataFileDeleted(const String & datafile_key) { // CheckpointDataFile is a single object, add tagging for it and update its mtime rewriteObjectWithTagging(*client, datafile_key, String(TaggingObjectIsDeleted)); - LOG_INFO(sub_logger, "datafile deleted by lifecycle tagging", datafile_key); + LOG_INFO(sub_logger, "datafile deleted by lifecycle tagging"); } else { @@ -439,13 +502,21 @@ void S3GCManager::lifecycleMarkDataFileDeleted(const String & datafile_key) // scanning only the sub objects of given key of this DMFile // TODO: If GCManager unexpectedly exit in the middle, it will leave some broken // sub file for DMFile, try clean them later. - S3::listPrefix(*client, datafile_key + "/", [&client, &datafile_key, &sub_logger](const Aws::S3::Model::Object & object) { - const auto & sub_key = object.GetKey(); - rewriteObjectWithTagging(*client, sub_key, String(TaggingObjectIsDeleted)); - LOG_INFO(sub_logger, "datafile deleted by lifecycle tagging, sub_key={}", datafile_key, sub_key); + std::vector sub_keys; + S3::listPrefix(*client, datafile_key + "/", [&sub_keys](const Aws::S3::Model::Object & object) { + sub_keys.emplace_back(object.GetKey()); return PageResult{.num_keys = 1, .more = true}; }); - LOG_INFO(sub_logger, "datafile deleted by lifecycle tagging, all sub keys are deleted", datafile_key); + // set tagging for all subkeys in parallel + remote_data_store->setTaggingsForKeys(sub_keys, TaggingObjectIsDeleted); + for (const auto & sub_key : sub_keys) + { + LOG_INFO(sub_logger, "datafile deleted by lifecycle tagging, sub_key={}", sub_key); + } + LOG_INFO( + sub_logger, + "datafile deleted by lifecycle tagging, all sub keys are deleted, n_sub_keys={}", + sub_keys.size()); } } @@ -461,7 +532,7 @@ void S3GCManager::physicalRemoveDataFile(const String & datafile_key) { // CheckpointDataFile is a single object, remove it. deleteObject(*client, datafile_key); - LOG_INFO(sub_logger, "datafile deleted, key={}", datafile_key); + LOG_INFO(sub_logger, "datafile deleted"); } else { @@ -470,13 +541,14 @@ void S3GCManager::physicalRemoveDataFile(const String & datafile_key) // only the sub objects of given key of this DMFile. // TODO: If GCManager unexpectedly exit in the middle, it will leave some broken // sub file for DMFile, try clean them later. - S3::listPrefix(*client, datafile_key + "/", [&client, &datafile_key, &sub_logger](const Aws::S3::Model::Object & object) { + std::vector sub_keys; + S3::listPrefix(*client, datafile_key + "/", [&client, &sub_logger](const Aws::S3::Model::Object & object) { const auto & sub_key = object.GetKey(); deleteObject(*client, sub_key); - LOG_INFO(sub_logger, "datafile deleted, sub_key={}", datafile_key, sub_key); + LOG_INFO(sub_logger, "datafile deleted, sub_key={}", sub_key); return PageResult{.num_keys = 1, .more = true}; }); - LOG_INFO(sub_logger, "datafile deleted, all sub keys are deleted", datafile_key); + LOG_INFO(sub_logger, "datafile deleted, all sub keys are deleted"); } } @@ -577,7 +649,12 @@ S3GCManagerService::S3GCManagerService( const S3GCConfig & config) : global_ctx(context.getGlobalContext()) { - manager = std::make_unique(std::move(pd_client), std::move(gc_owner_manager_), std::move(lock_client), config); + manager = std::make_unique( + std::move(pd_client), + std::move(gc_owner_manager_), + std::move(lock_client), + context.getSharedContextDisagg()->remote_data_store, + config); timer = global_ctx.getBackgroundPool().addTask( [this]() { diff --git a/dbms/src/Storages/S3/S3GCManager.h b/dbms/src/Storages/S3/S3GCManager.h index a11106084df..06954a2dc5d 100644 --- a/dbms/src/Storages/S3/S3GCManager.h +++ b/dbms/src/Storages/S3/S3GCManager.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -87,6 +88,7 @@ class S3GCManager pingcap::pd::ClientPtr pd_client_, OwnerManagerPtr gc_owner_manager_, S3LockClientPtr lock_client_, + DM::Remote::IDataStorePtr remote_data_store_, S3GCConfig config_); ~S3GCManager() = default; @@ -132,6 +134,8 @@ class S3GCManager const OwnerManagerPtr gc_owner_manager; const S3LockClientPtr lock_client; + DM::Remote::IDataStorePtr remote_data_store; + std::atomic shutdown_called; bool lifecycle_has_been_set = false; diff --git a/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp b/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp index 1390842bb8e..aa1fb9a2438 100644 --- a/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp +++ b/dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp @@ -15,11 +15,12 @@ #include #include #include +#include +#include #include #include #include #include -#include #include #include #include @@ -28,8 +29,6 @@ #include #include #include -#include -#include #include #include #include @@ -64,18 +63,35 @@ class S3GCManagerTest : public DB::base::TiFlashStorageTestBasic void SetUp() override { + DB::base::TiFlashStorageTestBasic::SetUp(); + auto config = getConfig(); mock_s3_client = ClientFactory::instance().sharedTiFlashClient(); auto mock_gc_owner = OwnerManager::createMockOwner("owner_0"); auto mock_lock_client = std::make_shared(mock_s3_client); auto mock_pd_client = std::make_shared(); - gc_mgr = std::make_unique(mock_pd_client, mock_gc_owner, mock_lock_client, config); + auto data_store = std::make_shared(::DB::tests::TiFlashTestEnv::getMockFileProvider()); + gc_mgr = std::make_unique(mock_pd_client, mock_gc_owner, mock_lock_client, data_store, config); ::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*mock_s3_client); +<<<<<<< HEAD dir = getTemporaryPath(); dropDataOnDisk(dir); createIfNotExist(dir); +======= + tmp_dir = getTemporaryPath(); + data_file_path_pattern1 = tmp_dir + "/data1_{index}"; + data_file_id_pattern1 = "data1_{index}"; + manifest_file_path1 = tmp_dir + "/manifest_foo1"; + manifest_file_id1 = "manifest_foo1"; + data_file_path_pattern2 = tmp_dir + "/data2_{index}"; + data_file_id_pattern2 = "data2_{index}"; + manifest_file_path2 = tmp_dir + "/manifest_foo2"; + manifest_file_id2 = "manifest_foo2"; + dropDataOnDisk(tmp_dir); + createIfNotExist(tmp_dir); +>>>>>>> 197bdcd0dd (Fix S3GC is broken on AWS (#7168)) } void TearDown() override @@ -84,7 +100,11 @@ class S3GCManagerTest : public DB::base::TiFlashStorageTestBasic } protected: +<<<<<<< HEAD String dir; +======= + String tmp_dir; +>>>>>>> 197bdcd0dd (Fix S3GC is broken on AWS (#7168)) std::shared_ptr mock_s3_client; std::unique_ptr gc_mgr; diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index 8a32fe308ad..66c1e5ba5ad 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -33,6 +33,7 @@ #include #include +#include #include namespace DB @@ -99,10 +100,27 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config s3gc_owner->campaignOwner(); // start campaign s3lock_client = std::make_shared(cluster.get(), s3gc_owner); - S3::S3GCConfig gc_config; - gc_config.interval_seconds = context.getSettingsRef().remote_gc_interval_seconds; // TODO: make it reloadable - gc_config.method = S3::ClientFactory::instance().gc_method; - s3gc_manager = std::make_unique(context, cluster->pd_client, s3gc_owner, s3lock_client, gc_config); + S3::S3GCConfig remote_gc_config; + { + Int64 gc_method_int = context.getSettingsRef().remote_gc_method; + if (gc_method_int == 1) + { + remote_gc_config.method = S3::S3GCMethod::Lifecycle; + LOG_INFO(Logger::get(), "Using remote_gc_method={}", magic_enum::enum_name(remote_gc_config.method)); + } + else if (gc_method_int == 2) + { + remote_gc_config.method = S3::S3GCMethod::ScanThenDelete; + LOG_INFO(Logger::get(), "Using remote_gc_method={}", magic_enum::enum_name(remote_gc_config.method)); + } + else + { + LOG_WARNING(Logger::get(), "Unknown remote gc method from settings, using default method, value={} remote_gc_method={}", gc_method_int, magic_enum::enum_name(remote_gc_config.method)); + } + } + remote_gc_config.interval_seconds = context.getSettingsRef().remote_gc_interval_seconds; // TODO: make it reloadable + remote_gc_config.method = S3::ClientFactory::instance().gc_method; + s3gc_manager = std::make_unique(context, cluster->pd_client, s3gc_owner, s3lock_client, remote_gc_config); } }