diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index c99d8a53f96..f2c93bd0e80 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -121,7 +121,9 @@ M(S3CompleteMultipartUpload) \ M(S3PutObject) \ M(S3GetObject) \ - M(S3HeadObject) + M(S3HeadObject) \ + M(S3ListObjects) \ + M(S3DeleteObject) namespace ProfileEvents { diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 6d8ea8fd81c..d983aff40ad 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -286,11 +286,21 @@ namespace DB F(type_mpp, {{"type", "mpp"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}), \ F(type_cop, {{"type", "cop"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}), \ F(type_batch, {{"type", "batch"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()})) \ - M(tiflash_shared_block_schemas, "statistics about shared block schemas of ColumnFiles", Gauge, \ + M(tiflash_shared_block_schemas, "statistics about shared block schemas of ColumnFiles", Gauge, \ F(type_current_size, {{"type", "current_size"}}), \ - F(type_still_used_when_evict, {{"type", "still_used_when_evict"}}), \ - F(type_miss_count, {{"type", "miss_count"}}), \ - F(type_hit_count, {{"type", "hit_count"}})) + F(type_still_used_when_evict, {{"type", "still_used_when_evict"}}), \ + F(type_miss_count, {{"type", "miss_count"}}), \ + F(type_hit_count, {{"type", "hit_count"}})) \ + M(tiflash_storage_s3_request_seconds, "S3 request duration in seconds", Histogram, \ + F(type_put_object, {{"type", "put_object"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_get_object, {{"type", "get_object"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_create_multi_part_upload, {{"type", "create_multi_part_upload"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_upload_part, {{"type", "upload_part"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_complete_multi_part_upload, {{"type", "complete_multi_part_upload"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_list_objects, {{"type", "list_objects"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delete_object, {{"type", "delete_object"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20})) + // clang-format on /// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)] diff --git a/dbms/src/Encryption/FileProvider.cpp b/dbms/src/Encryption/FileProvider.cpp index f2f96fa8568..767678e8f85 100644 --- a/dbms/src/Encryption/FileProvider.cpp +++ b/dbms/src/Encryption/FileProvider.cpp @@ -21,6 +21,10 @@ #include #include #include +#include +#include +#include +#include #include #include @@ -32,7 +36,16 @@ RandomAccessFilePtr FileProvider::newRandomAccessFile( const ReadLimiterPtr & read_limiter, int flags) const { - RandomAccessFilePtr file = std::make_shared(file_path_, flags, read_limiter); + RandomAccessFilePtr file; + if (auto view = S3::S3FilenameView::fromKeyWithPrefix(file_path_); view.isValid()) + { + file = S3::S3RandomAccessFile::create(view.toFullKey()); + } + else + { + RUNTIME_CHECK(Poco::Path(file_path_).isAbsolute(), file_path_); + file = std::make_shared(file_path_, flags, read_limiter); + } auto encryption_info = key_manager->getFile(encryption_path_.full_path); if (encryption_info.res != FileEncryptionRes::Disabled && encryption_info.method != EncryptionMethod::Plaintext) { @@ -50,7 +63,16 @@ WritableFilePtr FileProvider::newWritableFile( int flags, mode_t mode) const { - WritableFilePtr file = std::make_shared(file_path_, truncate_if_exists_, flags, mode, write_limiter_); + WritableFilePtr file; + if (auto view = S3::S3FilenameView::fromKeyWithPrefix(file_path_); view.isValid()) + { + file = S3::S3WritableFile::create(view.toFullKey()); + } + else + { + RUNTIME_CHECK(Poco::Path(file_path_).isAbsolute(), file_path_); + file = std::make_shared(file_path_, truncate_if_exists_, flags, mode, write_limiter_); + } if (encryption_enabled && create_new_encryption_info_) { auto encryption_info = key_manager->newFile(encryption_path_.full_path); diff --git a/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp index 8ea3b1db23c..63db63625c4 100644 --- a/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp +++ b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include #include @@ -39,6 +41,7 @@ class S3LockServiceTest { public: void SetUp() override + try { db_context = std::make_unique(DB::tests::TiFlashTestEnv::getContext()); log = Logger::get(); @@ -58,8 +61,32 @@ class S3LockServiceTest s3_client = std::make_shared(); } s3_lock_service = std::make_unique(owner_manager, s3_client); + createBucketIfNotExist(); createS3DataFiles(); } + CATCH + + bool createBucketIfNotExist() + { + Aws::S3::Model::CreateBucketRequest request; + const auto & bucket = s3_client->bucket(); + request.SetBucket(bucket); + auto outcome = s3_client->CreateBucket(request); + if (outcome.IsSuccess()) + { + LOG_DEBUG(log, "Created bucket {}", bucket); + } + else if (outcome.GetError().GetExceptionName() == "BucketAlreadyOwnedByYou") + { + LOG_DEBUG(log, "Bucket {} already exist", bucket); + } + else + { + const auto & err = outcome.GetError(); + LOG_ERROR(log, "CreateBucket: {}:{}", err.GetExceptionName(), err.GetMessage()); + } + return outcome.IsSuccess() || outcome.GetError().GetExceptionName() == "BucketAlreadyOwnedByYou"; + } void createS3DataFiles() { diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index ec02cdf1511..fed94f37df5 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -917,9 +917,10 @@ int Server::main(const std::vector & /*args*/) global_context->setUseAutoScaler(useAutoScaler(config())); /// Init File Provider + bool enable_encryption = false; if (proxy_conf.is_proxy_runnable) { - bool enable_encryption = tiflash_instance_wrap.proxy_helper->checkEncryptionEnabled(); + enable_encryption = tiflash_instance_wrap.proxy_helper->checkEncryptionEnabled(); if (enable_encryption) { auto method = tiflash_instance_wrap.proxy_helper->getEncryptionMethod(); @@ -949,6 +950,11 @@ int Server::main(const std::vector & /*args*/) if (storage_config.s3_config.isS3Enabled()) { + if (enable_encryption) + { + LOG_ERROR(log, "Cannot support S3 when encryption enabled."); + throw Exception("Cannot support S3 when encryption enabled."); + } S3::ClientFactory::instance().init(storage_config.s3_config); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 87424e55b0f..16388d6380d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -22,8 +22,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -138,18 +140,23 @@ DMFilePtr DMFile::restore( const String & parent_path, const ReadMetaMode & read_meta_mode) { - String path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE); - // The path may be dropped by another thread in some cases - auto poco_file = Poco::File(path); - if (!poco_file.exists()) - return nullptr; + auto is_s3_file = S3::S3FilenameView::fromKeyWithPrefix(parent_path).isDataFile(); + if (!is_s3_file) + { + RUNTIME_CHECK(Poco::Path(parent_path).isAbsolute(), parent_path); + String path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE); + // The path may be dropped by another thread in some cases + auto poco_file = Poco::File(path); + if (!poco_file.exists()) + return nullptr; + } DMFilePtr dmfile(new DMFile( file_id, page_id, parent_path, Status::READABLE)); - if (Poco::File meta_file(dmfile->metav2Path()); meta_file.exists()) + if (is_s3_file || Poco::File(dmfile->metav2Path()).exists()) { auto s = dmfile->readMetaV2(file_provider); dmfile->parseMetaV2(std::string_view(s.data(), s.size())); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 6d87595592b..4ac17be5312 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -130,8 +130,7 @@ DMFileReader::Stream::Stream( } else { - auto filename = reader.dmfile->colDataFileName(file_name_base); - estimated_size = Poco::File(reader.dmfile->subFilePath(filename)).getSize(); + estimated_size = reader.dmfile->colDataSize(col_id); } buffer_size = std::min(buffer_size, max_read_buffer_size); diff --git a/dbms/src/Storages/S3/MockS3Client.cpp b/dbms/src/Storages/S3/MockS3Client.cpp index d620a85e75c..cf4350850ca 100644 --- a/dbms/src/Storages/S3/MockS3Client.cpp +++ b/dbms/src/Storages/S3/MockS3Client.cpp @@ -16,106 +16,164 @@ #include #include #include +#include #include +#include #include #include +#include +#include +#include +#include #include +#include +#include #include #include #include #include #include #include +#include +#include namespace DB::S3::tests { using namespace Aws::S3; -Model::PutObjectOutcome MockS3Client::PutObject(const Model::PutObjectRequest & r) const +Model::GetObjectOutcome MockS3Client::GetObject(const Model::GetObjectRequest & request) const { - put_keys.emplace_back(r.GetKey()); - return Model::PutObjectOutcome{Aws::AmazonWebServiceResult{}}; + std::lock_guard lock(mtx); + auto itr = storage.find(request.GetBucket()); + if (itr == storage.end()) + { + return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuchBucket"); + } + const auto & bucket_storage = itr->second; + auto itr_obj = bucket_storage.find(request.GetKey()); + if (itr_obj == bucket_storage.end()) + { + return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuchKey"); + } + auto * ss = new std::stringstream(itr_obj->second); + Model::GetObjectResult result; + result.ReplaceBody(ss); + result.SetContentLength(itr_obj->second.size()); + return result; } -Model::DeleteObjectOutcome MockS3Client::DeleteObject(const Model::DeleteObjectRequest & r) const +Model::PutObjectOutcome MockS3Client::PutObject(const Model::PutObjectRequest & request) const { - delete_keys.emplace_back(r.GetKey()); - return Model::DeleteObjectOutcome{Aws::AmazonWebServiceResult{}}; + std::lock_guard lock(mtx); + auto itr = storage.find(request.GetBucket()); + if (itr == storage.end()) + { + return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuchBucket"); + } + auto & bucket_storage = itr->second; + bucket_storage[request.GetKey()] = String{std::istreambuf_iterator(*request.GetBody()), {}}; + return Model::PutObjectResult{}; } -Model::ListObjectsV2Outcome MockS3Client::ListObjectsV2(const Model::ListObjectsV2Request & r) const +Model::DeleteObjectOutcome MockS3Client::DeleteObject(const Model::DeleteObjectRequest & request) const { - Model::ListObjectsV2Result resp; - for (const auto & k : put_keys) + std::lock_guard lock(mtx); + auto itr = storage.find(request.GetBucket()); + if (itr == storage.end()) { - if (startsWith(k, r.GetPrefix())) - { - bool is_deleted = false; - for (const auto & d : delete_keys) - { - if (k == d) - { - is_deleted = true; - break; - } - } - if (is_deleted) - continue; - Model::Object o; - o.SetKey(k); - resp.AddContents(o); - } + return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuchBucket"); } - for (const auto & k : list_result) + auto & bucket_storage = itr->second; + bucket_storage.erase(request.GetKey()); + return Model::DeleteObjectResult{}; +} + +Model::ListObjectsV2Outcome MockS3Client::ListObjectsV2(const Model::ListObjectsV2Request & request) const +{ + std::lock_guard lock(mtx); + auto itr = storage.find(request.GetBucket()); + if (itr == storage.end()) { - if (startsWith(k, r.GetPrefix())) + return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuchBucket"); + } + const auto & bucket_storage = itr->second; + Model::ListObjectsV2Result result; + for (auto itr_obj = bucket_storage.lower_bound(request.GetPrefix()); itr_obj != bucket_storage.end(); ++itr_obj) + { + if (startsWith(itr_obj->first, request.GetPrefix())) + { + Model::Object obj; + obj.SetKey(itr_obj->first); + obj.SetSize(itr_obj->second.size()); + result.AddContents(std::move(obj)); + } + else { - bool is_deleted = false; - for (const auto & d : delete_keys) - { - if (k == d) - { - is_deleted = true; - break; - } - } - if (is_deleted) - continue; - Model::Object o; - o.SetKey(k); - resp.AddContents(o); + break; } } - return Model::ListObjectsV2Outcome{resp}; + return result; } -Model::HeadObjectOutcome MockS3Client::HeadObject(const Model::HeadObjectRequest & r) const +Model::HeadObjectOutcome MockS3Client::HeadObject(const Model::HeadObjectRequest & request) const { - for (const auto & k : put_keys) + std::lock_guard lock(mtx); + auto itr = storage.find(request.GetBucket()); + if (itr == storage.end()) { - if (r.GetKey() == k) - { - Model::HeadObjectResult resp; - return Model::HeadObjectOutcome{resp}; - } + return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuchBucket"); + } + const auto & bucket_storage = itr->second; + auto itr_obj = bucket_storage.find(request.GetKey()); + if (itr_obj != bucket_storage.end()) + { + return Model::HeadObjectResult{}; } + return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuchKey"); +} + +Model::CreateMultipartUploadOutcome MockS3Client::CreateMultipartUpload(const Model::CreateMultipartUploadRequest & /*request*/) const +{ + static std::atomic upload_id{0}; + Model::CreateMultipartUploadResult result; + result.SetUploadId(std::to_string(upload_id++)); + return result; +} - if (!head_result_mtime) +Model::UploadPartOutcome MockS3Client::UploadPart(const Model::UploadPartRequest & request) const +{ + std::lock_guard lock(mtx); + upload_parts[request.GetUploadId()][request.GetPartNumber()] = String{std::istreambuf_iterator(*request.GetBody()), {}}; + Model::UploadPartResult result; + result.SetETag(std::to_string(request.GetPartNumber())); + return result; +} + +Model::CompleteMultipartUploadOutcome MockS3Client::CompleteMultipartUpload(const Model::CompleteMultipartUploadRequest & request) const +{ + std::lock_guard lock(mtx); + const auto & parts = upload_parts[request.GetUploadId()]; + String s; + for (const auto & p : parts) { - Aws::Client::AWSError error(S3Errors::NO_SUCH_KEY, false); - return Model::HeadObjectOutcome{error}; + s += p.second; } - Model::HeadObjectResult resp; - resp.SetLastModified(head_result_mtime.value()); - return Model::HeadObjectOutcome{resp}; + auto itr = storage.find(request.GetBucket()); + if (itr == storage.end()) + { + return Aws::S3::S3ErrorMapper::GetErrorForName("NoSuchBucket"); + } + auto & bucket_storage = itr->second; + bucket_storage[request.GetKey()] = s; + return Model::CompleteMultipartUploadResult{}; } -void MockS3Client::clear() +Model::CreateBucketOutcome MockS3Client::CreateBucket(const Model::CreateBucketRequest & request) const { - put_keys.clear(); - delete_keys.clear(); - list_result.clear(); - head_result_mtime.reset(); + std::lock_guard lock(mtx); + [[maybe_unused]] auto & bucket_storage = storage[request.GetBucket()]; + return Model::CreateBucketResult{}; } + } // namespace DB::S3::tests diff --git a/dbms/src/Storages/S3/MockS3Client.h b/dbms/src/Storages/S3/MockS3Client.h index 5290dc69206..c89b9a69b7a 100644 --- a/dbms/src/Storages/S3/MockS3Client.h +++ b/dbms/src/Storages/S3/MockS3Client.h @@ -20,27 +20,32 @@ namespace DB::S3::tests { +using namespace Aws::S3; + class MockS3Client final : public S3::TiFlashS3Client { public: - MockS3Client() - : TiFlashS3Client("") + explicit MockS3Client(const String & bucket = "") + : TiFlashS3Client(bucket) {} ~MockS3Client() override = default; - void clear(); - - Aws::S3::Model::PutObjectOutcome PutObject(const Aws::S3::Model::PutObjectRequest & r) const override; - mutable Strings put_keys; - - Aws::S3::Model::DeleteObjectOutcome DeleteObject(const Aws::S3::Model::DeleteObjectRequest & r) const override; - mutable Strings delete_keys; - - Aws::S3::Model::ListObjectsV2Outcome ListObjectsV2(const Aws::S3::Model::ListObjectsV2Request & r) const override; - mutable Strings list_result; - - std::optional head_result_mtime; - Aws::S3::Model::HeadObjectOutcome HeadObject(const Aws::S3::Model::HeadObjectRequest & request) const override; + Model::GetObjectOutcome GetObject(const Model::GetObjectRequest & request) const override; + Model::PutObjectOutcome PutObject(const Model::PutObjectRequest & request) const override; + Model::ListObjectsV2Outcome ListObjectsV2(const Model::ListObjectsV2Request & request) const override; + Model::CreateMultipartUploadOutcome CreateMultipartUpload(const Model::CreateMultipartUploadRequest & request) const override; + Model::UploadPartOutcome UploadPart(const Model::UploadPartRequest & request) const override; + Model::CompleteMultipartUploadOutcome CompleteMultipartUpload(const Model::CompleteMultipartUploadRequest & request) const override; + Model::CreateBucketOutcome CreateBucket(const Model::CreateBucketRequest & request) const override; + Model::DeleteObjectOutcome DeleteObject(const Model::DeleteObjectRequest & request) const override; + Model::HeadObjectOutcome HeadObject(const Model::HeadObjectRequest & request) const override; + +private: + using BucketStorage = std::map; + using UploadParts = std::map; + mutable std::mutex mtx; + mutable std::unordered_map storage; + mutable std::unordered_map upload_parts; }; } // namespace DB::S3::tests diff --git a/dbms/src/Storages/S3/S3Common.cpp b/dbms/src/Storages/S3/S3Common.cpp index 2157f187ba1..5897c8077c2 100644 --- a/dbms/src/Storages/S3/S3Common.cpp +++ b/dbms/src/Storages/S3/S3Common.cpp @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include +#include #include #include #include @@ -33,21 +33,20 @@ #include #include -#include namespace ProfileEvents { extern const Event S3HeadObject; -} - -namespace DB::ErrorCodes -{ -extern const int S3_ERROR; -} +extern const Event S3GetObject; +extern const Event S3ReadBytes; +extern const Event S3PutObject; +extern const Event S3WriteBytes; +extern const Event S3ListObjects; +extern const Event S3DeleteObject; +} // namespace ProfileEvents namespace { - Poco::Message::Priority convertLogLevel(Aws::Utils::Logging::LogLevel log_level) { switch (log_level) @@ -149,7 +148,7 @@ void ClientFactory::init(const StorageS3Config & config_, bool mock_s3_) } else { - shared_tiflash_client = std::make_shared(config.bucket, std::make_unique()); + shared_tiflash_client = std::make_unique(config.bucket); } } @@ -235,21 +234,11 @@ bool isNotFoundError(Aws::S3::S3Errors error) return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY; } -namespace details -{ -template -Exception fromS3Error(const Aws::S3::S3Error & e, const std::string & fmt, Args &&... args) -{ - return DB::Exception( - ErrorCodes::S3_ERROR, - fmt + fmt::format(" s3error={} s3msg={}", magic_enum::enum_name(e.GetErrorType()), e.GetMessage()), - args...); -} -} // namespace details - Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id) { ProfileEvents::increment(ProfileEvents::S3HeadObject); + Stopwatch sw; + SCOPE_EXIT({ GET_METRIC(tiflash_storage_s3_request_seconds, type_head_object).Observe(sw.elapsedSeconds()); }); Aws::S3::Model::HeadObjectRequest req; req.SetBucket(bucket); req.SetKey(key); @@ -271,7 +260,7 @@ S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bu } else if (throw_on_error) { - throw details::fromS3Error(outcome.GetError(), "Failed to HEAD object, key={}", key); + throw fromS3Error(outcome.GetError(), "Failed to HEAD object, key={}", key); } return {}; } @@ -293,7 +282,7 @@ bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const { return false; } - throw details::fromS3Error(outcome.GetError(), "Failed to check existence of object, bucket={} key={}", bucket, key); + throw fromS3Error(outcome.GetError(), "Failed to check existence of object, bucket={} key={}", bucket, key); } void uploadEmptyFile(const Aws::S3::S3Client & client, const String & bucket, const String & key) @@ -304,13 +293,16 @@ void uploadEmptyFile(const Aws::S3::S3Client & client, const String & bucket, co req.SetContentType("binary/octet-stream"); auto istr = Aws::MakeShared("EmptyObjectInputStream", "", std::ios_base::in | std::ios_base::binary); req.SetBody(istr); + ProfileEvents::increment(ProfileEvents::S3PutObject); auto result = client.PutObject(req); if (!result.IsSuccess()) { - throw details::fromS3Error(result.GetError(), "S3 PutEmptyObject failed, bucket={} key={}", bucket, key); + throw fromS3Error(result.GetError(), "S3 PutEmptyObject failed, bucket={} key={}", bucket, key); } - static auto * log = &Poco::Logger::get("S3UploadFile"); - LOG_DEBUG(log, "remote_fname={}, cost={}ms", key, sw.elapsedMilliseconds()); + auto elapsed_seconds = sw.elapsedSeconds(); + GET_METRIC(tiflash_storage_s3_request_seconds, type_put_object).Observe(elapsed_seconds); + static auto log = Logger::get(); + LOG_DEBUG(log, "remote_fname={}, cost={}ms", key, elapsed_seconds); } void uploadFile(const Aws::S3::S3Client & client, const String & bucket, const String & local_fname, const String & remote_fname) @@ -320,14 +312,20 @@ void uploadFile(const Aws::S3::S3Client & client, const String & bucket, const S req.WithBucket(bucket).WithKey(remote_fname); req.SetContentType("binary/octet-stream"); auto istr = Aws::MakeShared("PutObjectInputStream", local_fname, std::ios_base::in | std::ios_base::binary); + RUNTIME_CHECK_MSG(istr->is_open(), "Open {} fail: {}", local_fname, strerror(errno)); req.SetBody(istr); + ProfileEvents::increment(ProfileEvents::S3PutObject); auto result = client.PutObject(req); if (!result.IsSuccess()) { - throw details::fromS3Error(result.GetError(), "S3 PutObject failed, local_fname={} bucket={} key={}", local_fname, bucket, remote_fname); + throw fromS3Error(result.GetError(), "S3 PutObject failed, local_fname={} bucket={} key={}", local_fname, bucket, remote_fname); } + auto write_bytes = istr->tellg(); + ProfileEvents::increment(ProfileEvents::S3WriteBytes, write_bytes); + auto elapsed_seconds = sw.elapsedSeconds(); + GET_METRIC(tiflash_storage_s3_request_seconds, type_put_object).Observe(elapsed_seconds); static auto log = Logger::get(); - LOG_DEBUG(log, "local_fname={}, remote_fname={}, cost={}ms", local_fname, remote_fname, sw.elapsedMilliseconds()); + LOG_DEBUG(log, "local_fname={}, remote_fname={}, write_bytes={} cost={}ms", local_fname, remote_fname, write_bytes, elapsed_seconds); } void downloadFile(const Aws::S3::S3Client & client, const String & bucket, const String & local_fname, const String & remote_fname) @@ -336,15 +334,18 @@ void downloadFile(const Aws::S3::S3Client & client, const String & bucket, const Aws::S3::Model::GetObjectRequest req; req.SetBucket(bucket); req.SetKey(remote_fname); - auto result = client.GetObject(req); - if (!result.IsSuccess()) + ProfileEvents::increment(ProfileEvents::S3GetObject); + auto outcome = client.GetObject(req); + if (!outcome.IsSuccess()) { - throw details::fromS3Error(result.GetError(), "S3 GetObject failed, local_fname={} bucket={} key={}", local_fname, bucket, remote_fname); + throw fromS3Error(outcome.GetError(), "remote_fname={}", remote_fname); } + ProfileEvents::increment(ProfileEvents::S3ReadBytes, outcome.GetResult().GetContentLength()); + GET_METRIC(tiflash_storage_s3_request_seconds, type_get_object).Observe(sw.elapsedSeconds()); Aws::OFStream ostr(local_fname, std::ios_base::out | std::ios_base::binary); - ostr << result.GetResult().GetBody().rdbuf(); - static auto log = Logger::get(); - LOG_DEBUG(log, "local_fname={}, remote_fname={}, cost={}ms", local_fname, remote_fname, sw.elapsedMilliseconds()); + RUNTIME_CHECK_MSG(ostr.is_open(), "Open {} fail: {}", local_fname, strerror(errno)); + ostr << outcome.GetResult().GetBody().rdbuf(); + RUNTIME_CHECK_MSG(ostr.good(), "Write {} fail: {}", local_fname, strerror(errno)); } void listPrefix( @@ -380,11 +381,14 @@ void listPrefix( size_t num_keys = 0; while (!done) { + Stopwatch sw_list; + ProfileEvents::increment(ProfileEvents::S3ListObjects); auto outcome = client.ListObjectsV2(req); if (!outcome.IsSuccess()) { - throw details::fromS3Error(outcome.GetError(), "S3 ListObjects failed, bucket={} prefix={} delimiter={}", bucket, prefix, delimiter); + throw fromS3Error(outcome.GetError(), "S3 ListObjects failed, bucket={} prefix={} delimiter={}", bucket, prefix, delimiter); } + GET_METRIC(tiflash_storage_s3_request_seconds, type_list_objects).Observe(sw_list.elapsedSeconds()); const auto & result = outcome.GetResult(); PageResult page_res = pager(result); @@ -430,7 +434,7 @@ std::pair tryGetObjectModifiedTime( { return {false, {}}; } - throw details::fromS3Error(o.GetError(), "Failed to check existence of object, bucket={} key={}", bucket, key); + throw fromS3Error(o.GetError(), "Failed to check existence of object, bucket={} key={}", bucket, key); } // Else the object still exist const auto & res = o.GetResult(); @@ -441,14 +445,17 @@ std::pair tryGetObjectModifiedTime( void deleteObject(const Aws::S3::S3Client & client, const String & bucket, const String & key) { + Stopwatch sw; Aws::S3::Model::DeleteObjectRequest req; req.SetBucket(bucket); req.SetKey(key); + ProfileEvents::increment(ProfileEvents::S3DeleteObject); auto o = client.DeleteObject(req); RUNTIME_CHECK(o.IsSuccess(), o.GetError().GetMessage()); const auto & res = o.GetResult(); // "DeleteMark" of S3 service, don't know what will lead to this RUNTIME_CHECK(!res.GetDeleteMarker(), bucket, key); + GET_METRIC(tiflash_storage_s3_request_seconds, type_delete_object).Observe(sw.elapsedSeconds()); } } // namespace DB::S3 diff --git a/dbms/src/Storages/S3/S3Common.h b/dbms/src/Storages/S3/S3Common.h index b21cac4074c..979dd081c09 100644 --- a/dbms/src/Storages/S3/S3Common.h +++ b/dbms/src/Storages/S3/S3Common.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -22,8 +23,23 @@ #include #include +#include + +namespace DB::ErrorCodes +{ +extern const int S3_ERROR; +} + namespace DB::S3 { +template +Exception fromS3Error(const Aws::S3::S3Error & e, const std::string & fmt, Args &&... args) +{ + return DB::Exception( + ErrorCodes::S3_ERROR, + fmt + fmt::format(" s3error={} s3msg={}", magic_enum::enum_name(e.GetErrorType()), e.GetMessage()), + args...); +} class TiFlashS3Client : public Aws::S3::S3Client { diff --git a/dbms/src/Storages/S3/S3Filename.cpp b/dbms/src/Storages/S3/S3Filename.cpp index 6ab41cb8a88..51b13524874 100644 --- a/dbms/src/Storages/S3/S3Filename.cpp +++ b/dbms/src/Storages/S3/S3Filename.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -24,7 +25,6 @@ namespace DB::S3 { - //==== Serialize/Deserialize ====// namespace details @@ -52,6 +52,7 @@ constexpr static std::string_view fmt_subpath_manifest = " constexpr static std::string_view fmt_datafile_prefix = "s{store_id}/data/"; constexpr static std::string_view fmt_data_file = "s{store_id}/data/{subpath}"; constexpr static std::string_view fmt_subpath_checkpoint_data = "dat_{seq}_{index}"; +constexpr static std::string_view fmt_subpath_dttable = "t_{table_id}"; constexpr static std::string_view fmt_subpath_dtfile = "t_{table_id}/dmf_{id}"; constexpr static std::string_view fmt_subpath_keyspace_dtfile = "ks_{keyspace}_t_{table_id}/dmf_{id}"; @@ -61,6 +62,8 @@ constexpr static std::string_view fmt_lock_prefix = "lock/"; constexpr static std::string_view fmt_lock_datafile_prefix = "lock/s{store_id}/{subpath}.lock_"; constexpr static std::string_view fmt_lock_file = "lock/s{store_id}/{subpath}.lock_s{lock_store}_{lock_seq}"; +// If you want to read/write S3 object as file throught `FileProvider`, file path must starts with `s3_filename_prefix`. +constexpr static std::string_view s3_filename_prefix = "s3://"; // clang-format on @@ -92,6 +95,11 @@ String S3Filename::toFullKey() const return details::toFullKey(type, store_id, data_subpath); } +String S3Filename::toFullKeyWithPrefix() const +{ + return fmt::format("{}{}", details::s3_filename_prefix, toFullKey()); +} + String S3Filename::toManifestPrefix() const { RUNTIME_CHECK(type == S3FilenameType::StorePrefix); @@ -189,6 +197,15 @@ S3FilenameView S3FilenameView::fromStoreKeyPrefix(const std::string_view prefix) return res; } +S3FilenameView S3FilenameView::fromKeyWithPrefix(std::string_view fullpath) +{ + if (startsWith(fullpath, details::s3_filename_prefix)) + { + return fromKey(fullpath.substr(details::s3_filename_prefix.size())); + } + return S3FilenameView{}; // Invalid +} + //==== Data file utils ====// String S3FilenameView::getLockPrefix() const @@ -293,6 +310,15 @@ S3Filename S3Filename::fromDMFileOID(const DMFileOID & oid) }; } +S3Filename S3Filename::fromTableID(StoreID store_id, TableID table_id) +{ + return S3Filename{ + .type = S3FilenameType::DataFile, + .store_id = store_id, + .data_subpath = fmt::format(details::fmt_subpath_dttable, fmt::arg("table_id", table_id)), + }; +} + S3Filename S3Filename::newCheckpointData(StoreID store_id, UInt64 upload_seq, UInt64 file_idx) { return S3Filename{ diff --git a/dbms/src/Storages/S3/S3Filename.h b/dbms/src/Storages/S3/S3Filename.h index f21b9e3b5d4..cde48e2bca0 100644 --- a/dbms/src/Storages/S3/S3Filename.h +++ b/dbms/src/Storages/S3/S3Filename.h @@ -23,7 +23,6 @@ namespace DB::S3 { - struct DMFileOID { StoreID store_id = 0; @@ -112,12 +111,16 @@ struct S3FilenameView ALWAYS_INLINE bool isDelMark() const { return type == S3FilenameType::DelMark; } + ALWAYS_INLINE bool isValid() const { return type != S3FilenameType::Invalid; } + public: // The result return a view from the `fullpath`. // If parsing from a raw char ptr, do NOT create a temporary String object. static S3FilenameView fromKey(std::string_view fullpath); static S3FilenameView fromStoreKeyPrefix(std::string_view prefix); + + static S3FilenameView fromKeyWithPrefix(std::string_view fullpath); }; // Use for generating the S3 object key @@ -129,11 +132,17 @@ struct S3Filename static S3Filename fromStoreId(StoreID store_id); static S3Filename fromDMFileOID(const DMFileOID & oid); + static S3Filename fromTableID(StoreID store_id, TableID table_id); static S3Filename newCheckpointData(StoreID store_id, UInt64 upload_seq, UInt64 file_idx); static S3Filename newCheckpointManifest(StoreID store_id, UInt64 upload_seq); String toFullKey() const; + // `toFullKeyWithPrefix` will as a `s3:://` prefix in full key. + // You can pass a full key with prefix to `FileProvider` as file path, + // if you want to read/write S3 object as file. + String toFullKeyWithPrefix() const; + String toManifestPrefix() const; String toDataPrefix() const; diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.cpp b/dbms/src/Storages/S3/S3RandomAccessFile.cpp new file mode 100644 index 00000000000..482c79d3fa7 --- /dev/null +++ b/dbms/src/Storages/S3/S3RandomAccessFile.cpp @@ -0,0 +1,95 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +namespace ProfileEvents +{ +extern const Event S3GetObject; +extern const Event S3ReadBytes; +} // namespace ProfileEvents + +namespace DB::S3 +{ +S3RandomAccessFile::S3RandomAccessFile( + std::shared_ptr client_ptr_, + const String & bucket_, + const String & remote_fname_) + : client_ptr(std::move(client_ptr_)) + , bucket(bucket_) + , remote_fname(remote_fname_) + , log(Logger::get("S3RandomAccessFile")) +{ + initialize(); +} + +ssize_t S3RandomAccessFile::read(char * buf, size_t size) +{ + auto & istr = read_result.GetBody(); + istr.read(buf, size); + size_t gcount = istr.gcount(); + if (gcount == 0 && !istr.eof()) + { + LOG_ERROR(log, "Cannot read from istream. bucket={} key={}", bucket, remote_fname); + return -1; + } + ProfileEvents::increment(ProfileEvents::S3ReadBytes, gcount); + return gcount; +} + +off_t S3RandomAccessFile::seek(off_t offset_, int whence) +{ + if (unlikely(whence != SEEK_SET)) + { + LOG_ERROR(log, "Only SEEK_SET mode is allowed, but {} is received", whence); + return -1; + } + if (unlikely(offset_ < 0)) + { + LOG_ERROR(log, "Seek position is out of bounds. Offset: {}", offset_); + return -1; + } + auto & istr = read_result.GetBody(); + istr.seekg(offset_); + return istr.tellg(); +} + +void S3RandomAccessFile::initialize() +{ + Stopwatch sw; + Aws::S3::Model::GetObjectRequest req; + req.SetBucket(bucket); + req.SetKey(remote_fname); + ProfileEvents::increment(ProfileEvents::S3GetObject); + auto outcome = client_ptr->GetObject(req); + if (!outcome.IsSuccess()) + { + throw S3::fromS3Error(outcome.GetError(), "bucket={} key={}", bucket, remote_fname); + } + ProfileEvents::increment(ProfileEvents::S3ReadBytes, outcome.GetResult().GetContentLength()); + GET_METRIC(tiflash_storage_s3_request_seconds, type_get_object).Observe(sw.elapsedSeconds()); + read_result = outcome.GetResultWithOwnership(); +} + +RandomAccessFilePtr S3RandomAccessFile::create(const String & remote_fname) +{ + auto & ins = S3::ClientFactory::instance(); + return std::make_shared(ins.sharedClient(), ins.bucket(), remote_fname); +} +} // namespace DB::S3 diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.h b/dbms/src/Storages/S3/S3RandomAccessFile.h new file mode 100644 index 00000000000..767dbe49da2 --- /dev/null +++ b/dbms/src/Storages/S3/S3RandomAccessFile.h @@ -0,0 +1,87 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace Aws::S3 +{ +class S3Client; +} + +namespace DB::ErrorCodes +{ +extern const int NOT_IMPLEMENTED; +} + +namespace DB::S3 +{ +class S3RandomAccessFile final : public RandomAccessFile +{ +public: + static RandomAccessFilePtr create(const String & remote_fname); + + S3RandomAccessFile( + std::shared_ptr client_ptr_, + const String & bucket_, + const String & remote_fname_); + + off_t seek(off_t offset, int whence) override; + + ssize_t read(char * buf, size_t size) override; + + std::string getFileName() const override + { + return fmt::format("{}/{}", bucket, remote_fname); + } + + ssize_t pread(char * /*buf*/, size_t /*size*/, off_t /*offset*/) const override + { + throw Exception("S3RandomAccessFile not support pread", ErrorCodes::NOT_IMPLEMENTED); + } + + int getFd() const override + { + return -1; + } + + bool isClosed() const override + { + return is_close; + } + + void close() override + { + is_close = true; + } + +private: + void initialize(); + + std::shared_ptr client_ptr; + String bucket; + String remote_fname; + + Aws::S3::Model::GetObjectResult read_result; + + DB::LoggerPtr log; + bool is_close = false; +}; + +} // namespace DB::S3 diff --git a/dbms/src/Storages/S3/S3WritableFile.cpp b/dbms/src/Storages/S3/S3WritableFile.cpp new file mode 100644 index 00000000000..30cbfa291b0 --- /dev/null +++ b/dbms/src/Storages/S3/S3WritableFile.cpp @@ -0,0 +1,295 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace ProfileEvents +{ +extern const Event S3WriteBytes; +extern const Event S3CreateMultipartUpload; +extern const Event S3UploadPart; +extern const Event S3CompleteMultipartUpload; +extern const Event S3PutObject; +} // namespace ProfileEvents + +namespace DB::ErrorCodes +{ +extern const int CORRUPTED_DATA; +} + +namespace DB::S3 +{ +struct S3WritableFile::UploadPartTask +{ + Aws::S3::Model::UploadPartRequest req; + bool is_finished = false; + std::string tag; + std::exception_ptr exception; +}; + +struct S3WritableFile::PutObjectTask +{ + Aws::S3::Model::PutObjectRequest req; + bool is_finished = false; + std::exception_ptr exception; +}; + +S3WritableFile::S3WritableFile( + std::shared_ptr client_ptr_, + const String & bucket_, + const String & remote_fname_, + const WriteSettings & write_settings_) + : bucket(bucket_) + , remote_fname(remote_fname_) + , client_ptr(std::move(client_ptr_)) + , write_settings(write_settings_) + , log(Logger::get("S3WritableFile")) +{ + allocateBuffer(); +} + +S3WritableFile::~S3WritableFile() = default; + +ssize_t S3WritableFile::write(char * buf, size_t size) +{ + temporary_buffer->write(buf, size); + if (!temporary_buffer->good()) + { + LOG_ERROR(log, "write size={} failed: bucket={} key={}", size, bucket, remote_fname); + return -1; + } + ProfileEvents::increment(ProfileEvents::S3WriteBytes, size); + last_part_size += size; + total_write_bytes += size; + + // Data size exceeds singlepart upload threshold, need to use multipart upload. + if (multipart_upload_id.empty() && last_part_size > write_settings.max_single_part_upload_size) + { + createMultipartUpload(); + } + + if (!multipart_upload_id.empty() && last_part_size > write_settings.upload_part_size) + { + writePart(); + allocateBuffer(); + } + return size; +} + +void S3WritableFile::allocateBuffer() +{ + temporary_buffer = Aws::MakeShared("temporary buffer"); + last_part_size = 0; +} + +int S3WritableFile::fsync() +{ + if (multipart_upload_id.empty()) + { + makeSinglepartUpload(); + } + else + { + // Write rest of the data as last part. + writePart(); + } + finalize(); + return 0; +} + +void S3WritableFile::finalize() +{ + if (!multipart_upload_id.empty()) + { + completeMultipartUpload(); + } + if (write_settings.check_objects_after_upload) + { + // TODO(jinhe): check checksums. + auto resp = S3::headObject(*client_ptr, bucket, remote_fname); + checkS3Outcome(resp); + } +} + +void S3WritableFile::createMultipartUpload() +{ + Stopwatch sw; + SCOPE_EXIT({ + GET_METRIC(tiflash_storage_s3_request_seconds, type_create_multi_part_upload).Observe(sw.elapsedSeconds()); + }); + Aws::S3::Model::CreateMultipartUploadRequest req; + req.SetBucket(bucket); + req.SetKey(remote_fname); + req.SetContentType("binary/octet-stream"); + ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload); + auto outcome = client_ptr->CreateMultipartUpload(req); + checkS3Outcome(outcome); + multipart_upload_id = outcome.GetResult().GetUploadId(); +} + +void S3WritableFile::writePart() +{ + auto size = temporary_buffer->tellp(); + if (size < 0) + { + throw Exception(ErrorCodes::CORRUPTED_DATA, "Buffer is in bad state. bucket={}, key={}", bucket, remote_fname); + } + if (size == 0) + { + LOG_DEBUG(log, "Skipping writing part. Buffer is empty."); + return; + } + + UploadPartTask task; + fillUploadRequest(task.req); + processUploadRequest(task); + part_tags.push_back(task.tag); +} + +void S3WritableFile::fillUploadRequest(Aws::S3::Model::UploadPartRequest & req) +{ + // Increase part number. + ++part_number; + // Setup request. + req.SetBucket(bucket); + req.SetKey(remote_fname); + req.SetPartNumber(static_cast(part_number)); + req.SetUploadId(multipart_upload_id); + req.SetContentLength(temporary_buffer->tellp()); + req.SetBody(temporary_buffer); + req.SetContentType("binary/octet-stream"); +} + +void S3WritableFile::processUploadRequest(UploadPartTask & task) +{ + Stopwatch sw; + SCOPE_EXIT({ + GET_METRIC(tiflash_storage_s3_request_seconds, type_upload_part).Observe(sw.elapsedSeconds()); + }); + ProfileEvents::increment(ProfileEvents::S3UploadPart); + auto outcome = client_ptr->UploadPart(task.req); + checkS3Outcome(outcome); + task.tag = outcome.GetResult().GetETag(); +} + +void S3WritableFile::completeMultipartUpload() +{ + RUNTIME_CHECK_MSG(!part_tags.empty(), "Failed to complete multipart upload. No parts have uploaded. bucket={}, key={}", bucket, remote_fname); + + Aws::S3::Model::CompleteMultipartUploadRequest req; + req.SetBucket(bucket); + req.SetKey(remote_fname); + req.SetUploadId(multipart_upload_id); + Aws::S3::Model::CompletedMultipartUpload multipart_upload; + for (size_t i = 0; i < part_tags.size(); ++i) + { + Aws::S3::Model::CompletedPart part; + multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(static_cast(i + 1))); + } + req.SetMultipartUpload(multipart_upload); + + size_t max_retry = std::max(write_settings.max_unexpected_write_error_retries, 1UL); + for (size_t i = 0; i < max_retry; ++i) + { + Stopwatch sw; + SCOPE_EXIT({ + GET_METRIC(tiflash_storage_s3_request_seconds, type_complete_multi_part_upload).Observe(sw.elapsedSeconds()); + }); + ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload); + auto outcome = client_ptr->CompleteMultipartUpload(req); + if (outcome.IsSuccess()) + { + LOG_DEBUG(log, "Multipart upload has completed. bucket={} key={} upload_id={} parts={}", bucket, remote_fname, multipart_upload_id, part_tags.size()); + break; + } + if (i + 1 < max_retry) + { + const auto & e = outcome.GetError(); + LOG_INFO(log, "Multipart upload failed and need retry: bucket={} key={} upload_id={} parts={} error={} message={}", bucket, remote_fname, multipart_upload_id, part_tags.size(), magic_enum::enum_name(e.GetErrorType()), e.GetMessage()); + } + else + { + throw fromS3Error(outcome.GetError(), "bucket={} key={} upload_id={} parts={}", bucket, remote_fname, multipart_upload_id, part_tags.size()); + } + } +} + +void S3WritableFile::makeSinglepartUpload() +{ + auto size = temporary_buffer->tellp(); + if (size < 0) + { + throw Exception(ErrorCodes::CORRUPTED_DATA, "Buffer is in bad state. bucket={}, key={}", bucket, remote_fname); + } + PutObjectTask task; + fillPutRequest(task.req); + processPutRequest(task); +} + +void S3WritableFile::fillPutRequest(Aws::S3::Model::PutObjectRequest & req) +{ + req.SetBucket(bucket); + req.SetKey(remote_fname); + req.SetContentLength(temporary_buffer->tellp()); + req.SetBody(temporary_buffer); + req.SetContentType("binary/octet-stream"); +} + +void S3WritableFile::processPutRequest(const PutObjectTask & task) +{ + size_t max_retry = std::max(write_settings.max_unexpected_write_error_retries, 1UL); + for (size_t i = 0; i < max_retry; ++i) + { + Stopwatch sw; + SCOPE_EXIT({ + GET_METRIC(tiflash_storage_s3_request_seconds, type_put_object).Observe(sw.elapsedSeconds()); + }); + ProfileEvents::increment(ProfileEvents::S3PutObject); + auto outcome = client_ptr->PutObject(task.req); + if (outcome.IsSuccess()) + { + LOG_DEBUG(log, "Single part upload has completed. bucket={}, key={}, size={}", bucket, remote_fname, task.req.GetContentLength()); + break; + } + if (i + 1 < max_retry) + { + const auto & e = outcome.GetError(); + LOG_INFO(log, "Single part upload failed: bucket={} key={} error={} message={}", bucket, remote_fname, magic_enum::enum_name(e.GetErrorType()), e.GetMessage()); + } + else + { + throw fromS3Error(outcome.GetError(), "bucket={} key={}", bucket, remote_fname); + } + } +} + +std::shared_ptr S3WritableFile::create(const String & remote_fname_) +{ + return std::make_shared( + S3::ClientFactory::instance().sharedClient(), + S3::ClientFactory::instance().bucket(), + remote_fname_, + WriteSettings{}); +} +} // namespace DB::S3 diff --git a/dbms/src/Storages/S3/S3WritableFile.h b/dbms/src/Storages/S3/S3WritableFile.h new file mode 100644 index 00000000000..5d8b7988bbb --- /dev/null +++ b/dbms/src/Storages/S3/S3WritableFile.h @@ -0,0 +1,166 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace Aws::S3 +{ +class S3Client; +} + +namespace Aws::S3::Model +{ +class UploadPartRequest; +class PutObjectRequest; +} // namespace Aws::S3::Model + +namespace DB::ErrorCodes +{ +extern const int NOT_IMPLEMENTED; +} + +namespace DB::S3 +{ +struct WriteSettings +{ + size_t upload_part_size = 16 * 1024 * 1024; + size_t max_single_part_upload_size = 32 * 1024 * 1024; + bool check_objects_after_upload = false; + size_t max_unexpected_write_error_retries = 4; +}; + +class S3WritableFile final : public WritableFile +{ +public: + static std::shared_ptr create(const String & remote_fname_); + + S3WritableFile( + std::shared_ptr client_ptr_, + const String & bucket_, + const String & remote_fname_, + const WriteSettings & write_settings_); + + ~S3WritableFile() override; + + ssize_t write(char * buf, size_t size) override; + + // To ensure that the data is uploaded to S3, the caller must call fsync after all write is finished. + int fsync() override; + + std::string getFileName() const override + { + return fmt::format("{}/{}", bucket, remote_fname); + } + + void close() override + { + is_close = true; + } + + bool isClosed() const override + { + return is_close; + } + + ssize_t pwrite(char * /*buf*/, size_t /*size*/, off_t /*offset*/) const override + { + throw Exception("S3WritableFile not support pwrite", ErrorCodes::NOT_IMPLEMENTED); + } + + int getFd() const override + { + return -1; + } + + void open() override + { + throw Exception("S3WritableFile not support open", ErrorCodes::NOT_IMPLEMENTED); + } + + int ftruncate(off_t /*length*/) override + { + throw Exception("S3WritatbleFile not support ftruncate", ErrorCodes::NOT_IMPLEMENTED); + } + + void hardLink(const std::string & /*existing_file*/) override + { + throw Exception("S3WritableFile not support hardLink", ErrorCodes::NOT_IMPLEMENTED); + } + + // `getUploadInfo` is used for test. + struct UploadInfo + { + size_t part_number; + String multipart_upload_id; + std::vector part_tags; + size_t total_write_bytes; + }; + UploadInfo getUploadInfo() const + { + return UploadInfo{part_number, multipart_upload_id, part_tags, total_write_bytes}; + } + +private: + void allocateBuffer(); + + void createMultipartUpload(); + void writePart(); + void completeMultipartUpload(); + + void makeSinglepartUpload(); + + void finalize(); + + struct UploadPartTask; + void fillUploadRequest(Aws::S3::Model::UploadPartRequest & req); + void processUploadRequest(UploadPartTask & task); + + struct PutObjectTask; + void fillPutRequest(Aws::S3::Model::PutObjectRequest & req); + void processPutRequest(const PutObjectTask & task); + + template + void checkS3Outcome(const T & outcome) + { + if (!outcome.IsSuccess()) + { + throw S3::fromS3Error(outcome.GetError(), "bucket={} key={}", bucket, remote_fname); + } + } + + const String bucket; + const String remote_fname; + const std::shared_ptr client_ptr; + const WriteSettings write_settings; + + std::shared_ptr temporary_buffer; // Buffer to accumulate data. + size_t last_part_size = 0; + size_t part_number = 0; + UInt64 total_write_bytes = 0; + + // Upload in S3 is made in parts. + String multipart_upload_id; + std::vector part_tags; + + LoggerPtr log; + + bool is_close = false; +}; + +} // namespace DB::S3 \ No newline at end of file diff --git a/dbms/src/Storages/S3/tests/gtest_s3file.cpp b/dbms/src/Storages/S3/tests/gtest_s3file.cpp new file mode 100644 index 00000000000..99a9f8dbb2a --- /dev/null +++ b/dbms/src/Storages/S3/tests/gtest_s3file.cpp @@ -0,0 +1,406 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +using namespace std::chrono_literals; +using namespace DB::DM; +using namespace DB::DM::tests; +using namespace DB::S3; + +namespace DB::tests +{ +using DMFileBlockOutputStreamPtr = std::shared_ptr; +using DMFileBlockInputStreamPtr = std::shared_ptr; + +class S3FileTest : public DB::base::TiFlashStorageTestBasic +{ +public: + static void SetUpTestCase() {} + + void SetUp() override + { + TiFlashStorageTestBasic::SetUp(); + + reload(); + + log = Logger::get(); + + buf_unit.resize(256); + std::iota(buf_unit.begin(), buf_unit.end(), 0); + + s3_client = S3::ClientFactory::instance().sharedClient(); + bucket = S3::ClientFactory::instance().bucket(); + ASSERT_TRUE(createBucketIfNotExist()); + } + + void reload() + { + TiFlashStorageTestBasic::reload(); + } + + Context & dbContext() { return *db_context; } + +protected: + bool createBucketIfNotExist() + { + Aws::S3::Model::CreateBucketRequest request; + request.SetBucket(bucket); + auto outcome = s3_client->CreateBucket(request); + if (outcome.IsSuccess()) + { + LOG_DEBUG(log, "Created bucket {}", bucket); + } + else if (outcome.GetError().GetExceptionName() == "BucketAlreadyOwnedByYou") + { + LOG_DEBUG(log, "Bucket {} already exist", bucket); + } + else + { + const auto & err = outcome.GetError(); + LOG_ERROR(log, "CreateBucket: {}:{}", err.GetExceptionName(), err.GetMessage()); + } + return outcome.IsSuccess() || outcome.GetError().GetExceptionName() == "BucketAlreadyOwnedByYou"; + } + void writeFile(const String & key, size_t size, const WriteSettings & write_setting) + { + S3WritableFile file(s3_client, bucket, key, write_setting); + size_t write_size = 0; + while (write_size < size) + { + auto to_write = std::min(buf_unit.size(), size - write_size); + auto n = file.write(buf_unit.data(), to_write); + ASSERT_EQ(n, to_write); + write_size += n; + } + auto r = file.fsync(); + ASSERT_EQ(r, 0); + last_upload_info = file.getUploadInfo(); + } + + void verifyFile(const String & key, size_t size) + { + S3RandomAccessFile file(s3_client, bucket, key); + std::vector tmp_buf; + size_t read_size = 0; + while (read_size < size) + { + tmp_buf.resize(256); + std::iota(tmp_buf.begin(), tmp_buf.end(), 1); + auto n = file.read(tmp_buf.data(), tmp_buf.size()); + ASSERT_GE(n, 0); + if (static_cast(n) == buf_unit.size()) + { + ASSERT_EQ(tmp_buf, buf_unit); + } + else + { + ASSERT_EQ(std::vector(tmp_buf.begin(), tmp_buf.begin() + n), + std::vector(buf_unit.begin(), buf_unit.begin() + n)); + } + read_size += n; + } + ASSERT_EQ(read_size, size); + } + + static String md5(const std::string & filename) + { + Poco::MD5Engine md5_engine; + Poco::DigestOutputStream output_stream(md5_engine); + std::ifstream fstr(filename); + Poco::StreamCopier::copyStream(fstr, output_stream); + output_stream.close(); + auto md5_val = md5_engine.digest(); + String res; + static constexpr const char * hex_table = "0123456789abcdef"; + for (int i = 0; i < 16; ++i) + { + auto c = md5_val[i]; + res += hex_table[c >> 4]; + res += hex_table[c & 15]; + } + return res; + } + + static std::vector getLocalFiles(const String & dir) + { + std::vector filenames; + Poco::DirectoryIterator end; + for (auto itr = Poco::DirectoryIterator{dir}; itr != end; ++itr) + { + if (itr->isFile()) + { + // `NGC` file is unused in Cloud-Native mode. + if (itr.name() != "NGC") + { + filenames.push_back(itr.name()); + } + } + } + return filenames; + } + + std::vector uploadDMFile(DMFilePtr local_dmfile, const DMFileOID & oid) + { + Stopwatch sw; + RUNTIME_CHECK(local_dmfile->fileId() == oid.file_id); + + const auto local_dir = local_dmfile->path(); + auto local_files = getLocalFiles(local_dir); + RUNTIME_CHECK(!local_files.empty()); + + const auto remote_dir = S3::S3Filename::fromDMFileOID(oid).toFullKey(); + LOG_DEBUG(log, "Start upload DMFile, local_dir={} remote_dir={} local_files={}", local_dir, remote_dir, local_files); + + std::vector> upload_results; + for (const auto & fname : local_files) + { + if (fname == DMFile::metav2FileName()) + { + // meta file will be upload at last. + continue; + } + auto local_fname = fmt::format("{}/{}", local_dir, fname); + auto remote_fname = fmt::format("{}/{}", remote_dir, fname); + S3::uploadFile(*s3_client, bucket, local_fname, remote_fname); + } + + // Only when the meta upload is successful, the dmfile upload can be considered successful. + auto local_meta_fname = fmt::format("{}/{}", local_dir, DMFile::metav2FileName()); + auto remote_meta_fname = fmt::format("{}/{}", remote_dir, DMFile::metav2FileName()); + S3::uploadFile(*s3_client, bucket, local_meta_fname, remote_meta_fname); + + LOG_DEBUG(log, "Upload DMFile finished, remote={}, cost={}ms", remote_dir, sw.elapsedMilliseconds()); + return local_files; + } + + void downloadDMFile(const DMFileOID & remote_oid, const String & local_dir, const std::vector & target_files) + { + Stopwatch sw; + const auto remote_dir = S3::S3Filename::fromDMFileOID(remote_oid).toFullKey(); + std::vector> download_results; + for (const auto & name : target_files) + { + auto remote_fname = fmt::format("{}/{}", remote_dir, name); + auto local_fname = fmt::format("{}/{}", local_dir, name); + S3::downloadFile(*s3_client, bucket, local_fname, remote_fname); + } + LOG_DEBUG(log, "Download DMFile meta finished, remote_dir={}, local_dir={} cost={}ms", remote_dir, local_dir, sw.elapsedMilliseconds()); + } + + std::unordered_map listFiles(const DMFileOID & oid) + { + auto dmfile_dir = DMFile::getPathByStatus( + S3::S3Filename::fromTableID(oid.store_id, oid.table_id).toFullKey(), + oid.file_id, + DMFile::Status::READABLE); + return S3::listPrefixWithSize(*s3_client, bucket, dmfile_dir + "/"); + } + + DMFilePtr restoreDMFile(const DMFileOID & oid) + { + return DMFile::restore(db_context->getFileProvider(), oid.file_id, oid.file_id, S3::S3Filename::fromTableID(oid.store_id, oid.table_id).toFullKeyWithPrefix(), DMFile::ReadMetaMode::all()); + } + + LoggerPtr log; + std::vector buf_unit; + std::shared_ptr s3_client; + String bucket; + S3WritableFile::UploadInfo last_upload_info; +}; + +TEST_F(S3FileTest, SinglePart) +try +{ + for (int i = 0; i < 10; i++) + { + const size_t size = 256 * i + ::rand() % 256; + const String key = "/a/b/c/singlepart"; + writeFile(key, size, WriteSettings{}); + ASSERT_EQ(last_upload_info.part_number, 0); + ASSERT_TRUE(last_upload_info.multipart_upload_id.empty()); + ASSERT_TRUE(last_upload_info.part_tags.empty()); + ASSERT_EQ(last_upload_info.total_write_bytes, size); + verifyFile(key, size); + } +} +CATCH + +TEST_F(S3FileTest, MultiPart) +try +{ + const auto size = 1024 * 1024 * 18; // 18MB + WriteSettings write_setting; + write_setting.max_single_part_upload_size = 1024 * 1024 * 6; // 6MB + write_setting.upload_part_size = 1024 * 1024 * 5; // 5MB + const String key = "/a/b/c/multipart"; + writeFile(key, size, write_setting); + ASSERT_EQ(last_upload_info.part_number, 4); + ASSERT_FALSE(last_upload_info.multipart_upload_id.empty()); + ASSERT_EQ(last_upload_info.part_tags.size(), last_upload_info.part_number); + ASSERT_EQ(last_upload_info.total_write_bytes, size); + verifyFile(key, size); +} +CATCH + +TEST_F(S3FileTest, Seek) +try +{ + const auto size = 1024 * 1024 * 10; // 10MB + WriteSettings write_setting; + const String key = "/a/b/c/seek"; + writeFile(key, size, write_setting); + S3RandomAccessFile file(s3_client, bucket, key); + { + std::vector tmp_buf(256); + auto n = file.read(tmp_buf.data(), tmp_buf.size()); + ASSERT_EQ(n, tmp_buf.size()); + ASSERT_EQ(tmp_buf, buf_unit); + } + { + auto offset = file.seek(513, SEEK_SET); + ASSERT_EQ(offset, 513); + std::vector tmp_buf(256); + auto n = file.read(tmp_buf.data(), tmp_buf.size()); + ASSERT_EQ(n, tmp_buf.size()); + + std::vector expected(256); + std::iota(expected.begin(), expected.end(), 1); + ASSERT_EQ(tmp_buf, expected); + } +} +CATCH + +TEST_F(S3FileTest, WriteRead) +try +{ + auto cols = DMTestEnv::getDefaultColumns(); + + const size_t num_rows_write = 128; + + DMFileBlockOutputStream::BlockProperty block_property1; + block_property1.effective_num_rows = 1; + block_property1.gc_hint_version = 1; + block_property1.deleted_rows = 1; + DMFileBlockOutputStream::BlockProperty block_property2; + block_property2.effective_num_rows = 2; + block_property2.gc_hint_version = 2; + block_property2.deleted_rows = 2; + std::vector block_propertys; + block_propertys.push_back(block_property1); + block_propertys.push_back(block_property2); + auto parent_path = TiFlashStorageTestBasic::getTemporaryPath(); + DMFilePtr dmfile; + DMFileOID oid; + oid.store_id = 1; + oid.table_id = 1; + oid.file_id = std::chrono::time_point_cast(std::chrono::system_clock::now()).time_since_epoch().count(); + { + // Prepare for write + // Block 1: [0, 64) + Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write / 2, false); + // Block 2: [64, 128) + Block block2 = DMTestEnv::prepareSimpleWriteBlock(num_rows_write / 2, num_rows_write, false); + + auto configuration = std::make_optional(); + dmfile = DMFile::create(oid.file_id, parent_path, std::move(configuration), DMFileFormat::V3); + auto stream = std::make_shared(dbContext(), dmfile, *cols); + stream->writePrefix(); + stream->write(block1, block_property1); + stream->write(block2, block_property2); + stream->writeSuffix(); + + ASSERT_EQ(dmfile->getPackProperties().property_size(), 2); + } + + std::vector uploaded_files; + { + uploaded_files = uploadDMFile(dmfile, oid); + auto files_with_size = listFiles(oid); + ASSERT_EQ(uploaded_files.size(), files_with_size.size()); + LOG_TRACE(log, "{}\n", files_with_size); + } + + { + auto dmfile_dir = dmfile->path(); + auto copy_dir = fmt::format("{}_copy", dmfile_dir); + Poco::File file(copy_dir); + if (file.exists()) + { + file.remove(true); + } + file.createDirectory(); + downloadDMFile(oid, copy_dir, uploaded_files); + Poco::File poco_copy_dir(copy_dir); + std::vector filenames; + poco_copy_dir.list(filenames); + + ASSERT_FALSE(filenames.empty()); + + for (const auto & filename : filenames) + { + auto local_fname = fmt::format("{}/{}", dmfile_dir, filename); + auto copy_fname = fmt::format("{}/{}", copy_dir, filename); + auto local_md5 = md5(local_fname); + auto copy_md5 = md5(copy_fname); + ASSERT_EQ(copy_md5, local_md5) << fmt::format("local_fname={}, copy_fname={}", local_fname, copy_fname); + LOG_TRACE(log, "local_fname={}, copy_fname={}, md5={}", local_fname, copy_fname, local_md5); + } + } + + auto dmfile_from_s3 = restoreDMFile(oid); + ASSERT_NE(dmfile_from_s3, nullptr); + try + { + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder.build(dmfile_from_s3, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}, std::make_shared()); + ASSERT_INPUTSTREAM_COLS_UR( + stream, + Strings({DMTestEnv::pk_name}), + createColumns({ + createColumn(createNumbers(0, num_rows_write)), + })); + } + catch (...) + { + tryLogCurrentException("restore..."); + std::abort(); + } +} +CATCH +} // namespace DB::tests \ No newline at end of file diff --git a/dbms/src/Storages/S3/tests/gtest_s3filename.cpp b/dbms/src/Storages/S3/tests/gtest_s3filename.cpp index b5e7a9d350d..b210f7ed0dc 100644 --- a/dbms/src/Storages/S3/tests/gtest_s3filename.cpp +++ b/dbms/src/Storages/S3/tests/gtest_s3filename.cpp @@ -139,6 +139,88 @@ TEST(S3FilenameTest, StableFile) check(r.toView()); } +TEST(S3FilenameTest, Prefix) +{ + String dmf = "s2077/data/t_44/dmf_57"; + String dmf_with_prefix = "s3://s2077/data/t_44/dmf_57"; + ASSERT_FALSE(S3FilenameView::fromKeyWithPrefix(dmf).isValid()); + ASSERT_TRUE(S3FilenameView::fromKey(dmf).isValid()); + ASSERT_TRUE(S3FilenameView::fromKeyWithPrefix(dmf_with_prefix).isValid()); + ASSERT_FALSE(S3FilenameView::fromKey(dmf_with_prefix).isValid()); + + { + auto meta = fmt::format("{}/meta", dmf_with_prefix); + auto v = S3FilenameView::fromKeyWithPrefix(meta); + ASSERT_TRUE(v.isValid()); + ASSERT_EQ(v.data_subpath, "t_44/dmf_57/meta"); + } + + { + auto data = fmt::format("{}/1.dat", dmf_with_prefix); + auto v = S3FilenameView::fromKeyWithPrefix(data); + ASSERT_TRUE(v.isValid()); + ASSERT_EQ(v.data_subpath, "t_44/dmf_57/1.dat"); + } + + { + auto mark = fmt::format("{}/1.mrk", dmf_with_prefix); + auto v = S3FilenameView::fromKeyWithPrefix(mark); + ASSERT_TRUE(v.isValid()); + ASSERT_EQ(v.data_subpath, "t_44/dmf_57/1.mrk"); + } + + { + auto null_map = fmt::format("{}/1.null.dat", dmf_with_prefix); + auto v = S3FilenameView::fromKeyWithPrefix(null_map); + ASSERT_TRUE(v.isValid()); + ASSERT_EQ(v.data_subpath, "t_44/dmf_57/1.null.dat"); + } + + { + auto null_mrk = fmt::format("{}/1.null.mrk", dmf_with_prefix); + auto v = S3FilenameView::fromKeyWithPrefix(null_mrk); + ASSERT_TRUE(v.isValid()); + ASSERT_EQ(v.data_subpath, "t_44/dmf_57/1.null.mrk"); + } + + { + auto index = fmt::format("{}/1.idx", dmf_with_prefix); + auto v = S3FilenameView::fromKeyWithPrefix(index); + ASSERT_TRUE(v.isValid()); + ASSERT_EQ(v.data_subpath, "t_44/dmf_57/1.idx"); + } + + DMFileOID oid{.store_id = 2077, .table_id = 44, .file_id = 57}; + { + auto s3_fname = S3Filename::fromDMFileOID(oid); + ASSERT_EQ(s3_fname.toFullKey(), dmf); + ASSERT_EQ(s3_fname.toFullKeyWithPrefix(), dmf_with_prefix); + } + { + String table = "s2077/data/t_44"; + String table_with_prefix = "s3://s2077/data/t_44"; + auto s3_fname = S3Filename::fromTableID(oid.store_id, oid.table_id); + ASSERT_EQ(s3_fname.toFullKey(), table); + ASSERT_EQ(s3_fname.toFullKeyWithPrefix(), table_with_prefix); + } +} + +TEST(S3FilenameTest, StableTable) +{ + UInt64 test_store_id = 2077; + Int64 test_table_id = 44; + String table_key = "s2077/data/t_44"; + + auto name = S3Filename::fromTableID(test_store_id, test_table_id); + ASSERT_EQ(name.toFullKey(), table_key); + + auto view = S3FilenameView::fromKey(table_key); + ASSERT_TRUE(view.isValid()) << table_key; + ASSERT_TRUE(view.isDataFile()) << table_key; + ASSERT_EQ(view.store_id, test_store_id) << table_key; + ASSERT_EQ(view.data_subpath, "t_44") << table_key; +} + TEST(S3FilenameTest, StorePrefix) { { diff --git a/dbms/src/TestUtils/gtests_dbms_main.cpp b/dbms/src/TestUtils/gtests_dbms_main.cpp index 2932b1ad36b..524dbb4fc90 100644 --- a/dbms/src/TestUtils/gtests_dbms_main.cpp +++ b/dbms/src/TestUtils/gtests_dbms_main.cpp @@ -75,9 +75,10 @@ int main(int argc, char ** argv) DB::DM::SegmentReadTaskScheduler::instance(); const auto s3_endpoint = Poco::Environment::get("S3_ENDPOINT", ""); - const auto s3_bucket = Poco::Environment::get("S3_BUCKET", ""); + const auto s3_bucket = Poco::Environment::get("S3_BUCKET", "mock_bucket"); const auto access_key_id = Poco::Environment::get("AWS_ACCESS_KEY_ID", ""); const auto secret_access_key = Poco::Environment::get("AWS_SECRET_ACCESS_KEY", ""); + const auto mock_s3 = Poco::Environment::get("MOCK_S3", "true"); // In unit-tests, use MockS3Client by default. auto s3config = DB::StorageS3Config{ .endpoint = s3_endpoint, .bucket = s3_bucket, @@ -85,7 +86,7 @@ int main(int argc, char ** argv) .secret_access_key = secret_access_key, }; Poco::Environment::set("AWS_EC2_METADATA_DISABLED", "true"); // disable to speedup testing - DB::S3::ClientFactory::instance().init(s3config); + DB::S3::ClientFactory::instance().init(s3config, mock_s3 == "true"); #ifdef FIU_ENABLE fiu_init(0); // init failpoint