Skip to content

Commit

Permalink
S3 Lock service (#6874)
Browse files Browse the repository at this point in the history
ref #6827
  • Loading branch information
JaySon-Huang authored Feb 28, 2023
1 parent ce1b09b commit dc28555
Show file tree
Hide file tree
Showing 21 changed files with 1,560 additions and 24 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
M(IOLimiterPendingFgWriteReq) \
M(IOLimiterPendingBgReadReq) \
M(IOLimiterPendingFgReadReq) \
M(S3LockServiceNumLatches) \
M(StoragePoolV2Only) \
M(StoragePoolV3Only) \
M(StoragePoolMixMode) \
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ namespace DB
F(type_bg_write_alloc_bytes, {"type", "bg_write_alloc_bytes"})) \
M(tiflash_storage_rough_set_filter_rate, "Bucketed histogram of rough set filter rate", Histogram, \
F(type_dtfile_pack, {{"type", "dtfile_pack"}}, EqualWidthBuckets{0, 6, 20})) \
M(tiflash_disaggregated_object_lock_request_count, "Total number of S3 object lock/delete request", Counter, \
F(type_lock, {"type", "lock"}), F(type_delete, {"type", "delete"}), \
F(type_owner_changed, {"type", "owner_changed"}), F(type_error, {"type", "error"}), \
F(type_lock_conflict, {"type", "lock_conflict"}), F(type_delete_conflict, {"type", "delete_conflict"}), \
F(type_delete_risk, {"type", "delete_risk"})) \
M(tiflash_disaggregated_object_lock_request_duration_seconds, "Bucketed histogram of S3 object lock/delete request duration", Histogram, \
F(type_lock, {{"type", "cop"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delete, {{"type", "batch"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_raft_command_duration_seconds, "Bucketed histogram of some raft command: apply snapshot", \
Histogram, /* these command usually cost servel seconds, increase the start bucket to 50ms */ \
F(type_ingest_sst, {{"type", "ingest_sst"}}, ExpBuckets{0.05, 2, 10}), \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ add_headers_and_sources(flash_service ./Planner)
add_headers_and_sources(flash_service ./Planner/Plans)
add_headers_and_sources(flash_service ./Statistics)
add_headers_and_sources(flash_service ./Management)
add_headers_and_sources(flash_service ./Disaggregated)

add_library(flash_service ${flash_service_headers} ${flash_service_sources})
target_link_libraries(flash_service dbms)
78 changes: 78 additions & 0 deletions dbms/src/Flash/Disaggregated/MockS3LockClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Disaggregated/S3LockClient.h>
#include <Storages/S3/S3Common.h>
#include <Storages/S3/S3Filename.h>

namespace DB::S3
{

// A simple mock lock client for testing.
// Notice: It does NOT guarantee atomicity between
// "try add lock" and "try mark delete" operations
// on the same `data_file_key`.
class MockS3LockClient : public IS3LockClient
{
public:
explicit MockS3LockClient(std::shared_ptr<TiFlashS3Client> c)
: s3_client(std::move(c))
{
}

std::pair<bool, String>
sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64) override
{
// If the data file exist and no delmark exist, then create a lock file on `data_file_key`
auto view = S3FilenameView::fromKey(data_file_key);
if (!objectExists(*s3_client, s3_client->bucket(), data_file_key))
{
return {false, ""};
}
auto delmark_key = view.getDelMarkKey();
if (objectExists(*s3_client, s3_client->bucket(), delmark_key))
{
return {false, ""};
}
uploadEmptyFile(*s3_client, s3_client->bucket(), view.getLockKey(lock_store_id, lock_seq));
return {true, ""};
}

std::pair<bool, String>
sendTryMarkDeleteRequest(const String & data_file_key, Int64) override
{
// If there is no lock on the given `data_file_key`, then mark as deleted
auto view = S3FilenameView::fromKey(data_file_key);
auto lock_prefix = view.getLockPrefix();
bool any_lock_exist = false;
listPrefix(*s3_client, s3_client->bucket(), lock_prefix, [&any_lock_exist](const Aws::S3::Model::ListObjectsV2Result & result) -> S3::PageResult {
if (!result.GetContents().empty())
any_lock_exist = true;
return S3::PageResult{.num_keys = result.GetContents().size(), .more = false};
});
if (any_lock_exist)
{
return {false, ""};
}
uploadEmptyFile(*s3_client, s3_client->bucket(), view.getDelMarkKey());
return {true, ""};
}

private:
std::shared_ptr<TiFlashS3Client> s3_client;
};

} // namespace DB::S3
209 changes: 209 additions & 0 deletions dbms/src/Flash/Disaggregated/S3LockClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Flash/Disaggregated/S3LockClient.h>
#include <Interpreters/Context.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/Types.h>
#include <TiDB/OwnerInfo.h>
#include <common/defines.h>
#include <common/types.h>
#include <grpcpp/client_context.h>
#include <grpcpp/support/status_code_enum.h>
#include <kvproto/disaggregated.pb.h>
#include <pingcap/kv/Rpc.h>
#include <pingcap/kv/internal/conn.h>

#include <chrono>
#include <magic_enum.hpp>
#include <mutex>
#include <thread>

namespace DB::ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
}

namespace DB::S3
{
S3LockClient::S3LockClient(
pingcap::kv::Cluster * kv_cluster_,
OwnerManagerPtr s3gc_owner_)
: kv_cluster(kv_cluster_)
, s3gc_owner(s3gc_owner_)
, log(Logger::get())
{
}

std::pair<bool, String>
S3LockClient::sendTryAddLockRequest(
const String & data_file_key,
UInt32 lock_store_id,
UInt32 lock_seq,
Int64 timeout_s)
{
using Req = disaggregated::TryAddLockRequest;
using Resp = disaggregated::TryAddLockResponse;
Req req;
req.set_data_file_key(data_file_key);
req.set_lock_store_id(lock_store_id);
req.set_lock_seq(lock_seq);
auto tracing_log = log->getChild(fmt::format("<key=<{},{},{}>,type=AddLock>", data_file_key, lock_store_id, lock_seq));

return makeCall<Resp>(
[](grpc::ClientContext * grpc_context,
const std::shared_ptr<pingcap::kv::KvConnClient> & kvstub,
const Req & req,
Resp * response) {
return kvstub->stub->tryAddLock(grpc_context, req, response);
},
req,
timeout_s,
tracing_log);
}

std::pair<bool, String>
S3LockClient::sendTryMarkDeleteRequest(
const String & data_file_key,
Int64 timeout_s)
{
using Req = disaggregated::TryMarkDeleteRequest;
using Resp = disaggregated::TryMarkDeleteResponse;
Req req;
req.set_data_file_key(data_file_key);
auto tracing_log = log->getChild(fmt::format("<key={},type=MarkDelete>", data_file_key));

return makeCall<Resp>(
[](grpc::ClientContext * grpc_context,
const std::shared_ptr<pingcap::kv::KvConnClient> & kvstub,
const Req & req,
Resp * response) {
return kvstub->stub->tryMarkDelete(grpc_context, req, response);
},
req,
timeout_s,
tracing_log);
}

// Try send the response to GC Owner in timeout_s seconds
// If the call success, return <true, "">
// Otherwise return <false, conflict_message>
// This method will update the owner info when owner changed.
// If deadline exceed or failed to get the owner info within
// `timeour_s`, it will throw exception.
template <typename Response, typename Request, typename SendRpc>
std::pair<bool, String> S3LockClient::makeCall(SendRpc send, const Request & req, Int64 timeout_s, const LoggerPtr & tracing_log)
{
const auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(timeout_s);
String address = getOwnerAddr(deadline, tracing_log);

do
{
LOG_DEBUG(tracing_log, "request, address={} req={}", address, req.ShortDebugString());
auto kvstub = kv_cluster->rpc_client->getConnArray(address)->get();
grpc::ClientContext grpc_context;
grpc_context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(timeout_s));
Response resp;
auto status = send(&grpc_context, kvstub, req, &resp);
if (!status.ok())
{
if (Clock::now() > deadline)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "deadline exceed, " + tracing_log->identifier());
}
// retry
LOG_ERROR(tracing_log, "meets error, code={} msg={}", status.error_code(), status.error_message());
address = updateOwnerAddr(deadline, tracing_log);
}

if (resp.result().has_success())
{
// success
return {true, ""};
}
else if (resp.result().has_conflict())
{
// not retriable
LOG_INFO(tracing_log, "meets conflict, reason={}", resp.result().conflict().reason());
return {false, resp.result().conflict().reason()};
}
else if (resp.result().has_not_owner())
{
if (Clock::now() > deadline)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "deadline exceed, " + tracing_log->identifier());
}
// retry
auto not_owner = resp.result().not_owner();
LOG_WARNING(tracing_log, "meets not owner, retry");
address = updateOwnerAddr(deadline, tracing_log);
}
} while (true);
}

String S3LockClient::getOwnerAddr(const Timepoint & deadline, const LoggerPtr & tracing_log)
{
{
std::shared_lock lk(mtx_owner);
if (!owner_cache.empty())
return owner_cache;
}
// owner_cache is empty, try update
return updateOwnerAddr(deadline, tracing_log);
}

String S3LockClient::updateOwnerAddr(const Timepoint & deadline, const LoggerPtr & tracing_log)
{
using namespace std::chrono_literals;
while (true)
{
auto owner_info = s3gc_owner->getOwnerID();
switch (owner_info.status)
{
case DB::OwnerType::IsOwner:
case DB::OwnerType::NotOwner:
{
{
std::unique_lock lk(mtx_owner);
owner_cache = owner_info.owner_id;
}
return owner_info.owner_id;
}
case DB::OwnerType::NoLeader:
{
if (Clock::now() > deadline)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "deadline exceed, owner not available, " + tracing_log->identifier());
}
LOG_ERROR(tracing_log, "owner not available");
std::this_thread::sleep_for(500ms);
break; // continue retry
}
case DB::OwnerType::GrpcError:
{
if (Clock::now() > deadline)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, fmt::format("deadline exceed, owner not available, msg={} {}", owner_info.errMsg(), tracing_log->identifier()));
}
LOG_ERROR(tracing_log, "owner not available, msg={}", owner_info.errMsg());
std::this_thread::sleep_for(500ms);
break; // continue retry
}
}
}
}

} // namespace DB::S3
Loading

0 comments on commit dc28555

Please sign in to comment.