Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix S3GC is broken on AWS #7168

Merged
merged 10 commits into from
Mar 28, 2023
11 changes: 10 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ namespace DB
M(tiflash_disaggregated_details, "", Counter, \
F(type_cftiny_read, {{"type", "cftiny_read"}}), \
F(type_cftiny_fetch, {{"type", "cftiny_fetch"}})) \
M(tiflash_raft_command_duration_seconds, "Bucketed histogram of some raft command: apply snapshot", \
M(tiflash_raft_command_duration_seconds, "Bucketed histogram of some raft command: apply snapshot and ingest SST", \
Histogram, /* these command usually cost several seconds, increase the start bucket to 50ms */ \
F(type_ingest_sst, {{"type", "ingest_sst"}}, ExpBuckets{0.05, 2, 10}), \
F(type_ingest_sst_sst2dt, {{"type", "ingest_sst_sst2dt"}}, ExpBuckets{0.05, 2, 10}), \
Expand Down Expand Up @@ -331,6 +331,15 @@ namespace DB
F(type_list_objects, {{"type", "list_objects"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delete_object, {{"type", "delete_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_s3_gc_seconds, "S3 GC subprocess duration in seconds", \
Histogram, /* these command usually cost several seconds, increase the start bucket to 500ms */ \
F(type_total, {{"type", "total"}}, ExpBuckets{0.5, 2, 20}), \
F(type_one_store, {{"type", "one_store"}}, ExpBuckets{0.5, 2, 20}), \
F(type_read_locks, {{"type", "read_locks"}}, ExpBuckets{0.5, 2, 20}), \
F(type_clean_locks, {{"type", "clean_locks"}}, ExpBuckets{0.5, 2, 20}), \
F(type_clean_manifests, {{"type", "clean_manifests"}}, ExpBuckets{0.5, 2, 20}), \
F(type_scan_then_clean_data_files, {{"type", "scan_then_clean_data_files"}}, ExpBuckets{0.5, 2, 20}), \
F(type_clean_one_lock, {{"type", "clean_one_lock"}}, ExpBuckets{0.5, 2, 20})) \
M(tiflash_storage_checkpoint_seconds, "PageStorage checkpoint elapsed time", Histogram, \
F(type_dump_checkpoint_snapshot, {{"type", "dump_checkpoint_snapshot"}}, ExpBuckets{0.001, 2, 20}), \
F(type_dump_checkpoint_data, {{"type", "dump_checkpoint_data"}}, ExpBuckets{0.001, 2, 20}), \
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ struct Settings
M(SettingInt64, dt_compression_level, 1, "The compression level.") \
\
M(SettingInt64, remote_checkpoint_interval_seconds, 30, "The interval of uploading checkpoint to the remote store. Unit is second.") \
M(SettingInt64, remote_gc_method, 1, "The method of running GC task on the remote store. 1 - lifecycle, 2 - scan.") \
M(SettingInt64, remote_gc_interval_seconds, 3600, "The interval of running GC task on the remote store. Unit is second.") \
M(SettingInt64, remote_gc_min_age_seconds, 3600, "The file will NOT be compacted when the time difference between the last modification is less than this threshold") \
M(SettingInt64, remote_gc_min_age_seconds, 3600, "The file will NOT be compacted when the time difference between the last modification is less than this threshold") \
M(SettingDouble, remote_gc_ratio, 0.5, "The files with valid rate less than this threshold will be compacted") \
M(SettingInt64, remote_gc_small_size, 128 * 1024, "The files with total size less than this threshold will be compacted") \
M(SettingDouble, disagg_read_concurrency_scale, 20.0, "Scale * logical cpu cores = disaggregated read IO concurrency.") \
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Server/StorageConfigParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ struct StorageS3Config
String bucket;
String access_key_id;
String secret_access_key;
UInt64 max_connections = 1024;
UInt64 max_connections = 4096;
UInt64 connection_timeout_ms = 1000;
UInt64 request_timeout_ms = 3000;
UInt64 request_timeout_ms = 7000;
String root;

inline static String S3_ACCESS_KEY_ID = "S3_ACCESS_KEY_ID";
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ class IDataStore : boost::noncopyable
std::chrono::system_clock::time_point mtime; // last_modification_time
};
virtual std::unordered_map<String, DataFileInfo> getDataFilesInfo(const std::unordered_set<String> & lock_keys) = 0;

// Attach tagging to the keys on remote store
virtual void setTaggingsForKeys(const std::vector<String> & keys, std::string_view tagging) = 0;
};


Expand Down
19 changes: 19 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ void DataStoreS3::copyToLocal(const S3::DMFileOID & remote_oid, const std::vecto
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
const auto remote_dir = S3::S3Filename::fromDMFileOID(remote_oid).toFullKey();
std::vector<std::future<void>> results;
results.reserve(target_short_fnames.size());
for (const auto & fname : target_short_fnames)
{
auto remote_fname = fmt::format("{}/{}", remote_dir, fname);
Expand All @@ -183,6 +184,24 @@ void DataStoreS3::copyToLocal(const S3::DMFileOID & remote_oid, const std::vecto
}
}

void DataStoreS3::setTaggingsForKeys(const std::vector<String> & keys, std::string_view tagging)
{
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
std::vector<std::future<void>> results;
results.reserve(keys.size());
for (const auto & k : keys)
{
auto task = std::make_shared<std::packaged_task<void()>>([&s3_client, &tagging, key = k] {
rewriteObjectWithTagging(*s3_client, key, String(tagging));
});
results.emplace_back(task->get_future());
DataStoreS3Pool::get().scheduleOrThrowOnError([task] { (*task)(); });
}
for (auto & f : results)
{
f.get();
}
}

IPreparedDMFileTokenPtr DataStoreS3::prepareDMFile(const S3::DMFileOID & oid, UInt64 page_id)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class DataStoreS3 final : public IDataStore

std::unordered_map<String, DataFileInfo> getDataFilesInfo(const std::unordered_set<String> & lock_keys) override;

void setTaggingsForKeys(const std::vector<String> & keys, std::string_view tagging) override;
#ifndef DBMS_PUBLIC_GTEST
private:
#else
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/S3/S3Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/ListObjectsV2Result.h>
#include <aws/s3/model/MetadataDirective.h>
#include <aws/s3/model/PutBucketLifecycleConfigurationRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/TaggingDirective.h>
Expand Down Expand Up @@ -531,7 +532,9 @@ void rewriteObjectWithTagging(const TiFlashS3Client & client, const String & key
// The copy_source format is "${source_bucket}/${source_key}"
auto copy_source = client.bucket() + "/" + (client.root() == "/" ? "" : client.root()) + key;
client.setBucketAndKeyWithRoot(req, key);
// metadata directive and tagging directive must be set to `REPLACE`
req.WithCopySource(copy_source) //
.WithMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE)
.WithTagging(tagging)
.WithTaggingDirective(Aws::S3::Model::TaggingDirective::REPLACE);
ProfileEvents::increment(ProfileEvents::S3CopyObject);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/S3/S3Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class TiFlashS3Client : public Aws::S3::S3Client

enum class S3GCMethod
{
Lifecycle,
Lifecycle = 1,
ScanThenDelete,
};

Expand Down
Loading