Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#7168
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
JaySon-Huang authored and ti-chi-bot committed Mar 28, 2023
1 parent 16a60b0 commit 72f347a
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 24 deletions.
19 changes: 18 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ namespace DB
M(tiflash_disaggregated_details, "", Counter, \
F(type_cftiny_read, {{"type", "cftiny_read"}}), \
F(type_cftiny_fetch, {{"type", "cftiny_fetch"}})) \
M(tiflash_raft_command_duration_seconds, "Bucketed histogram of some raft command: apply snapshot", \
M(tiflash_raft_command_duration_seconds, "Bucketed histogram of some raft command: apply snapshot and ingest SST", \
Histogram, /* these command usually cost several seconds, increase the start bucket to 50ms */ \
F(type_ingest_sst, {{"type", "ingest_sst"}}, ExpBuckets{0.05, 2, 10}), \
F(type_ingest_sst_sst2dt, {{"type", "ingest_sst_sst2dt"}}, ExpBuckets{0.05, 2, 10}), \
Expand Down Expand Up @@ -321,7 +321,24 @@ namespace DB
F(type_complete_multi_part_upload, {{"type", "complete_multi_part_upload"}}, ExpBuckets{0.001, 2, 20}), \
F(type_list_objects, {{"type", "list_objects"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delete_object, {{"type", "delete_object"}}, ExpBuckets{0.001, 2, 20}), \
<<<<<<< HEAD
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20}))
=======
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_s3_gc_seconds, "S3 GC subprocess duration in seconds", \
Histogram, /* these command usually cost several seconds, increase the start bucket to 500ms */ \
F(type_total, {{"type", "total"}}, ExpBuckets{0.5, 2, 20}), \
F(type_one_store, {{"type", "one_store"}}, ExpBuckets{0.5, 2, 20}), \
F(type_read_locks, {{"type", "read_locks"}}, ExpBuckets{0.5, 2, 20}), \
F(type_clean_locks, {{"type", "clean_locks"}}, ExpBuckets{0.5, 2, 20}), \
F(type_clean_manifests, {{"type", "clean_manifests"}}, ExpBuckets{0.5, 2, 20}), \
F(type_scan_then_clean_data_files, {{"type", "scan_then_clean_data_files"}}, ExpBuckets{0.5, 2, 20}), \
F(type_clean_one_lock, {{"type", "clean_one_lock"}}, ExpBuckets{0.5, 2, 20})) \
M(tiflash_storage_checkpoint_seconds, "PageStorage checkpoint elapsed time", Histogram, \
F(type_dump_checkpoint_snapshot, {{"type", "dump_checkpoint_snapshot"}}, ExpBuckets{0.001, 2, 20}), \
F(type_dump_checkpoint_data, {{"type", "dump_checkpoint_data"}}, ExpBuckets{0.001, 2, 20}), \
F(type_upload_checkpoint, {{"type", "upload_checkpoint"}}, ExpBuckets{0.001, 2, 20}))
>>>>>>> 197bdcd0dd (Fix S3GC is broken on AWS (#7168))

// clang-format on

Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,12 @@ struct Settings
M(SettingInt64, dt_compression_level, 1, "The compression level.") \
\
M(SettingInt64, remote_checkpoint_interval_seconds, 30, "The interval of uploading checkpoint to the remote store. Unit is second.") \
M(SettingInt64, remote_gc_method, 1, "The method of running GC task on the remote store. 1 - lifecycle, 2 - scan.") \
M(SettingInt64, remote_gc_interval_seconds, 3600, "The interval of running GC task on the remote store. Unit is second.") \
<<<<<<< HEAD
=======
M(SettingInt64, remote_gc_min_age_seconds, 3600, "The file will NOT be compacted when the time difference between the last modification is less than this threshold") \
>>>>>>> 197bdcd0dd (Fix S3GC is broken on AWS (#7168))
M(SettingDouble, remote_gc_ratio, 0.5, "The files with valid rate less than this threshold will be compacted") \
M(SettingInt64, remote_gc_small_size, 128 * 1024, "The files with total size less than this threshold will be compacted") \
M(SettingDouble, disagg_read_concurrency_scale, 20.0, "Scale * logical cpu cores = disaggregated read IO concurrency.") \
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Server/StorageConfigParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ struct StorageS3Config
String bucket;
String access_key_id;
String secret_access_key;
UInt64 max_connections = 1024;
UInt64 max_connections = 4096;
UInt64 connection_timeout_ms = 1000;
UInt64 request_timeout_ms = 3000;
UInt64 request_timeout_ms = 7000;
String root;

inline static String S3_ACCESS_KEY_ID = "S3_ACCESS_KEY_ID";
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,19 @@ class IDataStore : boost::noncopyable
*/
virtual bool putCheckpointFiles(const PS::V3::LocalCheckpointFiles & local_files, StoreID store_id, UInt64 upload_seq) = 0;

<<<<<<< HEAD
virtual std::unordered_map<String, Int64> getDataFileSizes(const std::unordered_set<String> & lock_keys) = 0;
=======
struct DataFileInfo
{
Int64 size = -1;
std::chrono::system_clock::time_point mtime; // last_modification_time
};
virtual std::unordered_map<String, DataFileInfo> getDataFilesInfo(const std::unordered_set<String> & lock_keys) = 0;

// Attach tagging to the keys on remote store
virtual void setTaggingsForKeys(const std::vector<String> & keys, std::string_view tagging) = 0;
>>>>>>> 197bdcd0dd (Fix S3GC is broken on AWS (#7168))
};


Expand Down
19 changes: 19 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ void DataStoreS3::copyToLocal(const S3::DMFileOID & remote_oid, const std::vecto
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
const auto remote_dir = S3::S3Filename::fromDMFileOID(remote_oid).toFullKey();
std::vector<std::future<void>> results;
results.reserve(target_short_fnames.size());
for (const auto & fname : target_short_fnames)
{
auto remote_fname = fmt::format("{}/{}", remote_dir, fname);
Expand All @@ -175,6 +176,24 @@ void DataStoreS3::copyToLocal(const S3::DMFileOID & remote_oid, const std::vecto
}
}

void DataStoreS3::setTaggingsForKeys(const std::vector<String> & keys, std::string_view tagging)
{
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
std::vector<std::future<void>> results;
results.reserve(keys.size());
for (const auto & k : keys)
{
auto task = std::make_shared<std::packaged_task<void()>>([&s3_client, &tagging, key = k] {
rewriteObjectWithTagging(*s3_client, key, String(tagging));
});
results.emplace_back(task->get_future());
DataStoreS3Pool::get().scheduleOrThrowOnError([task] { (*task)(); });
}
for (auto & f : results)
{
f.get();
}
}

IPreparedDMFileTokenPtr DataStoreS3::prepareDMFile(const S3::DMFileOID & oid, UInt64 page_id)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class DataStoreS3 final : public IDataStore

std::unordered_map<String, Int64> getDataFileSizes(const std::unordered_set<String> & lock_keys) override;

void setTaggingsForKeys(const std::vector<String> & keys, std::string_view tagging) override;
#ifndef DBMS_PUBLIC_GTEST
private:
#else
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/S3/S3Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/ListObjectsV2Result.h>
#include <aws/s3/model/MetadataDirective.h>
#include <aws/s3/model/PutBucketLifecycleConfigurationRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/TaggingDirective.h>
Expand Down Expand Up @@ -362,7 +363,9 @@ void rewriteObjectWithTagging(const TiFlashS3Client & client, const String & key
// The copy_source format is "${source_bucket}/${source_key}"
auto copy_source = client.bucket() + "/" + (client.root() == "/" ? "" : client.root()) + key;
client.setBucketAndKeyWithRoot(req, key);
// metadata directive and tagging directive must be set to `REPLACE`
req.WithCopySource(copy_source) //
.WithMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE)
.WithTagging(tagging)
.WithTaggingDirective(Aws::S3::Model::TaggingDirective::REPLACE);
ProfileEvents::increment(ProfileEvents::S3CopyObject);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/S3/S3Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class TiFlashS3Client : public Aws::S3::S3Client

enum class S3GCMethod
{
Lifecycle,
Lifecycle = 1,
ScanThenDelete,
};

Expand Down
Loading

0 comments on commit 72f347a

Please sign in to comment.