Skip to content

Commit

Permalink
Storage: DMFile supports read/write from S3. (#6913)
Browse files Browse the repository at this point in the history
ref #6827
  • Loading branch information
JinheLin authored Mar 2, 2023
1 parent f57bc16 commit 0b863a4
Show file tree
Hide file tree
Showing 20 changed files with 1,459 additions and 133 deletions.
4 changes: 3 additions & 1 deletion dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@
M(S3CompleteMultipartUpload) \
M(S3PutObject) \
M(S3GetObject) \
M(S3HeadObject)
M(S3HeadObject) \
M(S3ListObjects) \
M(S3DeleteObject)

namespace ProfileEvents
{
Expand Down
18 changes: 14 additions & 4 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,21 @@ namespace DB
F(type_mpp, {{"type", "mpp"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_cop, {{"type", "cop"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_batch, {{"type", "batch"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()})) \
M(tiflash_shared_block_schemas, "statistics about shared block schemas of ColumnFiles", Gauge, \
M(tiflash_shared_block_schemas, "statistics about shared block schemas of ColumnFiles", Gauge, \
F(type_current_size, {{"type", "current_size"}}), \
F(type_still_used_when_evict, {{"type", "still_used_when_evict"}}), \
F(type_miss_count, {{"type", "miss_count"}}), \
F(type_hit_count, {{"type", "hit_count"}}))
F(type_still_used_when_evict, {{"type", "still_used_when_evict"}}), \
F(type_miss_count, {{"type", "miss_count"}}), \
F(type_hit_count, {{"type", "hit_count"}})) \
M(tiflash_storage_s3_request_seconds, "S3 request duration in seconds", Histogram, \
F(type_put_object, {{"type", "put_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_get_object, {{"type", "get_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_create_multi_part_upload, {{"type", "create_multi_part_upload"}}, ExpBuckets{0.001, 2, 20}), \
F(type_upload_part, {{"type", "upload_part"}}, ExpBuckets{0.001, 2, 20}), \
F(type_complete_multi_part_upload, {{"type", "complete_multi_part_upload"}}, ExpBuckets{0.001, 2, 20}), \
F(type_list_objects, {{"type", "list_objects"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delete_object, {{"type", "delete_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20}))

// clang-format on

/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
26 changes: 24 additions & 2 deletions dbms/src/Encryption/FileProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
#include <Encryption/PosixWritableFile.h>
#include <Encryption/PosixWriteReadableFile.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <Storages/S3/S3Filename.h>
#include <Storages/S3/S3RandomAccessFile.h>
#include <Storages/S3/S3WritableFile.h>
#include <Storages/Transaction/FileEncryption.h>
#include <common/likely.h>

Expand All @@ -32,7 +36,16 @@ RandomAccessFilePtr FileProvider::newRandomAccessFile(
const ReadLimiterPtr & read_limiter,
int flags) const
{
RandomAccessFilePtr file = std::make_shared<PosixRandomAccessFile>(file_path_, flags, read_limiter);
RandomAccessFilePtr file;
if (auto view = S3::S3FilenameView::fromKeyWithPrefix(file_path_); view.isValid())
{
file = S3::S3RandomAccessFile::create(view.toFullKey());
}
else
{
RUNTIME_CHECK(Poco::Path(file_path_).isAbsolute(), file_path_);
file = std::make_shared<PosixRandomAccessFile>(file_path_, flags, read_limiter);
}
auto encryption_info = key_manager->getFile(encryption_path_.full_path);
if (encryption_info.res != FileEncryptionRes::Disabled && encryption_info.method != EncryptionMethod::Plaintext)
{
Expand All @@ -50,7 +63,16 @@ WritableFilePtr FileProvider::newWritableFile(
int flags,
mode_t mode) const
{
WritableFilePtr file = std::make_shared<PosixWritableFile>(file_path_, truncate_if_exists_, flags, mode, write_limiter_);
WritableFilePtr file;
if (auto view = S3::S3FilenameView::fromKeyWithPrefix(file_path_); view.isValid())
{
file = S3::S3WritableFile::create(view.toFullKey());
}
else
{
RUNTIME_CHECK(Poco::Path(file_path_).isAbsolute(), file_path_);
file = std::make_shared<PosixWritableFile>(file_path_, truncate_if_exists_, flags, mode, write_limiter_);
}
if (encryption_enabled && create_new_encryption_info_)
{
auto encryption_info = key_manager->newFile(encryption_path_.full_path);
Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <TiDB/MockOwnerManager.h>
#include <TiDB/OwnerManager.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/CreateBucketResult.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <common/types.h>
Expand All @@ -39,6 +41,7 @@ class S3LockServiceTest
{
public:
void SetUp() override
try
{
db_context = std::make_unique<Context>(DB::tests::TiFlashTestEnv::getContext());
log = Logger::get();
Expand All @@ -58,8 +61,32 @@ class S3LockServiceTest
s3_client = std::make_shared<MockS3Client>();
}
s3_lock_service = std::make_unique<DB::S3::S3LockService>(owner_manager, s3_client);
createBucketIfNotExist();
createS3DataFiles();
}
CATCH

bool createBucketIfNotExist()
{
Aws::S3::Model::CreateBucketRequest request;
const auto & bucket = s3_client->bucket();
request.SetBucket(bucket);
auto outcome = s3_client->CreateBucket(request);
if (outcome.IsSuccess())
{
LOG_DEBUG(log, "Created bucket {}", bucket);
}
else if (outcome.GetError().GetExceptionName() == "BucketAlreadyOwnedByYou")
{
LOG_DEBUG(log, "Bucket {} already exist", bucket);
}
else
{
const auto & err = outcome.GetError();
LOG_ERROR(log, "CreateBucket: {}:{}", err.GetExceptionName(), err.GetMessage());
}
return outcome.IsSuccess() || outcome.GetError().GetExceptionName() == "BucketAlreadyOwnedByYou";
}

void createS3DataFiles()
{
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -917,9 +917,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setUseAutoScaler(useAutoScaler(config()));

/// Init File Provider
bool enable_encryption = false;
if (proxy_conf.is_proxy_runnable)
{
bool enable_encryption = tiflash_instance_wrap.proxy_helper->checkEncryptionEnabled();
enable_encryption = tiflash_instance_wrap.proxy_helper->checkEncryptionEnabled();
if (enable_encryption)
{
auto method = tiflash_instance_wrap.proxy_helper->getEncryptionMethod();
Expand Down Expand Up @@ -949,6 +950,11 @@ int Server::main(const std::vector<std::string> & /*args*/)

if (storage_config.s3_config.isS3Enabled())
{
if (enable_encryption)
{
LOG_ERROR(log, "Cannot support S3 when encryption enabled.");
throw Exception("Cannot support S3 when encryption enabled.");
}
S3::ClientFactory::instance().init(storage_config.s3_config);
}

Expand Down
19 changes: 13 additions & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/Page/PageUtil.h>
#include <Storages/S3/S3Filename.h>
#include <boost_wrapper/string_split.h>
#include <common/logger_useful.h>
#include <fmt/format.h>
Expand Down Expand Up @@ -138,18 +140,23 @@ DMFilePtr DMFile::restore(
const String & parent_path,
const ReadMetaMode & read_meta_mode)
{
String path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE);
// The path may be dropped by another thread in some cases
auto poco_file = Poco::File(path);
if (!poco_file.exists())
return nullptr;
auto is_s3_file = S3::S3FilenameView::fromKeyWithPrefix(parent_path).isDataFile();
if (!is_s3_file)
{
RUNTIME_CHECK(Poco::Path(parent_path).isAbsolute(), parent_path);
String path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE);
// The path may be dropped by another thread in some cases
auto poco_file = Poco::File(path);
if (!poco_file.exists())
return nullptr;
}

DMFilePtr dmfile(new DMFile(
file_id,
page_id,
parent_path,
Status::READABLE));
if (Poco::File meta_file(dmfile->metav2Path()); meta_file.exists())
if (is_s3_file || Poco::File(dmfile->metav2Path()).exists())
{
auto s = dmfile->readMetaV2(file_provider);
dmfile->parseMetaV2(std::string_view(s.data(), s.size()));
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ DMFileReader::Stream::Stream(
}
else
{
auto filename = reader.dmfile->colDataFileName(file_name_base);
estimated_size = Poco::File(reader.dmfile->subFilePath(filename)).getSize();
estimated_size = reader.dmfile->colDataSize(col_id);
}

buffer_size = std::min(buffer_size, max_read_buffer_size);
Expand Down
Loading

0 comments on commit 0b863a4

Please sign in to comment.