Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify the data file exist according to manifest && fix S3 lifecycle rule (#7394) #7424

Merged
2 changes: 1 addition & 1 deletion contrib/aws
6 changes: 3 additions & 3 deletions dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ void AsynchronousMetrics::run()
template <typename Max, typename T>
static void calculateMax(Max & max, T x)
{
if (Max(x) > max)
if (static_cast<Max>(x) > max)
max = x;
}

template <typename Max, typename Sum, typename T>
static void calculateMaxAndSum(Max & max, Sum & sum, T x)
{
sum += x;
if (Max(x) > max)
if (static_cast<Max>(x) > max)
max = x;
}

Expand Down Expand Up @@ -172,7 +172,7 @@ FileUsageStatistics AsynchronousMetrics::getPageStorageFileUsage()
case DisaggregatedMode::Compute:
{
// disagg compute node without auto-scaler, the proxy data are stored in the `uni_ps`
if (auto uni_ps = context.getWriteNodePageStorage(); uni_ps != nullptr)
if (auto uni_ps = context.tryGetWriteNodePageStorage(); uni_ps != nullptr)
{
usage.merge(uni_ps->getFileUsageStatistics());
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ struct Settings
M(SettingInt64, remote_checkpoint_interval_seconds, 30, "The interval of uploading checkpoint to the remote store. Unit is second.") \
M(SettingInt64, remote_gc_method, 1, "The method of running GC task on the remote store. 1 - lifecycle, 2 - scan.") \
M(SettingInt64, remote_gc_interval_seconds, 3600, "The interval of running GC task on the remote store. Unit is second.") \
M(SettingInt64, remote_gc_verify_consistency, 0, "Verify the consistenct of valid locks when doing GC") \
M(SettingInt64, remote_gc_min_age_seconds, 3600, "The file will NOT be compacted when the time difference between the last modification is less than this threshold") \
M(SettingDouble, remote_gc_ratio, 0.5, "The files with valid rate less than this threshold will be compacted") \
M(SettingInt64, remote_gc_small_size, 128 * 1024, "The files with total size less than this threshold will be compacted") \
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,11 @@ ColumnPtr convertColumnByColumnDefineIfNeed(const DataTypePtr & from_type, Colum
auto [compatible, need_data_cast] = checkColumnTypeCompatibility(from_type, to_column_define.type);
if (unlikely(!compatible))
{
throw Exception("Reading mismatch data type pack. Cast from " + from_type->getName() + " to " + to_column_define.type->getName()
+ " is NOT supported!",
ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Reading mismatch data type pack. Cast from {} to {} is NOT supported, column_id={}",
from_type->getName(),
to_column_define.type->getName(),
to_column_define.id);
}
if (unlikely(!need_data_cast))
{
Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Storages/S3/MockS3Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Storages/S3/MockS3Client.h>
#include <aws/core/AmazonWebServiceRequest.h>
#include <aws/core/AmazonWebServiceResult.h>
#include <aws/core/NoResult.h>
#include <aws/core/utils/DateTime.h>
#include <aws/core/utils/stream/ResponseStream.h>
#include <aws/core/utils/xml/XmlSerializer.h>
Expand Down Expand Up @@ -309,5 +310,18 @@ Model::DeleteBucketOutcome MockS3Client::DeleteBucket(const Model::DeleteBucketR
return Model::DeleteBucketOutcome{};
}

Model::GetBucketLifecycleConfigurationOutcome MockS3Client::GetBucketLifecycleConfiguration(const Model::GetBucketLifecycleConfigurationRequest & request) const
{
// just mock a stub
UNUSED(request);
return Model::GetBucketLifecycleConfigurationResult();
}

Model::PutBucketLifecycleConfigurationOutcome MockS3Client::PutBucketLifecycleConfiguration(const Model::PutBucketLifecycleConfigurationRequest & request) const
{
// just mock a stub
UNUSED(request);
return Aws::NoResult();
}

} // namespace DB::S3::tests
3 changes: 3 additions & 0 deletions dbms/src/Storages/S3/MockS3Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class MockS3Client final : public S3::TiFlashS3Client
Model::CopyObjectOutcome CopyObject(const Model::CopyObjectRequest & request) const override;
Model::GetObjectTaggingOutcome GetObjectTagging(const Model::GetObjectTaggingRequest & request) const override;

Model::GetBucketLifecycleConfigurationOutcome GetBucketLifecycleConfiguration(const Model::GetBucketLifecycleConfigurationRequest & request) const override;
Model::PutBucketLifecycleConfigurationOutcome PutBucketLifecycleConfiguration(const Model::PutBucketLifecycleConfigurationRequest & request) const override;

private:
static String normalizedKey(String ori_key);

Expand Down
16 changes: 8 additions & 8 deletions dbms/src/Storages/S3/S3Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -754,17 +754,17 @@ bool ensureLifecycleRuleExist(const TiFlashS3Client & client, Int32 expire_days)
// Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/S3OutpostsLifecycleCLIJava.html
LOG_INFO(client.log, "The lifecycle rule with filter \"{}\" has not been added, n_rules={}", TaggingObjectIsDeleted, old_rules.size());
static_assert(TaggingObjectIsDeleted == "tiflash_deleted=true");
std::vector<Aws::S3::Model::Tag> filter_tags{Aws::S3::Model::Tag().WithKey("tiflash_deleted").WithValue("true")};
Aws::S3::Model::LifecycleRuleFilter filter;
filter.WithAnd(Aws::S3::Model::LifecycleRuleAndOperator()
.WithPrefix("")
.WithTags(filter_tags));
std::vector<Aws::S3::Model::Tag> filter_tags{
Aws::S3::Model::Tag().WithKey("tiflash_deleted").WithValue("true"),
};

Aws::S3::Model::LifecycleRule rule;
rule.WithStatus(Aws::S3::Model::ExpirationStatus::Enabled)
.WithFilter(filter)
.WithFilter(Aws::S3::Model::LifecycleRuleFilter()
.WithAnd(Aws::S3::Model::LifecycleRuleAndOperator()
.WithPrefix("")
.WithTags(filter_tags)))
.WithExpiration(Aws::S3::Model::LifecycleExpiration()
.WithExpiredObjectDeleteMarker(false)
.WithDays(expire_days))
.WithID("tiflashgc");

Expand All @@ -783,7 +783,7 @@ bool ensureLifecycleRuleExist(const TiFlashS3Client & client, Int32 expire_days)
const auto & error = outcome.GetError();
LOG_WARNING(
client.log,
"Create lifecycle rule with filter \"{}\" failed, please check the bucket lifecycle configuration or create the lifecycle rule manually"
"Create lifecycle rule with tag filter \"{}\" failed, please check the bucket lifecycle configuration or create the lifecycle rule manually"
", bucket={} {}",
TaggingObjectIsDeleted,
client.bucket(),
Expand Down
53 changes: 46 additions & 7 deletions dbms/src/Storages/S3/S3GCManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,25 @@ bool S3GCManager::runOnAllStores()
{
s = iter->second.state();
}
if (!s || *s == metapb::StoreState::Tombstone)
try
{
if (!s)
if (!s || *s == metapb::StoreState::Tombstone)
{
LOG_INFO(log, "store not found from pd, maybe already removed. gc_store_id={}", gc_store_id);
if (!s)
{
LOG_INFO(log, "store not found from pd, maybe already removed. gc_store_id={}", gc_store_id);
}
runForTombstoneStore(gc_store_id);
}
else
{
runForStore(gc_store_id);
}
runForTombstoneStore(gc_store_id);
}
else
catch (...)
{
runForStore(gc_store_id);
// log error and continue on next store_id
tryLogCurrentException(log, fmt::format("gc_store_id={}", gc_store_id));
}
}
// always return false, run in fixed rate
Expand Down Expand Up @@ -196,6 +204,11 @@ void S3GCManager::runForStore(UInt64 gc_store_id)
removeOutdatedManifest(manifests, &gc_timepoint);
GET_METRIC(tiflash_storage_s3_gc_seconds, type_clean_manifests).Observe(watch.elapsedMillisecondsFromLastTime() / 1000.0);

if (config.verify_locks)
{
verifyLocks(valid_lock_files);
}

switch (config.method)
{
case S3GCMethod::Lifecycle:
Expand All @@ -213,14 +226,15 @@ void S3GCManager::runForStore(UInt64 gc_store_id)
break;
}
}
LOG_INFO(log, "gc on store done, gc_store_id={}", gc_store_id);
}

void S3GCManager::runForTombstoneStore(UInt64 gc_store_id)
{
// get a timepoint at the begin, only remove objects that expired compare
// to this timepoint
const Aws::Utils::DateTime gc_timepoint = Aws::Utils::DateTime::Now();
LOG_DEBUG(log, "run gc, gc_store_id={} timepoint={}", gc_store_id, gc_timepoint.ToGmtString(Aws::Utils::DateFormat::ISO_8601));
LOG_DEBUG(log, "run gc, gc_store_id={} timepoint={} tombstone=true", gc_store_id, gc_timepoint.ToGmtString(Aws::Utils::DateFormat::ISO_8601));

Stopwatch watch;
// If the store id is tombstone, then run gc on the store as if no locks.
Expand Down Expand Up @@ -258,6 +272,7 @@ void S3GCManager::runForTombstoneStore(UInt64 gc_store_id)
break;
}
}
LOG_INFO(log, "gc on store done, gc_store_id={} tombstone=true", gc_store_id);
}

void S3GCManager::cleanUnusedLocks(
Expand Down Expand Up @@ -567,6 +582,30 @@ void S3GCManager::physicalRemoveDataFile(const String & datafile_key, const Logg
}
}

void S3GCManager::verifyLocks(const std::unordered_set<String> & valid_lock_files)
{
for (const auto & lock_key : valid_lock_files)
{
const auto lock_view = S3FilenameView::fromKey(lock_key);
RUNTIME_CHECK(lock_view.isLockFile(), lock_key, magic_enum::enum_name(lock_view.type));
const auto data_file_view = lock_view.asDataFile();
auto client = S3::ClientFactory::instance().sharedTiFlashClient();
String data_file_check_key;
if (!data_file_view.isDMFile())
{
data_file_check_key = data_file_view.toFullKey();
}
else
{
data_file_check_key = data_file_view.toFullKey() + "/meta";
}
LOG_INFO(log, "Checking consistency, lock_key={} data_file_key={}", lock_key, data_file_check_key);
auto object_info = S3::tryGetObjectInfo(*client, data_file_check_key);
RUNTIME_ASSERT(object_info.exist, log, "S3 file has already been removed! lock_key={} data_file={}", lock_key, data_file_check_key);
}
LOG_INFO(log, "All valid lock consistency check passed, num_locks={}", valid_lock_files.size());
}

std::vector<UInt64> S3GCManager::getAllStoreIds()
{
std::vector<UInt64> all_store_ids;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/S3/S3GCManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ struct S3GCConfig

S3GCMethod method = S3GCMethod::Lifecycle;

bool verify_locks = false;

// Only has meaning when method == ScanThenDelete
Int64 delmark_expired_hour = 1;

Expand Down Expand Up @@ -123,6 +125,8 @@ class S3GCManager
void lifecycleMarkDataFileDeleted(const String & datafile_key, const LoggerPtr & sub_logger);
void physicalRemoveDataFile(const String & datafile_key, const LoggerPtr & sub_logger) const;

void verifyLocks(const std::unordered_set<String> & valid_lock_files);

static std::vector<UInt64> getAllStoreIds();

std::unordered_set<String> getValidLocksFromManifest(const Strings & manifest_keys);
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/S3/tests/gtest_s3client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ class S3ClientTest : public ::testing::Test
std::shared_ptr<TiFlashS3Client> client;
};

TEST_F(S3ClientTest, LifecycleRule)
try
{
ASSERT_TRUE(ensureLifecycleRuleExist(*client, 1));
}
CATCH

TEST_F(S3ClientTest, UploadRead)
try
{
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config
}
}
remote_gc_config.interval_seconds = context.getSettingsRef().remote_gc_interval_seconds; // TODO: make it reloadable
remote_gc_config.method = S3::ClientFactory::instance().gc_method;
remote_gc_config.verify_locks = context.getSettingsRef().remote_gc_verify_consistency > 0;
// set the gc_method so that S3LockService can set tagging when create delmark
S3::ClientFactory::instance().gc_method = remote_gc_config.method;
s3gc_manager = std::make_unique<S3::S3GCManagerService>(context, cluster->pd_client, s3gc_owner, s3lock_client, remote_gc_config);
}
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/TestUtils/gtests_dbms_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ int main(int argc, char ** argv)
const auto s3_endpoint = Poco::Environment::get("S3_ENDPOINT", "");
const auto s3_bucket = Poco::Environment::get("S3_BUCKET", "mockbucket");
const auto s3_root = Poco::Environment::get("S3_ROOT", "tiflash_ut/");
const auto s3_verbose = Poco::Environment::get("S3_VERBOSE", "false");
const auto access_key_id = Poco::Environment::get("AWS_ACCESS_KEY_ID", "");
const auto secret_access_key = Poco::Environment::get("AWS_SECRET_ACCESS_KEY", "");
const auto mock_s3 = Poco::Environment::get("MOCK_S3", "true"); // In unit-tests, use MockS3Client by default.
auto s3config = DB::StorageS3Config{
.verbose = s3_verbose == "true",
.endpoint = s3_endpoint,
.bucket = s3_bucket,
.access_key_id = access_key_id,
Expand Down