Skip to content

Commit

Permalink
Verify the data file exist according to manifest && fix S3 lifecycle …
Browse files Browse the repository at this point in the history
…rule (#7394)

close #7328
  • Loading branch information
JaySon-Huang authored May 5, 2023
1 parent 6a8179b commit 5dc9bbf
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 23 deletions.
2 changes: 1 addition & 1 deletion contrib/aws
6 changes: 3 additions & 3 deletions dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ void AsynchronousMetrics::run()
template <typename Max, typename T>
static void calculateMax(Max & max, T x)
{
if (Max(x) > max)
if (static_cast<Max>(x) > max)
max = x;
}

template <typename Max, typename Sum, typename T>
static void calculateMaxAndSum(Max & max, Sum & sum, T x)
{
sum += x;
if (Max(x) > max)
if (static_cast<Max>(x) > max)
max = x;
}

Expand Down Expand Up @@ -172,7 +172,7 @@ FileUsageStatistics AsynchronousMetrics::getPageStorageFileUsage()
case DisaggregatedMode::Compute:
{
// disagg compute node without auto-scaler, the proxy data are stored in the `uni_ps`
if (auto uni_ps = context.getWriteNodePageStorage(); uni_ps != nullptr)
if (auto uni_ps = context.tryGetWriteNodePageStorage(); uni_ps != nullptr)
{
usage.merge(uni_ps->getFileUsageStatistics());
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ struct Settings
M(SettingInt64, remote_checkpoint_interval_seconds, 30, "The interval of uploading checkpoint to the remote store. Unit is second.") \
M(SettingInt64, remote_gc_method, 1, "The method of running GC task on the remote store. 1 - lifecycle, 2 - scan.") \
M(SettingInt64, remote_gc_interval_seconds, 3600, "The interval of running GC task on the remote store. Unit is second.") \
M(SettingInt64, remote_gc_verify_consistency, 0, "Verify the consistenct of valid locks when doing GC") \
M(SettingInt64, remote_gc_min_age_seconds, 3600, "The file will NOT be compacted when the time difference between the last modification is less than this threshold") \
M(SettingDouble, remote_gc_ratio, 0.5, "The files with valid rate less than this threshold will be compacted") \
M(SettingInt64, remote_gc_small_size, 128 * 1024, "The files with total size less than this threshold will be compacted") \
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,11 @@ ColumnPtr convertColumnByColumnDefineIfNeed(const DataTypePtr & from_type, Colum
auto [compatible, need_data_cast] = checkColumnTypeCompatibility(from_type, to_column_define.type);
if (unlikely(!compatible))
{
throw Exception("Reading mismatch data type pack. Cast from " + from_type->getName() + " to " + to_column_define.type->getName()
+ " is NOT supported!",
ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Reading mismatch data type pack. Cast from {} to {} is NOT supported, column_id={}",
from_type->getName(),
to_column_define.type->getName(),
to_column_define.id);
}
if (unlikely(!need_data_cast))
{
Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Storages/S3/MockS3Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Storages/S3/MockS3Client.h>
#include <aws/core/AmazonWebServiceRequest.h>
#include <aws/core/AmazonWebServiceResult.h>
#include <aws/core/NoResult.h>
#include <aws/core/utils/DateTime.h>
#include <aws/core/utils/stream/ResponseStream.h>
#include <aws/core/utils/xml/XmlSerializer.h>
Expand Down Expand Up @@ -309,5 +310,18 @@ Model::DeleteBucketOutcome MockS3Client::DeleteBucket(const Model::DeleteBucketR
return Model::DeleteBucketOutcome{};
}

Model::GetBucketLifecycleConfigurationOutcome MockS3Client::GetBucketLifecycleConfiguration(const Model::GetBucketLifecycleConfigurationRequest & request) const
{
// just mock a stub
UNUSED(request);
return Model::GetBucketLifecycleConfigurationResult();
}

Model::PutBucketLifecycleConfigurationOutcome MockS3Client::PutBucketLifecycleConfiguration(const Model::PutBucketLifecycleConfigurationRequest & request) const
{
// just mock a stub
UNUSED(request);
return Aws::NoResult();
}

} // namespace DB::S3::tests
3 changes: 3 additions & 0 deletions dbms/src/Storages/S3/MockS3Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class MockS3Client final : public S3::TiFlashS3Client
Model::CopyObjectOutcome CopyObject(const Model::CopyObjectRequest & request) const override;
Model::GetObjectTaggingOutcome GetObjectTagging(const Model::GetObjectTaggingRequest & request) const override;

Model::GetBucketLifecycleConfigurationOutcome GetBucketLifecycleConfiguration(const Model::GetBucketLifecycleConfigurationRequest & request) const override;
Model::PutBucketLifecycleConfigurationOutcome PutBucketLifecycleConfiguration(const Model::PutBucketLifecycleConfigurationRequest & request) const override;

private:
static String normalizedKey(String ori_key);

Expand Down
16 changes: 8 additions & 8 deletions dbms/src/Storages/S3/S3Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,17 +765,17 @@ bool ensureLifecycleRuleExist(const TiFlashS3Client & client, Int32 expire_days)
// Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/S3OutpostsLifecycleCLIJava.html
LOG_INFO(client.log, "The lifecycle rule with filter \"{}\" has not been added, n_rules={}", TaggingObjectIsDeleted, old_rules.size());
static_assert(TaggingObjectIsDeleted == "tiflash_deleted=true");
std::vector<Aws::S3::Model::Tag> filter_tags{Aws::S3::Model::Tag().WithKey("tiflash_deleted").WithValue("true")};
Aws::S3::Model::LifecycleRuleFilter filter;
filter.WithAnd(Aws::S3::Model::LifecycleRuleAndOperator()
.WithPrefix("")
.WithTags(filter_tags));
std::vector<Aws::S3::Model::Tag> filter_tags{
Aws::S3::Model::Tag().WithKey("tiflash_deleted").WithValue("true"),
};

Aws::S3::Model::LifecycleRule rule;
rule.WithStatus(Aws::S3::Model::ExpirationStatus::Enabled)
.WithFilter(filter)
.WithFilter(Aws::S3::Model::LifecycleRuleFilter()
.WithAnd(Aws::S3::Model::LifecycleRuleAndOperator()
.WithPrefix("")
.WithTags(filter_tags)))
.WithExpiration(Aws::S3::Model::LifecycleExpiration()
.WithExpiredObjectDeleteMarker(false)
.WithDays(expire_days))
.WithID("tiflashgc");

Expand All @@ -794,7 +794,7 @@ bool ensureLifecycleRuleExist(const TiFlashS3Client & client, Int32 expire_days)
const auto & error = outcome.GetError();
LOG_WARNING(
client.log,
"Create lifecycle rule with filter \"{}\" failed, please check the bucket lifecycle configuration or create the lifecycle rule manually"
"Create lifecycle rule with tag filter \"{}\" failed, please check the bucket lifecycle configuration or create the lifecycle rule manually"
", bucket={} {}",
TaggingObjectIsDeleted,
client.bucket(),
Expand Down
53 changes: 46 additions & 7 deletions dbms/src/Storages/S3/S3GCManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,25 @@ bool S3GCManager::runOnAllStores()
{
s = iter->second.state();
}
if (!s || *s == metapb::StoreState::Tombstone)
try
{
if (!s)
if (!s || *s == metapb::StoreState::Tombstone)
{
LOG_INFO(log, "store not found from pd, maybe already removed. gc_store_id={}", gc_store_id);
if (!s)
{
LOG_INFO(log, "store not found from pd, maybe already removed. gc_store_id={}", gc_store_id);
}
runForTombstoneStore(gc_store_id);
}
else
{
runForStore(gc_store_id);
}
runForTombstoneStore(gc_store_id);
}
else
catch (...)
{
runForStore(gc_store_id);
// log error and continue on next store_id
tryLogCurrentException(log, fmt::format("gc_store_id={}", gc_store_id));
}
}
// always return false, run in fixed rate
Expand Down Expand Up @@ -196,6 +204,11 @@ void S3GCManager::runForStore(UInt64 gc_store_id)
removeOutdatedManifest(manifests, &gc_timepoint);
GET_METRIC(tiflash_storage_s3_gc_seconds, type_clean_manifests).Observe(watch.elapsedMillisecondsFromLastTime() / 1000.0);

if (config.verify_locks)
{
verifyLocks(valid_lock_files);
}

switch (config.method)
{
case S3GCMethod::Lifecycle:
Expand All @@ -213,14 +226,15 @@ void S3GCManager::runForStore(UInt64 gc_store_id)
break;
}
}
LOG_INFO(log, "gc on store done, gc_store_id={}", gc_store_id);
}

void S3GCManager::runForTombstoneStore(UInt64 gc_store_id)
{
// get a timepoint at the begin, only remove objects that expired compare
// to this timepoint
const Aws::Utils::DateTime gc_timepoint = Aws::Utils::DateTime::Now();
LOG_DEBUG(log, "run gc, gc_store_id={} timepoint={}", gc_store_id, gc_timepoint.ToGmtString(Aws::Utils::DateFormat::ISO_8601));
LOG_DEBUG(log, "run gc, gc_store_id={} timepoint={} tombstone=true", gc_store_id, gc_timepoint.ToGmtString(Aws::Utils::DateFormat::ISO_8601));

Stopwatch watch;
// If the store id is tombstone, then run gc on the store as if no locks.
Expand Down Expand Up @@ -258,6 +272,7 @@ void S3GCManager::runForTombstoneStore(UInt64 gc_store_id)
break;
}
}
LOG_INFO(log, "gc on store done, gc_store_id={} tombstone=true", gc_store_id);
}

void S3GCManager::cleanUnusedLocks(
Expand Down Expand Up @@ -567,6 +582,30 @@ void S3GCManager::physicalRemoveDataFile(const String & datafile_key, const Logg
}
}

void S3GCManager::verifyLocks(const std::unordered_set<String> & valid_lock_files)
{
for (const auto & lock_key : valid_lock_files)
{
const auto lock_view = S3FilenameView::fromKey(lock_key);
RUNTIME_CHECK(lock_view.isLockFile(), lock_key, magic_enum::enum_name(lock_view.type));
const auto data_file_view = lock_view.asDataFile();
auto client = S3::ClientFactory::instance().sharedTiFlashClient();
String data_file_check_key;
if (!data_file_view.isDMFile())
{
data_file_check_key = data_file_view.toFullKey();
}
else
{
data_file_check_key = data_file_view.toFullKey() + "/meta";
}
LOG_INFO(log, "Checking consistency, lock_key={} data_file_key={}", lock_key, data_file_check_key);
auto object_info = S3::tryGetObjectInfo(*client, data_file_check_key);
RUNTIME_ASSERT(object_info.exist, log, "S3 file has already been removed! lock_key={} data_file={}", lock_key, data_file_check_key);
}
LOG_INFO(log, "All valid lock consistency check passed, num_locks={}", valid_lock_files.size());
}

std::vector<UInt64> S3GCManager::getAllStoreIds()
{
std::vector<UInt64> all_store_ids;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/S3/S3GCManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ struct S3GCConfig

S3GCMethod method = S3GCMethod::Lifecycle;

bool verify_locks = false;

// Only has meaning when method == ScanThenDelete
Int64 delmark_expired_hour = 1;

Expand Down Expand Up @@ -123,6 +125,8 @@ class S3GCManager
void lifecycleMarkDataFileDeleted(const String & datafile_key, const LoggerPtr & sub_logger);
void physicalRemoveDataFile(const String & datafile_key, const LoggerPtr & sub_logger) const;

void verifyLocks(const std::unordered_set<String> & valid_lock_files);

static std::vector<UInt64> getAllStoreIds();

std::unordered_set<String> getValidLocksFromManifest(const Strings & manifest_keys);
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/S3/tests/gtest_s3client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ class S3ClientTest : public ::testing::Test
std::shared_ptr<TiFlashS3Client> client;
};

TEST_F(S3ClientTest, LifecycleRule)
try
{
ASSERT_TRUE(ensureLifecycleRuleExist(*client, 1));
}
CATCH

TEST_F(S3ClientTest, UploadRead)
try
{
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config
}
}
remote_gc_config.interval_seconds = context.getSettingsRef().remote_gc_interval_seconds; // TODO: make it reloadable
remote_gc_config.method = S3::ClientFactory::instance().gc_method;
remote_gc_config.verify_locks = context.getSettingsRef().remote_gc_verify_consistency > 0;
// set the gc_method so that S3LockService can set tagging when create delmark
S3::ClientFactory::instance().gc_method = remote_gc_config.method;
s3gc_manager = std::make_unique<S3::S3GCManagerService>(context, cluster->pd_client, s3gc_owner, s3lock_client, remote_gc_config);
}
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/TestUtils/gtests_dbms_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ int main(int argc, char ** argv)
const auto s3_endpoint = Poco::Environment::get("S3_ENDPOINT", "");
const auto s3_bucket = Poco::Environment::get("S3_BUCKET", "mockbucket");
const auto s3_root = Poco::Environment::get("S3_ROOT", "tiflash_ut/");
const auto s3_verbose = Poco::Environment::get("S3_VERBOSE", "false");
const auto access_key_id = Poco::Environment::get("AWS_ACCESS_KEY_ID", "");
const auto secret_access_key = Poco::Environment::get("AWS_SECRET_ACCESS_KEY", "");
const auto mock_s3 = Poco::Environment::get("MOCK_S3", "true"); // In unit-tests, use MockS3Client by default.
auto s3config = DB::StorageS3Config{
.verbose = s3_verbose == "true",
.endpoint = s3_endpoint,
.bucket = s3_bucket,
.access_key_id = access_key_id,
Expand Down

0 comments on commit 5dc9bbf

Please sign in to comment.