diff --git a/contrib/kvproto b/contrib/kvproto index 1b2b4114103..b3251ae06bd 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 1b2b4114103afb06796b7e44f45f7e55133673c0 +Subproject commit b3251ae06bdd08fe5590eb5b7c5971c75db3d324 diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index 06412649922..93d4f07fa62 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -60,6 +60,7 @@ M(IOLimiterPendingFgWriteReq) \ M(IOLimiterPendingBgReadReq) \ M(IOLimiterPendingFgReadReq) \ + M(S3LockServiceNumLatches) \ M(StoragePoolV2Only) \ M(StoragePoolV3Only) \ M(StoragePoolMixMode) \ diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 2b6db73cf59..6d8ea8fd81c 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -203,6 +203,14 @@ namespace DB F(type_bg_write_alloc_bytes, {"type", "bg_write_alloc_bytes"})) \ M(tiflash_storage_rough_set_filter_rate, "Bucketed histogram of rough set filter rate", Histogram, \ F(type_dtfile_pack, {{"type", "dtfile_pack"}}, EqualWidthBuckets{0, 6, 20})) \ + M(tiflash_disaggregated_object_lock_request_count, "Total number of S3 object lock/delete request", Counter, \ + F(type_lock, {"type", "lock"}), F(type_delete, {"type", "delete"}), \ + F(type_owner_changed, {"type", "owner_changed"}), F(type_error, {"type", "error"}), \ + F(type_lock_conflict, {"type", "lock_conflict"}), F(type_delete_conflict, {"type", "delete_conflict"}), \ + F(type_delete_risk, {"type", "delete_risk"})) \ + M(tiflash_disaggregated_object_lock_request_duration_seconds, "Bucketed histogram of S3 object lock/delete request duration", Histogram, \ + F(type_lock, {{"type", "cop"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delete, {{"type", "batch"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_raft_command_duration_seconds, "Bucketed histogram of some raft command: apply snapshot", \ Histogram, /* these command usually cost servel seconds, increase the start bucket to 50ms */ \ F(type_ingest_sst, {{"type", "ingest_sst"}}, ExpBuckets{0.05, 2, 10}), \ diff --git a/dbms/src/Flash/CMakeLists.txt b/dbms/src/Flash/CMakeLists.txt index 3193ada560e..38eaf1ac695 100644 --- a/dbms/src/Flash/CMakeLists.txt +++ b/dbms/src/Flash/CMakeLists.txt @@ -28,6 +28,7 @@ add_headers_and_sources(flash_service ./Planner) add_headers_and_sources(flash_service ./Planner/Plans) add_headers_and_sources(flash_service ./Statistics) add_headers_and_sources(flash_service ./Management) +add_headers_and_sources(flash_service ./Disaggregated) add_library(flash_service ${flash_service_headers} ${flash_service_sources}) target_link_libraries(flash_service dbms) diff --git a/dbms/src/Flash/Disaggregated/MockS3LockClient.h b/dbms/src/Flash/Disaggregated/MockS3LockClient.h new file mode 100644 index 00000000000..3bc03dc595d --- /dev/null +++ b/dbms/src/Flash/Disaggregated/MockS3LockClient.h @@ -0,0 +1,78 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB::S3 +{ + +// A simple mock lock client for testing. +// Notice: It does NOT guarantee atomicity between +// "try add lock" and "try mark delete" operations +// on the same `data_file_key`. +class MockS3LockClient : public IS3LockClient +{ +public: + explicit MockS3LockClient(std::shared_ptr c) + : s3_client(std::move(c)) + { + } + + std::pair + sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64) override + { + // If the data file exist and no delmark exist, then create a lock file on `data_file_key` + auto view = S3FilenameView::fromKey(data_file_key); + if (!objectExists(*s3_client, s3_client->bucket(), data_file_key)) + { + return {false, ""}; + } + auto delmark_key = view.getDelMarkKey(); + if (objectExists(*s3_client, s3_client->bucket(), delmark_key)) + { + return {false, ""}; + } + uploadEmptyFile(*s3_client, s3_client->bucket(), view.getLockKey(lock_store_id, lock_seq)); + return {true, ""}; + } + + std::pair + sendTryMarkDeleteRequest(const String & data_file_key, Int64) override + { + // If there is no lock on the given `data_file_key`, then mark as deleted + auto view = S3FilenameView::fromKey(data_file_key); + auto lock_prefix = view.getLockPrefix(); + bool any_lock_exist = false; + listPrefix(*s3_client, s3_client->bucket(), lock_prefix, [&any_lock_exist](const Aws::S3::Model::ListObjectsV2Result & result) -> S3::PageResult { + if (!result.GetContents().empty()) + any_lock_exist = true; + return S3::PageResult{.num_keys = result.GetContents().size(), .more = false}; + }); + if (any_lock_exist) + { + return {false, ""}; + } + uploadEmptyFile(*s3_client, s3_client->bucket(), view.getDelMarkKey()); + return {true, ""}; + } + +private: + std::shared_ptr s3_client; +}; + +} // namespace DB::S3 diff --git a/dbms/src/Flash/Disaggregated/S3LockClient.cpp b/dbms/src/Flash/Disaggregated/S3LockClient.cpp new file mode 100644 index 00000000000..d5a4b1e80d4 --- /dev/null +++ b/dbms/src/Flash/Disaggregated/S3LockClient.cpp @@ -0,0 +1,209 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace DB::ErrorCodes +{ +extern const int TIMEOUT_EXCEEDED; +} + +namespace DB::S3 +{ +S3LockClient::S3LockClient( + pingcap::kv::Cluster * kv_cluster_, + OwnerManagerPtr s3gc_owner_) + : kv_cluster(kv_cluster_) + , s3gc_owner(s3gc_owner_) + , log(Logger::get()) +{ +} + +std::pair +S3LockClient::sendTryAddLockRequest( + const String & data_file_key, + UInt32 lock_store_id, + UInt32 lock_seq, + Int64 timeout_s) +{ + using Req = disaggregated::TryAddLockRequest; + using Resp = disaggregated::TryAddLockResponse; + Req req; + req.set_data_file_key(data_file_key); + req.set_lock_store_id(lock_store_id); + req.set_lock_seq(lock_seq); + auto tracing_log = log->getChild(fmt::format(",type=AddLock>", data_file_key, lock_store_id, lock_seq)); + + return makeCall( + [](grpc::ClientContext * grpc_context, + const std::shared_ptr & kvstub, + const Req & req, + Resp * response) { + return kvstub->stub->tryAddLock(grpc_context, req, response); + }, + req, + timeout_s, + tracing_log); +} + +std::pair +S3LockClient::sendTryMarkDeleteRequest( + const String & data_file_key, + Int64 timeout_s) +{ + using Req = disaggregated::TryMarkDeleteRequest; + using Resp = disaggregated::TryMarkDeleteResponse; + Req req; + req.set_data_file_key(data_file_key); + auto tracing_log = log->getChild(fmt::format("", data_file_key)); + + return makeCall( + [](grpc::ClientContext * grpc_context, + const std::shared_ptr & kvstub, + const Req & req, + Resp * response) { + return kvstub->stub->tryMarkDelete(grpc_context, req, response); + }, + req, + timeout_s, + tracing_log); +} + +// Try send the response to GC Owner in timeout_s seconds +// If the call success, return +// Otherwise return +// This method will update the owner info when owner changed. +// If deadline exceed or failed to get the owner info within +// `timeour_s`, it will throw exception. +template +std::pair S3LockClient::makeCall(SendRpc send, const Request & req, Int64 timeout_s, const LoggerPtr & tracing_log) +{ + const auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(timeout_s); + String address = getOwnerAddr(deadline, tracing_log); + + do + { + LOG_DEBUG(tracing_log, "request, address={} req={}", address, req.ShortDebugString()); + auto kvstub = kv_cluster->rpc_client->getConnArray(address)->get(); + grpc::ClientContext grpc_context; + grpc_context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(timeout_s)); + Response resp; + auto status = send(&grpc_context, kvstub, req, &resp); + if (!status.ok()) + { + if (Clock::now() > deadline) + { + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "deadline exceed, " + tracing_log->identifier()); + } + // retry + LOG_ERROR(tracing_log, "meets error, code={} msg={}", status.error_code(), status.error_message()); + address = updateOwnerAddr(deadline, tracing_log); + } + + if (resp.result().has_success()) + { + // success + return {true, ""}; + } + else if (resp.result().has_conflict()) + { + // not retriable + LOG_INFO(tracing_log, "meets conflict, reason={}", resp.result().conflict().reason()); + return {false, resp.result().conflict().reason()}; + } + else if (resp.result().has_not_owner()) + { + if (Clock::now() > deadline) + { + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "deadline exceed, " + tracing_log->identifier()); + } + // retry + auto not_owner = resp.result().not_owner(); + LOG_WARNING(tracing_log, "meets not owner, retry"); + address = updateOwnerAddr(deadline, tracing_log); + } + } while (true); +} + +String S3LockClient::getOwnerAddr(const Timepoint & deadline, const LoggerPtr & tracing_log) +{ + { + std::shared_lock lk(mtx_owner); + if (!owner_cache.empty()) + return owner_cache; + } + // owner_cache is empty, try update + return updateOwnerAddr(deadline, tracing_log); +} + +String S3LockClient::updateOwnerAddr(const Timepoint & deadline, const LoggerPtr & tracing_log) +{ + using namespace std::chrono_literals; + while (true) + { + auto owner_info = s3gc_owner->getOwnerID(); + switch (owner_info.status) + { + case DB::OwnerType::IsOwner: + case DB::OwnerType::NotOwner: + { + { + std::unique_lock lk(mtx_owner); + owner_cache = owner_info.owner_id; + } + return owner_info.owner_id; + } + case DB::OwnerType::NoLeader: + { + if (Clock::now() > deadline) + { + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "deadline exceed, owner not available, " + tracing_log->identifier()); + } + LOG_ERROR(tracing_log, "owner not available"); + std::this_thread::sleep_for(500ms); + break; // continue retry + } + case DB::OwnerType::GrpcError: + { + if (Clock::now() > deadline) + { + throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, fmt::format("deadline exceed, owner not available, msg={} {}", owner_info.errMsg(), tracing_log->identifier())); + } + LOG_ERROR(tracing_log, "owner not available, msg={}", owner_info.errMsg()); + std::this_thread::sleep_for(500ms); + break; // continue retry + } + } + } +} + +} // namespace DB::S3 diff --git a/dbms/src/Flash/Disaggregated/S3LockClient.h b/dbms/src/Flash/Disaggregated/S3LockClient.h new file mode 100644 index 00000000000..569e5950220 --- /dev/null +++ b/dbms/src/Flash/Disaggregated/S3LockClient.h @@ -0,0 +1,106 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ +class Context; +class Logger; +using LoggerPtr = std::shared_ptr; +} // namespace DB + +namespace DB::S3 +{ + +class IS3LockClient; +using S3LockClientPtr = std::shared_ptr; + +class IS3LockClient +{ +public: + virtual ~IS3LockClient() = default; + + // Try add lock to the `data_file_key` by `lock_store_id` and `lock_seq` + // If the file is locked successfully, return + // Otherwise return + // This method will update the owner info when owner changed. + // If deadline exceed or failed to get the owner info within + // `timeour_s`, it will throw exception. + virtual std::pair + sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s) = 0; + + // Try mark the `data_file_key` as deleted + // If the file is marked as deleted, return + // Otherwise return + // This method will update the owner info when owner changed. + // If deadline exceed or failed to get the owner info within + // `timeour_s`, it will throw exception. + virtual std::pair + sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s) = 0; +}; + +class S3LockClient : public IS3LockClient +{ +public: + explicit S3LockClient( + pingcap::kv::Cluster * kv_cluster_, + OwnerManagerPtr s3gc_owner_); + + // Try add lock to the `data_file_key` by `lock_store_id` and `lock_seq` + // If the file is locked successfully, return + // Otherwise return + // This method will update the owner info when owner changed. + // If deadline exceed or failed to get the owner info within + // `timeour_s`, it will throw exception. + std::pair + sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s) override; + + // Try mark the `data_file_key` as deleted + // If the file is marked as deleted, return + // Otherwise return + // This method will update the owner info when owner changed. + // If deadline exceed or failed to get the owner info within + // `timeour_s`, it will throw exception. + std::pair + sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s) override; + +private: + template + std::pair makeCall( + SendRpc send, + const Request & req, + Int64 timeout_s, + const LoggerPtr & tracing_log); + + String getOwnerAddr(const Timepoint & deadline, const LoggerPtr & tracing_log); + String updateOwnerAddr(const Timepoint & deadline, const LoggerPtr & tracing_log); + +private: + pingcap::kv::Cluster * kv_cluster; + OwnerManagerPtr s3gc_owner; + + mutable std::shared_mutex mtx_owner; + String owner_cache; + + LoggerPtr log; +}; + +} // namespace DB::S3 diff --git a/dbms/src/Flash/Disaggregated/S3LockService.cpp b/dbms/src/Flash/Disaggregated/S3LockService.cpp new file mode 100644 index 00000000000..6eba7e727f1 --- /dev/null +++ b/dbms/src/Flash/Disaggregated/S3LockService.cpp @@ -0,0 +1,329 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + + +namespace CurrentMetrics +{ +extern const Metric S3LockServiceNumLatches; +} + +namespace DB::S3 +{ + + +S3LockService::S3LockService(Context & context_) + : S3LockService( + context_.getGlobalContext().getTMTContext().getS3GCOwnerManager(), + S3::ClientFactory::instance().sharedTiFlashClient()) +{ +} + +S3LockService::S3LockService(OwnerManagerPtr owner_mgr_, std::shared_ptr s3_cli_) + : gc_owner(std::move(owner_mgr_)) + , s3_client(std::move(s3_cli_)) + , log(Logger::get()) +{ +} + +grpc::Status S3LockService::tryAddLock(const disaggregated::TryAddLockRequest * request, disaggregated::TryAddLockResponse * response) +{ + try + { + Stopwatch watch; + SCOPE_EXIT({ GET_METRIC(tiflash_disaggregated_object_lock_request_duration_seconds, type_lock).Observe(watch.elapsedSeconds()); }); + tryAddLockImpl(request->data_file_key(), request->lock_store_id(), request->lock_seq(), response); + if (response->result().has_conflict()) + { + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_lock_conflict).Increment(); + } + else if (response->result().has_not_owner()) + { + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_owner_changed).Increment(); + } + return grpc::Status::OK; + } + catch (const TiFlashException & e) + { + LOG_ERROR(log, "TiFlash Exception: {}\n{}", e.displayText(), e.getStackTrace().toString()); + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_error).Increment(); + return grpc::Status(grpc::StatusCode::INTERNAL, e.standardText()); + } + catch (const Exception & e) + { + LOG_ERROR(log, "DB Exception: {}\n{}", e.message(), e.getStackTrace().toString()); + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_error).Increment(); + return grpc::Status(tiflashErrorCodeToGrpcStatusCode(e.code()), e.message()); + } + catch (const pingcap::Exception & e) + { + LOG_ERROR(log, "KV Client Exception: {}", e.message()); + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_error).Increment(); + return grpc::Status(grpc::StatusCode::INTERNAL, e.message()); + } + catch (const std::exception & e) + { + LOG_ERROR(log, "std exception: {}", e.what()); + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_error).Increment(); + return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + } + catch (...) + { + LOG_ERROR(log, "other exception"); + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_error).Increment(); + return grpc::Status(grpc::StatusCode::INTERNAL, "other exception"); + } +} + +grpc::Status S3LockService::tryMarkDelete(const disaggregated::TryMarkDeleteRequest * request, disaggregated::TryMarkDeleteResponse * response) +{ + try + { + Stopwatch watch; + SCOPE_EXIT({ GET_METRIC(tiflash_disaggregated_object_lock_request_duration_seconds, type_delete).Observe(watch.elapsedSeconds()); }); + tryMarkDeleteImpl(request->data_file_key(), response); + if (response->result().has_conflict()) + { + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_delete_conflict).Increment(); + } + else if (response->result().has_not_owner()) + { + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_owner_changed).Increment(); + } + return grpc::Status::OK; + } + catch (const TiFlashException & e) + { + LOG_ERROR(log, "TiFlash Exception: {}\n{}", e.displayText(), e.getStackTrace().toString()); + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_error).Increment(); + return grpc::Status(grpc::StatusCode::INTERNAL, e.standardText()); + } + catch (const Exception & e) + { + LOG_ERROR(log, "DB Exception: {}\n{}", e.message(), e.getStackTrace().toString()); + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_error).Increment(); + return grpc::Status(tiflashErrorCodeToGrpcStatusCode(e.code()), e.message()); + } + catch (const pingcap::Exception & e) + { + LOG_ERROR(log, "KV Client Exception: {}", e.message()); + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_error).Increment(); + return grpc::Status(grpc::StatusCode::INTERNAL, e.message()); + } + catch (const std::exception & e) + { + LOG_ERROR(log, "std exception: {}", e.what()); + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_error).Increment(); + return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); + } + catch (...) + { + LOG_ERROR(log, "other exception"); + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_error).Increment(); + return grpc::Status(grpc::StatusCode::INTERNAL, "other exception"); + } +} + +S3LockService::DataFileMutexPtr +S3LockService::getDataFileLatch(const String & data_file_key) +{ + std::unique_lock lock(file_latch_map_mutex); + auto it = file_latch_map.find(data_file_key); + if (it == file_latch_map.end()) + { + it = file_latch_map.emplace(data_file_key, std::make_shared()).first; + CurrentMetrics::add(CurrentMetrics::S3LockServiceNumLatches); + } + auto file_latch = it->second; + // must add ref count under the protection of `file_latch_map_mutex` + file_latch->addRefCount(); + return file_latch; +} + +bool S3LockService::tryAddLockImpl( + const String & data_file_key, + UInt64 lock_store_id, + UInt64 lock_seq, + disaggregated::TryAddLockResponse * response) +{ + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_lock).Increment(); + const S3FilenameView key_view = S3FilenameView::fromKey(data_file_key); + RUNTIME_CHECK(key_view.isDataFile(), data_file_key); + + if (!gc_owner->isOwner()) + { + // client should retry + response->mutable_result()->mutable_not_owner(); + return false; + } + + // Get the latch of the file, with ref count added + auto file_latch = getDataFileLatch(data_file_key); + file_latch->lock(); // prevent other request on the same key + SCOPE_EXIT({ + file_latch->unlock(); + std::unique_lock lock(file_latch_map_mutex); + // currently no request for the same key, release + if (file_latch->decreaseRefCount() == 0) + { + file_latch_map.erase(data_file_key); + CurrentMetrics::sub(CurrentMetrics::S3LockServiceNumLatches); + } + }); + + // make sure data file exists + if (!DB::S3::objectExists(*s3_client, s3_client->bucket(), data_file_key)) + { + auto * e = response->mutable_result()->mutable_conflict(); + e->set_reason(fmt::format("data file not exist, key={}", data_file_key)); + LOG_INFO(log, "data file lock conflict: not exist, key={}", data_file_key); + return false; + } + + // make sure data file is not mark as deleted + const auto delmark_key = key_view.getDelMarkKey(); + if (DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key)) + { + auto * e = response->mutable_result()->mutable_conflict(); + e->set_reason(fmt::format("data file is mark deleted, key={} delmark={}", data_file_key, delmark_key)); + LOG_INFO(log, "data file lock conflict: mark deleted, key={} delmark={}", data_file_key, delmark_key); + return false; + } + + // Check whether this node is owner again before uploading lock file + if (!gc_owner->isOwner()) + { + // client should retry + response->mutable_result()->mutable_not_owner(); + LOG_INFO(log, "data file lock conflict: owner changed, key={}", data_file_key); + return false; + } + const auto lock_key = key_view.getLockKey(lock_store_id, lock_seq); + // upload lock file + DB::S3::uploadEmptyFile(*s3_client, s3_client->bucket(), lock_key); + if (!gc_owner->isOwner()) + { + // altough the owner is changed after lock file is uploaded, but + // it is safe to return owner change and let the client retry. + // the obsolete lock file will finally get removed in S3 GC. + response->mutable_result()->mutable_not_owner(); + LOG_INFO(log, "data file lock conflict: owner changed after lock added, key={} lock_key={}", data_file_key, lock_key); + return false; + } + + LOG_INFO(log, "data file is locked, key={} lock_key={}", data_file_key, lock_key); + response->mutable_result()->mutable_success(); + return true; +} + +std::optional S3LockService::anyLockExist(const String & lock_prefix) const +{ + std::optional lock_key; + DB::S3::listPrefix( + *s3_client, + s3_client->bucket(), + lock_prefix, + [&lock_key](const Aws::S3::Model::ListObjectsV2Result & result) -> S3::PageResult { + const auto & contents = result.GetContents(); + if (!contents.empty()) + { + lock_key = contents.front().GetKey(); + } + return S3::PageResult{ + .num_keys = contents.size(), + .more = false, // do not need more result + }; + }); + return lock_key; +} + +bool S3LockService::tryMarkDeleteImpl(const String & data_file_key, disaggregated::TryMarkDeleteResponse * response) +{ + const S3FilenameView key_view = S3FilenameView::fromKey(data_file_key); + RUNTIME_CHECK(key_view.isDataFile(), data_file_key); + + if (!gc_owner->isOwner()) + { + // client should retry + response->mutable_result()->mutable_not_owner(); + return false; + } + + // Get the latch of the file, with ref count added + auto file_latch = getDataFileLatch(data_file_key); + file_latch->lock(); // prevent other request on the same key + SCOPE_EXIT({ + file_latch->unlock(); + std::unique_lock lock(file_latch_map_mutex); + // currently no request for the same key, release + if (file_latch->decreaseRefCount() == 0) + { + file_latch_map.erase(data_file_key); + CurrentMetrics::sub(CurrentMetrics::S3LockServiceNumLatches); + } + }); + + // make sure data file has not been locked + const auto lock_prefix = key_view.getLockPrefix(); + std::optional lock_key = anyLockExist(lock_prefix); + if (lock_key) + { + auto * e = response->mutable_result()->mutable_conflict(); + e->set_reason(fmt::format("data file is locked, key={} lock_by={}", data_file_key, lock_key.value())); + LOG_INFO(log, "data file mark delete conflict: file is locked, key={} lock_by={}", data_file_key, lock_key.value()); + return false; + } + + // Check whether this node is owner again before marking delete + if (!gc_owner->isOwner()) + { + // client should retry + response->mutable_result()->mutable_not_owner(); + LOG_INFO(log, "data file mark delete conflict: owner changed, key={}", data_file_key); + return false; + } + // upload delete mark + const auto delmark_key = key_view.getDelMarkKey(); + DB::S3::uploadEmptyFile(*s3_client, s3_client->bucket(), delmark_key); + if (!gc_owner->isOwner()) + { + // owner changed happens when delmark is uploading, can not + // ensure whether this is safe or not. + LOG_ERROR(log, "data file mark delete conflict: owner changed when marking file deleted! key={} delmark={}", data_file_key, delmark_key); + GET_METRIC(tiflash_disaggregated_object_lock_request_count, type_delete_risk).Increment(); + } + + LOG_INFO(log, "data file is mark deleted, key={} delmark={}", data_file_key, delmark_key); + response->mutable_result()->mutable_success(); + return true; +} + +} // namespace DB::S3 diff --git a/dbms/src/Flash/Disaggregated/S3LockService.h b/dbms/src/Flash/Disaggregated/S3LockService.h new file mode 100644 index 00000000000..71f8b368cce --- /dev/null +++ b/dbms/src/Flash/Disaggregated/S3LockService.h @@ -0,0 +1,116 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wnon-virtual-dtor" +#ifdef __clang__ +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#endif +#include +#include +#pragma GCC diagnostic pop + +namespace DB +{ +class Context; +} + +namespace DB::S3 +{ + +// Disaggregated TiFlash could share the same S3 object by multiple TiFlash +// instances. And a TiFlash instance claims the ownership of the S3 object +// by "add lock". +// The S3 GC will respect the lock and only "mark delete" when there is +// no any lock on a given S3 object. +// +// This class provide atomic "add lock" and "mark deleted" service for a +// given S3 object. +class S3LockService final : private boost::noncopyable +{ +public: + explicit S3LockService(Context & context_); + + S3LockService(OwnerManagerPtr owner_mgr_, std::shared_ptr s3_cli_); + + ~S3LockService() = default; + + grpc::Status tryAddLock(const disaggregated::TryAddLockRequest * request, disaggregated::TryAddLockResponse * response); + + grpc::Status tryMarkDelete(const disaggregated::TryMarkDeleteRequest * request, disaggregated::TryMarkDeleteResponse * response); + +private: + struct DataFileMutex; + using DataFileMutexPtr = std::shared_ptr; + struct DataFileMutex + { + std::mutex file_mutex; + UInt32 ref_count = 0; + + void lock() + { + file_mutex.lock(); + } + + void unlock() + { + file_mutex.unlock(); + } + + // must be protected by the mutex on the whole map + void addRefCount() + { + ++ref_count; + } + + // must be protected by the mutex on the whole map + UInt32 decreaseRefCount() + { + --ref_count; + return ref_count; + } + }; + + bool tryAddLockImpl(const String & data_file_key, UInt64 lock_store_id, UInt64 lock_seq, disaggregated::TryAddLockResponse * response); + + bool tryMarkDeleteImpl(const String & data_file_key, disaggregated::TryMarkDeleteResponse * response); + + DataFileMutexPtr getDataFileLatch(const String & data_file_key); + + std::optional anyLockExist(const String & lock_prefix) const; + +private: + std::unordered_map file_latch_map; + std::mutex file_latch_map_mutex; + + OwnerManagerPtr gc_owner; + const std::shared_ptr s3_client; + + LoggerPtr log; +}; + + +} // namespace DB::S3 diff --git a/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp new file mode 100644 index 00000000000..8ea3b1db23c --- /dev/null +++ b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp @@ -0,0 +1,410 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB::S3::tests +{ + +class S3LockServiceTest + : public DB::base::TiFlashStorageTestBasic +{ +public: + void SetUp() override + { + db_context = std::make_unique(DB::tests::TiFlashTestEnv::getContext()); + log = Logger::get(); + + auto & client_factory = DB::S3::ClientFactory::instance(); + is_s3_test_enabled = client_factory.isEnabled(); + + owner_manager = std::static_pointer_cast(OwnerManager::createMockOwner("owner_0")); + owner_manager->campaignOwner(); + + if (is_s3_test_enabled) + { + s3_client = client_factory.sharedTiFlashClient(); + } + else + { + s3_client = std::make_shared(); + } + s3_lock_service = std::make_unique(owner_manager, s3_client); + createS3DataFiles(); + } + + void createS3DataFiles() + { + // create 5 data files + for (size_t i = 1; i <= 5; ++i) + { + auto data_filename = S3Filename::fromDMFileOID(DMFileOID{.store_id = store_id, .table_id = physical_table_id, .file_id = dm_file_id}); + DB::S3::uploadEmptyFile(*s3_client, s3_client->bucket(), data_filename.toFullKey()); + ++dm_file_id; + } + } + + void TearDown() override + { + // clean data files + while (dm_file_id > 0) + { + --dm_file_id; + auto data_filename = S3Filename::fromDMFileOID(DMFileOID{.store_id = store_id, .table_id = physical_table_id, .file_id = dm_file_id}); + DB::S3::deleteObject(*s3_client, s3_client->bucket(), data_filename.toFullKey()); + } + } + + S3Filename getDataFilename(std::optional get_fid = std::nullopt) + { + auto file_id = get_fid.has_value() ? get_fid.value() : dm_file_id - 1; // the last uploaded dmfile id + return S3Filename::fromDMFileOID(DMFileOID{.store_id = store_id, .table_id = physical_table_id, .file_id = file_id}); + } + +protected: + bool is_s3_test_enabled = false; + + std::shared_ptr owner_manager; + std::unique_ptr s3_lock_service; + + std::shared_ptr s3_client; + const UInt64 store_id = 1; + const Int64 physical_table_id = 1; + UInt64 dm_file_id = 1; + UInt64 lock_seq = 0; + const UInt64 lock_store_id = 2; + LoggerPtr log; +}; + +#define CHECK_S3_ENABLED \ + if (!is_s3_test_enabled) \ + { \ + const auto * t = ::testing::UnitTest::GetInstance()->current_test_info(); \ + LOG_INFO( \ + log, \ + "{}.{} is skipped because S3ClientFactory is not inited.", \ + t->test_case_name(), \ + t->name()); \ + return; \ + } + + +TEST_F(S3LockServiceTest, SingleTryAddLockRequest) +try +{ + auto data_filename = getDataFilename(); + auto data_file_key = data_filename.toFullKey(); + auto lock_key = data_filename.toView().getLockKey(lock_store_id, lock_seq); + + auto request = ::disaggregated::TryAddLockRequest(); + request.set_data_file_key(data_file_key); + request.set_lock_store_id(lock_store_id); + request.set_lock_seq(lock_seq); + auto response = ::disaggregated::TryAddLockResponse(); + auto status_code = s3_lock_service->tryAddLock(&request, &response); + + ASSERT_TRUE(status_code.ok()) << status_code.error_message(); + ASSERT_TRUE(response.result().has_success()) << response.ShortDebugString(); + ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key)); + + DB::S3::deleteObject(*s3_client, s3_client->bucket(), lock_key); +} +CATCH + + +TEST_F(S3LockServiceTest, SingleTryMarkDeleteTest) +try +{ + auto data_filename = getDataFilename(); + auto data_file_key = data_filename.toFullKey(); + auto delmark_key = data_filename.toView().getDelMarkKey(); + + auto request = ::disaggregated::TryMarkDeleteRequest(); + request.set_data_file_key(data_file_key); + auto response = ::disaggregated::TryMarkDeleteResponse(); + auto status_code = s3_lock_service->tryMarkDelete(&request, &response); + + ASSERT_TRUE(status_code.ok()) << status_code.error_message(); + ASSERT_TRUE(response.result().has_success()) << response.ShortDebugString(); + ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key)); + + DB::S3::deleteObject(*s3_client, s3_client->bucket(), delmark_key); +} +CATCH + +TEST_F(S3LockServiceTest, SingleTryAddLockRequestWithDeleteFileTest) +try +{ + auto data_filename = getDataFilename(); + auto data_file_key = data_filename.toFullKey(); + auto delmark_key = data_filename.toView().getDelMarkKey(); + auto lock_key = data_filename.toView().getLockKey(lock_store_id, lock_seq); + + // Add delete file first + { + auto request = ::disaggregated::TryMarkDeleteRequest(); + request.set_data_file_key(data_file_key); + auto response = ::disaggregated::TryMarkDeleteResponse(); + auto status_code = s3_lock_service->tryMarkDelete(&request, &response); + + ASSERT_TRUE(status_code.ok()) << status_code.error_message(); + ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key)); + } + + // Try add lock file, should fail + { + auto request = ::disaggregated::TryAddLockRequest(); + request.set_data_file_key(data_file_key); + request.set_lock_seq(lock_seq); + request.set_lock_store_id(lock_store_id); + auto response = ::disaggregated::TryAddLockResponse(); + auto status_code = s3_lock_service->tryAddLock(&request, &response); + + ASSERT_TRUE(status_code.ok()); + ASSERT_TRUE(!response.result().has_success()) << response.ShortDebugString(); + ASSERT_TRUE(response.result().has_conflict()) << response.ShortDebugString(); + ASSERT_TRUE(!DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key)); + } + + DB::S3::deleteObject(*s3_client, s3_client->bucket(), delmark_key); +} +CATCH + +TEST_F(S3LockServiceTest, SingleTryMarkDeleteRequestWithLockFileTest) +try +{ + auto data_filename = getDataFilename(); + auto data_file_key = data_filename.toFullKey(); + auto delmark_key = data_filename.toView().getDelMarkKey(); + auto lock_key = data_filename.toView().getLockKey(lock_store_id, lock_seq); + + // Add lock file first + { + auto request = ::disaggregated::TryAddLockRequest(); + + request.set_data_file_key(data_file_key); + request.set_lock_store_id(lock_store_id); + request.set_lock_seq(lock_seq); + auto response = ::disaggregated::TryAddLockResponse(); + auto status_code = s3_lock_service->tryAddLock(&request, &response); + + ASSERT_TRUE(status_code.ok()); + ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key)); + } + + // Try add delete mark, should fail + { + auto request = ::disaggregated::TryMarkDeleteRequest(); + + request.set_data_file_key(data_file_key); + auto response = ::disaggregated::TryMarkDeleteResponse(); + auto status_code = s3_lock_service->tryMarkDelete(&request, &response); + + ASSERT_TRUE(status_code.ok()); + ASSERT_TRUE(!response.result().has_success()) << response.ShortDebugString(); + ASSERT_TRUE(response.result().has_conflict()) << response.ShortDebugString(); + ASSERT_TRUE(!DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key)); + } + + DB::S3::deleteObject(*s3_client, s3_client->bucket(), lock_key); +} +CATCH + +TEST_F(S3LockServiceTest, SingleTryAddLockRequestWithDataFileLostTest) +try +{ + auto data_filename = getDataFilename(dm_file_id); // not created dmfile key + auto data_file_key = data_filename.toFullKey(); + auto lock_key = data_filename.toView().getLockKey(lock_store_id, lock_seq); + + // Try add lock file, data file is not exist, should fail + { + auto request = ::disaggregated::TryAddLockRequest(); + + request.set_data_file_key(data_file_key); + request.set_lock_seq(lock_seq); + request.set_lock_store_id(lock_store_id); + auto response = ::disaggregated::TryAddLockResponse(); + auto status_code = s3_lock_service->tryAddLock(&request, &response); + + ASSERT_TRUE(status_code.ok()); + ASSERT_TRUE(!response.result().has_success()) << response.ShortDebugString(); + ASSERT_TRUE(response.result().has_conflict()) << response.ShortDebugString(); + ASSERT_TRUE(!DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key)); + } +} +CATCH + +TEST_F(S3LockServiceTest, MultipleTryAddLockRequest) +try +{ + auto job = [&](size_t store_id) -> void { + auto data_filename = getDataFilename(); + auto data_file_key = data_filename.toFullKey(); + + // Try add lock file simultaneously, should success + { + auto lock_key = data_filename.toView().getLockKey(store_id, lock_seq); + + auto request = ::disaggregated::TryAddLockRequest(); + request.set_data_file_key(data_file_key); + request.set_lock_store_id(store_id); + request.set_lock_seq(lock_seq); + auto response = ::disaggregated::TryAddLockResponse(); + auto status_code = s3_lock_service->tryAddLock(&request, &response); + + ASSERT_TRUE(status_code.ok()); + ASSERT_TRUE(response.result().has_success()) << response.ShortDebugString(); + ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key)); + + DB::S3::deleteObject(*s3_client, s3_client->bucket(), lock_key); + } + }; + + std::vector threads; + const size_t thread_num = 10; + threads.reserve(thread_num); + for (size_t i = 0; i < thread_num; ++i) + { + threads.emplace_back(job, /*store_id*/ 40 + i); + } + for (auto & thread : threads) + { + thread.join(); + } +} +CATCH + +TEST_F(S3LockServiceTest, MultipleTryMarkDeleteRequest) +try +{ + auto data_filename = getDataFilename(); + auto data_file_key = data_filename.toFullKey(); + auto delmark_key = data_filename.toView().getDelMarkKey(); + + auto job = [&]() -> void { + auto request = ::disaggregated::TryMarkDeleteRequest(); + request.set_data_file_key(data_file_key); + auto response = ::disaggregated::TryMarkDeleteResponse(); + auto status_code = s3_lock_service->tryMarkDelete(&request, &response); + + ASSERT_TRUE(status_code.ok()); + ASSERT_TRUE(DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key)); + }; + + std::vector threads; + const size_t thread_num = 10; + threads.reserve(thread_num); + for (size_t i = 0; i < thread_num; ++i) + { + threads.emplace_back(job); + } + for (auto & thread : threads) + { + thread.join(); + } + + DB::S3::deleteObject(*s3_client, s3_client->bucket(), delmark_key); +} +CATCH + +TEST_F(S3LockServiceTest, MultipleMixRequest) +try +{ + auto lock_job = [&](size_t file_id) -> void { + auto data_filename = getDataFilename(file_id); + auto data_file_key = data_filename.toFullKey(); + auto lock_key = data_filename.toView().getLockKey(lock_store_id, lock_seq); + + auto request = ::disaggregated::TryAddLockRequest(); + request.set_data_file_key(data_file_key); + request.set_lock_seq(lock_seq); + request.set_lock_store_id(lock_store_id); + + auto response = ::disaggregated::TryAddLockResponse(); + auto status_code = s3_lock_service->tryAddLock(&request, &response); + ASSERT_TRUE(status_code.ok()); + }; + + auto delete_job = [&](size_t file_id) -> void { + auto data_filename = getDataFilename(file_id); + auto data_file_key = data_filename.toFullKey(); + auto delmark_key = data_filename.toView().getDelMarkKey(); + + auto request = ::disaggregated::TryMarkDeleteRequest(); + request.set_data_file_key(data_file_key); + + auto response = ::disaggregated::TryMarkDeleteResponse(); + auto status_code = s3_lock_service->tryMarkDelete(&request, &response); + ASSERT_TRUE(status_code.ok()) << status_code.error_message(); + }; + + std::vector threads; + const size_t thread_num = (dm_file_id - 1) * 2; + threads.reserve(thread_num); + for (size_t i = 0; i < thread_num; ++i) + { + if (random() % 2 == 0) + threads.emplace_back(lock_job, i / 2 + 1); + else + threads.emplace_back(delete_job, i / 2 + 1); + } + + for (auto & thread : threads) + { + thread.join(); + } + + for (size_t i = 1; i < dm_file_id; ++i) + { + auto data_filename = getDataFilename(i); + auto data_file_key = data_filename.toFullKey(); + auto lock_key = data_filename.toView().getLockKey(lock_store_id, lock_seq); + auto delmark_key = data_filename.toView().getDelMarkKey(); + + // Either lock or delete file should exist + if (DB::S3::objectExists(*s3_client, s3_client->bucket(), delmark_key)) + { + DB::S3::deleteObject(*s3_client, s3_client->bucket(), delmark_key); + } + else if (DB::S3::objectExists(*s3_client, s3_client->bucket(), lock_key)) + { + DB::S3::deleteObject(*s3_client, s3_client->bucket(), lock_key); + } + else + { + ASSERT_TRUE(false) << fmt::format("none of delmark or lock exist! data_key={} delmark={} lock={}", data_file_key, delmark_key, lock_key); + } + } +} +CATCH + +} // namespace DB::S3::tests diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 7f9443b7a7f..6cc845804e9 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -32,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -74,6 +76,10 @@ void FlashService::init(Context & context_) context->getGlobalContext(), context->getGlobalContext().getSettingsRef()); + // Only when the s3 storage is enabled on write node, provide the lock service interfaces + if (!context->isDisaggregatedComputeMode() && S3::ClientFactory::instance().isEnabled()) + s3_lock_service = std::make_unique(*context); + auto settings = context->getSettingsRef(); enable_local_tunnel = settings.enable_local_tunnel; enable_async_grpc_client = settings.enable_async_grpc_client; @@ -549,6 +555,44 @@ grpc::Status FlashService::Compact(grpc::ServerContext * grpc_context, const kvr return manual_compact_manager->handleRequest(request, response); } +grpc::Status FlashService::tryAddLock(grpc::ServerContext * grpc_context, const disaggregated::TryAddLockRequest * request, disaggregated::TryAddLockResponse * response) +{ + if (!s3_lock_service) + { + return grpc::Status(::grpc::StatusCode::INTERNAL, + fmt::format( + "can not handle tryAddLock, s3enabled={} compute_node={}", + S3::ClientFactory::instance().isEnabled(), + context->isDisaggregatedComputeMode())); + } + + CPUAffinityManager::getInstance().bindSelfGrpcThread(); + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; + + return s3_lock_service->tryAddLock(request, response); +} + +grpc::Status FlashService::tryMarkDelete(grpc::ServerContext * grpc_context, const disaggregated::TryMarkDeleteRequest * request, disaggregated::TryMarkDeleteResponse * response) +{ + if (!s3_lock_service) + { + return grpc::Status(::grpc::StatusCode::INTERNAL, + fmt::format( + "can not handle tryMarkDelete, s3enabled={} compute_node={}", + S3::ClientFactory::instance().isEnabled(), + context->isDisaggregatedComputeMode())); + } + + CPUAffinityManager::getInstance().bindSelfGrpcThread(); + auto check_result = checkGrpcContext(grpc_context); + if (!check_result.ok()) + return check_result; + + return s3_lock_service->tryMarkDelete(request, response); +} + void FlashService::setMockStorage(MockStorage * mock_storage_) { mock_storage = mock_storage_; diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index 01b6399de2b..e5282305d30 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -43,6 +43,10 @@ namespace Management { class ManualCompactManager; } // namespace Management +namespace S3 +{ +class S3LockService; +} // namespace S3 class FlashService : public tikvpb::Tikv::Service , public std::enable_shared_from_this @@ -80,6 +84,11 @@ class FlashService : public tikvpb::Tikv::Service grpc::Status Compact(grpc::ServerContext * grpc_context, const kvrpcpb::CompactRequest * request, kvrpcpb::CompactResponse * response) override; + + // For S3 Lock Service + grpc::Status tryAddLock(grpc::ServerContext * /*context*/, const disaggregated::TryAddLockRequest * request, disaggregated::TryAddLockResponse * response) override; + grpc::Status tryMarkDelete(grpc::ServerContext * /*context*/, const disaggregated::TryMarkDeleteRequest * request, disaggregated::TryMarkDeleteResponse * response) override; + void setMockStorage(MockStorage * mock_storage_); void setMockMPPServerInfo(MockMPPServerInfo & mpp_test_info_); Context * getContext() { return context; } @@ -96,6 +105,7 @@ class FlashService : public tikvpb::Tikv::Service bool enable_async_grpc_client = false; std::unique_ptr manual_compact_manager; + std::unique_ptr s3_lock_service; /// for mpp unit test. MockStorage * mock_storage = nullptr; diff --git a/dbms/src/Storages/S3/MockS3Client.cpp b/dbms/src/Storages/S3/MockS3Client.cpp new file mode 100644 index 00000000000..d620a85e75c --- /dev/null +++ b/dbms/src/Storages/S3/MockS3Client.cpp @@ -0,0 +1,121 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB::S3::tests +{ +using namespace Aws::S3; + +Model::PutObjectOutcome MockS3Client::PutObject(const Model::PutObjectRequest & r) const +{ + put_keys.emplace_back(r.GetKey()); + return Model::PutObjectOutcome{Aws::AmazonWebServiceResult{}}; +} + +Model::DeleteObjectOutcome MockS3Client::DeleteObject(const Model::DeleteObjectRequest & r) const +{ + delete_keys.emplace_back(r.GetKey()); + return Model::DeleteObjectOutcome{Aws::AmazonWebServiceResult{}}; +} + +Model::ListObjectsV2Outcome MockS3Client::ListObjectsV2(const Model::ListObjectsV2Request & r) const +{ + Model::ListObjectsV2Result resp; + for (const auto & k : put_keys) + { + if (startsWith(k, r.GetPrefix())) + { + bool is_deleted = false; + for (const auto & d : delete_keys) + { + if (k == d) + { + is_deleted = true; + break; + } + } + if (is_deleted) + continue; + Model::Object o; + o.SetKey(k); + resp.AddContents(o); + } + } + for (const auto & k : list_result) + { + if (startsWith(k, r.GetPrefix())) + { + bool is_deleted = false; + for (const auto & d : delete_keys) + { + if (k == d) + { + is_deleted = true; + break; + } + } + if (is_deleted) + continue; + Model::Object o; + o.SetKey(k); + resp.AddContents(o); + } + } + return Model::ListObjectsV2Outcome{resp}; +} + +Model::HeadObjectOutcome MockS3Client::HeadObject(const Model::HeadObjectRequest & r) const +{ + for (const auto & k : put_keys) + { + if (r.GetKey() == k) + { + Model::HeadObjectResult resp; + return Model::HeadObjectOutcome{resp}; + } + } + + if (!head_result_mtime) + { + Aws::Client::AWSError error(S3Errors::NO_SUCH_KEY, false); + return Model::HeadObjectOutcome{error}; + } + Model::HeadObjectResult resp; + resp.SetLastModified(head_result_mtime.value()); + return Model::HeadObjectOutcome{resp}; +} + +void MockS3Client::clear() +{ + put_keys.clear(); + delete_keys.clear(); + list_result.clear(); + head_result_mtime.reset(); +} + +} // namespace DB::S3::tests diff --git a/dbms/src/Storages/S3/MockS3Client.h b/dbms/src/Storages/S3/MockS3Client.h new file mode 100644 index 00000000000..5290dc69206 --- /dev/null +++ b/dbms/src/Storages/S3/MockS3Client.h @@ -0,0 +1,46 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +namespace DB::S3::tests +{ +class MockS3Client final : public S3::TiFlashS3Client +{ +public: + MockS3Client() + : TiFlashS3Client("") + {} + + ~MockS3Client() override = default; + + void clear(); + + Aws::S3::Model::PutObjectOutcome PutObject(const Aws::S3::Model::PutObjectRequest & r) const override; + mutable Strings put_keys; + + Aws::S3::Model::DeleteObjectOutcome DeleteObject(const Aws::S3::Model::DeleteObjectRequest & r) const override; + mutable Strings delete_keys; + + Aws::S3::Model::ListObjectsV2Outcome ListObjectsV2(const Aws::S3::Model::ListObjectsV2Request & r) const override; + mutable Strings list_result; + + std::optional head_result_mtime; + Aws::S3::Model::HeadObjectOutcome HeadObject(const Aws::S3::Model::HeadObjectRequest & request) const override; +}; +} // namespace DB::S3::tests diff --git a/dbms/src/Storages/S3/S3Common.cpp b/dbms/src/Storages/S3/S3Common.cpp index 95f2c37c20f..2157f187ba1 100644 --- a/dbms/src/Storages/S3/S3Common.cpp +++ b/dbms/src/Storages/S3/S3Common.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -110,6 +111,10 @@ class AWSLogger final : public Aws::Utils::Logging::LogSystemInterface namespace DB::S3 { +TiFlashS3Client::TiFlashS3Client(const String & bucket_name_) + : bucket_name(bucket_name_) +{} + TiFlashS3Client::TiFlashS3Client( const String & bucket_name_, const Aws::Auth::AWSCredentials & credentials, @@ -121,16 +126,36 @@ TiFlashS3Client::TiFlashS3Client( { } -void ClientFactory::init(const StorageS3Config & config_) +TiFlashS3Client::TiFlashS3Client(const String & bucket_name_, std::unique_ptr && raw_client) + : Aws::S3::S3Client(std::move(*raw_client)) + , bucket_name(bucket_name_) +{ +} + +bool ClientFactory::isEnabled() const +{ + return config.isS3Enabled(); +} + +void ClientFactory::init(const StorageS3Config & config_, bool mock_s3_) { config = config_; Aws::InitAPI(aws_options); Aws::Utils::Logging::InitializeAWSLogging(std::make_shared()); - shared_client = create(); + shared_client = mock_s3_ ? std::make_unique() : create(); + if (!mock_s3_) + { + shared_tiflash_client = std::make_shared(config.bucket, create()); + } + else + { + shared_tiflash_client = std::make_shared(config.bucket, std::make_unique()); + } } void ClientFactory::shutdown() { + shared_tiflash_client.reset(); shared_client.reset(); // Reset S3Client before Aws::ShutdownAPI. Aws::Utils::Logging::ShutdownAWSLogging(); Aws::ShutdownAPI(aws_options); @@ -163,6 +188,11 @@ std::shared_ptr ClientFactory::sharedClient() const return shared_client; } +std::shared_ptr ClientFactory::sharedTiFlashClient() const +{ + return shared_tiflash_client; +} + std::unique_ptr ClientFactory::create(const StorageS3Config & config_) { Aws::Client::ClientConfiguration cfg; @@ -195,22 +225,6 @@ std::unique_ptr ClientFactory::create(const StorageS3Config & } } -std::unique_ptr ClientFactory::createWithBucket() const -{ - auto scheme = parseScheme(config.endpoint); - Aws::Client::ClientConfiguration cfg; - cfg.endpointOverride = config.endpoint; - cfg.scheme = scheme; - cfg.verifySSL = scheme == Aws::Http::Scheme::HTTPS; - Aws::Auth::AWSCredentials cred(config.access_key_id, config.secret_access_key); - return std::make_unique( - config.bucket, - cred, - cfg, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, - /*useVirtualAddressing*/ true); -} - Aws::Http::Scheme ClientFactory::parseScheme(std::string_view endpoint) { return boost::algorithm::starts_with(endpoint, "https://") ? Aws::Http::Scheme::HTTPS : Aws::Http::Scheme::HTTP; diff --git a/dbms/src/Storages/S3/S3Common.h b/dbms/src/Storages/S3/S3Common.h index 655a7bd6bc4..b21cac4074c 100644 --- a/dbms/src/Storages/S3/S3Common.h +++ b/dbms/src/Storages/S3/S3Common.h @@ -31,6 +31,8 @@ class TiFlashS3Client : public Aws::S3::S3Client // Usually one tiflash instance only need access one bucket. // Store the bucket name to simpilfy some param passing. + explicit TiFlashS3Client(const String & bucket_name_); + TiFlashS3Client( const String & bucket_name_, const Aws::Auth::AWSCredentials & credentials, @@ -38,7 +40,9 @@ class TiFlashS3Client : public Aws::S3::S3Client Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy signPayloads, bool useVirtualAddressing); - const String & bucket() { return bucket_name; } + TiFlashS3Client(const String & bucket_name_, std::unique_ptr && raw_client); + + const String & bucket() const { return bucket_name; } private: const String bucket_name; @@ -52,13 +56,15 @@ class ClientFactory static ClientFactory & instance(); - void init(const StorageS3Config & config_); + bool isEnabled() const; + + void init(const StorageS3Config & config_, bool mock_s3_ = false); void shutdown(); const String & bucket() const; std::shared_ptr sharedClient() const; - std::unique_ptr createWithBucket() const; + std::shared_ptr sharedTiFlashClient() const; private: ClientFactory() = default; @@ -71,6 +77,7 @@ class ClientFactory Aws::SDKOptions aws_options; StorageS3Config config; std::shared_ptr shared_client; + std::shared_ptr shared_tiflash_client; }; struct ObjectInfo diff --git a/dbms/src/Storages/S3/S3Filename.cpp b/dbms/src/Storages/S3/S3Filename.cpp index bec35a2e886..6ab41cb8a88 100644 --- a/dbms/src/Storages/S3/S3Filename.cpp +++ b/dbms/src/Storages/S3/S3Filename.cpp @@ -29,6 +29,7 @@ namespace DB::S3 namespace details { +// Ref: https://github.com/google/re2/wiki/Syntax /// parsing LockFile const static re2::RE2 rgx_lock("^lock/s(?P[0-9]+)/(?P.+)$"); diff --git a/dbms/src/Storages/S3/S3Filename.h b/dbms/src/Storages/S3/S3Filename.h index 4545d2057a3..f21b9e3b5d4 100644 --- a/dbms/src/Storages/S3/S3Filename.h +++ b/dbms/src/Storages/S3/S3Filename.h @@ -27,7 +27,7 @@ namespace DB::S3 struct DMFileOID { StoreID store_id = 0; - Int64 table_id = 0; + TableID table_id = 0; UInt64 file_id = 0; }; diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index a9a2ecb8c79..34a9c9ccd1e 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -13,11 +13,13 @@ // limitations under the License. #include +#include #include #include #include #include #include +#include #include #include #include @@ -90,13 +92,17 @@ TMTContext::TMTContext(Context & context_, const TiFlashRaftConfig & raft_config , read_index_worker_tick_ms(DEFAULT_READ_INDEX_WORKER_TICK_MS) , wait_region_ready_timeout_sec(DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC) { - if (!raft_config.pd_addrs.empty()) + if (!raft_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled() && !context.isDisaggregatedComputeMode()) { etcd_client = Etcd::Client::create(cluster->pd_client, cluster_config); s3gc_owner = OwnerManager::createS3GCOwner(context, /*id*/ raft_config.flash_server_addr, etcd_client); + s3gc_owner->campaignOwner(); // start campaign + s3_lock_client = std::make_shared(cluster.get(), s3gc_owner); } } +TMTContext::~TMTContext() = default; + void TMTContext::updateSecurityConfig(const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config) { if (!raft_config.pd_addrs.empty()) @@ -129,6 +135,8 @@ void TMTContext::shutdown() { if (s3gc_owner) { + // stop the campaign loop, so the S3LockService will + // let client retry s3gc_owner->cancel(); s3gc_owner = nullptr; } diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index d51fb4b4bbe..818d0841e22 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -55,6 +55,13 @@ using ClientPtr = std::shared_ptr; } // namespace Etcd class OwnerManager; using OwnerManagerPtr = std::shared_ptr; +namespace S3 +{ +class IS3LockClient; +using S3LockClientPtr = std::shared_ptr; +class S3GCManagerService; +using S3GCManagerServicePtr = std::unique_ptr; +} // namespace S3 class TMTContext : private boost::noncopyable { @@ -92,6 +99,7 @@ class TMTContext : private boost::noncopyable explicit TMTContext(Context & context_, const TiFlashRaftConfig & raft_config, const pingcap::ClusterConfig & cluster_config_); + ~TMTContext(); SchemaSyncerPtr getSchemaSyncer() const; @@ -142,6 +150,7 @@ class TMTContext : private boost::noncopyable Etcd::ClientPtr etcd_client; OwnerManagerPtr s3gc_owner; + S3::S3LockClientPtr s3_lock_client; mutable std::mutex mutex; diff --git a/dbms/src/TestUtils/gtests_dbms_main.cpp b/dbms/src/TestUtils/gtests_dbms_main.cpp index 229f6ed01cd..2932b1ad36b 100644 --- a/dbms/src/TestUtils/gtests_dbms_main.cpp +++ b/dbms/src/TestUtils/gtests_dbms_main.cpp @@ -13,13 +13,17 @@ // limitations under the License. #include +#include +#include #include #include #include +#include #include #include #include + namespace DB::FailPoints { extern const char force_set_dtfile_exist_when_acquire_id[]; @@ -70,6 +74,19 @@ int main(int argc, char ** argv) DB::tests::TiFlashTestEnv::getGlobalContext().getSettingsRef().dt_read_thread_count_scale); DB::DM::SegmentReadTaskScheduler::instance(); + const auto s3_endpoint = Poco::Environment::get("S3_ENDPOINT", ""); + const auto s3_bucket = Poco::Environment::get("S3_BUCKET", ""); + const auto access_key_id = Poco::Environment::get("AWS_ACCESS_KEY_ID", ""); + const auto secret_access_key = Poco::Environment::get("AWS_SECRET_ACCESS_KEY", ""); + auto s3config = DB::StorageS3Config{ + .endpoint = s3_endpoint, + .bucket = s3_bucket, + .access_key_id = access_key_id, + .secret_access_key = secret_access_key, + }; + Poco::Environment::set("AWS_EC2_METADATA_DISABLED", "true"); // disable to speedup testing + DB::S3::ClientFactory::instance().init(s3config); + #ifdef FIU_ENABLE fiu_init(0); // init failpoint @@ -86,6 +103,7 @@ int main(int argc, char ** argv) // Stop threads explicitly before `TiFlashTestEnv::shutdown()`. DB::DM::SegmentReaderPoolManager::instance().stop(); DB::tests::TiFlashTestEnv::shutdown(); + DB::S3::ClientFactory::instance().shutdown(); return ret; }