Skip to content

Commit

Permalink
Make poco client can detect stream end correctly
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang committed Apr 7, 2023
1 parent e625715 commit 97599ad
Show file tree
Hide file tree
Showing 21 changed files with 122 additions and 62 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
url = https://github.com/guanzhi/GmSSL.git
[submodule "contrib/aws"]
path = contrib/aws
url = https://github.com/aws/aws-sdk-cpp.git
url = https://github.com/JaySon-Huang/aws-sdk-cpp.git
[submodule "contrib/aws-c-auth"]
path = contrib/aws-c-auth
url = https://github.com/awslabs/aws-c-auth.git
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/StorageConfigParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ struct StorageS3Config
String secret_access_key;
UInt64 max_connections = 4096;
UInt64 connection_timeout_ms = 1000;
UInt64 request_timeout_ms = 7000;
UInt64 request_timeout_ms = 30000;
UInt64 max_redirections = 10;
String root;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/TiFlashException.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/File/DMFileWriter.h>
#include <Storages/S3/S3Common.h>

#ifndef NDEBUG
#include <sys/stat.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <Storages/DeltaMerge/tests/DMTestEnv.h>
#include <Storages/FormatVersion.h>
#include <Storages/PathPool.h>
#include <Storages/S3/S3Common.h>
#include <TestUtils/FunctionTestUtils.h>
#include <TestUtils/InputStreamTestUtils.h>
#include <TestUtils/TiFlashStorageTestBasic.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
#include <Storages/S3/S3Common.h>
#include <Storages/S3/S3Filename.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/DeleteBucketRequest.h>
#include <TestUtils/TiFlashTestEnv.h>
#include <common/logger_useful.h>
#include <gtest/gtest.h>

Expand All @@ -44,13 +42,9 @@ class S3LockLocalManagerTest : public testing::Test
{}

void SetUp() override
{
::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client);
}

void TearDown() override
{
::DB::tests::TiFlashTestEnv::deleteBucket(*s3_client);
::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client);
}

protected:
Expand Down
28 changes: 4 additions & 24 deletions dbms/src/Storages/S3/MockS3Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/Logger.h>
#include <Common/StringUtils/StringUtils.h>
#include <Storages/S3/MockS3Client.h>
Expand Down Expand Up @@ -53,10 +52,6 @@
#include <mutex>
#include <string_view>

namespace DB::FailPoints
{
extern const char force_set_mocked_s3_object_mtime[];
} // namespace DB::FailPoints
namespace DB::S3::tests
{
using namespace Aws::S3;
Expand Down Expand Up @@ -91,7 +86,10 @@ Model::GetObjectOutcome MockS3Client::GetObject(const Model::GetObjectRequest &
boost::algorithm::split(v, request.GetRange().substr(prefix.size()), boost::algorithm::is_any_of("-"));
RUNTIME_CHECK(v.size() == 2, request.GetRange());
left = std::stoul(v[0]);
right = std::stoul(v[1]);
if (!v[1].empty())
{
right = std::stoul(v[1]);
}
}
auto size = right - left + 1;
Model::GetObjectResult result;
Expand Down Expand Up @@ -253,24 +251,6 @@ Model::HeadObjectOutcome MockS3Client::HeadObject(const Model::HeadObjectRequest
if (itr_obj != bucket_storage.end())
{
auto r = Model::HeadObjectResult{};
auto try_set_mtime = [&] {
if (auto v = FailPointHelper::getFailPointVal(FailPoints::force_set_mocked_s3_object_mtime); v)
{
auto m = std::any_cast<std::map<String, Aws::Utils::DateTime>>(v.value());
const auto req_key = normalizedKey(request.GetKey());
if (auto iter_m = m.find(req_key); iter_m != m.end())
{
r.SetLastModified(iter_m->second);
LOG_WARNING(Logger::get(), "failpoint set mtime, key={} mtime={}", req_key, iter_m->second.ToGmtString(Aws::Utils::DateFormat::ISO_8601));
}
else
{
LOG_WARNING(Logger::get(), "failpoint set mtime failed, key={}", req_key);
}
}
};
UNUSED(try_set_mtime);
fiu_do_on(FailPoints::force_set_mocked_s3_object_mtime, { try_set_mtime(); });
r.SetContentLength(itr_obj->second.size());
return r;
}
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/S3/PocoHTTPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ namespace DB::S3
{

PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
const RemoteHostFilter & remote_host_filter_,
unsigned int s3_max_redirects_,
const std::shared_ptr<RemoteHostFilter> & remote_host_filter_,
UInt32 s3_max_redirects_,
bool enable_s3_requests_logging_)
: remote_host_filter(remote_host_filter_)
, s3_max_redirects(s3_max_redirects_)
Expand Down Expand Up @@ -317,7 +317,7 @@ void PocoHTTPClient::makeRequestInternal(
if (poco_response.getStatus() == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT)
{
auto location = poco_response.get("location");
remote_host_filter.checkURL(Poco::URI(location));
remote_host_filter->checkURL(Poco::URI(location));
uri = location;
if (enable_s3_requests_logging)
LOG_DEBUG(log, "Redirecting request to new location: {}", location);
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/S3/PocoHTTPClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ struct PocoHTTPClientConfiguration
std::function<ClientConfigurationPerRequest(const Aws::Http::HttpRequest &)> per_request_configuration = [](const Aws::Http::HttpRequest &) {
return ClientConfigurationPerRequest();
};
const RemoteHostFilter & remote_host_filter;
unsigned int s3_max_redirects;
std::shared_ptr<RemoteHostFilter> remote_host_filter;
UInt32 s3_max_redirects;
bool enable_s3_requests_logging;
HTTPHeaderEntries extra_headers;

std::function<void(const ClientConfigurationPerRequest &)> error_report;

PocoHTTPClientConfiguration(
const RemoteHostFilter & remote_host_filter_,
unsigned int s3_max_redirects_,
const std::shared_ptr<RemoteHostFilter> & remote_host_filter_,
UInt32 s3_max_redirects_,
bool enable_s3_requests_logging_);

/// Constructor of Aws::Client::ClientConfiguration must be called after AWS SDK initialization.
Expand Down Expand Up @@ -154,7 +154,7 @@ class PocoHTTPClient : public Aws::Http::HttpClient
std::function<ClientConfigurationPerRequest(const Aws::Http::HttpRequest &)> per_request_configuration;
std::function<void(const ClientConfigurationPerRequest &)> error_report;
ConnectionTimeouts timeouts;
const RemoteHostFilter & remote_host_filter;
const std::shared_ptr<RemoteHostFilter> remote_host_filter;
const HTTPHeaderEntries extra_headers;
UInt32 s3_max_redirects;
bool enable_s3_requests_logging;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/S3/PocoHTTPClientFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

namespace DB::S3
{
PocoHTTPClientFactory::PocoHTTPClientFactory(PocoHTTPClientConfiguration & http_cfg)
PocoHTTPClientFactory::PocoHTTPClientFactory(const PocoHTTPClientConfiguration & http_cfg)
: poco_cfg(http_cfg)
{
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/S3/PocoHTTPClientFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace DB::S3
class PocoHTTPClientFactory : public Aws::Http::HttpClientFactory
{
public:
explicit PocoHTTPClientFactory(PocoHTTPClientConfiguration & http_cfg);
explicit PocoHTTPClientFactory(const PocoHTTPClientConfiguration & http_cfg);

~PocoHTTPClientFactory() override = default;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpClient>
Expand Down
50 changes: 49 additions & 1 deletion dbms/src/Storages/S3/S3Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/Logger.h>
#include <Common/ProfileEvents.h>
#include <Common/RemoteHostFilter.h>
Expand Down Expand Up @@ -64,6 +65,7 @@
#include <pingcap/kv/internal/type_traits.h>
#include <re2/re2.h>

#include <any>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <filesystem>
Expand Down Expand Up @@ -153,6 +155,10 @@ namespace pingcap::kv
PINGCAP_DEFINE_TRAITS(disaggregated, GetDisaggConfig, GetDisaggConfig);
}

namespace DB::FailPoints
{
extern const char force_set_mocked_s3_object_mtime[];
} // namespace DB::FailPoints
namespace DB::S3
{

Expand Down Expand Up @@ -276,7 +282,10 @@ void ClientFactory::init(const StorageS3Config & config_, bool mock_s3_)
// Override the HTTP client, use PocoHTTPClient instead
aws_options.httpOptions.httpClientFactory_create_fn = [&config_] {
// TODO: do we need the remote host filter?
PocoHTTPClientConfiguration poco_cfg(RemoteHostFilter(), config_.max_redirections, /*enable_s3_requests_logging_*/ config_.verbose);
PocoHTTPClientConfiguration poco_cfg(
std::make_shared<RemoteHostFilter>(),
config_.max_redirections,
/*enable_s3_requests_logging_*/ config_.verbose);
return std::make_shared<PocoHTTPClientFactory>(poco_cfg);
};
Aws::InitAPI(aws_options);
Expand Down Expand Up @@ -896,7 +905,30 @@ ObjectInfo tryGetObjectInfo(
throw fromS3Error(o.GetError(), "S3 HeadObject failed, bucket={} root={} key={}", client.bucket(), client.root(), key);
}
// Else the object still exist
#ifndef FIU_ENABLE
const auto & res = o.GetResult();
#else
// handle the failpoint for hijacking the last modified time of returned object
auto & res = o.GetResult();
auto try_set_mtime = [&] {
if (auto v = FailPointHelper::getFailPointVal(FailPoints::force_set_mocked_s3_object_mtime); v)
{
auto m = std::any_cast<std::map<String, Aws::Utils::DateTime>>(v.value());
const auto & req_key = key;
if (auto iter_m = m.find(req_key); iter_m != m.end())
{
res.SetLastModified(iter_m->second);
LOG_WARNING(Logger::get(), "failpoint set mtime, key={} mtime={}", req_key, iter_m->second.ToGmtString(Aws::Utils::DateFormat::ISO_8601));
}
else
{
LOG_WARNING(Logger::get(), "failpoint set mtime failed, key={}", req_key);
}
}
};
UNUSED(try_set_mtime);
fiu_do_on(FailPoints::force_set_mocked_s3_object_mtime, { try_set_mtime(); });
#endif
// "DeleteMark" of S3 service, don't know what will lead to this
RUNTIME_CHECK(!res.GetDeleteMarker(), client.bucket(), key);
return ObjectInfo{.exist = true, .size = res.GetContentLength(), .last_modification_time = res.GetLastModified()};
Expand Down Expand Up @@ -964,4 +996,20 @@ void rawListPrefix(
LOG_DEBUG(log, "rawListPrefix bucket={} prefix={} delimiter={} total_keys={} cost={:.2f}s", bucket, prefix, delimiter, num_keys, sw.elapsedSeconds());
}

void rawDeleteObject(const Aws::S3::S3Client & client, const String & bucket, const String & key)
{
Stopwatch sw;
Aws::S3::Model::DeleteObjectRequest req;
req.WithBucket(bucket).WithKey(key);
ProfileEvents::increment(ProfileEvents::S3DeleteObject);
auto o = client.DeleteObject(req);
if (!o.IsSuccess())
{
throw fromS3Error(o.GetError(), "S3 DeleteObject failed, bucket={} key={}", bucket, key);
}
const auto & res = o.GetResult();
UNUSED(res);
GET_METRIC(tiflash_storage_s3_request_seconds, type_delete_object).Observe(sw.elapsedSeconds());
}

} // namespace DB::S3
4 changes: 4 additions & 0 deletions dbms/src/Storages/S3/S3Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ void rawListPrefix(
std::string_view delimiter,
std::function<PageResult(const Aws::S3::Model::ListObjectsV2Result & result)> pager);

// Unlike `deleteObject` or other method above, this does not handle
// the TiFlashS3Client `root`.
void rawDeleteObject(const Aws::S3::S3Client & client, const String & bucket, const String & key);

template <typename F, typename... T>
void retryWrapper(F f, const T &... args)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/S3/S3RandomAccessFile.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/S3/S3RandomAccessFile.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Storages/S3/tests/gtest_s3file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/DeleteBucketCorsRequest.h>
#include <fmt/chrono.h>
#include <gtest/gtest.h>

#include <chrono>
Expand Down Expand Up @@ -515,10 +516,10 @@ try
FailPointHelper::enableFailPoint(
FailPoints::force_set_mocked_s3_object_mtime,
std::map<String, Aws::Utils::DateTime>{
{s3_client->root() + df_keys[0], test_infos[0].mtime},
{s3_client->root() + df_keys[1], test_infos[1].mtime},
{s3_client->root() + df_keys[2], test_infos[2].mtime},
{s3_client->root() + df_keys[3], test_infos[3].mtime},
{df_keys[0], test_infos[0].mtime},
{df_keys[1], test_infos[1].mtime},
{df_keys[2], test_infos[2].mtime},
{df_keys[3], test_infos[3].mtime},
});
SCOPE_EXIT({
FailPointHelper::disableFailPoint(FailPoints::force_set_mocked_s3_object_mtime);
Expand All @@ -529,7 +530,7 @@ try
for (size_t idx = 0; idx < test_infos.size(); ++idx)
{
ASSERT_EQ(remote_files_info.at(lock_keys[idx]).size, test_infos[idx].total_size);
ASSERT_EQ(remote_files_info.at(lock_keys[idx]).mtime, test_infos[idx].mtime);
ASSERT_EQ(remote_files_info.at(lock_keys[idx]).mtime, test_infos[idx].mtime) << fmt::format("remote_mtime:{:%Y-%m-%d %H:%M:%S} test_mtime:{:%Y-%m-%d %H:%M:%S}", remote_files_info.at(lock_keys[idx]).mtime, test_infos[idx].mtime);
}
ASSERT_EQ(remote_files_info.at(lock_keys[4]).size, -1); // not exist or exception happens

Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/S3/tests/gtest_s3gcmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,10 @@ try
S3::uploadEmptyFile(*mock_s3_client, df.toFullKey());
S3::uploadEmptyFile(*mock_s3_client, delmark_key);
auto delmark_mtime = timepoint - std::chrono::milliseconds(3599 * 1000);
FailPointHelper::enableFailPoint(FailPoints::force_set_mocked_s3_object_mtime, std::map<String, Aws::Utils::DateTime>{{mock_s3_client->root() + delmark_key, delmark_mtime}});
FailPointHelper::enableFailPoint(FailPoints::force_set_mocked_s3_object_mtime, std::map<String, Aws::Utils::DateTime>{{delmark_key, delmark_mtime}});
SCOPE_EXIT({
FailPointHelper::disableFailPoint(FailPoints::force_set_mocked_s3_object_mtime);
});
// mock_s3_client->head_result_mtime = delmark_mtime;
gc_mgr->cleanOneLock(lock_key, lock_view, timepoint);

// lock is deleted, datafile and delmark remain
Expand All @@ -373,7 +372,7 @@ try
S3::uploadEmptyFile(*mock_s3_client, df.toFullKey());
S3::uploadEmptyFile(*mock_s3_client, delmark_key);
auto delmark_mtime = timepoint - std::chrono::milliseconds(3601 * 1000);
FailPointHelper::enableFailPoint(FailPoints::force_set_mocked_s3_object_mtime, std::map<String, Aws::Utils::DateTime>{{mock_s3_client->root() + delmark_key, delmark_mtime}});
FailPointHelper::enableFailPoint(FailPoints::force_set_mocked_s3_object_mtime, std::map<String, Aws::Utils::DateTime>{{delmark_key, delmark_mtime}});
SCOPE_EXIT({
FailPointHelper::disableFailPoint(FailPoints::force_set_mocked_s3_object_mtime);
});
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,11 @@ void TMTContext::updateSecurityConfig(const TiFlashRaftConfig & raft_config, con
{
// update the client config including pd_client
cluster->update(raft_config.pd_addrs, cluster_config);
// update the etcd_client after pd_client get updated
etcd_client->update(cluster_config);
if (etcd_client)
{
// update the etcd_client after pd_client get updated
etcd_client->update(cluster_config);
}
}
}

Expand Down
Loading

0 comments on commit 97599ad

Please sign in to comment.